Automatically delete old messages
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -2243,6 +2243,7 @@ dependencies = [
|
|||||||
"color-eyre",
|
"color-eyre",
|
||||||
"eyre",
|
"eyre",
|
||||||
"log",
|
"log",
|
||||||
|
"rand",
|
||||||
"redb",
|
"redb",
|
||||||
"rmp-serde",
|
"rmp-serde",
|
||||||
"rocket",
|
"rocket",
|
||||||
|
|||||||
@ -68,7 +68,7 @@ fn log_lines(opt: &Opt, r: impl BufRead) -> eyre::Result<()> {
|
|||||||
let line = line.wrap_err("Failed to read from stdin")?;
|
let line = line.wrap_err("Failed to read from stdin")?;
|
||||||
let line = line.trim();
|
let line = line.trim();
|
||||||
if !line.is_empty() {
|
if !line.is_empty() {
|
||||||
log_message(&opt, &line)?;
|
log_message(opt, line)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -32,13 +32,17 @@ pub struct LogMsg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl LogMsg {
|
impl LogMsg {
|
||||||
pub fn new(service: String, message: String, hostname: String) -> Self {
|
pub fn new(
|
||||||
|
service: impl Into<String>,
|
||||||
|
message: impl Into<String>,
|
||||||
|
hostname: impl Into<String>,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
time: Some(Local::now()),
|
time: Some(Local::now()),
|
||||||
severity: Severity::Error,
|
severity: Severity::Error,
|
||||||
service,
|
service: service.into(),
|
||||||
message,
|
message: message.into(),
|
||||||
hostname,
|
hostname: hostname.into(),
|
||||||
file: None,
|
file: None,
|
||||||
line: None,
|
line: None,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,3 +17,6 @@ eyre = "0.6.12"
|
|||||||
color-eyre = "0.6.3"
|
color-eyre = "0.6.3"
|
||||||
redb = { version = "2.1.3", features = ["logging"] }
|
redb = { version = "2.1.3", features = ["logging"] }
|
||||||
rmp-serde = "1.3.0"
|
rmp-serde = "1.3.0"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
rand = "0.8.5"
|
||||||
|
|||||||
@ -9,6 +9,11 @@ use crate::database::Database;
|
|||||||
|
|
||||||
#[get("/")]
|
#[get("/")]
|
||||||
pub async fn index(db: &State<Database>) -> Result<RawHtml<Template>, Status> {
|
pub async fn index(db: &State<Database>) -> Result<RawHtml<Template>, Status> {
|
||||||
|
db.drop_old_messages().map_err(|e| {
|
||||||
|
log::error!("failed to query database: {e}");
|
||||||
|
Status::InternalServerError
|
||||||
|
})?;
|
||||||
|
|
||||||
// TODO:
|
// TODO:
|
||||||
// maybe only show most recent error for each service in dashboard view, i.e.:
|
// maybe only show most recent error for each service in dashboard view, i.e.:
|
||||||
// select distinct on (service) time, message from (select * from record order by time desc) as foo;
|
// select distinct on (service) time, message from (select * from record order by time desc) as foo;
|
||||||
|
|||||||
@ -1,9 +1,13 @@
|
|||||||
use std::{
|
use std::{
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
sync::atomic::{AtomicU64, Ordering},
|
sync::{
|
||||||
|
atomic::{AtomicU64, Ordering},
|
||||||
|
Mutex,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use chrono::{DateTime, Duration, Utc};
|
||||||
use eyre::{eyre, WrapErr};
|
use eyre::{eyre, WrapErr};
|
||||||
use redb::ReadableTable;
|
use redb::ReadableTable;
|
||||||
use serde::{de::DeserializeOwned, Serialize};
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
@ -22,28 +26,6 @@ struct Serialized<T> {
|
|||||||
_type: PhantomData<T>,
|
_type: PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn open(opt: &Opt) -> eyre::Result<Database> {
|
|
||||||
let inner = redb::Database::create(&opt.db)
|
|
||||||
.context(eyre!("Failed to open database at {}", opt.db.display()))?;
|
|
||||||
|
|
||||||
let txn = inner.begin_write()?;
|
|
||||||
let next_log_id = txn
|
|
||||||
.open_table(table::LOG_MESSAGE)?
|
|
||||||
.range::<u64>(..)?
|
|
||||||
.next_back()
|
|
||||||
.transpose()
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
.map(|(id, _)| id.value() + 1)
|
|
||||||
.unwrap_or_default();
|
|
||||||
txn.commit()?;
|
|
||||||
|
|
||||||
Ok(Database {
|
|
||||||
inner,
|
|
||||||
next_log_id: next_log_id.into(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
mod table {
|
mod table {
|
||||||
use super::Serialized;
|
use super::Serialized;
|
||||||
use redb::TableDefinition;
|
use redb::TableDefinition;
|
||||||
@ -53,7 +35,31 @@ mod table {
|
|||||||
TableDefinition::new("log_message");
|
TableDefinition::new("log_message");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub const MAX_MESSAGE_AGE: Duration = Duration::weeks(8);
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
|
pub(crate) fn open(opt: &Opt) -> eyre::Result<Self> {
|
||||||
|
let inner = redb::Database::create(&opt.db)
|
||||||
|
.context(eyre!("Failed to open database at {}", opt.db.display()))?;
|
||||||
|
|
||||||
|
let txn = inner.begin_write()?;
|
||||||
|
let next_log_id = txn
|
||||||
|
.open_table(table::LOG_MESSAGE)?
|
||||||
|
.range::<u64>(..)?
|
||||||
|
.next_back()
|
||||||
|
.transpose()
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.map(|(id, _)| id.value() + 1)
|
||||||
|
.unwrap_or_default();
|
||||||
|
txn.commit()?;
|
||||||
|
|
||||||
|
Ok(Database {
|
||||||
|
inner,
|
||||||
|
next_log_id: next_log_id.into(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub fn write_log(&self, log: &LogMsg) -> eyre::Result<u64> {
|
pub fn write_log(&self, log: &LogMsg) -> eyre::Result<u64> {
|
||||||
let txn = self.inner.begin_write()?;
|
let txn = self.inner.begin_write()?;
|
||||||
let id = self.next_log_id.fetch_add(1, Ordering::SeqCst);
|
let id = self.next_log_id.fetch_add(1, Ordering::SeqCst);
|
||||||
@ -77,6 +83,63 @@ impl Database {
|
|||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Drop any messages older than [MAX_MESSAGE_AGE]
|
||||||
|
pub fn drop_old_messages(&self) -> eyre::Result<usize> {
|
||||||
|
static LAST_RUN: Mutex<DateTime<Utc>> = Mutex::new(DateTime::<Utc>::MIN_UTC);
|
||||||
|
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
// Only run this once a day
|
||||||
|
{
|
||||||
|
let mut last_run = LAST_RUN.lock().unwrap();
|
||||||
|
if now > *last_run + Duration::days(1) {
|
||||||
|
*last_run = now;
|
||||||
|
} else {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::info!("cleaning up old messages");
|
||||||
|
|
||||||
|
let txn = self.inner.begin_write()?;
|
||||||
|
let mut n = 0;
|
||||||
|
txn.open_table(table::LOG_MESSAGE)?
|
||||||
|
.retain_in(0.., |_id, log_msg| {
|
||||||
|
let keep = match log_msg.deserialize() {
|
||||||
|
Ok(log_msg) => match log_msg.time {
|
||||||
|
Some(time) => {
|
||||||
|
let too_old = now > time + MAX_MESSAGE_AGE;
|
||||||
|
if too_old {
|
||||||
|
log::info!("removing old message posted at {time}");
|
||||||
|
}
|
||||||
|
!too_old
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timestamp should always be set.
|
||||||
|
// TODO: make it not be an option
|
||||||
|
None => {
|
||||||
|
log::warn!("removing message with no timestamp");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
log::warn!("removing message that failed to deserialize");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !keep {
|
||||||
|
n += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
keep
|
||||||
|
})?;
|
||||||
|
|
||||||
|
txn.commit()?;
|
||||||
|
|
||||||
|
Ok(n)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Serialized<T>
|
impl<T> Serialized<T>
|
||||||
@ -100,11 +163,13 @@ impl<T> redb::Value for Serialized<T>
|
|||||||
where
|
where
|
||||||
T: Debug + Serialize + DeserializeOwned,
|
T: Debug + Serialize + DeserializeOwned,
|
||||||
{
|
{
|
||||||
type SelfType<'a> = Self
|
type SelfType<'a>
|
||||||
|
= Self
|
||||||
where
|
where
|
||||||
Self: 'a;
|
Self: 'a;
|
||||||
|
|
||||||
type AsBytes<'a> = &'a [u8]
|
type AsBytes<'a>
|
||||||
|
= &'a [u8]
|
||||||
where
|
where
|
||||||
Self: 'a;
|
Self: 'a;
|
||||||
|
|
||||||
@ -134,3 +199,40 @@ where
|
|||||||
redb::TypeName::new(std::any::type_name::<Serialized<T>>())
|
redb::TypeName::new(std::any::type_name::<Serialized<T>>())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use chrono::Local;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn clear_old_messages() {
|
||||||
|
let tmp: u32 = rand::random();
|
||||||
|
let tmp = format!("/tmp/{tmp:08x}.db");
|
||||||
|
|
||||||
|
let opt = Opt { db: tmp.into() };
|
||||||
|
let db = Database::open(&opt).unwrap();
|
||||||
|
|
||||||
|
let now = Local::now();
|
||||||
|
let msg_with_time = |time| LogMsg {
|
||||||
|
time: Some(time),
|
||||||
|
..LogMsg::new("service", "message", "hostname")
|
||||||
|
};
|
||||||
|
|
||||||
|
// not old mesages
|
||||||
|
db.write_log(&msg_with_time(now)).unwrap();
|
||||||
|
db.write_log(&msg_with_time(now - MAX_MESSAGE_AGE + Duration::days(1)))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// old messages
|
||||||
|
db.write_log(&msg_with_time(now - MAX_MESSAGE_AGE - Duration::days(1)))
|
||||||
|
.unwrap();
|
||||||
|
db.write_log(&msg_with_time(now - MAX_MESSAGE_AGE - Duration::days(365)))
|
||||||
|
.unwrap();
|
||||||
|
db.write_log(&msg_with_time(now - MAX_MESSAGE_AGE - Duration::seconds(1)))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(db.drop_old_messages().unwrap(), 3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -8,6 +8,7 @@ mod database;
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
|
use database::Database;
|
||||||
use rocket_dyn_templates::Template;
|
use rocket_dyn_templates::Template;
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
@ -24,7 +25,7 @@ async fn rocket() -> _ {
|
|||||||
let opt = Opt::parse();
|
let opt = Opt::parse();
|
||||||
color_eyre::install().unwrap();
|
color_eyre::install().unwrap();
|
||||||
|
|
||||||
let db = database::open(&opt).expect("open database");
|
let db = Database::open(&opt).expect("open database");
|
||||||
|
|
||||||
rocket::build()
|
rocket::build()
|
||||||
.manage(db)
|
.manage(db)
|
||||||
|
|||||||
Reference in New Issue
Block a user