Make sure not to miss bulb power results

This commit is contained in:
2024-02-23 23:42:05 +01:00
parent ab46f56753
commit e38eb38579

View File

@ -1,4 +1,8 @@
use std::{collections::HashSet, str, time::Duration}; use std::{
collections::{HashSet, VecDeque},
str,
time::Duration,
};
use async_trait::async_trait; use async_trait::async_trait;
use eyre::{Context, Error}; use eyre::{Context, Error};
@ -25,6 +29,7 @@ const RECONNECT_DELAYS: &[u64] = &[0, 1, 2, 5, 10, 10, 10, 20];
pub struct BulbsMqtt { pub struct BulbsMqtt {
known_bulbs: HashSet<BulbId>, known_bulbs: HashSet<BulbId>,
socket: SocketState, socket: SocketState,
updates: VecDeque<(BulbId, BulbUpdate)>,
} }
struct SocketState { struct SocketState {
@ -44,6 +49,7 @@ impl BulbsMqtt {
failed_connect_attempts: 0, failed_connect_attempts: 0,
socket: None, socket: None,
}, },
updates: Default::default(),
} }
} }
} }
@ -140,6 +146,10 @@ impl BulbProvider for BulbsMqtt {
let socket = this.socket.get_connection().await?; let socket = this.socket.get_connection().await?;
loop { loop {
if let Some(id_update) = this.updates.pop_front() {
return Ok(id_update);
}
let packet = VariablePacket::parse(socket).await.map_err(retry)?; let packet = VariablePacket::parse(socket).await.map_err(retry)?;
let VariablePacket::PublishPacket(publish) = &packet else { let VariablePacket::PublishPacket(publish) = &packet else {
@ -147,6 +157,7 @@ impl BulbProvider for BulbsMqtt {
}; };
let topic_name = publish.topic_name(); let topic_name = publish.topic_name();
let topic_segments: Vec<&str> = topic_name.split('/').collect(); let topic_segments: Vec<&str> = topic_name.split('/').collect();
match &topic_segments[..] { match &topic_segments[..] {
[prefix, id @ .., suffix] => { [prefix, id @ .., suffix] => {
@ -161,8 +172,11 @@ impl BulbProvider for BulbsMqtt {
.wrap_err("Failed to decode pulish message payload") .wrap_err("Failed to decode pulish message payload")
.map_err(retry)?; .map_err(retry)?;
let update = match (*prefix, *suffix) { match (*prefix, *suffix) {
("stat", "POWER") => BulbUpdate::Power(payload == "ON"), ("stat", "POWER") => {
this.updates
.push_back((id, BulbUpdate::Power(payload == "ON")));
}
("stat", "RESULT") => { ("stat", "RESULT") => {
let result: BulbResult = serde_json::from_str(payload) let result: BulbResult = serde_json::from_str(payload)
.wrap_err("Failed to decode BulbResult") .wrap_err("Failed to decode BulbResult")
@ -174,27 +188,26 @@ impl BulbProvider for BulbsMqtt {
.parse() .parse()
.wrap_err("Failed to decode bulb color") .wrap_err("Failed to decode bulb color")
.map_err(retry)?; .map_err(retry)?;
BulbUpdate::Color(color) this.updates
} else if let Some(power) = result.power { .push_back((id.clone(), BulbUpdate::Color(color)));
BulbUpdate::Power(power == "ON") }
} else {
continue; if let Some(power) = result.power {
this.updates
.push_back((id, BulbUpdate::Power(power == "ON")));
} }
} }
// TODO: handle STATE message // TODO: handle STATE message
//("tele", "STATE") => todo!(), //("tele", "STATE") => todo!(),
// ignore known useless messages // ignore known useless messages
("cmnd", _) => continue, ("cmnd", _) => {}
("tele", "LWT") => continue, ("tele", "LWT") => {}
_ => { _ => {
warn!("unrecognized topic: {topic_name} payload={payload:?}"); warn!("unrecognized topic: {topic_name} payload={payload:?}");
continue;
} }
}; };
return Ok((id, update));
} }
_ => { _ => {
warn!("unrecognized topic: {topic_name}"); warn!("unrecognized topic: {topic_name}");
@ -261,7 +274,7 @@ async fn subscribe(socket: &mut TcpStream) -> eyre::Result<()> {
Ok(()) Ok(())
} }
#[derive(Deserialize)] #[derive(Deserialize, Debug)]
struct BulbResult { struct BulbResult {
#[serde(rename(deserialize = "POWER"))] #[serde(rename(deserialize = "POWER"))]
power: Option<String>, power: Option<String>,