diff --git a/src/main.rs b/src/main.rs index 220a631..52e67c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,159 +1,180 @@ +use chrono::{DateTime, Utc}; +use regex::Regex; use std::{ collections::BTreeMap, error::Error, - time::{SystemTime,Duration, UNIX_EPOCH,Instant}, - io::{prelude::*, BufReader}, fs::File, + io::{prelude::*, BufReader}, + sync::{ + mpsc, + mpsc::{Receiver, Sender}, + }, thread::spawn, - sync::{ mpsc, mpsc::{Sender, Receiver}}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; -use chrono::{DateTime,Utc}; -use regex::Regex; -type MetricMap = BTreeMap>; +type MetricMap = BTreeMap>; type MetricMapVec = BTreeMap>>; #[derive(Debug)] -struct MetricLine { +struct MetricLine { timestamp: i64, name: String, - value: f64 + value: f64, } fn show_invalid_line(index: usize, line: &str) { println!("invalid line: {} {}", index, line); } fn parse_line(line: &str, index: usize, re: &Regex) -> Option { - if let Some(caps) = re.captures(&line) { - let timestamp = match caps[1].parse::() { - Ok(value) => value, - Err(e) => { - println!("Parse timestamp {} error {}",&caps[1],e); + if let Some(caps) = re.captures(line) { + let timestamp = match caps[1].parse::() { + Ok(value) => value, + Err(e) => { + println!("Parse timestamp {} error {}", &caps[1], e); show_invalid_line(index, line); - return None; + return None; } }; let metric_value = match caps[3].parse::() { Ok(value) => value, - Err(e) => { - println!("Parse metric_value {} error {}",&caps[3],e); + Err(e) => { + println!("Parse metric_value {} error {}", &caps[3], e); show_invalid_line(index, line); - return None; + return None; } }; - Some( MetricLine { + Some(MetricLine { timestamp, name: caps[2].to_string(), - value: metric_value + value: metric_value, }) } else { - show_invalid_line(index, &line); + show_invalid_line(index, line); None } } -fn parse( - file: File, -) -> Result> { - let re = Regex::new(r"(\d+) (\w+) (\d+)")?; +fn parse(file: File) -> Result> { + let re = Regex::new(r"(\d+) (\w+) (\d+)")?; let mut metrics: MetricMapVec = BTreeMap::new(); let buf = BufReader::new(file); - buf.lines().enumerate().into_iter().for_each(|(index, read_line)| - match read_line { + buf.lines() + .enumerate() + .for_each(|(index, read_line)| match read_line { Ok(line) => { - if let Some(metric_line) = parse_line(&line, index, &re) { - let minute = - UNIX_EPOCH + Duration::from_secs((metric_line.timestamp - (metric_line.timestamp % 60)) as u64); - if let Some(metric) = metrics.get_mut(&metric_line.name) { + if let Some(metric_line) = parse_line(&line, index, &re) { + let minute = UNIX_EPOCH + + Duration::from_secs( + (metric_line.timestamp - (metric_line.timestamp % 60)) as u64, + ); + if let Some(metric) = metrics.get_mut(&metric_line.name) { if let Some(metric_data) = metric.get_mut(&minute) { - metric_data.push(metric_line.value); + metric_data.push(metric_line.value); } else { - metric.insert(minute, vec![metric_line.value]); + metric.insert(minute, vec![metric_line.value]); } } else { - let metric_time: BTreeMap > = [(minute, vec![metric_line.value])].into_iter().collect(); - metrics.entry(metric_line.name.to_string()).or_insert( metric_time); + let metric_time: BTreeMap> = + [(minute, vec![metric_line.value])].into_iter().collect(); + metrics + .entry(metric_line.name.to_string()) + .or_insert(metric_time); } } - }, - Err(e) => { - eprintln!("Error reading line {}: {}", index, e); - }, - } - ); - let mut aggregated_metrics: MetricMap = BTreeMap::new(); - metrics.into_iter().for_each(|(metric_name, time_val_list)| { - time_val_list.into_iter().for_each(|(time, values)| { - let average = values.iter().sum::() / values.len() as f64; - if let Some(metric) = aggregated_metrics.get_mut(&metric_name) { - if let Some(metric_data) = metric.get_mut(&time) { - *metric_data = average; - } else { - metric.insert(time, average); - } - } else { - let metric_time: BTreeMap = [(time, average)].into_iter().collect(); - aggregated_metrics.entry(metric_name.to_string()).or_insert( metric_time); } - }) - }); + Err(e) => { + eprintln!("Error reading line {}: {}", index, e); + } + }); + let mut aggregated_metrics: MetricMap = BTreeMap::new(); + metrics + .into_iter() + .for_each(|(metric_name, time_val_list)| { + time_val_list.into_iter().for_each(|(time, values)| { + let average = values.iter().sum::() / values.len() as f64; + if let Some(metric) = aggregated_metrics.get_mut(&metric_name) { + if let Some(metric_data) = metric.get_mut(&time) { + *metric_data = average; + } else { + metric.insert(time, average); + } + } else { + let metric_time: BTreeMap = + [(time, average)].into_iter().collect(); + aggregated_metrics + .entry(metric_name.to_string()) + .or_insert(metric_time); + } + }) + }); Ok(aggregated_metrics) } fn load_input(file_path: &str) -> Result> { - let file = File::open(&file_path) + let file = File::open(file_path) .map_err(|err| format!("Error reading file: {} {}", &file_path, err))?; - let metrics = parse(file) - .map_err(|err| format!("Unable to parse: {} {}", &file_path, err))?; + let metrics = parse(file).map_err(|err| format!("Unable to parse: {} {}", &file_path, err))?; Ok(metrics) } -fn show_metrics(metrics: BTreeMap>, output_path: &str) -> Vec { +fn show_metrics( + metrics: BTreeMap>, + output_path: &str, +) -> Vec { let mut output = Vec::new(); - metrics.into_iter().for_each(|(metric_name, time_val)| - for (time, value) in time_val { - let output_line = format!( - "{} {} {:?}", - DateTime::::from(time).format("%Y-%m-%dT%H:%M:%SZ"), - metric_name, - value - ); - match output_path { - "vec" => output.push(output_line), - "print" | _ => println!("{}", output_line), - } + metrics.into_iter().for_each(|(metric_name, time_val)| { + for (time, value) in time_val { + let output_line = format!( + "{} {} {:?}", + DateTime::::from(time).format("%Y-%m-%dT%H:%M:%SZ"), + metric_name, + value + ); + match output_path { + "vec" => output.push(output_line), + "print" => println!("{}", output_line), + _ => println!("{}", output_line), } - ); + } + }); output } fn generate_metrics(inputs_list: Vec) { let n_items = inputs_list.len(); let mut input_threads = Vec::with_capacity(n_items); - let (tx, rx): (Sender<(String,MetricMap)>, Receiver<(String,MetricMap)>) = mpsc::channel(); - for input in inputs_list.clone() { + type KeyMetricMap = (String, MetricMap); + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + for input in inputs_list.clone() { let thread_tx = tx.clone(); input_threads.push(spawn(move || { let start = Instant::now(); match load_input(&input) { Ok(metrics) => { - let _ = thread_tx.send((input.clone(), metrics)).unwrap_or_default(); - }, + thread_tx.send((input.clone(), metrics)).unwrap_or_default(); + } Err(err) => { eprint!("Error: {}", err); - let _ = thread_tx.send((input.clone(), BTreeMap::new())).unwrap_or_default(); + thread_tx + .send((input.clone(), BTreeMap::new())) + .unwrap_or_default(); } } - println!("\nProcessing {} took: {:?} ms", &input, start.elapsed().as_millis()) + println!( + "\nProcessing {} took: {:?} ms", + &input, + start.elapsed().as_millis() + ) })); } - let mut inputs_metrics= Vec::with_capacity(n_items); + let mut inputs_metrics = Vec::with_capacity(n_items); for _ in 0..input_threads.len() { match rx.recv() { Ok(result) => inputs_metrics.push(result), Err(e) => eprint!("Error: {}", e), } - } + } for thread in input_threads { let _ = thread.join(); } - inputs_metrics.into_iter().for_each(|data_metrics|{ + inputs_metrics.into_iter().for_each(|data_metrics| { let (name, metrics) = data_metrics; println!("\n{}: ---------------\n", name); show_metrics(metrics, ""); @@ -164,22 +185,25 @@ fn main() { let inputs_list = vec![String::from("input.txt"), String::from("input_2.txt")]; generate_metrics(inputs_list); - println!("\nALL Processing took: {:?} ms", main_start.elapsed().as_millis()) + println!( + "\nALL Processing took: {:?} ms", + main_start.elapsed().as_millis() + ) } #[cfg(test)] mod tests { use super::*; #[test] - fn test_load_input() -> Result<(), String>{ + fn test_load_input() -> Result<(), String> { let default_input = String::from("input.txt"); match load_input(&default_input) { Ok(_) => Ok(()), - Err(e) => Err(format!("Error: {}",e).into()), + Err(e) => Err(format!("Error: {}", e).into()), } } #[test] - fn test_invalid_line_value() -> Result<(), String>{ + fn test_invalid_line_value() -> Result<(), String> { let contents = String::from("1650973075 cpu A47\n"); let re = regex::Regex::new(r"(\d+) (\w+) (\d+)") .map_err(|err| format!("Error regex: {}", err))?; @@ -189,7 +213,7 @@ mod tests { } } #[test] - fn test_invalid_line_time() -> Result<(), String>{ + fn test_invalid_line_time() -> Result<(), String> { let contents = String::from("1650973075A cpu 47\n"); let re = regex::Regex::new(r"(\d+) (\w+) (\d+)") .map_err(|err| format!("Error regex: {}", err))?; @@ -206,10 +230,14 @@ mod tests { let data_metrics = show_metrics(metrics, "vec"); let expected_output = String::from("output_expected.txt"); - let file = File::open(expected_output.clone()).expect(format!("no such file: {}", expected_output).as_str()); + let file = File::open(expected_output.clone()) + .expect(format!("no such file: {}", expected_output).as_str()); let buf = BufReader::new(file); - let lines: Vec = buf.lines().map(|l| l.expect("Could not parse line")).collect(); + let lines: Vec = buf + .lines() + .map(|l| l.expect("Could not parse line")) + .collect(); - assert_eq!(lines.join("\n"),data_metrics.join("\n")); + assert_eq!(lines.join("\n"), data_metrics.join("\n")); } -} \ No newline at end of file +}