Update deps

This commit is contained in:
2023-09-30 22:05:16 +02:00
parent 4cf6b628ee
commit ac306eece4
5 changed files with 747 additions and 678 deletions

View File

@ -1,10 +1,20 @@
use crate::page;
use common::{ClientMessage, ServerMessage};
use gloo_console::error;
use gloo_net::websocket;
use gloo_net::websocket::futures::WebSocket;
use seed::app::orders::OrdersContainer;
use seed::futures::channel::mpsc::channel;
use seed::futures::channel::mpsc::Sender;
use seed::futures::select_biased;
use seed::futures::SinkExt;
use seed::futures::StreamExt;
use seed::prelude::*;
use seed::{log, window};
use seed::window;
use seed::FutureExt;
use seed_router::Router;
use std::collections::VecDeque;
use std::error::Error;
pub type AppOrders = OrdersContainer<Msg, Model, Vec<Node<Msg>>>;
@ -14,8 +24,16 @@ const TIMEOUT_CONNECT_DELAYS: &[u32] = &[2, 5, 10, 10, 10, 20, 30, 60, 120, 300]
pub struct Model {
page: Pages,
send_queue: VecDeque<ClientMessage>,
socket: WebSocket,
/// Websocket URL
ws_url: String,
/// Channel to send websocket messages.
socket: Option<Sender<websocket::Message>>,
/// Handle to the websocket task.
socket_task: Option<CmdHandle>,
timeout_count: usize,
}
@ -47,14 +65,14 @@ pub enum Msg {
// Global
Connect,
SocketOpened(),
SocketClosed(CloseEvent),
SocketError(),
SocketMessage(WebSocketMessage),
SocketOpened(Sender<websocket::Message>),
SocketClosed,
SocketMessage(websocket::Message),
}
pub fn init(url: Url, orders: &mut impl Orders<Msg>) -> Model {
orders.subscribe(Msg::SendMessage);
orders.send_msg(Msg::Connect);
let location = window().location();
let host = location.host().expect("Failed to get hostname");
@ -69,73 +87,109 @@ pub fn init(url: Url, orders: &mut impl Orders<Msg>) -> Model {
page: Pages::from_url(url, &mut orders.proxy(Msg::Page))
.unwrap_or(Pages::NotFound(Default::default())),
send_queue: Default::default(),
socket: open_socket(&ws_url, orders),
ws_url,
socket_task: None,
socket: None,
timeout_count: 0,
}
}
fn open_socket(url: &str, orders: &mut impl Orders<Msg>) -> WebSocket {
WebSocket::builder(url, orders)
.on_open(Msg::SocketOpened)
.on_close(Msg::SocketClosed)
.on_error(Msg::SocketError)
.on_message(Msg::SocketMessage)
.build_and_open()
.expect("failed to open websocket")
fn open_socket(url: String, orders: &mut impl Orders<Msg>) -> CmdHandle {
let update_fn = orders.msg_sender();
orders.perform_cmd_with_handle(async move {
let mut ws = WebSocket::open(&url).expect("Failed to open websocket");
let (tx, mut rx) = channel(128);
update_fn(Some(Msg::SocketOpened(tx)));
loop {
select_biased! {
message = rx.next().fuse() => {
let Some(message) = message else { return; };
if let Err(e) = ws.send(message).await {
error!(format!("websocket error: {e:?}"));
return;
}
}
message = ws.next().fuse() => match message {
Some(Ok(message)) => update_fn(Some(Msg::SocketMessage(message))),
Some(Err(e)) => {
error!(format!("websocket error: {e:?}"));
update_fn(Some(Msg::SocketClosed));
return;
}
None => {
update_fn(Some(Msg::SocketClosed));
return;
}
}
}
}
})
}
pub fn update(msg: Msg, model: &mut Model, orders: &mut AppOrders) {
#[cfg(debug_assertions)]
log!(format!("{msg:?}"));
gloo_console::debug!(format!("{msg:?}"));
match msg {
Msg::Page(msg) => model.page.update(msg, &mut orders.proxy(Msg::Page)),
Msg::FlushMessageQueue => {
while let Some(message) = model.send_queue.pop_front() {
let serialized = ron::to_string(&message).unwrap();
if let Some(socket) = model.socket.as_mut() {
while let Some(message) = model.send_queue.pop_front() {
let serialized = ron::to_string(&message).expect("failed to serialize ron");
if let Err(e) = model.socket.send_text(serialized) {
model.send_queue.push_front(message);
log!(e);
return;
let ws_message = websocket::Message::Text(serialized);
if socket.try_send(ws_message).is_err() {
error!("websocket queue full");
model.send_queue.push_front(message);
return;
}
}
}
}
Msg::Connect => {
model.socket = open_socket(&model.ws_url, orders);
model.socket_task = Some(open_socket(model.ws_url.clone(), orders));
}
Msg::SendMessage(message) => {
model.send_queue.push_back(message);
orders.send_msg(Msg::FlushMessageQueue);
}
Msg::SocketOpened() => {
Msg::SocketOpened(socket) => {
model.socket = Some(socket);
model.timeout_count = 0;
orders.send_msg(Msg::FlushMessageQueue);
}
Msg::SocketClosed(_event) => {
Msg::SocketClosed => {
model.socket_task = None;
model.socket = None;
let timeout_sec = TIMEOUT_CONNECT_DELAYS[model.timeout_count];
let timeout_ms = timeout_sec * 1000;
orders.perform_cmd(cmds::timeout(timeout_ms, || Msg::Connect));
log!(format!(
error!(format!(
"Socket closed, trying to reconnect in {timeout_sec} seconds"
));
model.timeout_count = TIMEOUT_CONNECT_DELAYS.len().min(model.timeout_count + 1);
}
Msg::SocketError() => {}
Msg::SocketMessage(message) => {
if let Err(e) = handle_ws_msg(message, orders) {
log!(e);
error!(format!("{e:?}"));
}
}
}
}
fn handle_ws_msg(message: WebSocketMessage, orders: &mut impl Orders<Msg>) -> anyhow::Result<()> {
let message = message.text().map_err(|e| anyhow::format_err!("{e:?}"))?;
let message: ServerMessage = ron::from_str(&message)?;
fn handle_ws_msg(
message: websocket::Message,
orders: &mut impl Orders<Msg>,
) -> Result<(), Box<dyn Error>> {
let websocket::Message::Text(text) = &message else {
return Err("Server is sending us raw bytes on the websocket! Argh!".into());
};
let message: ServerMessage = ron::from_str(text)?;
orders.notify(message);
Ok(())
@ -143,9 +197,4 @@ fn handle_ws_msg(message: WebSocketMessage, orders: &mut impl Orders<Msg>) -> an
pub fn view(model: &Model) -> Vec<Node<Msg>> {
vec![model.page.view().map_msg(Msg::Page)]
//match &model.page {
// Pages::NotFound => vec![h1!["Not Found"]],
// Pages::InfoScreen => vec![div![C![C.info_box], raw![&model.info_page]]],
// Pages::Lights(page) => vec![],
//}
}