Add cli & lib crates

This commit is contained in:
2021-04-22 15:13:28 +02:00
parent e39cffa3f6
commit 3a9ecc398a
53 changed files with 5065 additions and 99 deletions

1
cli/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

1711
cli/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

36
cli/Cargo.toml Normal file
View File

@ -0,0 +1,36 @@
[package]
name = "stl_cli"
version = "2.5.0"
authors = ["Joakim Hulthe <joakim@hulthe.net>"]
license = "MPL-2.0"
edition = "2018"
[[bin]]
name = "stl"
path = "src/main.rs"
[dependencies]
thiserror = "1.0.24"
notify = "4.0.16"
log = "0.4.14"
pretty_env_logger = "0.4.0"
uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = { version = "0.4", features = ["serde"] }
structopt = "0.3.21"
syscalls = { version = "0.3", default-features = false }
futures = "0.3"
[dependencies.stl_lib]
path = "../lib"
[dependencies.serde]
version = "1"
features = ["derive"]
[dependencies.reqwest]
version = "0.11.3"
features = ["json"]
[dependencies.tokio]
version = "1.5.0"
features = ["fs", "rt", "rt-multi-thread", "macros"]

0
cli/src/api.rs Normal file
View File

161
cli/src/daemon.rs Normal file
View File

@ -0,0 +1,161 @@
use crate::util::proxy_channel;
use crate::DaemonOpt;
use crate::Error;
use futures::poll;
use futures::task::Poll;
use notify::{INotifyWatcher, Op, RecursiveMode, Watcher};
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::mpsc;
use std::time::{Duration, Instant};
use stl_lib::api::url;
use stl_lib::v2::trees::category;
use tokio::fs;
use tokio::runtime::Runtime;
use tokio::task::{spawn, JoinHandle};
pub fn run(opt: DaemonOpt) -> Result<(), Error> {
let opt: &'static DaemonOpt = Box::leak(Box::new(opt));
let rt = Runtime::new()?;
rt.block_on(async {
let (tx, rx) = mpsc::channel();
info!("creating output file");
fs::write(&opt.file, b"").await?;
let mut watcher = INotifyWatcher::new_raw(tx).unwrap();
watcher
.watch(&opt.file, RecursiveMode::NonRecursive)
.expect("failed to watch file");
update_category_list(opt).await?;
let mut last_update = Instant::now();
let mut wfe: JoinHandle<_> = spawn(wait_for_event(opt));
let mut rx = proxy_channel(rx);
loop {
let event = match rx.recv().await {
Some(event) => event,
None => return Err(Error::ChannelClosed),
};
debug!("event op {:?}", event.op);
match event.op {
Ok(Op::CHMOD) => { /* YEESSS */ }
_ => continue,
}
debug!("CHMOD event detected");
let update_categories;
let restart_wfe;
match poll!(&mut wfe) {
Poll::Ready(Ok(Ok(_wfe))) => {
restart_wfe = true;
update_categories = Some(true);
}
Poll::Ready(Ok(Err(e))) => {
error!("error waiting for event (falling back to timing): {}", e);
restart_wfe = false;
update_categories = None;
}
Poll::Ready(Err(_)) => {
restart_wfe = false;
update_categories = None;
}
Poll::Pending => {
restart_wfe = false;
update_categories = Some(false);
}
}
let elapsed = last_update.elapsed();
let update_categories =
update_categories.unwrap_or_else(|| elapsed > Duration::from_secs(opt.poll_delay));
if update_categories {
update_category_list(opt).await?;
last_update = Instant::now();
}
if update_categories || restart_wfe {
wfe = spawn(wait_for_event(opt));
}
}
})
}
#[derive(Deserialize)]
struct WaitForEvent {
#[allow(dead_code)]
timeout: bool,
}
async fn wait_for_event(opt: &DaemonOpt) -> Result<WaitForEvent, Error> {
info!("calling wait_for_event");
let client = reqwest::Client::new();
let wfe: WaitForEvent = client
.get(format!(
"{}{}?timeout={}",
&opt.api_uri,
url::WAIT_FOR_EVENT,
opt.poll_delay
))
.header("Cookie", &opt.cookie)
.send()
.await?
.json()
.await?;
Ok(wfe)
}
async fn update_category_list(opt: &DaemonOpt) -> Result<(), Error> {
let categories = get_active_categories(opt).await?;
info!("writing output file");
let data = categories.join("|");
let data = if data.is_empty() {
b"--"
} else {
data.as_bytes()
};
fs::write(&opt.file, data).await?;
Ok(())
}
async fn get_active_categories(opt: &DaemonOpt) -> Result<Vec<String>, Error> {
info!("downloading category list");
let client = reqwest::Client::new();
let response = client
.get(format!("{}{}", &opt.api_uri, url::SESSIONS))
.header("Cookie", &opt.cookie)
.send()
.await?;
let categories: HashMap<category::K, category::V> = response.json().await?;
// sort by the longest running log
let mut categories: Vec<category::V> = categories.into_iter().map(|(_, v)| v).collect();
categories.sort_by_key(|c| (c.started));
Ok(categories
.into_iter()
.inspect(|category| {
info!(
" name={} active={}",
category.name,
category.started.is_some()
)
})
.filter(|category| category.started.is_some())
.map(|category| category.name)
.collect())
}

80
cli/src/main.rs Normal file
View File

@ -0,0 +1,80 @@
#[macro_use]
extern crate log;
pub mod daemon;
pub mod stat;
pub mod util;
use std::path::PathBuf;
use structopt::StructOpt;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("request error {0:?}")]
Request(#[from] reqwest::Error),
#[error("input/output error")]
IO(#[from] tokio::io::Error),
#[error("notify channel closed")]
ChannelClosed,
#[error("syscall error, code = {error_code}")]
SysCall { error_code: i64 },
}
#[derive(StructOpt)]
pub enum Opt {
/// Launch the daemon
Daemon(DaemonOpt),
/// touch the stat file if it exists
/// otherwise exit with status code 1
Stat(StatOpt),
}
#[derive(StructOpt)]
pub struct DaemonOpt {
/// The path to the output file
#[structopt(short, long)]
file: PathBuf,
/// The server uri
#[structopt(short, long, default_value = "https://stl.nubo.sh")]
api_uri: String,
/// The amount of time to wait between polls
#[structopt(short = "d", long, default_value = "30")]
poll_delay: u64,
/// The authorization cookie for the server
#[structopt(short, long)]
cookie: String,
}
#[derive(StructOpt)]
pub struct StatOpt {
#[structopt(short, long)]
file: PathBuf,
}
fn main() {
pretty_env_logger::init();
let opt = Opt::from_args();
let result = match opt {
Opt::Daemon(opt) => daemon::run(opt),
Opt::Stat(opt) => stat::run(&opt),
};
match result {
Ok(_) => {}
Err(e) => {
error!("{}", e);
}
}
info!("stld starting");
}

34
cli/src/stat.rs Normal file
View File

@ -0,0 +1,34 @@
use crate::{Error, StatOpt};
use std::fs;
use std::os::unix::io::AsRawFd;
use std::process::exit;
use syscalls::syscall;
pub fn run(opt: &StatOpt) -> Result<(), Error> {
let file = match fs::OpenOptions::new()
.write(true)
.read(false)
.create(false)
.open(&opt.file)
{
Ok(file) => file,
Err(_) => exit(1),
};
let fd: i32 = file.as_raw_fd();
let result: i64 = unsafe {
// update file mtime (to trigger inotify) in the daemon
//
// SAFETY:
// - fd is a valid file descriptor, since it was created using a File
// - passing NULL to all other parameters is valid according to `man 2 utimensat`
syscall!(SYS_utimensat, fd, 0, 0, 0)
}
.map_err(|error_code| Error::SysCall { error_code })?;
// 0 should be the only possible return value, since error cases are handled by the result
assert_eq!(result, 0);
Ok(())
}

24
cli/src/util.rs Normal file
View File

@ -0,0 +1,24 @@
use std::sync::mpsc as std_mpsc;
use tokio::sync::mpsc as tokio_mpsc;
use tokio::task;
pub fn proxy_channel<T>(std_rx: std_mpsc::Receiver<T>) -> tokio_mpsc::Receiver<T>
where
T: 'static + Send + Sync,
{
let (tx, tokio_rx) = tokio_mpsc::channel(255);
task::spawn_blocking(move || loop {
let msg = match std_rx.recv() {
Ok(msg) => msg,
Err(_) => return,
};
match tx.blocking_send(msg) {
Ok(_) => {}
Err(_) => return,
}
});
tokio_rx
}