Files
hemma/frontend/src/app.rs
2023-09-30 22:05:16 +02:00

201 lines
6.0 KiB
Rust

use crate::page;
use common::{ClientMessage, ServerMessage};
use gloo_console::error;
use gloo_net::websocket;
use gloo_net::websocket::futures::WebSocket;
use seed::app::orders::OrdersContainer;
use seed::futures::channel::mpsc::channel;
use seed::futures::channel::mpsc::Sender;
use seed::futures::select_biased;
use seed::futures::SinkExt;
use seed::futures::StreamExt;
use seed::prelude::*;
use seed::window;
use seed::FutureExt;
use seed_router::Router;
use std::collections::VecDeque;
use std::error::Error;
pub type AppOrders = OrdersContainer<Msg, Model, Vec<Node<Msg>>>;
/// Delays between successive attempts to reconnect in case the socket breaks. In seconds.
const TIMEOUT_CONNECT_DELAYS: &[u32] = &[2, 5, 10, 10, 10, 20, 30, 60, 120, 300];
pub struct Model {
page: Pages,
send_queue: VecDeque<ClientMessage>,
/// Websocket URL
ws_url: String,
/// Channel to send websocket messages.
socket: Option<Sender<websocket::Message>>,
/// Handle to the websocket task.
socket_task: Option<CmdHandle>,
timeout_count: usize,
}
#[derive(Router)]
pub enum Pages {
#[page("404", NotFound)]
NotFound(page::not_found::Model),
#[page("info", Info)]
Info(page::info::Model),
#[page("lights", Lights)]
Lights(page::lights::Model),
}
#[derive(Debug)]
pub enum PageMsg {
NotFound(page::not_found::Msg),
Info(page::info::Msg),
Lights(page::lights::Msg),
}
#[derive(Debug)]
pub enum Msg {
Page(PageMsg),
SendMessage(ClientMessage),
FlushMessageQueue,
// Global
Connect,
SocketOpened(Sender<websocket::Message>),
SocketClosed,
SocketMessage(websocket::Message),
}
pub fn init(url: Url, orders: &mut impl Orders<Msg>) -> Model {
orders.subscribe(Msg::SendMessage);
orders.send_msg(Msg::Connect);
let location = window().location();
let host = location.host().expect("Failed to get hostname");
let ws_protocol = match location.protocol().ok().as_deref() {
Some("http:") => "ws",
_ => "wss",
};
let ws_url = format!("{ws_protocol}://{host}/api/ws");
Model {
page: Pages::from_url(url, &mut orders.proxy(Msg::Page))
.unwrap_or(Pages::NotFound(Default::default())),
send_queue: Default::default(),
ws_url,
socket_task: None,
socket: None,
timeout_count: 0,
}
}
fn open_socket(url: String, orders: &mut impl Orders<Msg>) -> CmdHandle {
let update_fn = orders.msg_sender();
orders.perform_cmd_with_handle(async move {
let mut ws = WebSocket::open(&url).expect("Failed to open websocket");
let (tx, mut rx) = channel(128);
update_fn(Some(Msg::SocketOpened(tx)));
loop {
select_biased! {
message = rx.next().fuse() => {
let Some(message) = message else { return; };
if let Err(e) = ws.send(message).await {
error!(format!("websocket error: {e:?}"));
return;
}
}
message = ws.next().fuse() => match message {
Some(Ok(message)) => update_fn(Some(Msg::SocketMessage(message))),
Some(Err(e)) => {
error!(format!("websocket error: {e:?}"));
update_fn(Some(Msg::SocketClosed));
return;
}
None => {
update_fn(Some(Msg::SocketClosed));
return;
}
}
}
}
})
}
pub fn update(msg: Msg, model: &mut Model, orders: &mut AppOrders) {
#[cfg(debug_assertions)]
gloo_console::debug!(format!("{msg:?}"));
match msg {
Msg::Page(msg) => model.page.update(msg, &mut orders.proxy(Msg::Page)),
Msg::FlushMessageQueue => {
if let Some(socket) = model.socket.as_mut() {
while let Some(message) = model.send_queue.pop_front() {
let serialized = ron::to_string(&message).expect("failed to serialize ron");
let ws_message = websocket::Message::Text(serialized);
if socket.try_send(ws_message).is_err() {
error!("websocket queue full");
model.send_queue.push_front(message);
return;
}
}
}
}
Msg::Connect => {
model.socket_task = Some(open_socket(model.ws_url.clone(), orders));
}
Msg::SendMessage(message) => {
model.send_queue.push_back(message);
orders.send_msg(Msg::FlushMessageQueue);
}
Msg::SocketOpened(socket) => {
model.socket = Some(socket);
model.timeout_count = 0;
orders.send_msg(Msg::FlushMessageQueue);
}
Msg::SocketClosed => {
model.socket_task = None;
model.socket = None;
let timeout_sec = TIMEOUT_CONNECT_DELAYS[model.timeout_count];
let timeout_ms = timeout_sec * 1000;
orders.perform_cmd(cmds::timeout(timeout_ms, || Msg::Connect));
error!(format!(
"Socket closed, trying to reconnect in {timeout_sec} seconds"
));
model.timeout_count = TIMEOUT_CONNECT_DELAYS.len().min(model.timeout_count + 1);
}
Msg::SocketMessage(message) => {
if let Err(e) = handle_ws_msg(message, orders) {
error!(format!("{e:?}"));
}
}
}
}
fn handle_ws_msg(
message: websocket::Message,
orders: &mut impl Orders<Msg>,
) -> Result<(), Box<dyn Error>> {
let websocket::Message::Text(text) = &message else {
return Err("Server is sending us raw bytes on the websocket! Argh!".into());
};
let message: ServerMessage = ron::from_str(text)?;
orders.notify(message);
Ok(())
}
pub fn view(model: &Model) -> Vec<Node<Msg>> {
vec![model.page.view().map_msg(Msg::Page)]
}