Initial Commit
This commit is contained in:
31
backend/Cargo.toml
Normal file
31
backend/Cargo.toml
Normal file
@ -0,0 +1,31 @@
|
||||
[package]
|
||||
name = "hemma"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
futures-util = "0.3.21"
|
||||
log = "0.4.17"
|
||||
pretty_env_logger = "0.4.0"
|
||||
tokio = { version = "1.19.2", features = ["full"] }
|
||||
warp = "0.3.2"
|
||||
ron = "0.7.1"
|
||||
reqwest = { version = "0.11.11", default_features = false, features = ["rustls-tls"] }
|
||||
async-trait = "0.1.56"
|
||||
anyhow = "1.0.58"
|
||||
markdown = "0.3.0"
|
||||
clap = { version = "3.2.6", features = ["derive"] }
|
||||
toml = "0.5.9"
|
||||
serde = { version = "1.0.138", features = ["derive"] }
|
||||
futures = "0.3.21"
|
||||
|
||||
[dependencies.common]
|
||||
path = "../common"
|
||||
|
||||
[dependencies.lighter_lib]
|
||||
git = "https://git.nubo.sh/hulthe/lighter.git"
|
||||
#path = "../../lighter/lib"
|
||||
|
||||
[dependencies.lighter_manager]
|
||||
git = "https://git.nubo.sh/hulthe/lighter.git"
|
||||
#path = "../../lighter/manager"
|
||||
31
backend/example.config.toml
Normal file
31
backend/example.config.toml
Normal file
@ -0,0 +1,31 @@
|
||||
[mqtt]
|
||||
#address = "hostname"
|
||||
#port = 1883
|
||||
#username = "user"
|
||||
#password = "password"
|
||||
|
||||
[collectors]
|
||||
markdown_web_links = [
|
||||
"https://example.org/lmao.md"
|
||||
]
|
||||
|
||||
[[bulbs]]
|
||||
id = "light/bedroom"
|
||||
|
||||
[[bulbs]]
|
||||
id = "light/living_room"
|
||||
|
||||
[[groups]]
|
||||
name = "Living Room"
|
||||
bulbs = ["light/living_room"]
|
||||
x = 0
|
||||
y = 0
|
||||
shape = { Rectangle = { w = 10, h = 10 } }
|
||||
|
||||
[[groups]]
|
||||
name = "Bedroom"
|
||||
bulbs = ["light/bedroom"]
|
||||
x = 11
|
||||
y = 0
|
||||
shape = { Rectangle = { w = 10, h = 10 } }
|
||||
|
||||
15
backend/src/collector.rs
Normal file
15
backend/src/collector.rs
Normal file
@ -0,0 +1,15 @@
|
||||
mod markdown_web;
|
||||
|
||||
pub use markdown_web::MarkdownWeb;
|
||||
|
||||
use serde::Deserialize;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Collector {
|
||||
async fn collect(&mut self) -> anyhow::Result<String>;
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct CollectorConfig {
|
||||
pub markdown_web_links: Vec<String>,
|
||||
}
|
||||
16
backend/src/collector/markdown_web.rs
Normal file
16
backend/src/collector/markdown_web.rs
Normal file
@ -0,0 +1,16 @@
|
||||
use crate::collector::Collector;
|
||||
use reqwest::get;
|
||||
|
||||
pub struct MarkdownWeb {
|
||||
pub url: String,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Collector for MarkdownWeb {
|
||||
async fn collect(&mut self) -> anyhow::Result<String> {
|
||||
let text = get(&self.url).await?.text().await?;
|
||||
let html = markdown::to_html(&text);
|
||||
|
||||
Ok(html)
|
||||
}
|
||||
}
|
||||
285
backend/src/main.rs
Normal file
285
backend/src/main.rs
Normal file
@ -0,0 +1,285 @@
|
||||
mod collector;
|
||||
|
||||
use clap::Parser;
|
||||
use collector::{Collector, CollectorConfig, MarkdownWeb};
|
||||
use common::{BulbMap, ClientMessage, ServerMessage};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use lighter_manager::{
|
||||
manager::{BulbCommand, BulbManager, BulbSelector, BulbsConfig},
|
||||
mqtt_conf::MqttConfig,
|
||||
};
|
||||
use log::LevelFilter;
|
||||
use serde::Deserialize;
|
||||
use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tokio::time::{sleep, Duration};
|
||||
use tokio::{fs, select, task};
|
||||
use warp::ws::{self, WebSocket};
|
||||
use warp::{Filter, Rejection, Reply};
|
||||
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
#[derive(Parser)]
|
||||
struct Opt {
|
||||
/// More logging
|
||||
#[clap(short, long, parse(from_occurrences))]
|
||||
verbose: u8,
|
||||
|
||||
/// Supress non-error logs
|
||||
#[clap(short, long)]
|
||||
quiet: bool,
|
||||
|
||||
#[clap(long, short, default_value = "127.0.0.0:8000")]
|
||||
bind: SocketAddr,
|
||||
|
||||
#[clap(long, default_value = "./www")]
|
||||
frontend: String,
|
||||
|
||||
#[clap(long, short)]
|
||||
config: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Config {
|
||||
mqtt: MqttConfig,
|
||||
|
||||
collectors: CollectorConfig,
|
||||
|
||||
#[serde(flatten)]
|
||||
bulbs: BulbsConfig,
|
||||
|
||||
#[serde(flatten)]
|
||||
bulb_map: BulbMap,
|
||||
}
|
||||
|
||||
struct State {
|
||||
config: Config,
|
||||
client_message: broadcast::Sender<ClientRequest>,
|
||||
server_message: broadcast::Sender<ServerMessage>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let opt = Opt::parse();
|
||||
|
||||
let log_level = match opt.verbose {
|
||||
_ if opt.quiet => LevelFilter::Error,
|
||||
0 => LevelFilter::Info,
|
||||
1 => LevelFilter::Debug,
|
||||
2.. => LevelFilter::Trace,
|
||||
};
|
||||
|
||||
pretty_env_logger::formatted_builder()
|
||||
.default_format()
|
||||
.filter_level(log_level)
|
||||
.init();
|
||||
|
||||
let config = fs::read_to_string(&opt.config)
|
||||
.await
|
||||
.expect("Failed to read config");
|
||||
let config: Config = toml::from_str(&config).expect("Failed to parse config");
|
||||
|
||||
// we keep a receiver here to keep the channel active when no clients are connected
|
||||
let (server_message, _receiver) = broadcast::channel(100);
|
||||
let (client_message, _) = broadcast::channel(100);
|
||||
|
||||
let state = State {
|
||||
config,
|
||||
client_message,
|
||||
server_message,
|
||||
};
|
||||
let state = Box::leak(Box::new(state));
|
||||
|
||||
task::spawn(info_collector(state));
|
||||
task::spawn(lights_collector(state));
|
||||
|
||||
let ws = warp::path("ws")
|
||||
// The `ws()` filter will prepare the Websocket handshake.
|
||||
.and(warp::ws())
|
||||
.map(|ws: warp::ws::Ws| {
|
||||
// And then our closure will be called when it completes...
|
||||
ws.on_upgrade(|websocket| {
|
||||
info!("Client connected {:?}", websocket);
|
||||
|
||||
client_handler(websocket, state)
|
||||
})
|
||||
});
|
||||
|
||||
let api = warp::path("api").and(ws.recover(not_found));
|
||||
|
||||
let index_html = format!("{}/index.html", opt.frontend);
|
||||
let frontend = warp::fs::dir(opt.frontend).or(warp::fs::file(index_html));
|
||||
|
||||
let routes = api.or(frontend);
|
||||
|
||||
warp::serve(routes).run(opt.bind).await;
|
||||
}
|
||||
|
||||
async fn not_found(_: Rejection) -> Result<impl Reply, Infallible> {
|
||||
Ok(warp::http::StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ClientRequest {
|
||||
message: ClientMessage,
|
||||
response: mpsc::Sender<ServerMessage>,
|
||||
}
|
||||
|
||||
async fn lights_collector(state: &State) {
|
||||
let config = &state.config;
|
||||
let server_message = &state.server_message;
|
||||
let mut client_message = state.client_message.subscribe();
|
||||
|
||||
let (cmd, bulb_states) = BulbManager::launch(config.bulbs.clone(), config.mqtt.clone())
|
||||
.await
|
||||
.expect("Failed to launch bulb manager");
|
||||
|
||||
loop {
|
||||
let notify = bulb_states.notify_on_change();
|
||||
sleep(Duration::from_millis(1000 / 10)).await; // limit to 10 updates/second
|
||||
select! {
|
||||
_ = notify => {
|
||||
for (id, mode) in bulb_states.bulbs().await.clone().into_iter() {
|
||||
if let Err(e) = server_message.send(ServerMessage::BulbMode { id, mode }) {
|
||||
error!("broadcast channel error: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
request = client_message.recv() => {
|
||||
let request = match request {
|
||||
Ok(r) => r,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
match request.message {
|
||||
ClientMessage::SetBulbColor { id, color } => {
|
||||
if let Err(e) = cmd.send(BulbCommand::SetColor(BulbSelector::Id(id), color)).await {
|
||||
error!("bulb manager error: {e}");
|
||||
}
|
||||
}
|
||||
ClientMessage::SetBulbPower { id, power } => {
|
||||
if let Err(e) = cmd.send(BulbCommand::SetPower(BulbSelector::Id(id), power)).await {
|
||||
error!("bulb manager error: {e}");
|
||||
}
|
||||
}
|
||||
ClientMessage::GetBulbs => {
|
||||
if let Err(e) = request.response.send(ServerMessage::BulbMap(config.bulb_map.clone())).await {
|
||||
error!("GetBulbs response channel error: {e}");
|
||||
return;
|
||||
}
|
||||
for (id, mode) in bulb_states.bulbs().await.clone().into_iter() {
|
||||
if let Err(e) = request.response.send(ServerMessage::BulbMode { id, mode }).await {
|
||||
error!("GetBulbs response channel error: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn info_collector(state: &State) {
|
||||
let mut collectors: Vec<Box<dyn Collector + Send>> = vec![];
|
||||
|
||||
for url in &state.config.collectors.markdown_web_links {
|
||||
collectors.push(Box::new(MarkdownWeb {
|
||||
url: url.to_string(),
|
||||
}));
|
||||
}
|
||||
|
||||
let mut collectors = collectors.into_boxed_slice();
|
||||
|
||||
let server_message = &state.server_message;
|
||||
|
||||
let collectors_len = collectors.len();
|
||||
let next = move |i: usize| (i + 1) % collectors_len;
|
||||
let mut i = 0;
|
||||
|
||||
loop {
|
||||
sleep(Duration::from_secs(30)).await;
|
||||
|
||||
// don't bother collecting if no clients are connected
|
||||
// there is always 1 receiver held by main process
|
||||
if server_message.receiver_count() <= 1 {
|
||||
continue;
|
||||
}
|
||||
|
||||
i = next(i);
|
||||
let collector = &mut collectors[i];
|
||||
|
||||
let msg = match collector.collect().await {
|
||||
Ok(html) => ServerMessage::InfoPage { html },
|
||||
Err(e) => {
|
||||
warn!("collector error: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = server_message.send(msg) {
|
||||
error!("broadcast channel error: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn client_handler(mut socket: WebSocket, state: &State) {
|
||||
let mut server_message = state.server_message.subscribe();
|
||||
let (server_responder, mut server_responses) = mpsc::channel(100);
|
||||
|
||||
async fn handle_server_message(socket: &mut WebSocket, message: ServerMessage) {
|
||||
let message = match ron::to_string(&message) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => return error!("Failed to serialize message: {e}"),
|
||||
};
|
||||
|
||||
if let Err(e) = socket.send(ws::Message::text(message)).await {
|
||||
return warn!("client error: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
select! {
|
||||
response = server_responses.recv() => {
|
||||
if let Some(message) = response {
|
||||
handle_server_message(&mut socket, message).await;
|
||||
}
|
||||
}
|
||||
message = server_message.recv() => {
|
||||
let message = match message {
|
||||
Ok(msg) => msg,
|
||||
Err(RecvError::Lagged(_)) => continue,
|
||||
Err(RecvError::Closed) => return,
|
||||
};
|
||||
handle_server_message(&mut socket, message).await;
|
||||
|
||||
}
|
||||
message = socket.next() => {
|
||||
match message {
|
||||
None => return info!("stream closed"),
|
||||
Some(Err(e)) => return warn!("client error: {e}"),
|
||||
Some(Ok(message)) => {
|
||||
let message = ron::from_str(message.to_str().unwrap()).unwrap();
|
||||
|
||||
let request = ClientRequest {
|
||||
message,
|
||||
response: server_responder.clone(),
|
||||
};
|
||||
|
||||
if let Err(e) = state.client_message.send(request) {
|
||||
return error!("client message handlers error: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user