chore: fix for parallel metrics processing and parser

This commit is contained in:
Jesús Pérez 2024-10-19 14:41:47 +01:00
parent 2c781647aa
commit a1489cb24b
No known key found for this signature in database

View File

@ -1,65 +1,86 @@
use std::{collections::BTreeMap, process::exit}; use std::{
use std::error::Error; collections::BTreeMap,
use std::fs::File; error::Error,
use std::io::Read; time::{SystemTime,Duration, UNIX_EPOCH,Instant},
use std::time::{SystemTime,Duration, UNIX_EPOCH}; io::{prelude::*, BufReader},
fs::File,
thread::spawn,
sync::{ mpsc, mpsc::{Sender, Receiver}},
};
use chrono::{DateTime,Utc}; use chrono::{DateTime,Utc};
use regex::Regex; use regex::Regex;
fn parse( type MetricMap = BTreeMap<String, BTreeMap<SystemTime, f64>>;
mut file: File, type MetricMapVec = BTreeMap<String, BTreeMap<SystemTime, Vec<f64>>>;
) -> Result<BTreeMap<String, BTreeMap<SystemTime, f64>>, Box<dyn Error>> {
let mut contents = String::new(); #[derive(Debug)]
file.read_to_string(&mut contents)?; struct MetricLine {
let re = regex::Regex::new(r"(\d+) (\w+) (\d+)")?; timestamp: i64,
Ok(parse_content(contents, re)) name: String,
value: f64
} }
fn parse_content(
contents: String,
re: Regex,
) -> BTreeMap<String, BTreeMap<SystemTime, f64>> {
dbg!(&contents);
let mut metrics: BTreeMap<String, BTreeMap<SystemTime, Vec<f64>>> = BTreeMap::new();
let show_invalid_line = | index: usize, line: &str | println!("invalid line: {} {}", index, line);
for (index,line ) in contents.lines().enumerate() { fn show_invalid_line(index: usize, line: &str) {
if let Some(caps) = re.captures(line) { println!("invalid line: {} {}", index, line);
let timestamp_raw = &caps[1]; }
let metric_name = &caps[2]; fn parse_line(line: &str, index: usize, re: &Regex) -> Option<MetricLine> {
let metric_value_raw = &caps[3]; if let Some(caps) = re.captures(&line) {
let timestamp = timestamp_raw.parse::<i64>().unwrap_or_else(|e|{ let timestamp = match caps[1].parse::<i64>() {
println!("Parse timestamp {} error {}",timestamp_raw,e); Ok(value) => value,
0 Err(e) => {
}); println!("Parse timestamp {} error {}",&caps[1],e);
if timestamp == 0 {
show_invalid_line(index, line); show_invalid_line(index, line);
continue; return None;
}
let metric_value = metric_value_raw.parse::<f64>().unwrap_or_else(|e|{
println!("Parse metric_value {} error {}",metric_value_raw,e);
0 as f64
});
if metric_value == 0 as f64 {
show_invalid_line(index, line);
continue;
}
let minute = UNIX_EPOCH + Duration::from_secs((timestamp - (timestamp % 60)) as u64);
if let Some(metric) = metrics.get_mut(metric_name) {
if let Some(metric_data) = metric.get_mut(&minute) {
metric_data.push(metric_value);
} else {
metric.insert(minute, vec![metric_value]);
}
} else {
let metric_time: BTreeMap <SystemTime, Vec<f64>> = [(minute, vec![metric_value])].into_iter().collect();
metrics.entry(metric_name.to_string()).or_insert( metric_time);
} }
} else { };
show_invalid_line(index, line); let metric_value = match caps[3].parse::<f64>() {
} Ok(value) => value,
Err(e) => {
println!("Parse metric_value {} error {}",&caps[3],e);
show_invalid_line(index, line);
return None;
}
};
Some( MetricLine {
timestamp,
name: caps[2].to_string(),
value: metric_value
})
} else {
show_invalid_line(index, &line);
None
} }
}
let mut aggregated_metrics: BTreeMap<String, BTreeMap<SystemTime, f64>> = BTreeMap::new(); fn parse(
file: File,
) -> Result<MetricMap, Box<dyn Error>> {
let re = Regex::new(r"(\d+) (\w+) (\d+)")?;
let mut metrics: MetricMapVec = BTreeMap::new();
let buf = BufReader::new(file);
buf.lines().enumerate().into_iter().for_each(|(index, read_line)|
match read_line {
Ok(line) => {
if let Some(metric_line) = parse_line(&line, index, &re) {
let minute =
UNIX_EPOCH + Duration::from_secs((metric_line.timestamp - (metric_line.timestamp % 60)) as u64);
if let Some(metric) = metrics.get_mut(&metric_line.name) {
if let Some(metric_data) = metric.get_mut(&minute) {
metric_data.push(metric_line.value);
} else {
metric.insert(minute, vec![metric_line.value]);
}
} else {
let metric_time: BTreeMap <SystemTime, Vec<f64>> = [(minute, vec![metric_line.value])].into_iter().collect();
metrics.entry(metric_line.name.to_string()).or_insert( metric_time);
}
}
},
Err(e) => {
eprintln!("Error reading line {}: {}", index, e);
},
}
);
let mut aggregated_metrics: MetricMap = BTreeMap::new();
metrics.into_iter().for_each(|(metric_name, time_val_list)| { metrics.into_iter().for_each(|(metric_name, time_val_list)| {
time_val_list.into_iter().for_each(|(time, values)| { time_val_list.into_iter().for_each(|(time, values)| {
let average = values.iter().sum::<f64>() / values.len() as f64; let average = values.iter().sum::<f64>() / values.len() as f64;
@ -75,10 +96,9 @@ fn parse_content(
} }
}) })
}); });
aggregated_metrics Ok(aggregated_metrics)
} }
fn load_input(file_path: &str) -> Result<MetricMap, Box<dyn Error>> {
fn load_input(file_path: &str) -> Result<BTreeMap<String, BTreeMap<SystemTime, f64>>, Box<dyn Error>> {
let file = File::open(&file_path) let file = File::open(&file_path)
.map_err(|err| format!("Error reading file: {} {}", &file_path, err))?; .map_err(|err| format!("Error reading file: {} {}", &file_path, err))?;
let metrics = parse(file) let metrics = parse(file)
@ -103,17 +123,48 @@ fn show_metrics(metrics: BTreeMap<String, BTreeMap<SystemTime, f64>>, output_pat
); );
output output
} }
fn main() { fn generate_metrics(inputs_list: Vec<String>) {
let default_input = String::from("input.txt"); let n_items = inputs_list.len();
match load_input(&default_input) { let mut input_threads = Vec::with_capacity(n_items);
Ok(metrics) => { let (tx, rx): (Sender<(String,MetricMap)>, Receiver<(String,MetricMap)>) = mpsc::channel();
let _ = show_metrics(metrics, ""); for input in inputs_list.clone() {
}, let thread_tx = tx.clone();
Err(err) => { input_threads.push(spawn(move || {
eprint!("Error: {}", err); let start = Instant::now();
exit(1); match load_input(&input) {
} Ok(metrics) => {
let _ = thread_tx.send((input.clone(), metrics)).unwrap_or_default();
},
Err(err) => {
eprint!("Error: {}", err);
let _ = thread_tx.send((input.clone(), BTreeMap::new())).unwrap_or_default();
}
}
println!("\nProcessing {} took: {:?} ms", &input, start.elapsed().as_millis())
}));
} }
let mut inputs_metrics= Vec::with_capacity(n_items);
for _ in 0..input_threads.len() {
match rx.recv() {
Ok(result) => inputs_metrics.push(result),
Err(e) => eprint!("Error: {}", e),
}
}
for thread in input_threads {
let _ = thread.join();
}
inputs_metrics.into_iter().for_each(|data_metrics|{
let (name, metrics) = data_metrics;
println!("\n{}: ---------------\n", name);
show_metrics(metrics, "");
});
}
fn main() {
let main_start = Instant::now();
let inputs_list = vec![String::from("input.txt"), String::from("input_2.txt")];
generate_metrics(inputs_list);
println!("\nALL Processing took: {:?} ms", main_start.elapsed().as_millis())
} }
#[cfg(test)] #[cfg(test)]
@ -132,23 +183,19 @@ mod tests {
let contents = String::from("1650973075 cpu A47\n"); let contents = String::from("1650973075 cpu A47\n");
let re = regex::Regex::new(r"(\d+) (\w+) (\d+)") let re = regex::Regex::new(r"(\d+) (\w+) (\d+)")
.map_err(|err| format!("Error regex: {}", err))?; .map_err(|err| format!("Error regex: {}", err))?;
let result = parse_content(contents.clone(), re); match parse_line(&contents, 1, &re) {
if result.len() == 0 { Some(_) => Err(format!("Error invalid line value: {}", contents).into()),
Ok(()) None => Ok(()),
} else {
Err(format!("Error invalid line value: {}", contents).into())
} }
} }
#[test] #[test]
fn test_invalid_line_time() -> Result<(), String>{ fn test_invalid_line_time() -> Result<(), String>{
let contents = String::from("1650973075A cpu 47\n"); let contents = String::from("1650973075A cpu 47\n");
let re = regex::Regex::new(r"(\d+) (\w+) (\d+)") let re = regex::Regex::new(r"(\d+) (\w+) (\d+)")
.map_err(|err| format!("Error regex: {}", err))?; .map_err(|err| format!("Error regex: {}", err))?;
let result = parse_content(contents.clone(), re); match parse_line(&contents, 1, &re) {
if result.len() == 0 { Some(_) => Err(format!("Error invalid line value: {}", contents).into()),
Ok(()) None => Ok(()),
} else {
Err(format!("Error invalid line time: {}", contents).into())
} }
} }
#[test] #[test]