Move lights and info tasks to module
This commit is contained in:
@ -1,13 +1,11 @@
|
|||||||
mod collector;
|
mod collector;
|
||||||
|
mod tasks;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use collector::{Collector, CollectorConfig, MarkdownWeb, WeatherApi};
|
use collector::CollectorConfig;
|
||||||
use common::{BulbMap, ClientMessage, ServerMessage};
|
use common::{BulbMap, ClientMessage, ServerMessage};
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use lighter_manager::{
|
use lighter_manager::{manager::BulbsConfig, mqtt_conf::MqttConfig};
|
||||||
manager::{BulbCommand, BulbManager, BulbSelector, BulbsConfig},
|
|
||||||
mqtt_conf::MqttConfig,
|
|
||||||
};
|
|
||||||
use log::LevelFilter;
|
use log::LevelFilter;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::convert::Infallible;
|
use std::convert::Infallible;
|
||||||
@ -15,7 +13,6 @@ use std::net::SocketAddr;
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use tokio::sync::broadcast::error::RecvError;
|
use tokio::sync::broadcast::error::RecvError;
|
||||||
use tokio::sync::{broadcast, mpsc};
|
use tokio::sync::{broadcast, mpsc};
|
||||||
use tokio::time::{sleep, Duration};
|
|
||||||
use tokio::{fs, select, task};
|
use tokio::{fs, select, task};
|
||||||
use warp::ws::{self, WebSocket};
|
use warp::ws::{self, WebSocket};
|
||||||
use warp::{Filter, Rejection, Reply};
|
use warp::{Filter, Rejection, Reply};
|
||||||
@ -44,7 +41,7 @@ struct Opt {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct Config {
|
pub struct Config {
|
||||||
mqtt: MqttConfig,
|
mqtt: MqttConfig,
|
||||||
|
|
||||||
collectors: CollectorConfig,
|
collectors: CollectorConfig,
|
||||||
@ -56,7 +53,7 @@ struct Config {
|
|||||||
bulb_map: BulbMap,
|
bulb_map: BulbMap,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct State {
|
pub struct State {
|
||||||
config: Config,
|
config: Config,
|
||||||
client_message: broadcast::Sender<ClientRequest>,
|
client_message: broadcast::Sender<ClientRequest>,
|
||||||
server_message: broadcast::Sender<ServerMessage>,
|
server_message: broadcast::Sender<ServerMessage>,
|
||||||
@ -94,8 +91,8 @@ async fn main() {
|
|||||||
};
|
};
|
||||||
let state = Box::leak(Box::new(state));
|
let state = Box::leak(Box::new(state));
|
||||||
|
|
||||||
task::spawn(info_collector(state));
|
task::spawn(tasks::lights_task(state));
|
||||||
task::spawn(lights_collector(state));
|
task::spawn(tasks::info_task(state));
|
||||||
|
|
||||||
let ws = warp::path("ws")
|
let ws = warp::path("ws")
|
||||||
// The `ws()` filter will prepare the Websocket handshake.
|
// The `ws()` filter will prepare the Websocket handshake.
|
||||||
@ -129,123 +126,6 @@ struct ClientRequest {
|
|||||||
response: mpsc::Sender<ServerMessage>,
|
response: mpsc::Sender<ServerMessage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn lights_collector(state: &State) {
|
|
||||||
let config = &state.config;
|
|
||||||
let server_message = &state.server_message;
|
|
||||||
let mut client_message = state.client_message.subscribe();
|
|
||||||
|
|
||||||
let (cmd, bulb_states) = BulbManager::launch(config.bulbs.clone(), config.mqtt.clone())
|
|
||||||
.await
|
|
||||||
.expect("Failed to launch bulb manager");
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let notify = bulb_states.notify_on_change();
|
|
||||||
sleep(Duration::from_millis(1000 / 10)).await; // limit to 10 updates/second
|
|
||||||
select! {
|
|
||||||
_ = notify => {
|
|
||||||
for (id, mode) in bulb_states.bulbs().await.clone().into_iter() {
|
|
||||||
if let Err(e) = server_message.send(ServerMessage::BulbMode { id, mode }) {
|
|
||||||
error!("broadcast channel error: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
request = client_message.recv() => {
|
|
||||||
let request = match request {
|
|
||||||
Ok(r) => r,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
match request.message {
|
|
||||||
ClientMessage::SetBulbColor { id, color } => {
|
|
||||||
if let Err(e) = cmd.send(BulbCommand::SetColor(BulbSelector::Id(id), color)).await {
|
|
||||||
error!("bulb manager error: {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ClientMessage::SetBulbPower { id, power } => {
|
|
||||||
if let Err(e) = cmd.send(BulbCommand::SetPower(BulbSelector::Id(id), power)).await {
|
|
||||||
error!("bulb manager error: {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ClientMessage::GetBulbs => {
|
|
||||||
if let Err(e) = request.response.send(ServerMessage::BulbMap(config.bulb_map.clone())).await {
|
|
||||||
error!("GetBulbs response channel error: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
for (id, mode) in bulb_states.bulbs().await.clone().into_iter() {
|
|
||||||
if let Err(e) = request.response.send(ServerMessage::BulbMode { id, mode }).await {
|
|
||||||
error!("GetBulbs response channel error: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn info_collector(state: &State) {
|
|
||||||
let mut collectors: Vec<Box<dyn Collector + Send>> = vec![];
|
|
||||||
|
|
||||||
for url in &state.config.collectors.markdown_web_links {
|
|
||||||
collectors.push(Box::new(MarkdownWeb {
|
|
||||||
url: url.to_string(),
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
if !state.config.collectors.weatherapi_locations.is_empty() {
|
|
||||||
let api_key = state
|
|
||||||
.config
|
|
||||||
.collectors
|
|
||||||
.weatherapi_key
|
|
||||||
.as_deref()
|
|
||||||
.expect("Missing weatherapi_key");
|
|
||||||
|
|
||||||
for location in state.config.collectors.weatherapi_locations.iter().cloned() {
|
|
||||||
collectors.push(Box::new(WeatherApi {
|
|
||||||
api_key: api_key.to_string(),
|
|
||||||
location,
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut collectors = collectors.into_boxed_slice();
|
|
||||||
|
|
||||||
let server_message = &state.server_message;
|
|
||||||
|
|
||||||
let collectors_len = collectors.len();
|
|
||||||
let next = move |i: usize| (i + 1) % collectors_len;
|
|
||||||
let mut i = 0;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
sleep(Duration::from_secs(30)).await;
|
|
||||||
|
|
||||||
// don't bother collecting if no clients are connected
|
|
||||||
// there is always 1 receiver held by main process
|
|
||||||
if server_message.receiver_count() <= 1 {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
i = next(i);
|
|
||||||
let collector = &mut collectors[i];
|
|
||||||
|
|
||||||
let msg = match collector.collect().await {
|
|
||||||
Ok(html) => ServerMessage::InfoPage { html },
|
|
||||||
Err(e) => {
|
|
||||||
warn!("collector error: {e}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = server_message.send(msg) {
|
|
||||||
error!("broadcast channel error: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn client_handler(mut socket: WebSocket, state: &State) {
|
async fn client_handler(mut socket: WebSocket, state: &State) {
|
||||||
let mut server_message = state.server_message.subscribe();
|
let mut server_message = state.server_message.subscribe();
|
||||||
let (server_responder, mut server_responses) = mpsc::channel(100);
|
let (server_responder, mut server_responses) = mpsc::channel(100);
|
||||||
@ -257,7 +137,7 @@ async fn client_handler(mut socket: WebSocket, state: &State) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = socket.send(ws::Message::text(message)).await {
|
if let Err(e) = socket.send(ws::Message::text(message)).await {
|
||||||
return warn!("client error: {e}");
|
warn!("client error: {e}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
69
backend/src/tasks/info.rs
Normal file
69
backend/src/tasks/info.rs
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use common::ServerMessage;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
collector::{Collector, MarkdownWeb, WeatherApi},
|
||||||
|
State,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub async fn info_task(state: &State) {
|
||||||
|
let mut collectors: Vec<Box<dyn Collector + Send>> = vec![];
|
||||||
|
|
||||||
|
for url in &state.config.collectors.markdown_web_links {
|
||||||
|
collectors.push(Box::new(MarkdownWeb {
|
||||||
|
url: url.to_string(),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !state.config.collectors.weatherapi_locations.is_empty() {
|
||||||
|
let api_key = state
|
||||||
|
.config
|
||||||
|
.collectors
|
||||||
|
.weatherapi_key
|
||||||
|
.as_deref()
|
||||||
|
.expect("Missing weatherapi_key");
|
||||||
|
|
||||||
|
for location in state.config.collectors.weatherapi_locations.iter().cloned() {
|
||||||
|
collectors.push(Box::new(WeatherApi {
|
||||||
|
api_key: api_key.to_string(),
|
||||||
|
location,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut collectors = collectors.into_boxed_slice();
|
||||||
|
|
||||||
|
let server_message = &state.server_message;
|
||||||
|
|
||||||
|
let collectors_len = collectors.len();
|
||||||
|
let next = move |i: usize| (i + 1) % collectors_len;
|
||||||
|
let mut i = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
sleep(Duration::from_secs(30)).await;
|
||||||
|
|
||||||
|
// don't bother collecting if no clients are connected
|
||||||
|
// there is always 1 receiver held by main process
|
||||||
|
if server_message.receiver_count() <= 1 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
i = next(i);
|
||||||
|
let collector = &mut collectors[i];
|
||||||
|
|
||||||
|
let msg = match collector.collect().await {
|
||||||
|
Ok(html) => ServerMessage::InfoPage { html },
|
||||||
|
Err(e) => {
|
||||||
|
warn!("collector error: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = server_message.send(msg) {
|
||||||
|
error!("broadcast channel error: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
63
backend/src/tasks/lights.rs
Normal file
63
backend/src/tasks/lights.rs
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
use common::{ClientMessage, ServerMessage};
|
||||||
|
use lighter_manager::manager::{BulbCommand, BulbManager, BulbSelector};
|
||||||
|
use tokio::select;
|
||||||
|
use tokio::time::{sleep, Duration};
|
||||||
|
|
||||||
|
use crate::State;
|
||||||
|
|
||||||
|
pub async fn lights_task(state: &State) {
|
||||||
|
let config = &state.config;
|
||||||
|
let server_message = &state.server_message;
|
||||||
|
let mut client_message = state.client_message.subscribe();
|
||||||
|
|
||||||
|
let (cmd, bulb_states) = BulbManager::launch(config.bulbs.clone(), config.mqtt.clone())
|
||||||
|
.await
|
||||||
|
.expect("Failed to launch bulb manager");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let notify = bulb_states.notify_on_change();
|
||||||
|
sleep(Duration::from_millis(1000 / 10)).await; // limit to 10 updates/second
|
||||||
|
select! {
|
||||||
|
_ = notify => {
|
||||||
|
for (id, mode) in bulb_states.bulbs().await.clone().into_iter() {
|
||||||
|
if let Err(e) = server_message.send(ServerMessage::BulbMode { id, mode }) {
|
||||||
|
error!("broadcast channel error: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
request = client_message.recv() => {
|
||||||
|
let request = match request {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
match request.message {
|
||||||
|
ClientMessage::SetBulbColor { id, color } => {
|
||||||
|
if let Err(e) = cmd.send(BulbCommand::SetColor(BulbSelector::Id(id), color)).await {
|
||||||
|
error!("bulb manager error: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ClientMessage::SetBulbPower { id, power } => {
|
||||||
|
if let Err(e) = cmd.send(BulbCommand::SetPower(BulbSelector::Id(id), power)).await {
|
||||||
|
error!("bulb manager error: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ClientMessage::GetBulbs => {
|
||||||
|
if let Err(e) = request.response.send(ServerMessage::BulbMap(config.bulb_map.clone())).await {
|
||||||
|
error!("GetBulbs response channel error: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (id, mode) in bulb_states.bulbs().await.clone().into_iter() {
|
||||||
|
if let Err(e) = request.response.send(ServerMessage::BulbMode { id, mode }).await {
|
||||||
|
error!("GetBulbs response channel error: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
5
backend/src/tasks/mod.rs
Normal file
5
backend/src/tasks/mod.rs
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
pub mod info;
|
||||||
|
pub mod lights;
|
||||||
|
|
||||||
|
pub use info::info_task;
|
||||||
|
pub use lights::lights_task;
|
||||||
@ -50,16 +50,16 @@ pub enum BulbGroupShape {
|
|||||||
|
|
||||||
impl BulbGroupShape {
|
impl BulbGroupShape {
|
||||||
pub fn height(&self) -> u32 {
|
pub fn height(&self) -> u32 {
|
||||||
match self {
|
match *self {
|
||||||
&Self::Circle { r } => r,
|
Self::Circle { r } => r,
|
||||||
&Self::Rectangle { h, .. } => h,
|
Self::Rectangle { h, .. } => h,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn width(&self) -> u32 {
|
pub fn width(&self) -> u32 {
|
||||||
match self {
|
match *self {
|
||||||
&Self::Circle { r } => r,
|
Self::Circle { r } => r,
|
||||||
&Self::Rectangle { w, .. } => w,
|
Self::Rectangle { w, .. } => w,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user