use std::collections::HashMap; use chrono::{DateTime, Datelike, Local, NaiveTime, Weekday}; use common::{ClientMessage, ServerMessage}; use lighter_lib::{BulbColor, BulbId}; use lighter_manager::manager::{BulbCommand, BulbManager, BulbSelector}; use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc}; use tokio::task::{spawn, JoinHandle}; use tokio::time::sleep; use crate::persistence::PersistenceFile; use crate::{ClientRequest, State}; #[derive(Default, Clone, PartialEq, Eq, Serialize, Deserialize)] struct LightsState { wake_schedule: HashMap>, } pub async fn lights_task(state: &State) { let config = &state.config; let server_message = &state.server_message; let mut client_message = state.client_message.subscribe(); let mut lights_state: PersistenceFile = state .persistence .open("lights".into()) .await .expect("Failed to open lights config"); let (cmd, bulb_states) = BulbManager::launch(config.bulbs.clone(), config.mqtt.clone()) .await .expect("Failed to launch bulb manager"); let mut wake_tasks: HashMap<(BulbId, Weekday), JoinHandle<()>> = lights_state .get() .wake_schedule .iter() .flat_map(|(bulb, schedule)| schedule.iter().map(move |(day, time)| (bulb, day, time))) .map(|(bulb, day, time)| { let handle = spawn(wake_task( state.client_message.clone(), cmd.clone(), bulb.clone(), *day, *time, )); ((bulb.clone(), *day), handle) }) .collect(); loop { let notify = bulb_states.notify_on_change(); select! { _ = notify => { let lights_state = lights_state.get(); for (id, mode) in bulb_states.bulbs().await.clone().into_iter() { let wake_schedule = lights_state.wake_schedule.get(&id).cloned().unwrap_or_default(); let msg = ServerMessage::BulbState { id, mode, wake_schedule }; if let Err(e) = server_message.send(msg) { error!("broadcast channel error: {e}"); return; } } } request = client_message.recv() => { let request = match request { Ok(r) => r, Err(_) => continue, }; match request.message { ClientMessage::SetBulbColor { id, color } => { if let Err(e) = cmd.send(BulbCommand::SetColor(BulbSelector::Id(id), color)).await { error!("bulb manager error: {e}"); } } ClientMessage::SetBulbPower { id, power } => { if let Err(e) = cmd.send(BulbCommand::SetPower(BulbSelector::Id(id), power)).await { error!("bulb manager error: {e}"); } } ClientMessage::GetBulbs => { if let Err(e) = request.response.send(ServerMessage::BulbMap(config.bulb_map.clone())).await { error!("GetBulbs response channel error: {e}"); return; } let lights_state = lights_state.get(); for (id, mode) in bulb_states.bulbs().await.clone().into_iter() { let wake_schedule = lights_state.wake_schedule.get(&id).cloned().unwrap_or_default(); let msg = ServerMessage::BulbState { id, mode, wake_schedule }; if let Err(e) = request.response.send(msg).await { error!("GetBulbs response channel error: {e}"); return; } } } ClientMessage::SetBulbWakeTime { id, day, time } => { if let Err(e) = lights_state.update(|lights_state| { let schedule = lights_state.wake_schedule.entry(id.clone()).or_default(); if let Some(time) = time { schedule.insert(day, time); } else { schedule.remove(&day); } }).await { error!("Failed to save wake schedule: {e}"); }; if let Some(time) = time { let handle = spawn(wake_task( state.client_message.clone(), cmd.clone(), id.clone(), day, time, )); if let Some(old_handle) = wake_tasks.insert((id, day), handle) { old_handle.abort(); } } else if let Some(old_handle) = wake_tasks.remove(&(id, day)) { old_handle.abort(); } } _ => {} } } } } } async fn wake_task( client_messages: broadcast::Sender, cmd: mpsc::Sender, id: BulbId, day: Weekday, time: NaiveTime, ) { let mut alarm = next_alarm(Local::now(), day, time); loop { info!("sleeping until {alarm}"); sleep((alarm - Local::now()).to_std().unwrap()).await; // slowly turn up brightness of bulb for brightness in (1..=75).map(|i| (i as f32) * 0.01) { select! { // abort if the client pokes the bulb _ = wait_for_bulb_command(&id, client_messages.subscribe()) => break, _ = sleep(Duration::from_secs(12)) => {} }; if cmd .send(BulbCommand::SetColor( BulbSelector::Id(id.clone()), BulbColor::Kelvin { t: 0.0, b: brightness, }, )) .await .is_err() { return; }; } alarm = next_alarm(Local::now(), day, time); } } /// Get the next alarm, from a weekday+time schedule. fn next_alarm(now: DateTime, day: Weekday, time: NaiveTime) -> DateTime { let day_of_alarm = day.num_days_from_monday() as i64; let day_now = now.weekday().num_days_from_monday() as i64; let alarm = now + chrono::Duration::days(day_of_alarm - day_now); let mut alarm = alarm .date_naive() .and_time(time) .and_local_timezone(Local) .unwrap(); if alarm <= now { alarm += chrono::Duration::weeks(1); } alarm } /// Wait until we receive a client request that mutates the given bulb async fn wait_for_bulb_command( bulb_id: &BulbId, mut client_messages: broadcast::Receiver, ) { loop { match client_messages.recv().await { Err(_) => return, Ok(request) => match request.message { ClientMessage::SetBulbColor { id, .. } | ClientMessage::SetBulbPower { id, .. } | ClientMessage::SetBulbWakeTime { id, .. } if &id == bulb_id => { break } _ => continue, }, } } } #[cfg(test)] mod test { use chrono::{offset::TimeZone, Local, NaiveTime, Weekday}; use super::next_alarm; #[test] fn test_alarm_date() { const FMT: &str = "%Y-%m-%d %H:%M"; let now = Local.datetime_from_str("2022-10-18 15:30", FMT).unwrap(); let test_values = [ (Weekday::Tue, (16, 30), "2022-10-18 16:30"), (Weekday::Tue, (14, 30), "2022-10-25 14:30"), (Weekday::Wed, (15, 30), "2022-10-19 15:30"), (Weekday::Mon, (15, 30), "2022-10-24 15:30"), ]; for (day, (hour, min), expected) in test_values { let expected = Local.datetime_from_str(expected, FMT).unwrap(); assert_eq!( next_alarm(now, day, NaiveTime::from_hms(hour, min, 0)), expected ); } } }