chore: fmt and clippy review

This commit is contained in:
Jesús Pérez 2024-10-21 03:37:28 +01:00
parent b83f2fd501
commit 278cd7a2f7
No known key found for this signature in database

View File

@ -1,159 +1,180 @@
use chrono::{DateTime, Utc};
use regex::Regex;
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
error::Error, error::Error,
time::{SystemTime,Duration, UNIX_EPOCH,Instant},
io::{prelude::*, BufReader},
fs::File, fs::File,
io::{prelude::*, BufReader},
sync::{
mpsc,
mpsc::{Receiver, Sender},
},
thread::spawn, thread::spawn,
sync::{ mpsc, mpsc::{Sender, Receiver}}, time::{Duration, Instant, SystemTime, UNIX_EPOCH},
}; };
use chrono::{DateTime,Utc};
use regex::Regex;
type MetricMap = BTreeMap<String, BTreeMap<SystemTime, f64>>; type MetricMap = BTreeMap<String, BTreeMap<SystemTime, f64>>;
type MetricMapVec = BTreeMap<String, BTreeMap<SystemTime, Vec<f64>>>; type MetricMapVec = BTreeMap<String, BTreeMap<SystemTime, Vec<f64>>>;
#[derive(Debug)] #[derive(Debug)]
struct MetricLine { struct MetricLine {
timestamp: i64, timestamp: i64,
name: String, name: String,
value: f64 value: f64,
} }
fn show_invalid_line(index: usize, line: &str) { fn show_invalid_line(index: usize, line: &str) {
println!("invalid line: {} {}", index, line); println!("invalid line: {} {}", index, line);
} }
fn parse_line(line: &str, index: usize, re: &Regex) -> Option<MetricLine> { fn parse_line(line: &str, index: usize, re: &Regex) -> Option<MetricLine> {
if let Some(caps) = re.captures(&line) { if let Some(caps) = re.captures(line) {
let timestamp = match caps[1].parse::<i64>() { let timestamp = match caps[1].parse::<i64>() {
Ok(value) => value, Ok(value) => value,
Err(e) => { Err(e) => {
println!("Parse timestamp {} error {}",&caps[1],e); println!("Parse timestamp {} error {}", &caps[1], e);
show_invalid_line(index, line); show_invalid_line(index, line);
return None; return None;
} }
}; };
let metric_value = match caps[3].parse::<f64>() { let metric_value = match caps[3].parse::<f64>() {
Ok(value) => value, Ok(value) => value,
Err(e) => { Err(e) => {
println!("Parse metric_value {} error {}",&caps[3],e); println!("Parse metric_value {} error {}", &caps[3], e);
show_invalid_line(index, line); show_invalid_line(index, line);
return None; return None;
} }
}; };
Some( MetricLine { Some(MetricLine {
timestamp, timestamp,
name: caps[2].to_string(), name: caps[2].to_string(),
value: metric_value value: metric_value,
}) })
} else { } else {
show_invalid_line(index, &line); show_invalid_line(index, line);
None None
} }
} }
fn parse( fn parse(file: File) -> Result<MetricMap, Box<dyn Error>> {
file: File, let re = Regex::new(r"(\d+) (\w+) (\d+)")?;
) -> Result<MetricMap, Box<dyn Error>> {
let re = Regex::new(r"(\d+) (\w+) (\d+)")?;
let mut metrics: MetricMapVec = BTreeMap::new(); let mut metrics: MetricMapVec = BTreeMap::new();
let buf = BufReader::new(file); let buf = BufReader::new(file);
buf.lines().enumerate().into_iter().for_each(|(index, read_line)| buf.lines()
match read_line { .enumerate()
.for_each(|(index, read_line)| match read_line {
Ok(line) => { Ok(line) => {
if let Some(metric_line) = parse_line(&line, index, &re) { if let Some(metric_line) = parse_line(&line, index, &re) {
let minute = let minute = UNIX_EPOCH
UNIX_EPOCH + Duration::from_secs((metric_line.timestamp - (metric_line.timestamp % 60)) as u64); + Duration::from_secs(
if let Some(metric) = metrics.get_mut(&metric_line.name) { (metric_line.timestamp - (metric_line.timestamp % 60)) as u64,
);
if let Some(metric) = metrics.get_mut(&metric_line.name) {
if let Some(metric_data) = metric.get_mut(&minute) { if let Some(metric_data) = metric.get_mut(&minute) {
metric_data.push(metric_line.value); metric_data.push(metric_line.value);
} else { } else {
metric.insert(minute, vec![metric_line.value]); metric.insert(minute, vec![metric_line.value]);
} }
} else { } else {
let metric_time: BTreeMap <SystemTime, Vec<f64>> = [(minute, vec![metric_line.value])].into_iter().collect(); let metric_time: BTreeMap<SystemTime, Vec<f64>> =
metrics.entry(metric_line.name.to_string()).or_insert( metric_time); [(minute, vec![metric_line.value])].into_iter().collect();
metrics
.entry(metric_line.name.to_string())
.or_insert(metric_time);
} }
} }
},
Err(e) => {
eprintln!("Error reading line {}: {}", index, e);
},
}
);
let mut aggregated_metrics: MetricMap = BTreeMap::new();
metrics.into_iter().for_each(|(metric_name, time_val_list)| {
time_val_list.into_iter().for_each(|(time, values)| {
let average = values.iter().sum::<f64>() / values.len() as f64;
if let Some(metric) = aggregated_metrics.get_mut(&metric_name) {
if let Some(metric_data) = metric.get_mut(&time) {
*metric_data = average;
} else {
metric.insert(time, average);
}
} else {
let metric_time: BTreeMap <SystemTime,f64> = [(time, average)].into_iter().collect();
aggregated_metrics.entry(metric_name.to_string()).or_insert( metric_time);
} }
}) Err(e) => {
}); eprintln!("Error reading line {}: {}", index, e);
}
});
let mut aggregated_metrics: MetricMap = BTreeMap::new();
metrics
.into_iter()
.for_each(|(metric_name, time_val_list)| {
time_val_list.into_iter().for_each(|(time, values)| {
let average = values.iter().sum::<f64>() / values.len() as f64;
if let Some(metric) = aggregated_metrics.get_mut(&metric_name) {
if let Some(metric_data) = metric.get_mut(&time) {
*metric_data = average;
} else {
metric.insert(time, average);
}
} else {
let metric_time: BTreeMap<SystemTime, f64> =
[(time, average)].into_iter().collect();
aggregated_metrics
.entry(metric_name.to_string())
.or_insert(metric_time);
}
})
});
Ok(aggregated_metrics) Ok(aggregated_metrics)
} }
fn load_input(file_path: &str) -> Result<MetricMap, Box<dyn Error>> { fn load_input(file_path: &str) -> Result<MetricMap, Box<dyn Error>> {
let file = File::open(&file_path) let file = File::open(file_path)
.map_err(|err| format!("Error reading file: {} {}", &file_path, err))?; .map_err(|err| format!("Error reading file: {} {}", &file_path, err))?;
let metrics = parse(file) let metrics = parse(file).map_err(|err| format!("Unable to parse: {} {}", &file_path, err))?;
.map_err(|err| format!("Unable to parse: {} {}", &file_path, err))?;
Ok(metrics) Ok(metrics)
} }
fn show_metrics(metrics: BTreeMap<String, BTreeMap<SystemTime, f64>>, output_path: &str) -> Vec<String> { fn show_metrics(
metrics: BTreeMap<String, BTreeMap<SystemTime, f64>>,
output_path: &str,
) -> Vec<String> {
let mut output = Vec::new(); let mut output = Vec::new();
metrics.into_iter().for_each(|(metric_name, time_val)| metrics.into_iter().for_each(|(metric_name, time_val)| {
for (time, value) in time_val { for (time, value) in time_val {
let output_line = format!( let output_line = format!(
"{} {} {:?}", "{} {} {:?}",
DateTime::<Utc>::from(time).format("%Y-%m-%dT%H:%M:%SZ"), DateTime::<Utc>::from(time).format("%Y-%m-%dT%H:%M:%SZ"),
metric_name, metric_name,
value value
); );
match output_path { match output_path {
"vec" => output.push(output_line), "vec" => output.push(output_line),
"print" | _ => println!("{}", output_line), "print" => println!("{}", output_line),
} _ => println!("{}", output_line),
} }
); }
});
output output
} }
fn generate_metrics(inputs_list: Vec<String>) { fn generate_metrics(inputs_list: Vec<String>) {
let n_items = inputs_list.len(); let n_items = inputs_list.len();
let mut input_threads = Vec::with_capacity(n_items); let mut input_threads = Vec::with_capacity(n_items);
let (tx, rx): (Sender<(String,MetricMap)>, Receiver<(String,MetricMap)>) = mpsc::channel(); type KeyMetricMap = (String, MetricMap);
for input in inputs_list.clone() { let (tx, rx): (Sender<KeyMetricMap>, Receiver<KeyMetricMap>) = mpsc::channel();
for input in inputs_list.clone() {
let thread_tx = tx.clone(); let thread_tx = tx.clone();
input_threads.push(spawn(move || { input_threads.push(spawn(move || {
let start = Instant::now(); let start = Instant::now();
match load_input(&input) { match load_input(&input) {
Ok(metrics) => { Ok(metrics) => {
let _ = thread_tx.send((input.clone(), metrics)).unwrap_or_default(); thread_tx.send((input.clone(), metrics)).unwrap_or_default();
}, }
Err(err) => { Err(err) => {
eprint!("Error: {}", err); eprint!("Error: {}", err);
let _ = thread_tx.send((input.clone(), BTreeMap::new())).unwrap_or_default(); thread_tx
.send((input.clone(), BTreeMap::new()))
.unwrap_or_default();
} }
} }
println!("\nProcessing {} took: {:?} ms", &input, start.elapsed().as_millis()) println!(
"\nProcessing {} took: {:?} ms",
&input,
start.elapsed().as_millis()
)
})); }));
} }
let mut inputs_metrics= Vec::with_capacity(n_items); let mut inputs_metrics = Vec::with_capacity(n_items);
for _ in 0..input_threads.len() { for _ in 0..input_threads.len() {
match rx.recv() { match rx.recv() {
Ok(result) => inputs_metrics.push(result), Ok(result) => inputs_metrics.push(result),
Err(e) => eprint!("Error: {}", e), Err(e) => eprint!("Error: {}", e),
} }
} }
for thread in input_threads { for thread in input_threads {
let _ = thread.join(); let _ = thread.join();
} }
inputs_metrics.into_iter().for_each(|data_metrics|{ inputs_metrics.into_iter().for_each(|data_metrics| {
let (name, metrics) = data_metrics; let (name, metrics) = data_metrics;
println!("\n{}: ---------------\n", name); println!("\n{}: ---------------\n", name);
show_metrics(metrics, ""); show_metrics(metrics, "");
@ -164,22 +185,25 @@ fn main() {
let inputs_list = vec![String::from("input.txt"), String::from("input_2.txt")]; let inputs_list = vec![String::from("input.txt"), String::from("input_2.txt")];
generate_metrics(inputs_list); generate_metrics(inputs_list);
println!("\nALL Processing took: {:?} ms", main_start.elapsed().as_millis()) println!(
"\nALL Processing took: {:?} ms",
main_start.elapsed().as_millis()
)
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
#[test] #[test]
fn test_load_input() -> Result<(), String>{ fn test_load_input() -> Result<(), String> {
let default_input = String::from("input.txt"); let default_input = String::from("input.txt");
match load_input(&default_input) { match load_input(&default_input) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => Err(format!("Error: {}",e).into()), Err(e) => Err(format!("Error: {}", e).into()),
} }
} }
#[test] #[test]
fn test_invalid_line_value() -> Result<(), String>{ fn test_invalid_line_value() -> Result<(), String> {
let contents = String::from("1650973075 cpu A47\n"); let contents = String::from("1650973075 cpu A47\n");
let re = regex::Regex::new(r"(\d+) (\w+) (\d+)") let re = regex::Regex::new(r"(\d+) (\w+) (\d+)")
.map_err(|err| format!("Error regex: {}", err))?; .map_err(|err| format!("Error regex: {}", err))?;
@ -189,7 +213,7 @@ mod tests {
} }
} }
#[test] #[test]
fn test_invalid_line_time() -> Result<(), String>{ fn test_invalid_line_time() -> Result<(), String> {
let contents = String::from("1650973075A cpu 47\n"); let contents = String::from("1650973075A cpu 47\n");
let re = regex::Regex::new(r"(\d+) (\w+) (\d+)") let re = regex::Regex::new(r"(\d+) (\w+) (\d+)")
.map_err(|err| format!("Error regex: {}", err))?; .map_err(|err| format!("Error regex: {}", err))?;
@ -206,10 +230,14 @@ mod tests {
let data_metrics = show_metrics(metrics, "vec"); let data_metrics = show_metrics(metrics, "vec");
let expected_output = String::from("output_expected.txt"); let expected_output = String::from("output_expected.txt");
let file = File::open(expected_output.clone()).expect(format!("no such file: {}", expected_output).as_str()); let file = File::open(expected_output.clone())
.expect(format!("no such file: {}", expected_output).as_str());
let buf = BufReader::new(file); let buf = BufReader::new(file);
let lines: Vec<String> = buf.lines().map(|l| l.expect("Could not parse line")).collect(); let lines: Vec<String> = buf
.lines()
.map(|l| l.expect("Could not parse line"))
.collect();
assert_eq!(lines.join("\n"),data_metrics.join("\n")); assert_eq!(lines.join("\n"), data_metrics.join("\n"));
} }
} }