Implement cpu usage monitor

This commit is contained in:
2022-05-12 12:50:42 +02:00
parent 7ff5f62413
commit 30b5bb77e8
11 changed files with 483 additions and 122 deletions

20
Cargo.lock generated
View File

@ -8,6 +8,17 @@ version = "1.0.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27"
[[package]]
name = "async-trait"
version = "0.1.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -93,6 +104,7 @@ name = "composers"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"clap",
"crossterm 0.23.2",
"serde",
@ -219,6 +231,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "mio"
version = "0.7.14"
@ -511,9 +529,11 @@ checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee"
dependencies = [
"bytes",
"libc",
"memchr",
"mio 0.8.2",
"num_cpus",
"once_cell",
"parking_lot 0.12.0",
"pin-project-lite",
"signal-hook-registry",
"tokio-macros",

View File

@ -10,7 +10,8 @@ tui = "0.17.0"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
anyhow = "1"
async-trait = "0.1.53"
[dependencies.tokio]
version = "1.17.0"
features = ["rt-multi-thread", "fs", "macros", "process", "sync", "time"]
features = ["rt-multi-thread", "fs", "macros", "process", "sync", "time", "io-util", "parking_lot"]

View File

@ -1,66 +1,70 @@
use crate::docker;
use crate::state::{StackStats, StateEvent};
use crate::stack::{self, StackInfo};
use std::collections::HashMap;
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::{sleep, Duration};
#[derive(Debug)]
pub enum StateEvent {
Put { name: String, info: StackInfo },
Delete { name: String },
}
pub(crate) async fn start_collector(events: mpsc::Sender<StateEvent>) -> anyhow::Result<()> {
let mut old_stacks: HashMap<String, StackStats> = HashMap::new();
//let mut stacks: HashMap<String, StackInfo> = HashMap::new();
let mut stack_monitors: HashMap<String, mpsc::Receiver<StackInfo>> = HashMap::new();
loop {
let new_stacks = collect_data().await?;
let stacks = docker::list_stacks().await?;
for (name, stats) in &new_stacks {
let mut send_put = false;
if let Some(old_stats) = old_stacks.get(name) {
if old_stats != stats {
send_put = true;
for stack in &stacks {
if let Some(monitor) = stack_monitors.get_mut(&stack.name) {
match monitor.try_recv() {
Ok(info) => {
events
.send(StateEvent::Put {
name: stack.name.clone(),
info,
})
.await?
}
Err(mpsc::error::TryRecvError::Empty) => continue,
Err(_) => {
stack_monitors.remove(&stack.name);
}
}
} else {
send_put = true;
}
if send_put {
events
.send(StateEvent::Put {
name: name.clone(),
stats: stats.clone(),
})
.await?;
stack_monitors.insert(stack.name.clone(), stack::spawn_monitor(stack.clone()));
}
}
for name in old_stacks.keys() {
if !new_stacks.contains_key(name) {
events
.send(StateEvent::Delete { name: name.clone() })
.await?;
}
}
let delete: Vec<_> = stack_monitors
.keys()
.filter(|&name| !stacks.iter().any(|stack| &stack.name == name))
.cloned()
.collect();
old_stacks = new_stacks;
for name in delete {
stack_monitors.remove(&name);
events.send(StateEvent::Delete { name }).await?;
}
sleep(Duration::from_secs(1)).await;
}
}
/*
pub(crate) async fn collect_data() -> anyhow::Result<HashMap<String, StackStats>> {
let docker_stacks = docker::list_stacks().await?;
let mut stack_jobs = Vec::new();
for docker_stack in docker_stacks {
stack_jobs.push(task::spawn(collect_stack_data(docker_stack)));
}
let mut out = HashMap::with_capacity(stack_jobs.len());
for stack_job in stack_jobs {
let (name, stats) = stack_job.await??;
out.insert(name, stats);
}
Ok(out)
docker::list_stacks()
.await?
.into_iter()
// collect data for all stacks concurrently
.map(collect_stack_data)
.run_concurrent::<Vec<_>>()
.await?
// convert to Result<HashMap>
.into_iter()
.collect()
}
async fn collect_stack_data(docker_stack: docker::Stack) -> anyhow::Result<(String, StackStats)> {
@ -93,3 +97,4 @@ async fn collect_stack_data(docker_stack: docker::Stack) -> anyhow::Result<(Stri
Ok((docker_stack.name, stats))
}
*/

17
src/debug.rs Normal file
View File

@ -0,0 +1,17 @@
use std::fmt::Display;
use std::future::Future;
use tokio::sync::Mutex;
pub static LOG: Mutex<Vec<String>> = Mutex::const_new(Vec::new());
pub async fn log_on_error<T, E: Display>(prefix: &str, f: impl Future<Output = Result<T, E>>) {
if let Err(e) = f.await {
error(format!("{prefix}: {e}")).await;
}
}
pub async fn error(msg: String) {
let mut log = LOG.lock().await;
log.push(msg);
}

View File

@ -1,9 +1,10 @@
use serde::Deserialize;
use tokio::fs;
use tokio::process::Command;
use tokio::task;
//! Fetching container and stack info from Docker
#[derive(Deserialize)]
use crate::util::NextN;
use serde::Deserialize;
use tokio::process::Command;
#[derive(Clone, Deserialize)]
pub struct Stack {
#[serde(rename = "ConfigFiles")]
pub config_file: String,
@ -33,6 +34,12 @@ pub struct Container {
pub project: String,
}
impl Container {
pub fn is_running(&self) -> bool {
self.state.contains("running")
}
}
/// Run `docker compose ls` and parse the output
pub async fn list_stacks() -> anyhow::Result<Vec<Stack>> {
let output = Command::new("docker")
@ -56,6 +63,7 @@ pub async fn list_containers(stack: &Stack) -> anyhow::Result<Vec<Container>> {
Ok(serde_json::from_str(&stdout)?)
}
#[derive(PartialEq, Eq, Hash)]
pub struct Process {
pub uid: String,
pub pid: u32,
@ -78,8 +86,6 @@ pub async fn list_processes(stack: &Stack, container: &Container) -> anyhow::Res
let mut processes = Vec::new();
let mut proc_info_set = Vec::new();
for line in stdout.lines().skip(2) {
if line.trim().is_empty() {
continue;
@ -87,32 +93,35 @@ pub async fn list_processes(stack: &Stack, container: &Container) -> anyhow::Res
let mut words = line.split_whitespace();
let err = || anyhow::format_err!("invalid docker top output");
let uid = words.next().ok_or_else(err)?.to_string();
let pid = words.next().ok_or_else(err)?.parse()?;
let ppid = words.next().ok_or_else(err)?.parse()?;
let _c = words.next().ok_or_else(err)?;
let _stime = words.next().ok_or_else(err)?;
let _tty = words.next().ok_or_else(err)?;
let _time = words.next().ok_or_else(err)?;
let [uid, pid, ppid, _c, _stime, _tty, _time] = words
.next_n()
.ok_or_else(|| anyhow::format_err!("invalid docker top output"))?;
let cmd: String = words.collect();
proc_info_set.push(task::spawn(fs::read_to_string(format!(
"/proc/{pid}/status"
))));
processes.push(Process {
uid,
pid,
ppid,
uid: uid.to_string(),
pid: pid.parse()?,
ppid: ppid.parse()?,
cmd,
memory_usage: 0,
})
}
for (process, proc_info) in processes.iter_mut().zip(proc_info_set.into_iter()) {
let proc_info = proc_info.await??;
for (key, value) in proc_info.lines().flat_map(|line| line.split_once(':')) {
/*
let proc_statuses: Vec<_> = processes
.iter()
.map(|p| fs::read_to_string(format!("/proc/{}/status", p.pid)))
.run_concurrent()
.await?;
for (process, proc_status) in processes.iter_mut().zip(proc_statuses) {
let proc_status = match proc_status {
Ok(proc_status) => proc_status,
Err(_) => continue, // discard errors
};
for (key, value) in proc_status.lines().flat_map(|line| line.split_once(':')) {
let value = value.trim();
match key {
@ -123,6 +132,7 @@ pub async fn list_processes(stack: &Stack, container: &Container) -> anyhow::Res
}
}
}
*/
Ok(processes)
}

View File

@ -1,8 +1,11 @@
mod circle;
mod collector;
mod debug;
mod docker;
mod state;
mod process;
mod stack;
mod ui;
mod util;
use tokio::sync::mpsc;
use tokio::task;

97
src/process.rs Normal file
View File

@ -0,0 +1,97 @@
//! Process monitoring
use crate::debug::log_on_error;
use crate::util::NextN;
use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::sleep;
#[derive(Default, Debug)]
pub struct ProcInfo {
pub pid: u32,
pub memory_usage: usize,
pub cpu_percent: f64,
}
pub fn spawn_monitor(pid: u32) -> mpsc::Receiver<ProcInfo> {
let (tx, rx) = mpsc::channel(64);
task::spawn(log_on_error(
"process monitor failed",
monitor_proc(tx, pid),
));
rx
}
async fn monitor_proc(tx: mpsc::Sender<ProcInfo>, pid: u32) -> anyhow::Result<()> {
let mut buf = String::new();
let status_path = format!("/proc/{pid}/status");
let stat_path = format!("/proc/{pid}/stat");
let uptime_path = "/proc/uptime";
let mut last_uptime = 0.0;
let mut last_total_time = 0.0;
let hertz = 100.0; // TODO
loop {
buf.clear();
let mut status = File::open(&status_path).await?;
status.read_to_string(&mut buf).await?;
let mut proc_info = ProcInfo::default();
for (key, value) in buf.lines().flat_map(|line| line.split_once(':')) {
let value = value.trim();
match key {
"VmRSS" => {
proc_info.memory_usage = value.trim_end_matches(" kB").parse()?;
}
_ => {}
}
}
buf.clear();
let mut uptime = File::open(&uptime_path).await?;
uptime.read_to_string(&mut buf).await?;
let uptime: f64 = buf
.split_whitespace()
.next()
.ok_or_else(|| anyhow::format_err!("error parsing /proc/uptime"))?
.parse()?;
buf.clear();
let mut stat = File::open(&stat_path).await?;
stat.read_to_string(&mut buf).await?;
let [_pid, _tcomm, _state, _ppid, _pgrp, _sid, _tty_nr, _tty_pgrp, _flags, _min_flt, _cmin_flt, _maj_flt, _cmaj_flt, utime, stime, _cutime, cstime, _priority, _nice, _num_threads, _, start_time, _vsize, _rss, _rsslim, _start_code, _end_code, _esp, _eip, _pending, _blocked, _sigign, _sigcatch, _, _, _, _exit_signal, _task_cpu, _rt_priority, _policy, _blkio_ticks, _gtime, _cgtime, _start_data, _end_data, _start_brk, _arg_start, _arg_end, _env_start, _env_end, _exit_code] =
buf.split_whitespace()
.next_n()
.ok_or_else(|| anyhow::format_err!("error parsing /proc/<pid>/stat"))?;
let utime: f64 = utime.parse()?;
let stime: f64 = stime.parse()?;
let cstime: f64 = cstime.parse()?;
// time when process started, measured in clock ticks since boot
let start_time: f64 = start_time.parse()?;
let uptime_chunk = uptime - last_uptime;
let seconds = uptime_chunk - (start_time / hertz);
let total_time = utime + stime + cstime;
let time = total_time - last_total_time;
proc_info.cpu_percent = (time / hertz) / seconds;
last_total_time = total_time;
last_uptime = uptime;
tx.send(proc_info).await?;
sleep(Duration::from_secs(1)).await;
}
}

89
src/stack.rs Normal file
View File

@ -0,0 +1,89 @@
use crate::docker;
use crate::process::{self, ProcInfo};
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::sleep;
#[derive(Clone, Copy, Default, Debug, PartialEq)]
pub struct StackInfo {
pub containers: u32,
pub running_containers: u32,
pub stopped_containers: u32,
pub process_count: u32,
pub cpu_percent: f64,
pub memory_usage: usize,
pub memory_percent: f64,
}
pub fn spawn_monitor(stack: docker::Stack) -> mpsc::Receiver<StackInfo> {
let (tx, rx) = mpsc::channel(64);
task::spawn(monitor_proc(tx, stack));
rx
}
async fn monitor_proc(tx: mpsc::Sender<StackInfo>, stack: docker::Stack) -> anyhow::Result<()> {
let mut proc_monitors: HashMap<u32, mpsc::Receiver<ProcInfo>> = HashMap::new();
let mut processes = HashMap::<u32, ProcInfo>::new();
let mut last_stack_info = StackInfo::default();
loop {
let mut stack_info = StackInfo::default();
let containers = docker::list_containers(&stack).await?;
stack_info.containers = containers.len() as u32;
stack_info.running_containers = containers.iter().filter(|c| c.is_running()).count() as u32;
stack_info.stopped_containers = stack_info.containers - stack_info.running_containers;
processes.clear();
for container in &containers {
for process in docker::list_processes(&stack, container).await? {
processes.insert(process.pid, Default::default());
if !proc_monitors.contains_key(&process.pid) {
proc_monitors.insert(process.pid, process::spawn_monitor(process.pid));
}
}
}
proc_monitors.retain(|&pid, monitor| {
if !processes.contains_key(&pid) {
return false;
}
match monitor.try_recv() {
Ok(info) => {
processes.insert(pid, info);
true
}
Err(mpsc::error::TryRecvError::Empty) => true,
Err(_) => false,
}
});
let memory_usage = processes.values().map(|p| p.memory_usage).sum();
let host_memory = (1usize << 20) * 16; // 10 GiB // TODO
stack_info = StackInfo {
process_count: processes.len() as u32,
memory_usage,
memory_percent: memory_usage as f64 / host_memory as f64,
cpu_percent: processes
.values()
.map(|p| p.cpu_percent)
.sum::<f64>()
.max(0.0),
..stack_info
};
if stack_info != last_stack_info {
last_stack_info = stack_info;
tx.send(stack_info).await?;
}
sleep(Duration::from_secs(1)).await;
}
}

View File

@ -1,16 +0,0 @@
#[derive(Clone, Debug, PartialEq)]
pub struct StackStats {
pub containers: u32,
pub running_containers: u32,
pub stopped_containers: u32,
pub process_count: u32,
pub cpu_percent: f64,
pub memory_usage: usize,
pub memory_percent: f64,
}
#[derive(Debug)]
pub enum StateEvent {
Put { name: String, stats: StackStats },
Delete { name: String },
}

136
src/ui.rs
View File

@ -1,5 +1,7 @@
use crate::circle::Circle;
use crate::state::{StackStats, StateEvent};
use crate::collector::StateEvent;
use crate::debug;
use crate::stack::StackInfo;
use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
execute,
@ -23,7 +25,7 @@ const KEYS_DOWN: &[KeyCode] = &[KeyCode::Down, KeyCode::PageDown, KeyCode::Char(
const KEYS_UP: &[KeyCode] = &[KeyCode::Up, KeyCode::PageUp, KeyCode::Char('k')];
pub struct Ui {
stacks: BTreeMap<String, StackStats>,
stacks: BTreeMap<String, StackInfo>,
events: mpsc::Receiver<StateEvent>,
event_log: [String; 7],
event_count: usize,
@ -109,9 +111,9 @@ impl Ui {
log_msg = format!("{:.4}: DELETE {name}", self.event_count);
self.stacks.remove(&name);
}
StateEvent::Put { name, stats } => {
StateEvent::Put { name, info } => {
log_msg = format!("{:.4}: UPDATE {name}", self.event_count);
self.stacks.insert(name, stats);
self.stacks.insert(name, info);
}
}
@ -120,7 +122,7 @@ impl Ui {
self.event_count += 1;
}
fn draw<B: Backend>(&self, f: &mut Frame<'_, B>) {
fn draw<'a, B: Backend>(&self, f: &mut Frame<'a, B>) {
let size = f.size();
const BOX_HEIGHT: u16 = 9;
@ -148,20 +150,25 @@ impl Ui {
//self.draw_info(f, chunks[0]);
let mut stacks = self
let stack_views = self
.stacks
.iter()
.skip((self.scroll * fitted_boxes_x as usize).saturating_sub(1));
.map(|(name, info)| Box::new(StackView { name, info }) as Box<dyn BoxView<B>>);
let mut first = self.scroll == 0;
let meta_views = [
Box::new(DebugLog) as Box<dyn BoxView<B>>,
Box::new(self.make_info_view()),
];
let mut views = meta_views
.into_iter()
.chain(stack_views)
.skip(self.scroll * fitted_boxes_x as usize);
'outer: for &y_chunk in &y_chunks[..fitted_boxes_y as usize] {
for x_chunk in x_layout.split(y_chunk) {
if first {
first = false;
self.draw_info(f, x_chunk);
} else if let Some((name, info)) = stacks.next() {
self.draw_stack(f, x_chunk, name, info);
if let Some(view) = views.next() {
view.draw(f, x_chunk);
} else {
break 'outer;
}
@ -169,7 +176,70 @@ impl Ui {
}
}
fn draw_info<B: Backend>(&self, f: &mut Frame<B>, area: Rect) {
fn make_info_view(&self) -> GlobalInfo {
let unhealthy_count = self
.stacks
.values()
.filter(|stack| stack.stopped_containers > 0)
.count();
GlobalInfo {
stack_count: self.stacks.len(),
unhealthy_count,
event_log: &self.event_log,
}
}
}
fn fmt_kilobytes<'a>(kbs: usize) -> String {
let gibi = 1 << 20;
let mebi = 1 << 10;
if kbs > gibi * 2 {
format!("{} GBs", kbs / gibi)
} else if kbs > mebi * 10 {
format!("{} MBs", kbs / mebi)
} else {
format!("{} KBs", kbs)
}
}
pub trait BoxView<B: Backend> {
fn draw(&self, f: &mut Frame<'_, B>, area: Rect);
}
struct DebugLog;
impl<B: Backend> BoxView<B> for DebugLog {
fn draw(&self, f: &mut Frame<'_, B>, area: Rect) {
let block = Block::default().borders(Borders::ALL);
let inner = block.inner(area);
f.render_widget(block, area);
let log_style = Style::default()
.fg(Color::Red)
.add_modifier(Modifier::ITALIC);
let log = debug::LOG.blocking_lock().clone();
let debug_log = Paragraph::new(
log.into_iter()
.map(|msg| Span::styled(msg, log_style))
.map(Spans::from)
.collect::<Vec<_>>(),
);
f.render_widget(debug_log, inner);
}
}
struct GlobalInfo<'a> {
stack_count: usize,
unhealthy_count: usize,
event_log: &'a [String; 7],
}
impl<'a, B: Backend> BoxView<B> for GlobalInfo<'a> {
fn draw(&self, f: &mut Frame<'_, B>, area: Rect) {
let block = Block::default().borders(Borders::ALL);
let inner = block.inner(area);
f.render_widget(block, area);
@ -179,19 +249,13 @@ impl Ui {
.constraints([Constraint::Length(20), Constraint::Min(10)])
.split(inner);
let unhealthy_stacks = self
.stacks
.values()
.filter(|stack| stack.stopped_containers > 0)
.count();
let notices = Paragraph::new(vec![
Spans::from("Status"),
Spans::from(format!("stacks: {}", self.stacks.len())),
Spans::from(format!("stacks: {}", self.stack_count)),
Spans::from(""),
if unhealthy_stacks > 0 {
if self.unhealthy_count > 0 {
let style = Style::default().fg(Color::Red);
Span::styled(format!("unhealthy: {}", unhealthy_stacks), style).into()
Span::styled(format!("unhealthy: {}", self.unhealthy_count), style).into()
} else {
Spans::from("")
},
@ -201,6 +265,7 @@ impl Ui {
let log_style = Style::default()
.fg(Color::LightBlue)
.add_modifier(Modifier::ITALIC);
let event_log = Paragraph::new(
self.event_log
.clone()
@ -210,8 +275,18 @@ impl Ui {
);
f.render_widget(event_log, chunks[1]);
}
}
struct StackView<'a> {
name: &'a str,
info: &'a StackInfo,
}
impl<'a, B: Backend> BoxView<B> for StackView<'a> {
fn draw(&self, f: &mut Frame<'_, B>, area: Rect) {
let name = self.name;
let info = self.info;
fn draw_stack<B: Backend>(&self, f: &mut Frame<B>, area: Rect, name: &str, info: &StackStats) {
let title_style = Style::default().fg(Color::LightMagenta).bg(Color::Black);
let block = Block::default()
@ -282,16 +357,3 @@ impl Ui {
f.render_widget(gauge_canvas(info.memory_percent, "MEM"), chunks[2]);
}
}
fn fmt_kilobytes<'a>(kbs: usize) -> String {
let gibi = 1 << 20;
let mebi = 1 << 10;
if kbs > gibi * 2 {
format!("{} GBs", kbs / gibi)
} else if kbs > mebi * 10 {
format!("{} MBs", kbs / mebi)
} else {
format!("{} KBs", kbs)
}
}

73
src/util.rs Normal file
View File

@ -0,0 +1,73 @@
use async_trait::async_trait;
use std::future::Future;
use tokio::task;
use tokio::task::JoinError;
pub trait NextN {
type Item;
/// Puts the next N elements of an iterator into a fized-size array.
///
/// Returns None if the iterator did not have enough elements to fill the array.
fn next_n<const N: usize>(&mut self) -> Option<[Self::Item; N]>;
}
impl<I: Iterator> NextN for I {
type Item = I::Item;
fn next_n<const N: usize>(&mut self) -> Option<[<Self as NextN>::Item; N]> {
let mut array = [(); N].map(|_| None);
let mut i = 0;
for item in self {
if i == N {
break;
}
array[i] = Some(item);
i += 1;
}
if i < N {
None
} else {
Some(array.map(|option| option.unwrap()))
}
}
}
#[async_trait]
pub trait RunConcurrent<F, T>
where
Self: Sized,
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
async fn run_concurrent<C>(self) -> Result<C, JoinError>
where
C: Default + Extend<T> + Send;
}
#[async_trait]
impl<I, F, T> RunConcurrent<F, T> for I
where
I: Iterator<Item = F> + Send,
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
async fn run_concurrent<C>(self) -> Result<C, JoinError>
where
C: Default + Extend<T> + Send,
{
let tasks = self.map(|job| task::spawn(job)).collect::<Vec<_>>();
let mut results = C::default();
for task in tasks {
results.extend(Some(task.await?));
}
Ok(results)
}
}