use std::collections::HashMap; use chrono::{Datelike, Local, NaiveTime, Weekday}; use common::{ClientMessage, ServerMessage}; use lighter_lib::{BulbColor, BulbId}; use lighter_manager::manager::{BulbCommand, BulbManager, BulbSelector}; use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc}; use tokio::task::{spawn, JoinHandle}; use tokio::time::sleep; use crate::persistence::PersistenceFile; use crate::{ClientRequest, State}; #[derive(Default, Clone, PartialEq, Eq, Serialize, Deserialize)] struct LightsState { wake_schedule: HashMap>, } pub async fn lights_task(state: &State) { let config = &state.config; let server_message = &state.server_message; let mut client_message = state.client_message.subscribe(); let mut lights_state: PersistenceFile = state .persistence .open("lights".into()) .await .expect("Failed to open lights config"); let (cmd, bulb_states) = BulbManager::launch(config.bulbs.clone(), config.mqtt.clone()) .await .expect("Failed to launch bulb manager"); let mut wake_tasks: HashMap<(BulbId, Weekday), JoinHandle<()>> = lights_state .get() .wake_schedule .iter() .flat_map(|(bulb, schedule)| schedule.iter().map(move |(day, time)| (bulb, day, time))) .map(|(bulb, day, time)| { let handle = spawn(wake_task( state.client_message.subscribe(), cmd.clone(), bulb.clone(), *day, *time, )); ((bulb.clone(), *day), handle) }) .collect(); loop { let notify = bulb_states.notify_on_change(); sleep(tokio::time::Duration::from_millis(1000 / 10)).await; // limit to 10 updates/second select! { _ = notify => { let lights_state = lights_state.get(); for (id, mode) in bulb_states.bulbs().await.clone().into_iter() { let wake_schedule = lights_state.wake_schedule.get(&id).cloned().unwrap_or_default(); let msg = ServerMessage::BulbState { id, mode, wake_schedule }; if let Err(e) = server_message.send(msg) { error!("broadcast channel error: {e}"); return; } } } request = client_message.recv() => { let request = match request { Ok(r) => r, Err(_) => continue, }; match request.message { ClientMessage::SetBulbColor { id, color } => { if let Err(e) = cmd.send(BulbCommand::SetColor(BulbSelector::Id(id), color)).await { error!("bulb manager error: {e}"); } } ClientMessage::SetBulbPower { id, power } => { if let Err(e) = cmd.send(BulbCommand::SetPower(BulbSelector::Id(id), power)).await { error!("bulb manager error: {e}"); } } ClientMessage::GetBulbs => { if let Err(e) = request.response.send(ServerMessage::BulbMap(config.bulb_map.clone())).await { error!("GetBulbs response channel error: {e}"); return; } let lights_state = lights_state.get(); for (id, mode) in bulb_states.bulbs().await.clone().into_iter() { let wake_schedule = lights_state.wake_schedule.get(&id).cloned().unwrap_or_default(); let msg = ServerMessage::BulbState { id, mode, wake_schedule }; if let Err(e) = request.response.send(msg).await { error!("GetBulbs response channel error: {e}"); return; } } } ClientMessage::SetBulbWakeTime { id, day, time } => { if let Err(e) = lights_state.update(|lights_state| { let schedule = lights_state.wake_schedule.entry(id.clone()).or_default(); if let Some(time) = time { schedule.insert(day, time); } else { schedule.remove(&day); } }).await { error!("Failed to save wake schedule: {e}"); }; if let Some(time) = time { let handle = spawn(wake_task( state.client_message.subscribe(), cmd.clone(), id.clone(), day, time, )); if let Some(old_handle) = wake_tasks.insert((id, day), handle) { old_handle.abort(); }} else { if let Some(old_handle) = wake_tasks.remove(&(id, day)) { old_handle.abort(); } } } _ => {} } } } } } async fn wake_task( mut client_messages: broadcast::Receiver, cmd: mpsc::Sender, id: BulbId, day: Weekday, time: NaiveTime, ) { let now = Local::now(); let day_num = day.num_days_from_monday(); let now_day = now.weekday(); let now_day_num = now_day.num_days_from_monday(); let mut alarm = now; if day_num >= now_day_num { // next alarm is this week alarm += chrono::Duration::days((day_num - now_day_num).into()); alarm = alarm.date().and_time(time).unwrap(); } else { // next alarm is next week alarm += chrono::Duration::weeks(1); alarm -= chrono::Duration::days((now_day_num - day_num).into()); alarm = alarm.date().and_time(time).unwrap(); } loop { info!("sleeping until {alarm}"); sleep((alarm - Local::now()).to_std().unwrap()).await; alarm += chrono::Duration::weeks(1); // slowly turn up brightness of bulb for brightness in (1..=75).map(|i| (i as f32) * 0.01) { select! { // abort if the client pokes the bulb _ = wait_for_bulb_command(&id, &mut client_messages) => break, _ = sleep(Duration::from_secs(12)) => {} }; if cmd .send(BulbCommand::SetColor( BulbSelector::Id(id.clone()), BulbColor::Kelvin { t: 0.0, b: brightness, }, )) .await .is_err() { return; }; } } } /// Wait until we receive a client request that mutates the given bulb async fn wait_for_bulb_command( bulb_id: &BulbId, client_messages: &mut broadcast::Receiver, ) { loop { match client_messages.recv().await { Err(_) => return, Ok(request) => match request.message { ClientMessage::SetBulbColor { id, .. } | ClientMessage::SetBulbPower { id, .. } | ClientMessage::SetBulbWakeTime { id, .. } if &id == bulb_id => { break } _ => continue, }, } } }