diff --git a/manager/src/manager.rs b/manager/src/manager.rs index 419a6f3..10af918 100644 --- a/manager/src/manager.rs +++ b/manager/src/manager.rs @@ -1,8 +1,13 @@ use crate::provider::{BulbProvider, BulbUpdate}; +use eyre::Context; use lighter_lib::{BulbColor, BulbId, BulbMode}; use serde::Deserialize; -use std::collections::BTreeMap; +use std::collections::HashMap; +use std::future::Future; +use std::sync::atomic::Ordering; use std::sync::Arc; +use std::{collections::BTreeMap, sync::atomic::AtomicU64}; +use tokio::select; use tokio::{ sync::{futures::Notified, mpsc, Notify, RwLock, RwLockReadGuard}, task, @@ -30,12 +35,17 @@ pub enum BulbCommand { SetColor(BulbSelector, BulbColor), } -#[derive(Clone)] +type InstanceId = u64; + +/// A handle to a bulb manager. Can be cloned. pub struct BulbManager { + id: InstanceId, state: Arc, } struct BulbsState { + next_id: AtomicU64, + /// Notify on any change to the bulbs notify: Notify, @@ -44,8 +54,11 @@ struct BulbsState { /// State of all bulbs bulbs: RwLock>, + + exclusive_bulbs: RwLock)>>, } +/// State of the main bulb manager thread. struct ManagerState

{ #[allow(dead_code)] config: BulbsConfig, @@ -64,6 +77,7 @@ impl BulbManager { let (command_tx, command_rx) = mpsc::channel(100); let bulbs_state = Arc::new(BulbsState { + next_id: AtomicU64::new(1), notify: Notify::new(), command: command_tx, bulbs: RwLock::new( @@ -74,6 +88,7 @@ impl BulbManager { .map(|id| (id, Default::default())) .collect(), ), + exclusive_bulbs: Default::default(), }); let state = ManagerState { @@ -83,7 +98,10 @@ impl BulbManager { state: Arc::clone(&bulbs_state), }; - let manager = BulbManager { state: bulbs_state }; + let manager = BulbManager { + id: 0, + state: bulbs_state, + }; task::spawn(run(state)); @@ -95,16 +113,74 @@ impl BulbManager { } pub async fn send_command(&self, command: BulbCommand) { - info!("sending command {command:?}"); + let exclusive_bulbs = self.state.exclusive_bulbs.read().await; + + match command.selector() { + BulbSelector::All => { + // TODO + } + BulbSelector::Id(bulb) => { + if let Some((id, interrupt)) = exclusive_bulbs.get(bulb) { + if id != &self.id { + interrupt.notify_one(); + } + } + } + } + + debug!("sending command {command:?}"); if let Err(e) = self.state.command.send(command).await { error!("error sending bulb command: {e:#}"); } - info!("sent command"); + debug!("sent command"); } pub async fn bulbs(&self) -> RwLockReadGuard<'_, BTreeMap> { self.state.bulbs.read().await } + + /// Run the provided future until it finishes, or some other BulbManager sends a command to + /// the specified bulb. + pub async fn until_interrupted(&self, bulb: BulbId, f: F) -> Option + where + F: Future, + { + let interrupt = Arc::new(Notify::new()); + if let Some((id, prev)) = self + .state + .exclusive_bulbs + .write() + .await + .insert(bulb, (self.id, Arc::clone(&interrupt))) + { + if id != self.id { + prev.notify_one(); + } + } + + select! { + _ = interrupt.notified() => None, + t = f => Some(t), + } + } +} + +impl Clone for BulbManager { + fn clone(&self) -> Self { + Self { + id: self.state.next_id.fetch_add(1, Ordering::SeqCst), + state: self.state.clone(), + } + } +} + +impl BulbCommand { + pub fn selector(&self) -> &BulbSelector { + match self { + BulbCommand::SetPower(s, _) => s, + BulbCommand::SetColor(s, _) => s, + } + } } async fn run

(state: ManagerState

) @@ -113,10 +189,11 @@ where { debug!("manager task running"); if let Err(e) = run_inner(state).await { - error!("bulb manage exited with error: {e:#}"); + error!("bulb manager exited with error: {e:?}"); } info!("manager task exited"); } + async fn run_inner

(mut state: ManagerState

) -> eyre::Result<()> where P: BulbProvider + Send, @@ -128,13 +205,15 @@ where info!("handle closed, shutting down"); return Ok(()); }; - info!("command received: {command:?}"); - state.provider.send_command(command.clone()).await?; + debug!("command received: {command:?}"); + state.provider.send_command(command.clone()).await + .wrap_err("Failed to send command to BulbProvider")?; } update = state.provider.listen() => { - let (id, update) = update?; + let (id, update) = update + .wrap_err("Error listening to BulbProvider")?; - info!("update received: {id:?} {update:?}"); + debug!("update received: {id:?} {update:?}"); let mut bulbs = state.state.bulbs.write().await; let Some(bulb) = bulbs.get_mut(&id) else { diff --git a/manager/src/provider.rs b/manager/src/provider.rs index 8f2294d..6431686 100644 --- a/manager/src/provider.rs +++ b/manager/src/provider.rs @@ -16,10 +16,16 @@ pub enum BulbUpdate { // An interface that allows communication with bulbs. #[async_trait] pub trait BulbProvider { - // Send a command to some bulbs to update their state + // Send a command to some bulbs to update their state. + // + // This function should only return fatal errors. + // Recoverable error should incurr a retry. async fn send_command(&mut self, cmd: BulbCommand) -> eyre::Result<()>; // Wait for any bulb to send an update + // + // This function should only return fatal errors. + // Recoverable error should incurr a retry. async fn listen(&mut self) -> eyre::Result<(BulbId, BulbUpdate)>; } diff --git a/manager/src/provider/mqtt.rs b/manager/src/provider/mqtt.rs index 681bc6e..a77367d 100644 --- a/manager/src/provider/mqtt.rs +++ b/manager/src/provider/mqtt.rs @@ -1,6 +1,7 @@ use std::{collections::HashSet, str, time::Duration}; use async_trait::async_trait; +use eyre::{Context, Error}; use lighter_lib::BulbId; use mqtt::{ packet::{PublishPacket, QoSWithPacketIdentifier, SubscribePacket, VariablePacket}, @@ -48,7 +49,7 @@ impl BulbsMqtt { } impl SocketState { - async fn get_connection(&mut self) -> eyre::Result<&mut TcpStream> { + async fn get_connection(&mut self) -> Result<&mut TcpStream, FailMode> { let socket = &mut self.socket; if let Some(socket) = socket { @@ -63,8 +64,8 @@ impl SocketState { self.last_connection_attempt = Instant::now(); info!("connecting to MQTT (attempt {attempt})"); - let mut new_socket = self.mqtt_config.connect().await?; - subscribe(&mut new_socket).await?; + let mut new_socket = self.mqtt_config.connect().await.map_err(retry)?; + subscribe(&mut new_socket).await.map_err(retry)?; info!("connected to MQTT"); self.failed_connect_attempts = 0; @@ -75,108 +76,138 @@ impl SocketState { #[async_trait] impl BulbProvider for BulbsMqtt { async fn send_command(&mut self, command: BulbCommand) -> eyre::Result<()> { - debug!("mqtt sending command {command:?}"); - let socket = self.socket.get_connection().await?; - async fn send( all_bulbs: &HashSet, selector: BulbSelector, publish: &mut Publish<'_, P>, - ) -> eyre::Result<()> { + ) -> Result<(), FailMode> { match selector { - BulbSelector::Id(id) => publish.send(&id).await?, + BulbSelector::Id(id) => publish.send(&id).await.map_err(retry)?, BulbSelector::All => { for id in all_bulbs { - publish.send(id).await?; + publish.send(id).await.map_err(retry)?; } } } Ok(()) } - match command { - BulbCommand::SetPower(selector, power) => { - let payload = if power { "ON" } else { "OFF" }; - let mut publish = Publish { - topic_prefix: "cmnd", - topic_suffix: "POWER", - payload, - socket, - }; - send(&self.known_bulbs, selector, &mut publish).await?; + async fn inner(this: &mut BulbsMqtt, command: &BulbCommand) -> Result<(), FailMode> { + let socket = this.socket.get_connection().await?; + + match command.clone() { + BulbCommand::SetPower(selector, power) => { + let payload = if power { "ON" } else { "OFF" }; + let mut publish = Publish { + topic_prefix: "cmnd", + topic_suffix: "POWER", + payload, + socket, + }; + send(&this.known_bulbs, selector, &mut publish).await?; + } + BulbCommand::SetColor(selector, color) => { + let mut publish = Publish { + topic_prefix: "cmnd", + topic_suffix: "COLOR", + payload: color.color_string(), + socket, + }; + send(&this.known_bulbs, selector, &mut publish).await?; + } } - BulbCommand::SetColor(selector, color) => { - let mut publish = Publish { - topic_prefix: "cmnd", - topic_suffix: "COLOR", - payload: color.color_string(), - socket, - }; - send(&self.known_bulbs, selector, &mut publish).await?; + Ok(()) + } + + debug!("mqtt sending command {command:?}"); + + loop { + match inner(self, &command).await { + Ok(t) => break Ok(t), + Err(FailMode::Retry(e)) => info!("Retrying on error: {e:?}"), + Err(FailMode::Fatal(e)) => break Err(e), } } - Ok(()) } async fn listen(&mut self) -> eyre::Result<(BulbId, BulbUpdate)> { debug!("mqtt listening for updates"); - let socket = self.socket.get_connection().await?; - loop { - let packet = VariablePacket::parse(socket).await?; + async fn inner(this: &mut BulbsMqtt) -> Result<(BulbId, BulbUpdate), FailMode> { + let socket = this.socket.get_connection().await?; - let VariablePacket::PublishPacket(publish) = &packet else { - continue; - }; + loop { + let packet = VariablePacket::parse(socket).await.map_err(retry)?; - let topic_name = publish.topic_name(); - let topic_segments: Vec<&str> = topic_name.split('/').collect(); - match &topic_segments[..] { - [prefix, id @ .., suffix] => { - let id = BulbId(id.join("/")); + let VariablePacket::PublishPacket(publish) = &packet else { + continue; + }; - if !self.known_bulbs.contains(&id) { - warn!("ignoring publish from unknown bulb {id}"); - continue; - } + let topic_name = publish.topic_name(); + let topic_segments: Vec<&str> = topic_name.split('/').collect(); + match &topic_segments[..] { + [prefix, id @ .., suffix] => { + let id = BulbId(id.join("/")); - let payload = str::from_utf8(publish.payload())?; - - let update = match (*prefix, *suffix) { - ("stat", "POWER") => BulbUpdate::Power(payload == "ON"), - ("stat", "RESULT") => { - let result: BulbResult = serde_json::from_str(payload)?; - - // TODO: color and power can be updated at the same time? - if let Some(color) = result.color { - BulbUpdate::Color(color.parse()?) - } else if let Some(power) = result.power { - BulbUpdate::Power(power == "ON") - } else { - continue; - } - } - // TODO: handle STATE message - //("tele", "STATE") => todo!(), - - // ignore known useless messages - ("cmnd", _) => continue, - ("tele", "LWT") => continue, - - _ => { - warn!("unrecognized topic: {topic_name} payload={payload:?}"); + if !this.known_bulbs.contains(&id) { + warn!("ignoring publish from unknown bulb {id}"); continue; } - }; - return Ok((id, update)); - } - _ => { - warn!("unrecognized topic: {topic_name}"); - continue; + let payload = str::from_utf8(publish.payload()) + .wrap_err("Failed to decode pulish message payload") + .map_err(retry)?; + + let update = match (*prefix, *suffix) { + ("stat", "POWER") => BulbUpdate::Power(payload == "ON"), + ("stat", "RESULT") => { + let result: BulbResult = serde_json::from_str(payload) + .wrap_err("Failed to decode BulbResult") + .map_err(retry)?; + + // TODO: color and power can be updated at the same time? + if let Some(color) = result.color { + let color = color + .parse() + .wrap_err("Failed to decode bulb color") + .map_err(retry)?; + BulbUpdate::Color(color) + } else if let Some(power) = result.power { + BulbUpdate::Power(power == "ON") + } else { + continue; + } + } + // TODO: handle STATE message + //("tele", "STATE") => todo!(), + + // ignore known useless messages + ("cmnd", _) => continue, + ("tele", "LWT") => continue, + + _ => { + warn!("unrecognized topic: {topic_name} payload={payload:?}"); + continue; + } + }; + + return Ok((id, update)); + } + _ => { + warn!("unrecognized topic: {topic_name}"); + continue; + } } } } + + loop { + match inner(self).await { + Ok(t) => break Ok(t), + Err(FailMode::Retry(e)) => info!("Retrying on error: {e:?}"), + Err(FailMode::Fatal(e)) => break Err(e), + } + } } } @@ -232,3 +263,15 @@ struct BulbResult { #[serde(rename(deserialize = "Color"))] color: Option, } + +fn retry>(err: X) -> FailMode { + FailMode::Retry(err.into()) +} + +enum FailMode { + /// A non-fatal error that should prompt a retry. + Retry(E), + + /// A fatal error that can't be recovered from. + Fatal(E), +}