diff --git a/src/main.rs b/src/main.rs index 1904eb9..220a631 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,65 +1,86 @@ -use std::{collections::BTreeMap, process::exit}; -use std::error::Error; -use std::fs::File; -use std::io::Read; -use std::time::{SystemTime,Duration, UNIX_EPOCH}; +use std::{ + collections::BTreeMap, + error::Error, + time::{SystemTime,Duration, UNIX_EPOCH,Instant}, + io::{prelude::*, BufReader}, + fs::File, + thread::spawn, + sync::{ mpsc, mpsc::{Sender, Receiver}}, +}; use chrono::{DateTime,Utc}; use regex::Regex; -fn parse( - mut file: File, -) -> Result>, Box> { - let mut contents = String::new(); - file.read_to_string(&mut contents)?; - let re = regex::Regex::new(r"(\d+) (\w+) (\d+)")?; - Ok(parse_content(contents, re)) +type MetricMap = BTreeMap>; +type MetricMapVec = BTreeMap>>; + +#[derive(Debug)] +struct MetricLine { + timestamp: i64, + name: String, + value: f64 } -fn parse_content( - contents: String, - re: Regex, -) -> BTreeMap> { - dbg!(&contents); - let mut metrics: BTreeMap>> = BTreeMap::new(); - let show_invalid_line = | index: usize, line: &str | println!("invalid line: {} {}", index, line); - for (index,line ) in contents.lines().enumerate() { - if let Some(caps) = re.captures(line) { - let timestamp_raw = &caps[1]; - let metric_name = &caps[2]; - let metric_value_raw = &caps[3]; - let timestamp = timestamp_raw.parse::().unwrap_or_else(|e|{ - println!("Parse timestamp {} error {}",timestamp_raw,e); - 0 - }); - if timestamp == 0 { +fn show_invalid_line(index: usize, line: &str) { + println!("invalid line: {} {}", index, line); +} +fn parse_line(line: &str, index: usize, re: &Regex) -> Option { + if let Some(caps) = re.captures(&line) { + let timestamp = match caps[1].parse::() { + Ok(value) => value, + Err(e) => { + println!("Parse timestamp {} error {}",&caps[1],e); show_invalid_line(index, line); - continue; - } - let metric_value = metric_value_raw.parse::().unwrap_or_else(|e|{ - println!("Parse metric_value {} error {}",metric_value_raw,e); - 0 as f64 - }); - if metric_value == 0 as f64 { - show_invalid_line(index, line); - continue; - } - let minute = UNIX_EPOCH + Duration::from_secs((timestamp - (timestamp % 60)) as u64); - if let Some(metric) = metrics.get_mut(metric_name) { - if let Some(metric_data) = metric.get_mut(&minute) { - metric_data.push(metric_value); - } else { - metric.insert(minute, vec![metric_value]); - } - } else { - let metric_time: BTreeMap > = [(minute, vec![metric_value])].into_iter().collect(); - metrics.entry(metric_name.to_string()).or_insert( metric_time); + return None; } - } else { - show_invalid_line(index, line); - } + }; + let metric_value = match caps[3].parse::() { + Ok(value) => value, + Err(e) => { + println!("Parse metric_value {} error {}",&caps[3],e); + show_invalid_line(index, line); + return None; + } + }; + Some( MetricLine { + timestamp, + name: caps[2].to_string(), + value: metric_value + }) + } else { + show_invalid_line(index, &line); + None } - - let mut aggregated_metrics: BTreeMap> = BTreeMap::new(); +} +fn parse( + file: File, +) -> Result> { + let re = Regex::new(r"(\d+) (\w+) (\d+)")?; + let mut metrics: MetricMapVec = BTreeMap::new(); + let buf = BufReader::new(file); + buf.lines().enumerate().into_iter().for_each(|(index, read_line)| + match read_line { + Ok(line) => { + if let Some(metric_line) = parse_line(&line, index, &re) { + let minute = + UNIX_EPOCH + Duration::from_secs((metric_line.timestamp - (metric_line.timestamp % 60)) as u64); + if let Some(metric) = metrics.get_mut(&metric_line.name) { + if let Some(metric_data) = metric.get_mut(&minute) { + metric_data.push(metric_line.value); + } else { + metric.insert(minute, vec![metric_line.value]); + } + } else { + let metric_time: BTreeMap > = [(minute, vec![metric_line.value])].into_iter().collect(); + metrics.entry(metric_line.name.to_string()).or_insert( metric_time); + } + } + }, + Err(e) => { + eprintln!("Error reading line {}: {}", index, e); + }, + } + ); + let mut aggregated_metrics: MetricMap = BTreeMap::new(); metrics.into_iter().for_each(|(metric_name, time_val_list)| { time_val_list.into_iter().for_each(|(time, values)| { let average = values.iter().sum::() / values.len() as f64; @@ -75,10 +96,9 @@ fn parse_content( } }) }); - aggregated_metrics + Ok(aggregated_metrics) } - -fn load_input(file_path: &str) -> Result>, Box> { +fn load_input(file_path: &str) -> Result> { let file = File::open(&file_path) .map_err(|err| format!("Error reading file: {} {}", &file_path, err))?; let metrics = parse(file) @@ -103,17 +123,48 @@ fn show_metrics(metrics: BTreeMap>, output_pat ); output } -fn main() { - let default_input = String::from("input.txt"); - match load_input(&default_input) { - Ok(metrics) => { - let _ = show_metrics(metrics, ""); - }, - Err(err) => { - eprint!("Error: {}", err); - exit(1); - } +fn generate_metrics(inputs_list: Vec) { + let n_items = inputs_list.len(); + let mut input_threads = Vec::with_capacity(n_items); + let (tx, rx): (Sender<(String,MetricMap)>, Receiver<(String,MetricMap)>) = mpsc::channel(); + for input in inputs_list.clone() { + let thread_tx = tx.clone(); + input_threads.push(spawn(move || { + let start = Instant::now(); + match load_input(&input) { + Ok(metrics) => { + let _ = thread_tx.send((input.clone(), metrics)).unwrap_or_default(); + }, + Err(err) => { + eprint!("Error: {}", err); + let _ = thread_tx.send((input.clone(), BTreeMap::new())).unwrap_or_default(); + } + } + println!("\nProcessing {} took: {:?} ms", &input, start.elapsed().as_millis()) + })); } + let mut inputs_metrics= Vec::with_capacity(n_items); + for _ in 0..input_threads.len() { + match rx.recv() { + Ok(result) => inputs_metrics.push(result), + Err(e) => eprint!("Error: {}", e), + } + } + for thread in input_threads { + let _ = thread.join(); + } + inputs_metrics.into_iter().for_each(|data_metrics|{ + let (name, metrics) = data_metrics; + println!("\n{}: ---------------\n", name); + show_metrics(metrics, ""); + }); +} +fn main() { + let main_start = Instant::now(); + let inputs_list = vec![String::from("input.txt"), String::from("input_2.txt")]; + + generate_metrics(inputs_list); + println!("\nALL Processing took: {:?} ms", main_start.elapsed().as_millis()) } #[cfg(test)] @@ -132,23 +183,19 @@ mod tests { let contents = String::from("1650973075 cpu A47\n"); let re = regex::Regex::new(r"(\d+) (\w+) (\d+)") .map_err(|err| format!("Error regex: {}", err))?; - let result = parse_content(contents.clone(), re); - if result.len() == 0 { - Ok(()) - } else { - Err(format!("Error invalid line value: {}", contents).into()) + match parse_line(&contents, 1, &re) { + Some(_) => Err(format!("Error invalid line value: {}", contents).into()), + None => Ok(()), } } - #[test] + #[test] fn test_invalid_line_time() -> Result<(), String>{ let contents = String::from("1650973075A cpu 47\n"); let re = regex::Regex::new(r"(\d+) (\w+) (\d+)") .map_err(|err| format!("Error regex: {}", err))?; - let result = parse_content(contents.clone(), re); - if result.len() == 0 { - Ok(()) - } else { - Err(format!("Error invalid line time: {}", contents).into()) + match parse_line(&contents, 1, &re) { + Some(_) => Err(format!("Error invalid line value: {}", contents).into()), + None => Ok(()), } } #[test]