diff --git a/Cargo.lock b/Cargo.lock index 13354e1..fff3149 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2243,6 +2243,7 @@ dependencies = [ "color-eyre", "eyre", "log", + "rand", "redb", "rmp-serde", "rocket", diff --git a/snitch-cli/src/main.rs b/snitch-cli/src/main.rs index f986372..b3a1d19 100644 --- a/snitch-cli/src/main.rs +++ b/snitch-cli/src/main.rs @@ -68,7 +68,7 @@ fn log_lines(opt: &Opt, r: impl BufRead) -> eyre::Result<()> { let line = line.wrap_err("Failed to read from stdin")?; let line = line.trim(); if !line.is_empty() { - log_message(&opt, &line)?; + log_message(opt, line)?; } } diff --git a/snitch-lib/src/message.rs b/snitch-lib/src/message.rs index ba0b595..36b2126 100644 --- a/snitch-lib/src/message.rs +++ b/snitch-lib/src/message.rs @@ -32,13 +32,17 @@ pub struct LogMsg { } impl LogMsg { - pub fn new(service: String, message: String, hostname: String) -> Self { + pub fn new( + service: impl Into, + message: impl Into, + hostname: impl Into, + ) -> Self { Self { time: Some(Local::now()), severity: Severity::Error, - service, - message, - hostname, + service: service.into(), + message: message.into(), + hostname: hostname.into(), file: None, line: None, } diff --git a/snitch-web/Cargo.toml b/snitch-web/Cargo.toml index 1df5df4..7d50da3 100644 --- a/snitch-web/Cargo.toml +++ b/snitch-web/Cargo.toml @@ -17,3 +17,6 @@ eyre = "0.6.12" color-eyre = "0.6.3" redb = { version = "2.1.3", features = ["logging"] } rmp-serde = "1.3.0" + +[dev-dependencies] +rand = "0.8.5" diff --git a/snitch-web/src/dashboard.rs b/snitch-web/src/dashboard.rs index 5cfbb92..afd3ac3 100644 --- a/snitch-web/src/dashboard.rs +++ b/snitch-web/src/dashboard.rs @@ -9,6 +9,11 @@ use crate::database::Database; #[get("/")] pub async fn index(db: &State) -> Result, Status> { + db.drop_old_messages().map_err(|e| { + log::error!("failed to query database: {e}"); + Status::InternalServerError + })?; + // TODO: // maybe only show most recent error for each service in dashboard view, i.e.: // select distinct on (service) time, message from (select * from record order by time desc) as foo; diff --git a/snitch-web/src/database.rs b/snitch-web/src/database.rs index 9859124..6706440 100644 --- a/snitch-web/src/database.rs +++ b/snitch-web/src/database.rs @@ -1,9 +1,13 @@ use std::{ fmt::Debug, marker::PhantomData, - sync::atomic::{AtomicU64, Ordering}, + sync::{ + atomic::{AtomicU64, Ordering}, + Mutex, + }, }; +use chrono::{DateTime, Duration, Utc}; use eyre::{eyre, WrapErr}; use redb::ReadableTable; use serde::{de::DeserializeOwned, Serialize}; @@ -22,28 +26,6 @@ struct Serialized { _type: PhantomData, } -pub(crate) fn open(opt: &Opt) -> eyre::Result { - let inner = redb::Database::create(&opt.db) - .context(eyre!("Failed to open database at {}", opt.db.display()))?; - - let txn = inner.begin_write()?; - let next_log_id = txn - .open_table(table::LOG_MESSAGE)? - .range::(..)? - .next_back() - .transpose() - .ok() - .flatten() - .map(|(id, _)| id.value() + 1) - .unwrap_or_default(); - txn.commit()?; - - Ok(Database { - inner, - next_log_id: next_log_id.into(), - }) -} - mod table { use super::Serialized; use redb::TableDefinition; @@ -53,7 +35,31 @@ mod table { TableDefinition::new("log_message"); } +pub const MAX_MESSAGE_AGE: Duration = Duration::weeks(8); + impl Database { + pub(crate) fn open(opt: &Opt) -> eyre::Result { + let inner = redb::Database::create(&opt.db) + .context(eyre!("Failed to open database at {}", opt.db.display()))?; + + let txn = inner.begin_write()?; + let next_log_id = txn + .open_table(table::LOG_MESSAGE)? + .range::(..)? + .next_back() + .transpose() + .ok() + .flatten() + .map(|(id, _)| id.value() + 1) + .unwrap_or_default(); + txn.commit()?; + + Ok(Database { + inner, + next_log_id: next_log_id.into(), + }) + } + pub fn write_log(&self, log: &LogMsg) -> eyre::Result { let txn = self.inner.begin_write()?; let id = self.next_log_id.fetch_add(1, Ordering::SeqCst); @@ -77,6 +83,63 @@ impl Database { }) .collect() } + + /// Drop any messages older than [MAX_MESSAGE_AGE] + pub fn drop_old_messages(&self) -> eyre::Result { + static LAST_RUN: Mutex> = Mutex::new(DateTime::::MIN_UTC); + + let now = Utc::now(); + + // Only run this once a day + { + let mut last_run = LAST_RUN.lock().unwrap(); + if now > *last_run + Duration::days(1) { + *last_run = now; + } else { + return Ok(0); + } + } + + log::info!("cleaning up old messages"); + + let txn = self.inner.begin_write()?; + let mut n = 0; + txn.open_table(table::LOG_MESSAGE)? + .retain_in(0.., |_id, log_msg| { + let keep = match log_msg.deserialize() { + Ok(log_msg) => match log_msg.time { + Some(time) => { + let too_old = now > time + MAX_MESSAGE_AGE; + if too_old { + log::info!("removing old message posted at {time}"); + } + !too_old + } + + // Timestamp should always be set. + // TODO: make it not be an option + None => { + log::warn!("removing message with no timestamp"); + false + } + }, + Err(_) => { + log::warn!("removing message that failed to deserialize"); + false + } + }; + + if !keep { + n += 1; + } + + keep + })?; + + txn.commit()?; + + Ok(n) + } } impl Serialized @@ -100,11 +163,13 @@ impl redb::Value for Serialized where T: Debug + Serialize + DeserializeOwned, { - type SelfType<'a> = Self + type SelfType<'a> + = Self where Self: 'a; - type AsBytes<'a> = &'a [u8] + type AsBytes<'a> + = &'a [u8] where Self: 'a; @@ -134,3 +199,40 @@ where redb::TypeName::new(std::any::type_name::>()) } } + +#[cfg(test)] +mod tests { + use chrono::Local; + + use super::*; + + #[test] + fn clear_old_messages() { + let tmp: u32 = rand::random(); + let tmp = format!("/tmp/{tmp:08x}.db"); + + let opt = Opt { db: tmp.into() }; + let db = Database::open(&opt).unwrap(); + + let now = Local::now(); + let msg_with_time = |time| LogMsg { + time: Some(time), + ..LogMsg::new("service", "message", "hostname") + }; + + // not old mesages + db.write_log(&msg_with_time(now)).unwrap(); + db.write_log(&msg_with_time(now - MAX_MESSAGE_AGE + Duration::days(1))) + .unwrap(); + + // old messages + db.write_log(&msg_with_time(now - MAX_MESSAGE_AGE - Duration::days(1))) + .unwrap(); + db.write_log(&msg_with_time(now - MAX_MESSAGE_AGE - Duration::days(365))) + .unwrap(); + db.write_log(&msg_with_time(now - MAX_MESSAGE_AGE - Duration::seconds(1))) + .unwrap(); + + assert_eq!(db.drop_old_messages().unwrap(), 3); + } +} diff --git a/snitch-web/src/main.rs b/snitch-web/src/main.rs index 11bbdab..1cfc3c7 100644 --- a/snitch-web/src/main.rs +++ b/snitch-web/src/main.rs @@ -8,6 +8,7 @@ mod database; use std::path::PathBuf; use clap::Parser; +use database::Database; use rocket_dyn_templates::Template; #[derive(Parser)] @@ -24,7 +25,7 @@ async fn rocket() -> _ { let opt = Opt::parse(); color_eyre::install().unwrap(); - let db = database::open(&opt).expect("open database"); + let db = Database::open(&opt).expect("open database"); rocket::build() .manage(db)