mod collector; mod persistence; mod tasks; use clap::Parser; use collector::CollectorConfig; use common::{BulbMap, ClientMessage, ServerMessage}; use futures_util::{SinkExt, StreamExt}; use lighter_manager::{manager::BulbsConfig, mqtt_conf::MqttConfig}; use log::LevelFilter; use persistence::Persistence; use serde::Deserialize; use std::convert::Infallible; use std::net::SocketAddr; use std::path::PathBuf; use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, mpsc}; use tokio::{fs, select, task}; use warp::ws::{self, WebSocket}; use warp::{Filter, Rejection, Reply}; #[macro_use] extern crate log; #[derive(Parser)] struct Opt { /// More logging #[clap(short, long, parse(from_occurrences))] verbose: u8, /// Supress non-error logs #[clap(short, long)] quiet: bool, #[clap(long, short, default_value = "127.0.0.0:8000")] bind: SocketAddr, #[clap(long, default_value = "./www")] frontend: String, #[clap(long, short)] config: PathBuf, } #[derive(Deserialize)] pub struct Config { mqtt: MqttConfig, collectors: CollectorConfig, persistence_dir: Option, #[serde(flatten)] bulbs: BulbsConfig, #[serde(flatten)] bulb_map: BulbMap, } pub struct State { config: Config, persistence: Persistence, client_message: broadcast::Sender, server_message: broadcast::Sender, } #[tokio::main] async fn main() { let opt = Opt::parse(); let log_level = match opt.verbose { _ if opt.quiet => LevelFilter::Error, 0 => LevelFilter::Info, 1 => LevelFilter::Debug, 2.. => LevelFilter::Trace, }; pretty_env_logger::formatted_builder() .default_format() .filter_level(log_level) .init(); let config = fs::read_to_string(&opt.config) .await .expect("Failed to read config"); let config: Config = toml::from_str(&config).expect("Failed to parse config"); // we keep a receiver here to keep the channel active when no clients are connected let (server_message, _receiver) = broadcast::channel(100); let (client_message, _) = broadcast::channel(100); let state = State { client_message, server_message, persistence: match &config.persistence_dir { Some(path) => Persistence::new_persistence(path.to_owned()) .await .expect("Failed to open persistence dir"), None => Persistence::new().await, }, config, }; let state = Box::leak(Box::new(state)); task::spawn(tasks::lights_task(state)); task::spawn(tasks::info_task(state)); let ws = warp::path("ws") // The `ws()` filter will prepare the Websocket handshake. .and(warp::ws()) .map(|ws: warp::ws::Ws| { // And then our closure will be called when it completes... ws.on_upgrade(|websocket| { info!("Client connected {:?}", websocket); client_handler(websocket, state) }) }); let api = warp::path("api").and(ws.recover(not_found)); let index_html = format!("{}/index.html", opt.frontend); let frontend = warp::fs::dir(opt.frontend).or(warp::fs::file(index_html)); let routes = api.or(frontend); warp::serve(routes).run(opt.bind).await; } async fn not_found(_: Rejection) -> Result { Ok(warp::http::StatusCode::NOT_FOUND) } #[derive(Clone)] struct ClientRequest { message: ClientMessage, response: mpsc::Sender, } async fn client_handler(mut socket: WebSocket, state: &State) { let mut server_message = state.server_message.subscribe(); let (server_responder, mut server_responses) = mpsc::channel(100); async fn handle_server_message(socket: &mut WebSocket, message: ServerMessage) { let message = match ron::to_string(&message) { Ok(msg) => msg, Err(e) => return error!("Failed to serialize message: {e}"), }; if let Err(e) = socket.send(ws::Message::text(message)).await { warn!("client error: {e}"); } } loop { select! { response = server_responses.recv() => { if let Some(message) = response { handle_server_message(&mut socket, message).await; } } message = server_message.recv() => { let message = match message { Ok(msg) => msg, Err(RecvError::Lagged(_)) => continue, Err(RecvError::Closed) => return, }; handle_server_message(&mut socket, message).await; } message = socket.next() => { match message { None => return info!("stream closed"), Some(Err(e)) => return warn!("client error: {e}"), Some(Ok(message)) => { let message = match message.to_str().ok() { Some(text) => text, None => continue, }; let message = match ron::from_str(message) { Ok(message) => message, Err(e) => { return error!("failed to deserialize websocket message: {e}"); } }; let request = ClientRequest { message, response: server_responder.clone(), }; if let Err(e) = state.client_message.send(request) { return error!("client message handlers error: {e}"); } } } } } } }