chore: fmt, bound generic for generate_metrics, adjust types to generic, remove some mut and fix tests

This commit is contained in:
Jesús Pérez 2024-10-22 12:46:22 +01:00
parent c29aed3316
commit de89086e90
No known key found for this signature in database
8 changed files with 104 additions and 90 deletions

View File

@ -2,12 +2,16 @@
//! - Abstraction via **traits** //! - Abstraction via **traits**
//! - Generic process / tasks / steps required for metrics traitment. //! - Generic process / tasks / steps required for metrics traitment.
use std::{error::Error, fs::File};
use regex::Regex; use regex::Regex;
use std::{error::Error, fs::File};
pub trait MetricParser { pub trait MetricParser
fn load_input(&mut self) -> Result<bool, Box<dyn Error>>; where
fn parse(&mut self, file: File, reg_exp: Regex); Self: Sized,
{
fn input(&self) -> String;
fn load_input(&self) -> Result<Self, Box<dyn Error>>;
fn parse(&self, file: File, reg_exp: Regex) -> Self;
fn collect_aggregates(&mut self); fn collect_aggregates(&mut self);
fn show_metrics(&mut self) -> Result<Vec<String>, std::io::Error>; fn show_metrics(&self) -> Result<Vec<String>, std::io::Error>;
} }

View File

@ -70,47 +70,40 @@ mod tests;
/// Main threads control flow for each `target_list` item <br> /// Main threads control flow for each `target_list` item <br>
/// All process are collected and finally <br> /// All process are collected and finally <br>
/// [`MetricsConsumerTarget`] to **show_metrics** is called to get **print** or **write** results <br> /// [`MetricsConsumerTarget`] as **T** to **show_metrics** is called to get **print** or **write** results <br>
/// **be_quiet** attribute is just to avoid (true) all messages around processing and parsing operations <br> /// **be_quiet** attribute is just to avoid (true) all messages around processing and parsing operations <br>
/// ## For paralellism /// ## For paralellism
/// This can be done with [Tokio](https://tokio.rs/) as alternative to [std::thread](https://doc.rust-lang.org/std/thread/) <br> /// This can be done with [Tokio](https://tokio.rs/) as alternative to [std::thread](https://doc.rust-lang.org/std/thread/) <br>
/// It will require load other **crates** and feature customizations<br> /// It will require load other **crates** and feature customizations<br>
/// Another alternative could be [Coroutines](https://doc.rust-lang.org/std/ops/trait.Coroutine.html) <br> /// Another alternative could be [Coroutines](https://doc.rust-lang.org/std/ops/trait.Coroutine.html) <br>
/// As experimental features is a **nightly-only** (October 2024) /// As experimental features is a **nightly-only** (October 2024)
fn generate_metrics(targets_list: Vec<MetricsConsumerTarget>, be_quiet: bool) { fn generate_metrics<T: MetricParser + Sync + Send + 'static>(targets_list: Vec<T>, be_quiet: bool) {
let n_items = targets_list.len(); let n_items = targets_list.len();
let mut input_threads = Vec::with_capacity(n_items); let mut input_threads = Vec::with_capacity(n_items);
let (tx, rx): ( let (tx, rx): (Sender<Option<T>>, Receiver<Option<T>>) = mpsc::channel();
Sender<MetricsConsumerTarget>, targets_list.into_iter().for_each(|metrics_item| {
Receiver<MetricsConsumerTarget>,
) = mpsc::channel();
for mut metrics_consumer_data in targets_list.clone() {
let thread_tx = tx.clone(); let thread_tx = tx.clone();
input_threads.push(spawn(move || { input_threads.push(spawn(move || {
let start = Instant::now(); let start = Instant::now();
match metrics_consumer_data.load_input() { match metrics_item.load_input() {
Ok(_) => { Ok(mut result) => {
metrics_consumer_data.collect_aggregates(); result.collect_aggregates();
thread_tx thread_tx.send(Some(result)).unwrap_or_default();
.send(metrics_consumer_data.clone())
.unwrap_or_default();
} }
Err(err) => { Err(err) => {
eprint!("Error: {}", err); eprint!("Error: {}", err);
thread_tx thread_tx.send(None).unwrap_or_default();
.send(metrics_consumer_data.clone())
.unwrap_or_default();
} }
} }
if !be_quiet { if !be_quiet {
println!( println!(
"Processing {} took: {:?} ms", "Processing {} took: {:?} ms",
&metrics_consumer_data.input, &metrics_item.input(),
start.elapsed().as_millis() start.elapsed().as_millis()
) )
} }
})); }));
} });
let mut inputs_metrics = Vec::with_capacity(n_items); let mut inputs_metrics = Vec::with_capacity(n_items);
for _ in 0..input_threads.len() { for _ in 0..input_threads.len() {
match rx.recv() { match rx.recv() {
@ -121,11 +114,13 @@ fn generate_metrics(targets_list: Vec<MetricsConsumerTarget>, be_quiet: bool) {
for thread in input_threads { for thread in input_threads {
let _ = thread.join(); let _ = thread.join();
} }
inputs_metrics.iter_mut().for_each(|metrics_consumer_data| { inputs_metrics.iter().for_each(|metrics_item| {
if !be_quiet { if let Some(metrics_data) = metrics_item {
println!("\n{}: ---------------\n", &metrics_consumer_data.input); if !be_quiet {
println!("\n{}: ---------------\n", &metrics_data.input());
}
let _ = metrics_data.show_metrics();
} }
let _ = metrics_consumer_data.show_metrics();
}); });
} }
/// Keep it short: /// Keep it short:
@ -152,7 +147,7 @@ fn main() {
if !config.be_quiet { if !config.be_quiet {
println!("Loaded config from: {}", &args_settings.config_path); println!("Loaded config from: {}", &args_settings.config_path);
} }
let targets_list: Vec<MetricsConsumerTarget> = config let targets_list = config
.targets .targets
.iter() .iter()
.map(|item| MetricsConsumerTarget { .map(|item| MetricsConsumerTarget {

View File

@ -31,8 +31,9 @@ pub(crate) struct MetricsConsumerTarget {
/// Implement generic metrics operations / tasks for [`MetricsConsumerData`] /// Implement generic metrics operations / tasks for [`MetricsConsumerData`]
/// ///
impl MetricParser for MetricsConsumerTarget { impl MetricParser for MetricsConsumerTarget {
fn parse(&mut self, file: File, reg_exp: Regex) { fn parse(&self, file: File, reg_exp: Regex) -> Self {
let buf = BufReader::new(file); let buf = BufReader::new(file);
let mut consumer_target = self.clone();
buf.lines() buf.lines()
.enumerate() .enumerate()
.for_each(|(index, read_line)| match read_line { .for_each(|(index, read_line)| match read_line {
@ -42,7 +43,7 @@ impl MetricParser for MetricsConsumerTarget {
let minute = let minute =
UNIX_EPOCH + Duration::from_secs((timestamp - (timestamp % 60)) as u64); UNIX_EPOCH + Duration::from_secs((timestamp - (timestamp % 60)) as u64);
let mut not_found = true; let mut not_found = true;
for metric in self.metrics.iter_mut() { for metric in consumer_target.metrics.iter_mut() {
if metric.name() == name.to_lowercase() { if metric.name() == name.to_lowercase() {
if let Some(metric_data) = metric.time_data() { if let Some(metric_data) = metric.time_data() {
if metric_data.time == minute { if metric_data.time == minute {
@ -54,11 +55,13 @@ impl MetricParser for MetricsConsumerTarget {
} }
} }
if not_found { if not_found {
self.metrics.push(MetricsConsumerData::from_values( consumer_target
&name, .metrics
minute, .push(MetricsConsumerData::from_values(
Vec::from([value]), &name,
)); minute,
Vec::from([value]),
));
} }
} }
} }
@ -66,16 +69,17 @@ impl MetricParser for MetricsConsumerTarget {
eprintln!("Error reading line {}: {}", index, e); eprintln!("Error reading line {}: {}", index, e);
} }
}); });
consumer_target
} }
fn load_input(&mut self) -> Result<bool, Box<dyn Error>> { fn load_input(&self) -> Result<Self, Box<dyn Error>> {
let file = File::open(&self.input) let file = File::open(&self.input)
.map_err(|err| format!("Error reading file: {} {}", &self.input, err))?; .map_err(|err| format!("Error reading file: {} {}", &self.input, err))?;
if self.reg_exp.is_empty() { if self.reg_exp.is_empty() {
return Err(String::from("Error invalid reg expression").into()); return Err(String::from("Error invalid reg expression").into());
} }
let reg_exp = Regex::new(&self.reg_exp)?; let reg_exp = Regex::new(&self.reg_exp)?;
self.parse(file, reg_exp);
Ok(true) Ok(self.parse(file, reg_exp))
} }
fn collect_aggregates(&mut self) { fn collect_aggregates(&mut self) {
self.metrics.iter().for_each(|metric_data| { self.metrics.iter().for_each(|metric_data| {
@ -105,10 +109,11 @@ impl MetricParser for MetricsConsumerTarget {
} }
}) })
} }
fn show_metrics(&mut self) -> Result<Vec<String>, std::io::Error> { fn show_metrics(&self) -> Result<Vec<String>, std::io::Error> {
let mut output = Vec::new(); let mut output = Vec::new();
self.aggregates.sort(); let mut aggregates = self.aggregates.clone();
self.aggregates.iter().for_each(|metric_data| { aggregates.sort();
aggregates.iter().for_each(|metric_data| {
if metric_data.time_data().is_some() { if metric_data.time_data().is_some() {
let output_line = format!("{}", metric_data); let output_line = format!("{}", metric_data);
match self.output.as_str() { match self.output.as_str() {
@ -130,12 +135,22 @@ impl MetricParser for MetricsConsumerTarget {
let file = File::options().append(true).open(&self.output)?; let file = File::options().append(true).open(&self.output)?;
let mut writer = BufWriter::new(file); let mut writer = BufWriter::new(file);
writer.write_all(output.join("\n").as_bytes())?; writer.write_all(output.join("\n").as_bytes())?;
let text_overwrite = if self.out_overwrite { String::from ("overwriten")} else { String::from("") }; let text_overwrite = if self.out_overwrite {
println!("Metrics for '{}' are saved in '{}' {}", &self.input, &self.output, &text_overwrite); String::from("overwriten")
} else {
String::from("")
};
println!(
"Metrics for '{}' are saved in '{}' {}",
&self.input, &self.output, &text_overwrite
);
} }
}; };
Ok(output.to_owned()) Ok(output.to_owned())
} }
fn input(&self) -> String {
self.input.to_string()
}
} }
/// Specific implementations like **parse_line** /// Specific implementations like **parse_line**
impl MetricsConsumerTarget { impl MetricsConsumerTarget {

View File

@ -5,7 +5,7 @@
use super::*; use super::*;
#[test] #[test]
fn test_load_input() -> Result<(), String> { fn test_load_input() -> Result<(), String> {
let mut metrics_target = MetricsConsumerTarget { let metrics_target = MetricsConsumerTarget {
input: String::from(DEFAULT_INPUT_PATH), input: String::from(DEFAULT_INPUT_PATH),
reg_exp: String::from(DEFAULT_REG_EXP), reg_exp: String::from(DEFAULT_REG_EXP),
..Default::default() ..Default::default()
@ -51,15 +51,15 @@ fn test_expected_metrics() {
fs::File, fs::File,
io::{prelude::*, BufReader}, io::{prelude::*, BufReader},
}; };
let mut metrics_target = MetricsConsumerTarget { let metrics_target = MetricsConsumerTarget {
input: String::from(DEFAULT_INPUT_PATH), input: String::from(DEFAULT_INPUT_PATH),
output: String::from("vec"), output: String::from("vec"),
reg_exp: String::from(DEFAULT_REG_EXP), reg_exp: String::from(DEFAULT_REG_EXP),
..Default::default() ..Default::default()
}; };
metrics_target.load_input().unwrap_or_default(); let mut result = metrics_target.load_input().unwrap_or_default();
metrics_target.collect_aggregates(); result.collect_aggregates();
let data_metrics = metrics_target.show_metrics().unwrap_or_default(); let data_metrics = result.show_metrics().unwrap_or_default();
let expected_output = String::from("output_expected.txt"); let expected_output = String::from("output_expected.txt");
let file = File::open(expected_output.clone()) let file = File::open(expected_output.clone())