This commit is contained in:
2023-12-04 21:17:37 +01:00
parent 4294d5c79e
commit e1c1fa71af
3 changed files with 213 additions and 85 deletions

View File

@ -1,8 +1,13 @@
use crate::provider::{BulbProvider, BulbUpdate};
use eyre::Context;
use lighter_lib::{BulbColor, BulbId, BulbMode};
use serde::Deserialize;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::{collections::BTreeMap, sync::atomic::AtomicU64};
use tokio::select;
use tokio::{
sync::{futures::Notified, mpsc, Notify, RwLock, RwLockReadGuard},
task,
@ -30,12 +35,17 @@ pub enum BulbCommand {
SetColor(BulbSelector, BulbColor),
}
#[derive(Clone)]
type InstanceId = u64;
/// A handle to a bulb manager. Can be cloned.
pub struct BulbManager {
id: InstanceId,
state: Arc<BulbsState>,
}
struct BulbsState {
next_id: AtomicU64,
/// Notify on any change to the bulbs
notify: Notify,
@ -44,8 +54,11 @@ struct BulbsState {
/// State of all bulbs
bulbs: RwLock<BTreeMap<BulbId, BulbMode>>,
exclusive_bulbs: RwLock<HashMap<BulbId, (InstanceId, Arc<Notify>)>>,
}
/// State of the main bulb manager thread.
struct ManagerState<P> {
#[allow(dead_code)]
config: BulbsConfig,
@ -64,6 +77,7 @@ impl BulbManager {
let (command_tx, command_rx) = mpsc::channel(100);
let bulbs_state = Arc::new(BulbsState {
next_id: AtomicU64::new(1),
notify: Notify::new(),
command: command_tx,
bulbs: RwLock::new(
@ -74,6 +88,7 @@ impl BulbManager {
.map(|id| (id, Default::default()))
.collect(),
),
exclusive_bulbs: Default::default(),
});
let state = ManagerState {
@ -83,7 +98,10 @@ impl BulbManager {
state: Arc::clone(&bulbs_state),
};
let manager = BulbManager { state: bulbs_state };
let manager = BulbManager {
id: 0,
state: bulbs_state,
};
task::spawn(run(state));
@ -95,16 +113,74 @@ impl BulbManager {
}
pub async fn send_command(&self, command: BulbCommand) {
info!("sending command {command:?}");
let exclusive_bulbs = self.state.exclusive_bulbs.read().await;
match command.selector() {
BulbSelector::All => {
// TODO
}
BulbSelector::Id(bulb) => {
if let Some((id, interrupt)) = exclusive_bulbs.get(bulb) {
if id != &self.id {
interrupt.notify_one();
}
}
}
}
debug!("sending command {command:?}");
if let Err(e) = self.state.command.send(command).await {
error!("error sending bulb command: {e:#}");
}
info!("sent command");
debug!("sent command");
}
pub async fn bulbs(&self) -> RwLockReadGuard<'_, BTreeMap<BulbId, BulbMode>> {
self.state.bulbs.read().await
}
/// Run the provided future until it finishes, or some other BulbManager sends a command to
/// the specified bulb.
pub async fn until_interrupted<F, T>(&self, bulb: BulbId, f: F) -> Option<T>
where
F: Future<Output = T>,
{
let interrupt = Arc::new(Notify::new());
if let Some((id, prev)) = self
.state
.exclusive_bulbs
.write()
.await
.insert(bulb, (self.id, Arc::clone(&interrupt)))
{
if id != self.id {
prev.notify_one();
}
}
select! {
_ = interrupt.notified() => None,
t = f => Some(t),
}
}
}
impl Clone for BulbManager {
fn clone(&self) -> Self {
Self {
id: self.state.next_id.fetch_add(1, Ordering::SeqCst),
state: self.state.clone(),
}
}
}
impl BulbCommand {
pub fn selector(&self) -> &BulbSelector {
match self {
BulbCommand::SetPower(s, _) => s,
BulbCommand::SetColor(s, _) => s,
}
}
}
async fn run<P>(state: ManagerState<P>)
@ -113,10 +189,11 @@ where
{
debug!("manager task running");
if let Err(e) = run_inner(state).await {
error!("bulb manage exited with error: {e:#}");
error!("bulb manager exited with error: {e:?}");
}
info!("manager task exited");
}
async fn run_inner<P>(mut state: ManagerState<P>) -> eyre::Result<()>
where
P: BulbProvider + Send,
@ -128,13 +205,15 @@ where
info!("handle closed, shutting down");
return Ok(());
};
info!("command received: {command:?}");
state.provider.send_command(command.clone()).await?;
debug!("command received: {command:?}");
state.provider.send_command(command.clone()).await
.wrap_err("Failed to send command to BulbProvider")?;
}
update = state.provider.listen() => {
let (id, update) = update?;
let (id, update) = update
.wrap_err("Error listening to BulbProvider")?;
info!("update received: {id:?} {update:?}");
debug!("update received: {id:?} {update:?}");
let mut bulbs = state.state.bulbs.write().await;
let Some(bulb) = bulbs.get_mut(&id) else {

View File

@ -16,10 +16,16 @@ pub enum BulbUpdate {
// An interface that allows communication with bulbs.
#[async_trait]
pub trait BulbProvider {
// Send a command to some bulbs to update their state
// Send a command to some bulbs to update their state.
//
// This function should only return fatal errors.
// Recoverable error should incurr a retry.
async fn send_command(&mut self, cmd: BulbCommand) -> eyre::Result<()>;
// Wait for any bulb to send an update
//
// This function should only return fatal errors.
// Recoverable error should incurr a retry.
async fn listen(&mut self) -> eyre::Result<(BulbId, BulbUpdate)>;
}

View File

@ -1,6 +1,7 @@
use std::{collections::HashSet, str, time::Duration};
use async_trait::async_trait;
use eyre::{Context, Error};
use lighter_lib::BulbId;
use mqtt::{
packet::{PublishPacket, QoSWithPacketIdentifier, SubscribePacket, VariablePacket},
@ -48,7 +49,7 @@ impl BulbsMqtt {
}
impl SocketState {
async fn get_connection(&mut self) -> eyre::Result<&mut TcpStream> {
async fn get_connection(&mut self) -> Result<&mut TcpStream, FailMode<Error>> {
let socket = &mut self.socket;
if let Some(socket) = socket {
@ -63,8 +64,8 @@ impl SocketState {
self.last_connection_attempt = Instant::now();
info!("connecting to MQTT (attempt {attempt})");
let mut new_socket = self.mqtt_config.connect().await?;
subscribe(&mut new_socket).await?;
let mut new_socket = self.mqtt_config.connect().await.map_err(retry)?;
subscribe(&mut new_socket).await.map_err(retry)?;
info!("connected to MQTT");
self.failed_connect_attempts = 0;
@ -75,26 +76,26 @@ impl SocketState {
#[async_trait]
impl BulbProvider for BulbsMqtt {
async fn send_command(&mut self, command: BulbCommand) -> eyre::Result<()> {
debug!("mqtt sending command {command:?}");
let socket = self.socket.get_connection().await?;
async fn send<P: ToString>(
all_bulbs: &HashSet<BulbId>,
selector: BulbSelector,
publish: &mut Publish<'_, P>,
) -> eyre::Result<()> {
) -> Result<(), FailMode<Error>> {
match selector {
BulbSelector::Id(id) => publish.send(&id).await?,
BulbSelector::Id(id) => publish.send(&id).await.map_err(retry)?,
BulbSelector::All => {
for id in all_bulbs {
publish.send(id).await?;
publish.send(id).await.map_err(retry)?;
}
}
}
Ok(())
}
match command {
async fn inner(this: &mut BulbsMqtt, command: &BulbCommand) -> Result<(), FailMode<Error>> {
let socket = this.socket.get_connection().await?;
match command.clone() {
BulbCommand::SetPower(selector, power) => {
let payload = if power { "ON" } else { "OFF" };
let mut publish = Publish {
@ -103,7 +104,7 @@ impl BulbProvider for BulbsMqtt {
payload,
socket,
};
send(&self.known_bulbs, selector, &mut publish).await?;
send(&this.known_bulbs, selector, &mut publish).await?;
}
BulbCommand::SetColor(selector, color) => {
let mut publish = Publish {
@ -112,18 +113,31 @@ impl BulbProvider for BulbsMqtt {
payload: color.color_string(),
socket,
};
send(&self.known_bulbs, selector, &mut publish).await?;
send(&this.known_bulbs, selector, &mut publish).await?;
}
}
Ok(())
}
async fn listen(&mut self) -> eyre::Result<(BulbId, BulbUpdate)> {
debug!("mqtt listening for updates");
let socket = self.socket.get_connection().await?;
debug!("mqtt sending command {command:?}");
loop {
let packet = VariablePacket::parse(socket).await?;
match inner(self, &command).await {
Ok(t) => break Ok(t),
Err(FailMode::Retry(e)) => info!("Retrying on error: {e:?}"),
Err(FailMode::Fatal(e)) => break Err(e),
}
}
}
async fn listen(&mut self) -> eyre::Result<(BulbId, BulbUpdate)> {
debug!("mqtt listening for updates");
async fn inner(this: &mut BulbsMqtt) -> Result<(BulbId, BulbUpdate), FailMode<Error>> {
let socket = this.socket.get_connection().await?;
loop {
let packet = VariablePacket::parse(socket).await.map_err(retry)?;
let VariablePacket::PublishPacket(publish) = &packet else {
continue;
@ -135,21 +149,29 @@ impl BulbProvider for BulbsMqtt {
[prefix, id @ .., suffix] => {
let id = BulbId(id.join("/"));
if !self.known_bulbs.contains(&id) {
if !this.known_bulbs.contains(&id) {
warn!("ignoring publish from unknown bulb {id}");
continue;
}
let payload = str::from_utf8(publish.payload())?;
let payload = str::from_utf8(publish.payload())
.wrap_err("Failed to decode pulish message payload")
.map_err(retry)?;
let update = match (*prefix, *suffix) {
("stat", "POWER") => BulbUpdate::Power(payload == "ON"),
("stat", "RESULT") => {
let result: BulbResult = serde_json::from_str(payload)?;
let result: BulbResult = serde_json::from_str(payload)
.wrap_err("Failed to decode BulbResult")
.map_err(retry)?;
// TODO: color and power can be updated at the same time?
if let Some(color) = result.color {
BulbUpdate::Color(color.parse()?)
let color = color
.parse()
.wrap_err("Failed to decode bulb color")
.map_err(retry)?;
BulbUpdate::Color(color)
} else if let Some(power) = result.power {
BulbUpdate::Power(power == "ON")
} else {
@ -178,6 +200,15 @@ impl BulbProvider for BulbsMqtt {
}
}
}
loop {
match inner(self).await {
Ok(t) => break Ok(t),
Err(FailMode::Retry(e)) => info!("Retrying on error: {e:?}"),
Err(FailMode::Fatal(e)) => break Err(e),
}
}
}
}
struct Publish<'a, P: ToString> {
@ -232,3 +263,15 @@ struct BulbResult {
#[serde(rename(deserialize = "Color"))]
color: Option<String>,
}
fn retry<X, Y: From<X>>(err: X) -> FailMode<Y> {
FailMode::Retry(err.into())
}
enum FailMode<E> {
/// A non-fatal error that should prompt a retry.
Retry(E),
/// A fatal error that can't be recovered from.
Fatal(E),
}