diff --git a/backend/src/main.rs b/backend/src/main.rs index 201c369..93c4390 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,13 +1,11 @@ mod collector; +mod tasks; use clap::Parser; -use collector::{Collector, CollectorConfig, MarkdownWeb, WeatherApi}; +use collector::CollectorConfig; use common::{BulbMap, ClientMessage, ServerMessage}; use futures_util::{SinkExt, StreamExt}; -use lighter_manager::{ - manager::{BulbCommand, BulbManager, BulbSelector, BulbsConfig}, - mqtt_conf::MqttConfig, -}; +use lighter_manager::{manager::BulbsConfig, mqtt_conf::MqttConfig}; use log::LevelFilter; use serde::Deserialize; use std::convert::Infallible; @@ -15,7 +13,6 @@ use std::net::SocketAddr; use std::path::PathBuf; use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, mpsc}; -use tokio::time::{sleep, Duration}; use tokio::{fs, select, task}; use warp::ws::{self, WebSocket}; use warp::{Filter, Rejection, Reply}; @@ -44,7 +41,7 @@ struct Opt { } #[derive(Deserialize)] -struct Config { +pub struct Config { mqtt: MqttConfig, collectors: CollectorConfig, @@ -56,7 +53,7 @@ struct Config { bulb_map: BulbMap, } -struct State { +pub struct State { config: Config, client_message: broadcast::Sender, server_message: broadcast::Sender, @@ -94,8 +91,8 @@ async fn main() { }; let state = Box::leak(Box::new(state)); - task::spawn(info_collector(state)); - task::spawn(lights_collector(state)); + task::spawn(tasks::lights_task(state)); + task::spawn(tasks::info_task(state)); let ws = warp::path("ws") // The `ws()` filter will prepare the Websocket handshake. @@ -129,123 +126,6 @@ struct ClientRequest { response: mpsc::Sender, } -async fn lights_collector(state: &State) { - let config = &state.config; - let server_message = &state.server_message; - let mut client_message = state.client_message.subscribe(); - - let (cmd, bulb_states) = BulbManager::launch(config.bulbs.clone(), config.mqtt.clone()) - .await - .expect("Failed to launch bulb manager"); - - loop { - let notify = bulb_states.notify_on_change(); - sleep(Duration::from_millis(1000 / 10)).await; // limit to 10 updates/second - select! { - _ = notify => { - for (id, mode) in bulb_states.bulbs().await.clone().into_iter() { - if let Err(e) = server_message.send(ServerMessage::BulbMode { id, mode }) { - error!("broadcast channel error: {e}"); - return; - } - } - } - request = client_message.recv() => { - let request = match request { - Ok(r) => r, - Err(_) => continue, - }; - - match request.message { - ClientMessage::SetBulbColor { id, color } => { - if let Err(e) = cmd.send(BulbCommand::SetColor(BulbSelector::Id(id), color)).await { - error!("bulb manager error: {e}"); - } - } - ClientMessage::SetBulbPower { id, power } => { - if let Err(e) = cmd.send(BulbCommand::SetPower(BulbSelector::Id(id), power)).await { - error!("bulb manager error: {e}"); - } - } - ClientMessage::GetBulbs => { - if let Err(e) = request.response.send(ServerMessage::BulbMap(config.bulb_map.clone())).await { - error!("GetBulbs response channel error: {e}"); - return; - } - for (id, mode) in bulb_states.bulbs().await.clone().into_iter() { - if let Err(e) = request.response.send(ServerMessage::BulbMode { id, mode }).await { - error!("GetBulbs response channel error: {e}"); - return; - } - } - } - _ => {} - } - } - } - } -} - -async fn info_collector(state: &State) { - let mut collectors: Vec> = vec![]; - - for url in &state.config.collectors.markdown_web_links { - collectors.push(Box::new(MarkdownWeb { - url: url.to_string(), - })); - } - - if !state.config.collectors.weatherapi_locations.is_empty() { - let api_key = state - .config - .collectors - .weatherapi_key - .as_deref() - .expect("Missing weatherapi_key"); - - for location in state.config.collectors.weatherapi_locations.iter().cloned() { - collectors.push(Box::new(WeatherApi { - api_key: api_key.to_string(), - location, - })); - } - } - - let mut collectors = collectors.into_boxed_slice(); - - let server_message = &state.server_message; - - let collectors_len = collectors.len(); - let next = move |i: usize| (i + 1) % collectors_len; - let mut i = 0; - - loop { - sleep(Duration::from_secs(30)).await; - - // don't bother collecting if no clients are connected - // there is always 1 receiver held by main process - if server_message.receiver_count() <= 1 { - continue; - } - - i = next(i); - let collector = &mut collectors[i]; - - let msg = match collector.collect().await { - Ok(html) => ServerMessage::InfoPage { html }, - Err(e) => { - warn!("collector error: {e}"); - continue; - } - }; - - if let Err(e) = server_message.send(msg) { - error!("broadcast channel error: {e}"); - return; - } - } -} - async fn client_handler(mut socket: WebSocket, state: &State) { let mut server_message = state.server_message.subscribe(); let (server_responder, mut server_responses) = mpsc::channel(100); @@ -257,7 +137,7 @@ async fn client_handler(mut socket: WebSocket, state: &State) { }; if let Err(e) = socket.send(ws::Message::text(message)).await { - return warn!("client error: {e}"); + warn!("client error: {e}"); } } diff --git a/backend/src/tasks/info.rs b/backend/src/tasks/info.rs new file mode 100644 index 0000000..096dcd7 --- /dev/null +++ b/backend/src/tasks/info.rs @@ -0,0 +1,69 @@ +use std::time::Duration; + +use common::ServerMessage; +use tokio::time::sleep; + +use crate::{ + collector::{Collector, MarkdownWeb, WeatherApi}, + State, +}; + +pub async fn info_task(state: &State) { + let mut collectors: Vec> = vec![]; + + for url in &state.config.collectors.markdown_web_links { + collectors.push(Box::new(MarkdownWeb { + url: url.to_string(), + })); + } + + if !state.config.collectors.weatherapi_locations.is_empty() { + let api_key = state + .config + .collectors + .weatherapi_key + .as_deref() + .expect("Missing weatherapi_key"); + + for location in state.config.collectors.weatherapi_locations.iter().cloned() { + collectors.push(Box::new(WeatherApi { + api_key: api_key.to_string(), + location, + })); + } + } + + let mut collectors = collectors.into_boxed_slice(); + + let server_message = &state.server_message; + + let collectors_len = collectors.len(); + let next = move |i: usize| (i + 1) % collectors_len; + let mut i = 0; + + loop { + sleep(Duration::from_secs(30)).await; + + // don't bother collecting if no clients are connected + // there is always 1 receiver held by main process + if server_message.receiver_count() <= 1 { + continue; + } + + i = next(i); + let collector = &mut collectors[i]; + + let msg = match collector.collect().await { + Ok(html) => ServerMessage::InfoPage { html }, + Err(e) => { + warn!("collector error: {e}"); + continue; + } + }; + + if let Err(e) = server_message.send(msg) { + error!("broadcast channel error: {e}"); + return; + } + } +} diff --git a/backend/src/tasks/lights.rs b/backend/src/tasks/lights.rs new file mode 100644 index 0000000..4821afd --- /dev/null +++ b/backend/src/tasks/lights.rs @@ -0,0 +1,63 @@ +use common::{ClientMessage, ServerMessage}; +use lighter_manager::manager::{BulbCommand, BulbManager, BulbSelector}; +use tokio::select; +use tokio::time::{sleep, Duration}; + +use crate::State; + +pub async fn lights_task(state: &State) { + let config = &state.config; + let server_message = &state.server_message; + let mut client_message = state.client_message.subscribe(); + + let (cmd, bulb_states) = BulbManager::launch(config.bulbs.clone(), config.mqtt.clone()) + .await + .expect("Failed to launch bulb manager"); + + loop { + let notify = bulb_states.notify_on_change(); + sleep(Duration::from_millis(1000 / 10)).await; // limit to 10 updates/second + select! { + _ = notify => { + for (id, mode) in bulb_states.bulbs().await.clone().into_iter() { + if let Err(e) = server_message.send(ServerMessage::BulbMode { id, mode }) { + error!("broadcast channel error: {e}"); + return; + } + } + } + request = client_message.recv() => { + let request = match request { + Ok(r) => r, + Err(_) => continue, + }; + + match request.message { + ClientMessage::SetBulbColor { id, color } => { + if let Err(e) = cmd.send(BulbCommand::SetColor(BulbSelector::Id(id), color)).await { + error!("bulb manager error: {e}"); + } + } + ClientMessage::SetBulbPower { id, power } => { + if let Err(e) = cmd.send(BulbCommand::SetPower(BulbSelector::Id(id), power)).await { + error!("bulb manager error: {e}"); + } + } + ClientMessage::GetBulbs => { + if let Err(e) = request.response.send(ServerMessage::BulbMap(config.bulb_map.clone())).await { + error!("GetBulbs response channel error: {e}"); + return; + } + for (id, mode) in bulb_states.bulbs().await.clone().into_iter() { + if let Err(e) = request.response.send(ServerMessage::BulbMode { id, mode }).await { + error!("GetBulbs response channel error: {e}"); + return; + } + } + } + _ => {} + } + } + } + } +} diff --git a/backend/src/tasks/mod.rs b/backend/src/tasks/mod.rs new file mode 100644 index 0000000..f991cae --- /dev/null +++ b/backend/src/tasks/mod.rs @@ -0,0 +1,5 @@ +pub mod info; +pub mod lights; + +pub use info::info_task; +pub use lights::lights_task; diff --git a/common/src/lib.rs b/common/src/lib.rs index 3ddcac0..e1714ce 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -50,16 +50,16 @@ pub enum BulbGroupShape { impl BulbGroupShape { pub fn height(&self) -> u32 { - match self { - &Self::Circle { r } => r, - &Self::Rectangle { h, .. } => h, + match *self { + Self::Circle { r } => r, + Self::Rectangle { h, .. } => h, } } pub fn width(&self) -> u32 { - match self { - &Self::Circle { r } => r, - &Self::Rectangle { w, .. } => w, + match *self { + Self::Circle { r } => r, + Self::Rectangle { w, .. } => w, } } }