From 30b5bb77e8e210ad349a5d7eef32b69a08e8c6a3 Mon Sep 17 00:00:00 2001 From: Joakim Hulthe Date: Thu, 12 May 2022 12:50:42 +0200 Subject: [PATCH] Implement cpu usage monitor --- Cargo.lock | 20 +++++++ Cargo.toml | 3 +- src/collector.rs | 89 ++++++++++++++++--------------- src/debug.rs | 17 ++++++ src/docker.rs | 60 ++++++++++++--------- src/main.rs | 5 +- src/process.rs | 97 +++++++++++++++++++++++++++++++++ src/stack.rs | 89 +++++++++++++++++++++++++++++++ src/state.rs | 16 ------ src/ui.rs | 136 ++++++++++++++++++++++++++++++++++------------- src/util.rs | 73 +++++++++++++++++++++++++ 11 files changed, 483 insertions(+), 122 deletions(-) create mode 100644 src/debug.rs create mode 100644 src/process.rs create mode 100644 src/stack.rs delete mode 100644 src/state.rs create mode 100644 src/util.rs diff --git a/Cargo.lock b/Cargo.lock index 4dbc9ff..94b4e14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,17 @@ version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" +[[package]] +name = "async-trait" +version = "0.1.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atty" version = "0.2.14" @@ -93,6 +104,7 @@ name = "composers" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "clap", "crossterm 0.23.2", "serde", @@ -219,6 +231,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + [[package]] name = "mio" version = "0.7.14" @@ -511,9 +529,11 @@ checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" dependencies = [ "bytes", "libc", + "memchr", "mio 0.8.2", "num_cpus", "once_cell", + "parking_lot 0.12.0", "pin-project-lite", "signal-hook-registry", "tokio-macros", diff --git a/Cargo.toml b/Cargo.toml index efe48dd..069bfa1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,8 @@ tui = "0.17.0" serde = { version = "1", features = ["derive"] } serde_json = "1" anyhow = "1" +async-trait = "0.1.53" [dependencies.tokio] version = "1.17.0" -features = ["rt-multi-thread", "fs", "macros", "process", "sync", "time"] +features = ["rt-multi-thread", "fs", "macros", "process", "sync", "time", "io-util", "parking_lot"] diff --git a/src/collector.rs b/src/collector.rs index dfa82a3..4d0abef 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -1,66 +1,70 @@ use crate::docker; -use crate::state::{StackStats, StateEvent}; +use crate::stack::{self, StackInfo}; use std::collections::HashMap; use tokio::sync::mpsc; -use tokio::task; use tokio::time::{sleep, Duration}; +#[derive(Debug)] +pub enum StateEvent { + Put { name: String, info: StackInfo }, + Delete { name: String }, +} + pub(crate) async fn start_collector(events: mpsc::Sender) -> anyhow::Result<()> { - let mut old_stacks: HashMap = HashMap::new(); + //let mut stacks: HashMap = HashMap::new(); + let mut stack_monitors: HashMap> = HashMap::new(); loop { - let new_stacks = collect_data().await?; + let stacks = docker::list_stacks().await?; - for (name, stats) in &new_stacks { - let mut send_put = false; - if let Some(old_stats) = old_stacks.get(name) { - if old_stats != stats { - send_put = true; + for stack in &stacks { + if let Some(monitor) = stack_monitors.get_mut(&stack.name) { + match monitor.try_recv() { + Ok(info) => { + events + .send(StateEvent::Put { + name: stack.name.clone(), + info, + }) + .await? + } + Err(mpsc::error::TryRecvError::Empty) => continue, + Err(_) => { + stack_monitors.remove(&stack.name); + } } } else { - send_put = true; - } - - if send_put { - events - .send(StateEvent::Put { - name: name.clone(), - stats: stats.clone(), - }) - .await?; + stack_monitors.insert(stack.name.clone(), stack::spawn_monitor(stack.clone())); } } - for name in old_stacks.keys() { - if !new_stacks.contains_key(name) { - events - .send(StateEvent::Delete { name: name.clone() }) - .await?; - } - } + let delete: Vec<_> = stack_monitors + .keys() + .filter(|&name| !stacks.iter().any(|stack| &stack.name == name)) + .cloned() + .collect(); - old_stacks = new_stacks; + for name in delete { + stack_monitors.remove(&name); + events.send(StateEvent::Delete { name }).await?; + } sleep(Duration::from_secs(1)).await; } } +/* pub(crate) async fn collect_data() -> anyhow::Result> { - let docker_stacks = docker::list_stacks().await?; - - let mut stack_jobs = Vec::new(); - for docker_stack in docker_stacks { - stack_jobs.push(task::spawn(collect_stack_data(docker_stack))); - } - - let mut out = HashMap::with_capacity(stack_jobs.len()); - - for stack_job in stack_jobs { - let (name, stats) = stack_job.await??; - out.insert(name, stats); - } - - Ok(out) + docker::list_stacks() + .await? + .into_iter() + // collect data for all stacks concurrently + .map(collect_stack_data) + .run_concurrent::>() + .await? + // convert to Result + .into_iter() + .collect() } async fn collect_stack_data(docker_stack: docker::Stack) -> anyhow::Result<(String, StackStats)> { @@ -93,3 +97,4 @@ async fn collect_stack_data(docker_stack: docker::Stack) -> anyhow::Result<(Stri Ok((docker_stack.name, stats)) } +*/ diff --git a/src/debug.rs b/src/debug.rs new file mode 100644 index 0000000..fa54832 --- /dev/null +++ b/src/debug.rs @@ -0,0 +1,17 @@ +use std::fmt::Display; +use std::future::Future; +use tokio::sync::Mutex; + +pub static LOG: Mutex> = Mutex::const_new(Vec::new()); + +pub async fn log_on_error(prefix: &str, f: impl Future>) { + if let Err(e) = f.await { + error(format!("{prefix}: {e}")).await; + } +} + +pub async fn error(msg: String) { + let mut log = LOG.lock().await; + + log.push(msg); +} diff --git a/src/docker.rs b/src/docker.rs index 8befba8..33ea92f 100644 --- a/src/docker.rs +++ b/src/docker.rs @@ -1,9 +1,10 @@ -use serde::Deserialize; -use tokio::fs; -use tokio::process::Command; -use tokio::task; +//! Fetching container and stack info from Docker -#[derive(Deserialize)] +use crate::util::NextN; +use serde::Deserialize; +use tokio::process::Command; + +#[derive(Clone, Deserialize)] pub struct Stack { #[serde(rename = "ConfigFiles")] pub config_file: String, @@ -33,6 +34,12 @@ pub struct Container { pub project: String, } +impl Container { + pub fn is_running(&self) -> bool { + self.state.contains("running") + } +} + /// Run `docker compose ls` and parse the output pub async fn list_stacks() -> anyhow::Result> { let output = Command::new("docker") @@ -56,6 +63,7 @@ pub async fn list_containers(stack: &Stack) -> anyhow::Result> { Ok(serde_json::from_str(&stdout)?) } +#[derive(PartialEq, Eq, Hash)] pub struct Process { pub uid: String, pub pid: u32, @@ -78,8 +86,6 @@ pub async fn list_processes(stack: &Stack, container: &Container) -> anyhow::Res let mut processes = Vec::new(); - let mut proc_info_set = Vec::new(); - for line in stdout.lines().skip(2) { if line.trim().is_empty() { continue; @@ -87,32 +93,35 @@ pub async fn list_processes(stack: &Stack, container: &Container) -> anyhow::Res let mut words = line.split_whitespace(); - let err = || anyhow::format_err!("invalid docker top output"); - let uid = words.next().ok_or_else(err)?.to_string(); - let pid = words.next().ok_or_else(err)?.parse()?; - let ppid = words.next().ok_or_else(err)?.parse()?; - let _c = words.next().ok_or_else(err)?; - let _stime = words.next().ok_or_else(err)?; - let _tty = words.next().ok_or_else(err)?; - let _time = words.next().ok_or_else(err)?; + let [uid, pid, ppid, _c, _stime, _tty, _time] = words + .next_n() + .ok_or_else(|| anyhow::format_err!("invalid docker top output"))?; + let cmd: String = words.collect(); - proc_info_set.push(task::spawn(fs::read_to_string(format!( - "/proc/{pid}/status" - )))); - processes.push(Process { - uid, - pid, - ppid, + uid: uid.to_string(), + pid: pid.parse()?, + ppid: ppid.parse()?, cmd, memory_usage: 0, }) } - for (process, proc_info) in processes.iter_mut().zip(proc_info_set.into_iter()) { - let proc_info = proc_info.await??; - for (key, value) in proc_info.lines().flat_map(|line| line.split_once(':')) { + /* + let proc_statuses: Vec<_> = processes + .iter() + .map(|p| fs::read_to_string(format!("/proc/{}/status", p.pid))) + .run_concurrent() + .await?; + + for (process, proc_status) in processes.iter_mut().zip(proc_statuses) { + let proc_status = match proc_status { + Ok(proc_status) => proc_status, + Err(_) => continue, // discard errors + }; + + for (key, value) in proc_status.lines().flat_map(|line| line.split_once(':')) { let value = value.trim(); match key { @@ -123,6 +132,7 @@ pub async fn list_processes(stack: &Stack, container: &Container) -> anyhow::Res } } } + */ Ok(processes) } diff --git a/src/main.rs b/src/main.rs index 560817e..2c5ef00 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,11 @@ mod circle; mod collector; +mod debug; mod docker; -mod state; +mod process; +mod stack; mod ui; +mod util; use tokio::sync::mpsc; use tokio::task; diff --git a/src/process.rs b/src/process.rs new file mode 100644 index 0000000..2d447e2 --- /dev/null +++ b/src/process.rs @@ -0,0 +1,97 @@ +//! Process monitoring +use crate::debug::log_on_error; +use crate::util::NextN; +use std::time::Duration; +use tokio::fs::File; +use tokio::io::AsyncReadExt; +use tokio::sync::mpsc; +use tokio::task; +use tokio::time::sleep; + +#[derive(Default, Debug)] +pub struct ProcInfo { + pub pid: u32, + pub memory_usage: usize, + pub cpu_percent: f64, +} + +pub fn spawn_monitor(pid: u32) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(64); + + task::spawn(log_on_error( + "process monitor failed", + monitor_proc(tx, pid), + )); + + rx +} + +async fn monitor_proc(tx: mpsc::Sender, pid: u32) -> anyhow::Result<()> { + let mut buf = String::new(); + let status_path = format!("/proc/{pid}/status"); + let stat_path = format!("/proc/{pid}/stat"); + let uptime_path = "/proc/uptime"; + + let mut last_uptime = 0.0; + let mut last_total_time = 0.0; + + let hertz = 100.0; // TODO + + loop { + buf.clear(); + let mut status = File::open(&status_path).await?; + status.read_to_string(&mut buf).await?; + + let mut proc_info = ProcInfo::default(); + + for (key, value) in buf.lines().flat_map(|line| line.split_once(':')) { + let value = value.trim(); + + match key { + "VmRSS" => { + proc_info.memory_usage = value.trim_end_matches(" kB").parse()?; + } + _ => {} + } + } + + buf.clear(); + let mut uptime = File::open(&uptime_path).await?; + uptime.read_to_string(&mut buf).await?; + let uptime: f64 = buf + .split_whitespace() + .next() + .ok_or_else(|| anyhow::format_err!("error parsing /proc/uptime"))? + .parse()?; + + buf.clear(); + let mut stat = File::open(&stat_path).await?; + stat.read_to_string(&mut buf).await?; + + let [_pid, _tcomm, _state, _ppid, _pgrp, _sid, _tty_nr, _tty_pgrp, _flags, _min_flt, _cmin_flt, _maj_flt, _cmaj_flt, utime, stime, _cutime, cstime, _priority, _nice, _num_threads, _, start_time, _vsize, _rss, _rsslim, _start_code, _end_code, _esp, _eip, _pending, _blocked, _sigign, _sigcatch, _, _, _, _exit_signal, _task_cpu, _rt_priority, _policy, _blkio_ticks, _gtime, _cgtime, _start_data, _end_data, _start_brk, _arg_start, _arg_end, _env_start, _env_end, _exit_code] = + buf.split_whitespace() + .next_n() + .ok_or_else(|| anyhow::format_err!("error parsing /proc//stat"))?; + + let utime: f64 = utime.parse()?; + let stime: f64 = stime.parse()?; + let cstime: f64 = cstime.parse()?; + + // time when process started, measured in clock ticks since boot + let start_time: f64 = start_time.parse()?; + + let uptime_chunk = uptime - last_uptime; + let seconds = uptime_chunk - (start_time / hertz); + + let total_time = utime + stime + cstime; + let time = total_time - last_total_time; + proc_info.cpu_percent = (time / hertz) / seconds; + + last_total_time = total_time; + last_uptime = uptime; + + tx.send(proc_info).await?; + + sleep(Duration::from_secs(1)).await; + } +} diff --git a/src/stack.rs b/src/stack.rs new file mode 100644 index 0000000..99af1cf --- /dev/null +++ b/src/stack.rs @@ -0,0 +1,89 @@ +use crate::docker; +use crate::process::{self, ProcInfo}; +use std::collections::HashMap; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::task; +use tokio::time::sleep; + +#[derive(Clone, Copy, Default, Debug, PartialEq)] +pub struct StackInfo { + pub containers: u32, + pub running_containers: u32, + pub stopped_containers: u32, + pub process_count: u32, + pub cpu_percent: f64, + pub memory_usage: usize, + pub memory_percent: f64, +} + +pub fn spawn_monitor(stack: docker::Stack) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(64); + + task::spawn(monitor_proc(tx, stack)); + + rx +} + +async fn monitor_proc(tx: mpsc::Sender, stack: docker::Stack) -> anyhow::Result<()> { + let mut proc_monitors: HashMap> = HashMap::new(); + let mut processes = HashMap::::new(); + let mut last_stack_info = StackInfo::default(); + + loop { + let mut stack_info = StackInfo::default(); + + let containers = docker::list_containers(&stack).await?; + stack_info.containers = containers.len() as u32; + stack_info.running_containers = containers.iter().filter(|c| c.is_running()).count() as u32; + stack_info.stopped_containers = stack_info.containers - stack_info.running_containers; + + processes.clear(); + for container in &containers { + for process in docker::list_processes(&stack, container).await? { + processes.insert(process.pid, Default::default()); + if !proc_monitors.contains_key(&process.pid) { + proc_monitors.insert(process.pid, process::spawn_monitor(process.pid)); + } + } + } + + proc_monitors.retain(|&pid, monitor| { + if !processes.contains_key(&pid) { + return false; + } + + match monitor.try_recv() { + Ok(info) => { + processes.insert(pid, info); + true + } + Err(mpsc::error::TryRecvError::Empty) => true, + Err(_) => false, + } + }); + + let memory_usage = processes.values().map(|p| p.memory_usage).sum(); + let host_memory = (1usize << 20) * 16; // 10 GiB // TODO + + stack_info = StackInfo { + process_count: processes.len() as u32, + memory_usage, + memory_percent: memory_usage as f64 / host_memory as f64, + cpu_percent: processes + .values() + .map(|p| p.cpu_percent) + .sum::() + .max(0.0), + ..stack_info + }; + + if stack_info != last_stack_info { + last_stack_info = stack_info; + + tx.send(stack_info).await?; + } + + sleep(Duration::from_secs(1)).await; + } +} diff --git a/src/state.rs b/src/state.rs deleted file mode 100644 index b45e641..0000000 --- a/src/state.rs +++ /dev/null @@ -1,16 +0,0 @@ -#[derive(Clone, Debug, PartialEq)] -pub struct StackStats { - pub containers: u32, - pub running_containers: u32, - pub stopped_containers: u32, - pub process_count: u32, - pub cpu_percent: f64, - pub memory_usage: usize, - pub memory_percent: f64, -} - -#[derive(Debug)] -pub enum StateEvent { - Put { name: String, stats: StackStats }, - Delete { name: String }, -} diff --git a/src/ui.rs b/src/ui.rs index df3b2b4..6b7ab86 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -1,5 +1,7 @@ use crate::circle::Circle; -use crate::state::{StackStats, StateEvent}; +use crate::collector::StateEvent; +use crate::debug; +use crate::stack::StackInfo; use crossterm::{ event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, execute, @@ -23,7 +25,7 @@ const KEYS_DOWN: &[KeyCode] = &[KeyCode::Down, KeyCode::PageDown, KeyCode::Char( const KEYS_UP: &[KeyCode] = &[KeyCode::Up, KeyCode::PageUp, KeyCode::Char('k')]; pub struct Ui { - stacks: BTreeMap, + stacks: BTreeMap, events: mpsc::Receiver, event_log: [String; 7], event_count: usize, @@ -109,9 +111,9 @@ impl Ui { log_msg = format!("{:.4}: DELETE {name}", self.event_count); self.stacks.remove(&name); } - StateEvent::Put { name, stats } => { + StateEvent::Put { name, info } => { log_msg = format!("{:.4}: UPDATE {name}", self.event_count); - self.stacks.insert(name, stats); + self.stacks.insert(name, info); } } @@ -120,7 +122,7 @@ impl Ui { self.event_count += 1; } - fn draw(&self, f: &mut Frame<'_, B>) { + fn draw<'a, B: Backend>(&self, f: &mut Frame<'a, B>) { let size = f.size(); const BOX_HEIGHT: u16 = 9; @@ -148,20 +150,25 @@ impl Ui { //self.draw_info(f, chunks[0]); - let mut stacks = self + let stack_views = self .stacks .iter() - .skip((self.scroll * fitted_boxes_x as usize).saturating_sub(1)); + .map(|(name, info)| Box::new(StackView { name, info }) as Box>); - let mut first = self.scroll == 0; + let meta_views = [ + Box::new(DebugLog) as Box>, + Box::new(self.make_info_view()), + ]; + + let mut views = meta_views + .into_iter() + .chain(stack_views) + .skip(self.scroll * fitted_boxes_x as usize); 'outer: for &y_chunk in &y_chunks[..fitted_boxes_y as usize] { for x_chunk in x_layout.split(y_chunk) { - if first { - first = false; - self.draw_info(f, x_chunk); - } else if let Some((name, info)) = stacks.next() { - self.draw_stack(f, x_chunk, name, info); + if let Some(view) = views.next() { + view.draw(f, x_chunk); } else { break 'outer; } @@ -169,7 +176,70 @@ impl Ui { } } - fn draw_info(&self, f: &mut Frame, area: Rect) { + fn make_info_view(&self) -> GlobalInfo { + let unhealthy_count = self + .stacks + .values() + .filter(|stack| stack.stopped_containers > 0) + .count(); + + GlobalInfo { + stack_count: self.stacks.len(), + unhealthy_count, + event_log: &self.event_log, + } + } +} + +fn fmt_kilobytes<'a>(kbs: usize) -> String { + let gibi = 1 << 20; + let mebi = 1 << 10; + + if kbs > gibi * 2 { + format!("{} GBs", kbs / gibi) + } else if kbs > mebi * 10 { + format!("{} MBs", kbs / mebi) + } else { + format!("{} KBs", kbs) + } +} + +pub trait BoxView { + fn draw(&self, f: &mut Frame<'_, B>, area: Rect); +} + +struct DebugLog; + +impl BoxView for DebugLog { + fn draw(&self, f: &mut Frame<'_, B>, area: Rect) { + let block = Block::default().borders(Borders::ALL); + let inner = block.inner(area); + f.render_widget(block, area); + + let log_style = Style::default() + .fg(Color::Red) + .add_modifier(Modifier::ITALIC); + + let log = debug::LOG.blocking_lock().clone(); + + let debug_log = Paragraph::new( + log.into_iter() + .map(|msg| Span::styled(msg, log_style)) + .map(Spans::from) + .collect::>(), + ); + f.render_widget(debug_log, inner); + } +} + +struct GlobalInfo<'a> { + stack_count: usize, + unhealthy_count: usize, + event_log: &'a [String; 7], +} + +impl<'a, B: Backend> BoxView for GlobalInfo<'a> { + fn draw(&self, f: &mut Frame<'_, B>, area: Rect) { let block = Block::default().borders(Borders::ALL); let inner = block.inner(area); f.render_widget(block, area); @@ -179,19 +249,13 @@ impl Ui { .constraints([Constraint::Length(20), Constraint::Min(10)]) .split(inner); - let unhealthy_stacks = self - .stacks - .values() - .filter(|stack| stack.stopped_containers > 0) - .count(); - let notices = Paragraph::new(vec![ Spans::from("Status"), - Spans::from(format!("stacks: {}", self.stacks.len())), + Spans::from(format!("stacks: {}", self.stack_count)), Spans::from(""), - if unhealthy_stacks > 0 { + if self.unhealthy_count > 0 { let style = Style::default().fg(Color::Red); - Span::styled(format!("unhealthy: {}", unhealthy_stacks), style).into() + Span::styled(format!("unhealthy: {}", self.unhealthy_count), style).into() } else { Spans::from("") }, @@ -201,6 +265,7 @@ impl Ui { let log_style = Style::default() .fg(Color::LightBlue) .add_modifier(Modifier::ITALIC); + let event_log = Paragraph::new( self.event_log .clone() @@ -210,8 +275,18 @@ impl Ui { ); f.render_widget(event_log, chunks[1]); } +} + +struct StackView<'a> { + name: &'a str, + info: &'a StackInfo, +} + +impl<'a, B: Backend> BoxView for StackView<'a> { + fn draw(&self, f: &mut Frame<'_, B>, area: Rect) { + let name = self.name; + let info = self.info; - fn draw_stack(&self, f: &mut Frame, area: Rect, name: &str, info: &StackStats) { let title_style = Style::default().fg(Color::LightMagenta).bg(Color::Black); let block = Block::default() @@ -282,16 +357,3 @@ impl Ui { f.render_widget(gauge_canvas(info.memory_percent, "MEM"), chunks[2]); } } - -fn fmt_kilobytes<'a>(kbs: usize) -> String { - let gibi = 1 << 20; - let mebi = 1 << 10; - - if kbs > gibi * 2 { - format!("{} GBs", kbs / gibi) - } else if kbs > mebi * 10 { - format!("{} MBs", kbs / mebi) - } else { - format!("{} KBs", kbs) - } -} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..083f8a7 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,73 @@ +use async_trait::async_trait; +use std::future::Future; +use tokio::task; +use tokio::task::JoinError; + +pub trait NextN { + type Item; + + /// Puts the next N elements of an iterator into a fized-size array. + /// + /// Returns None if the iterator did not have enough elements to fill the array. + fn next_n(&mut self) -> Option<[Self::Item; N]>; +} + +impl NextN for I { + type Item = I::Item; + + fn next_n(&mut self) -> Option<[::Item; N]> { + let mut array = [(); N].map(|_| None); + + let mut i = 0; + for item in self { + if i == N { + break; + } + + array[i] = Some(item); + + i += 1; + } + + if i < N { + None + } else { + Some(array.map(|option| option.unwrap())) + } + } +} + +#[async_trait] +pub trait RunConcurrent +where + Self: Sized, + F: Future + Send + 'static, + T: Send + 'static, +{ + async fn run_concurrent(self) -> Result + where + C: Default + Extend + Send; +} + +#[async_trait] +impl RunConcurrent for I +where + I: Iterator + Send, + F: Future + Send + 'static, + T: Send + 'static, +{ + async fn run_concurrent(self) -> Result + where + C: Default + Extend + Send, + { + let tasks = self.map(|job| task::spawn(job)).collect::>(); + + let mut results = C::default(); + + for task in tasks { + results.extend(Some(task.await?)); + } + + Ok(results) + } +}