From e38eb38579b5a4a5efbbcabe43e422522b20caf7 Mon Sep 17 00:00:00 2001 From: Joakim Hulthe Date: Fri, 23 Feb 2024 23:42:05 +0100 Subject: [PATCH] Make sure not to miss bulb power results --- manager/src/provider/mqtt.rs | 41 ++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/manager/src/provider/mqtt.rs b/manager/src/provider/mqtt.rs index 52133fd..da972d2 100644 --- a/manager/src/provider/mqtt.rs +++ b/manager/src/provider/mqtt.rs @@ -1,4 +1,8 @@ -use std::{collections::HashSet, str, time::Duration}; +use std::{ + collections::{HashSet, VecDeque}, + str, + time::Duration, +}; use async_trait::async_trait; use eyre::{Context, Error}; @@ -25,6 +29,7 @@ const RECONNECT_DELAYS: &[u64] = &[0, 1, 2, 5, 10, 10, 10, 20]; pub struct BulbsMqtt { known_bulbs: HashSet, socket: SocketState, + updates: VecDeque<(BulbId, BulbUpdate)>, } struct SocketState { @@ -44,6 +49,7 @@ impl BulbsMqtt { failed_connect_attempts: 0, socket: None, }, + updates: Default::default(), } } } @@ -140,6 +146,10 @@ impl BulbProvider for BulbsMqtt { let socket = this.socket.get_connection().await?; loop { + if let Some(id_update) = this.updates.pop_front() { + return Ok(id_update); + } + let packet = VariablePacket::parse(socket).await.map_err(retry)?; let VariablePacket::PublishPacket(publish) = &packet else { @@ -147,6 +157,7 @@ impl BulbProvider for BulbsMqtt { }; let topic_name = publish.topic_name(); + let topic_segments: Vec<&str> = topic_name.split('/').collect(); match &topic_segments[..] { [prefix, id @ .., suffix] => { @@ -161,8 +172,11 @@ impl BulbProvider for BulbsMqtt { .wrap_err("Failed to decode pulish message payload") .map_err(retry)?; - let update = match (*prefix, *suffix) { - ("stat", "POWER") => BulbUpdate::Power(payload == "ON"), + match (*prefix, *suffix) { + ("stat", "POWER") => { + this.updates + .push_back((id, BulbUpdate::Power(payload == "ON"))); + } ("stat", "RESULT") => { let result: BulbResult = serde_json::from_str(payload) .wrap_err("Failed to decode BulbResult") @@ -174,27 +188,26 @@ impl BulbProvider for BulbsMqtt { .parse() .wrap_err("Failed to decode bulb color") .map_err(retry)?; - BulbUpdate::Color(color) - } else if let Some(power) = result.power { - BulbUpdate::Power(power == "ON") - } else { - continue; + this.updates + .push_back((id.clone(), BulbUpdate::Color(color))); + } + + if let Some(power) = result.power { + this.updates + .push_back((id, BulbUpdate::Power(power == "ON"))); } } // TODO: handle STATE message //("tele", "STATE") => todo!(), // ignore known useless messages - ("cmnd", _) => continue, - ("tele", "LWT") => continue, + ("cmnd", _) => {} + ("tele", "LWT") => {} _ => { warn!("unrecognized topic: {topic_name} payload={payload:?}"); - continue; } }; - - return Ok((id, update)); } _ => { warn!("unrecognized topic: {topic_name}"); @@ -261,7 +274,7 @@ async fn subscribe(socket: &mut TcpStream) -> eyre::Result<()> { Ok(()) } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] struct BulbResult { #[serde(rename(deserialize = "POWER"))] power: Option,