Compare commits

...

16 Commits

4 changed files with 295 additions and 66 deletions

28
CHANGES.md Normal file
View File

@ -0,0 +1,28 @@
# Backend internal interview (rust)
## CHANGES: fixed branch
- Moving from **HashMap** to **BTreeMap** to preseve loading lines order, nevertheless still looks very cumbersome.
- Fix **lines parser** in Maps **metrics** and in **aggregated_metrics** to be handled properly.
- Fix **unwrap / expect** for obvious reasons, add simple error message via **map_err**, **unwrap_or_else** and **closure function**.
- Fix some source code lines order, output format and conversions.
- Change some **iterators** to <u>functional mode</u>.
- Separate tasks to independent funtions for better isolation, less responsibilities and help on **tests** handling.
- Adding **output_path** to help on result output options.
- Adding **elapsed time** to show how many millisecods took each metric file and total processing.
- Create type alias to make so long **Map types** more readable.
- Create a **MetricLine** struct to make parser easier (a lot of work should be done but requires full refactoring).
- Adding **input_2.txt** to process multiple metrics in parallel (done via **std::threads**), collect result properly via channels (**std:mpsc**) and show then properly.
- Create a **generate_metrics** to process each metric file and to be used in **test_expected_metrics**
- Following **unit tests** are implemented:
- <u>test_load_input</u> test load input file **input.txt**.
- <u>test_invalid_line_value</u> test use invalid value in line parsing.
- <u>test_invalid_line</u> test use invlid value in line parsing.
- <u>test_expected_metrics</u> test load input data from **input.txt** compare with **output_expected.txt**.
> [!Note]
> Code simply **works as expected** trying to be preserve initial approach and not too much disruptive changes.<br>
> It is able to **process multiple metrics in parallel** (input.txt and input_2.txt). <br>
> A full refactoring has to be done for <u>better quality, maintenance and be more readable</u>. (Structs, implementaitions, settings for multiple inputs, etc). <br>
Next round in: [Improve branch](/NewRelic/be-technical-interview-rust/src/branch/improved)

View File

@ -1,4 +1,17 @@
# Backend internal interview (rust)
# Backend internal interview (rust) - basic-fixed
> [!Tip]
> Use [Improve branch](/jesus/be-technical-interview-rust/src/branch/improved) as a full refactoring for <u>better quality, maintenance and readability</u>. (Structs, implementaitions, settings for multiple inputs, etc).
> [!NOTE]
> Code in this branch simply **works as expected** trying to be preserve initial approach and not too much disruptive changes.<br>
> It is able to **process multiple metrics in parallel** (input.txt and input_2.txt). <br>
> A full refactoring has to be done for <u>better quality, maintenance and be more readable</u>. (Structs, implementaitions, settings for multiple inputs, etc). <br>
> [!CAUTION]
> [Improve branch](/jesus/be-technical-interview-rust/src/branch/improved) should try to make code more modular and simple, avoid hardcoding, etc.
See [main changes](/jesus/be-technical-interview-rust/src/branch/basic-fixed/CHANGES.md)
A **Refactor metric-consumer** task
@ -6,10 +19,12 @@ A **Refactor metric-consumer** task
There are several branches developed as proposal:
- [Basic fixed one](basic-fixed)
> A basic code review, fixed with minor and essential changes to work as expected.
- [Basic fixed one](/jesus/be-technical-interview-rust/src/branch/basic-fixed)
> A basic code review, fixed with minor and essential changes to work as expected. <br>
> Tests are included for verification.
- [Improve one](improved)
- [Improve one](/jesus/be-technical-interview-rust/src/branch/improved)
- [Multiple input one](multi-input)
- [x] Define a basic model, easily to extend and modify. **Abstraction / Generic**.
- [x] Structs and implementations to specific metricis traitments. **Modular appoach**.
- [x] Settings and configuration for interactive and non interactive processing (batch mode) **Customize on context**.

19
input_2.txt Normal file
View File

@ -0,0 +1,19 @@
1650973147 mem 1761992
1650973159 cpu 49
1650973171 mem 1858502
1650973183 cpu 51
1650973195 cpu 55
1650973207 mem 1076203
1650973219 cpu 60
1650973231 mem 640005
1650973243 mem 324911
1650973255 mem 1024
1650973267 cpu 56
1650973279 cpu 58
1650973291 mem 1024
1650973303 mem 1024
1650973315 mem 1024
1650973327 mem 1024
1650973339 cpu 49
1650973351 mem 1024
1650973363 cpu 49

View File

@ -1,76 +1,243 @@
use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::io::Read;
use std::time::{Duration, UNIX_EPOCH};
use chrono::{DateTime, Utc};
use regex::Regex;
use std::{
collections::BTreeMap,
error::Error,
fs::File,
io::{prelude::*, BufReader},
sync::{
mpsc,
mpsc::{Receiver, Sender},
},
thread::spawn,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
fn parse(
file: File,
) -> Result<HashMap<String, HashMap<std::time::SystemTime, f64>>, Box<dyn Error>> {
let mut file = file;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
type MetricMap = BTreeMap<String, BTreeMap<SystemTime, f64>>;
type MetricMapVec = BTreeMap<String, BTreeMap<SystemTime, Vec<f64>>>;
let mut metrics: HashMap<String, HashMap<std::time::SystemTime, Vec<f64>>> = HashMap::new();
#[derive(Debug)]
struct MetricLine {
timestamp: i64,
name: String,
value: f64,
}
for line in contents.lines() {
let re = regex::Regex::new(r"(\d+) (\w+) (\d+)").unwrap();
if let Some(caps) = re.captures(line) {
let timestamp_raw = &caps[1];
let metric_name = &caps[2];
let metric_value_raw = &caps[3];
let timestamp = timestamp_raw.parse::<i64>().unwrap();
let metric_value = metric_value_raw.parse::<f64>().unwrap();
if !metrics.contains_key(metric_name) {
metrics.insert(metric_name.to_string(), HashMap::new());
fn show_invalid_line(index: usize, line: &str) {
println!("invalid line: {} {}", index, line);
}
fn parse_line(line: &str, index: usize, re: &Regex) -> Option<MetricLine> {
if let Some(caps) = re.captures(line) {
let timestamp = match caps[1].parse::<i64>() {
Ok(value) => value,
Err(e) => {
println!("Parse timestamp {} error {}", &caps[1], e);
show_invalid_line(index, line);
return None;
}
let minute = UNIX_EPOCH + Duration::from_secs((timestamp - (timestamp % 60)) as u64);
metrics
.get_mut(metric_name)
.unwrap()
.insert(minute, vec![metric_value]);
} else {
println!("invalid line");
}
}
let mut aggregated_metrics: HashMap<String, HashMap<std::time::SystemTime, f64>> =
HashMap::new();
for (metric_name, time_val_list) in metrics {
aggregated_metrics.insert(metric_name.clone(), HashMap::new());
for (time, values) in time_val_list {
let mut sum = 0.0;
for v in values.iter() {
sum += *v
};
let metric_value = match caps[3].parse::<f64>() {
Ok(value) => value,
Err(e) => {
println!("Parse metric_value {} error {}", &caps[3], e);
show_invalid_line(index, line);
return None;
}
let average = sum / values.len() as f64;
aggregated_metrics
.get_mut(&metric_name)
.unwrap()
.insert(time, average);
}
};
Some(MetricLine {
timestamp,
name: caps[2].to_string(),
value: metric_value,
})
} else {
show_invalid_line(index, line);
None
}
}
fn parse(file: File) -> Result<MetricMap, Box<dyn Error>> {
let re = Regex::new(r"(\d+) (\w+) (\d+)")?;
let mut metrics: MetricMapVec = BTreeMap::new();
let buf = BufReader::new(file);
buf.lines()
.enumerate()
.for_each(|(index, read_line)| match read_line {
Ok(line) => {
if let Some(metric_line) = parse_line(&line, index, &re) {
let minute = UNIX_EPOCH
+ Duration::from_secs(
(metric_line.timestamp - (metric_line.timestamp % 60)) as u64,
);
if let Some(metric) = metrics.get_mut(&metric_line.name) {
if let Some(metric_data) = metric.get_mut(&minute) {
metric_data.push(metric_line.value);
} else {
metric.insert(minute, vec![metric_line.value]);
}
} else {
let metric_time: BTreeMap<SystemTime, Vec<f64>> =
[(minute, vec![metric_line.value])].into_iter().collect();
metrics
.entry(metric_line.name.to_string())
.or_insert(metric_time);
}
}
}
Err(e) => {
eprintln!("Error reading line {}: {}", index, e);
}
});
let mut aggregated_metrics: MetricMap = BTreeMap::new();
metrics
.into_iter()
.for_each(|(metric_name, time_val_list)| {
time_val_list.into_iter().for_each(|(time, values)| {
let average = values.iter().sum::<f64>() / values.len() as f64;
if let Some(metric) = aggregated_metrics.get_mut(&metric_name) {
if let Some(metric_data) = metric.get_mut(&time) {
*metric_data = average;
} else {
metric.insert(time, average);
}
} else {
let metric_time: BTreeMap<SystemTime, f64> =
[(time, average)].into_iter().collect();
aggregated_metrics
.entry(metric_name.to_string())
.or_insert(metric_time);
}
})
});
Ok(aggregated_metrics)
}
fn main() {
let file = File::open("input.txt").expect("Unable to open file");
let metrics = parse(file).expect("Unable to parse file");
for (metric_name, time_val) in metrics {
fn load_input(file_path: &str) -> Result<MetricMap, Box<dyn Error>> {
let file = File::open(file_path)
.map_err(|err| format!("Error reading file: {} {}", &file_path, err))?;
let metrics = parse(file).map_err(|err| format!("Unable to parse: {} {}", &file_path, err))?;
Ok(metrics)
}
fn show_metrics(
metrics: BTreeMap<String, BTreeMap<SystemTime, f64>>,
output_path: &str,
) -> Vec<String> {
let mut output = Vec::new();
metrics.into_iter().for_each(|(metric_name, time_val)| {
for (time, value) in time_val {
println!(
"{} {:?} {:.2}",
let output_line = format!(
"{} {} {:?}",
DateTime::<Utc>::from(time).format("%Y-%m-%dT%H:%M:%SZ"),
metric_name,
chrono::DateTime::<chrono::Utc>::from(time),
value
);
match output_path {
"vec" => output.push(output_line),
"print" => println!("{}", output_line),
_ => println!("{}", output_line),
}
}
});
output
}
fn generate_metrics(inputs_list: Vec<String>) {
let n_items = inputs_list.len();
let mut input_threads = Vec::with_capacity(n_items);
type KeyMetricMap = (String, MetricMap);
let (tx, rx): (Sender<KeyMetricMap>, Receiver<KeyMetricMap>) = mpsc::channel();
for input in inputs_list.clone() {
let thread_tx = tx.clone();
input_threads.push(spawn(move || {
let start = Instant::now();
match load_input(&input) {
Ok(metrics) => {
thread_tx.send((input.clone(), metrics)).unwrap_or_default();
}
Err(err) => {
eprint!("Error: {}", err);
thread_tx
.send((input.clone(), BTreeMap::new()))
.unwrap_or_default();
}
}
println!(
"\nProcessing {} took: {:?} ms",
&input,
start.elapsed().as_millis()
)
}));
}
let mut inputs_metrics = Vec::with_capacity(n_items);
for _ in 0..input_threads.len() {
match rx.recv() {
Ok(result) => inputs_metrics.push(result),
Err(e) => eprint!("Error: {}", e),
}
}
for thread in input_threads {
let _ = thread.join();
}
inputs_metrics.into_iter().for_each(|data_metrics| {
let (name, metrics) = data_metrics;
println!("\n{}: ---------------\n", name);
show_metrics(metrics, "");
});
}
fn main() {
let main_start = Instant::now();
let inputs_list = vec![String::from("input.txt"), String::from("input_2.txt")];
generate_metrics(inputs_list);
println!(
"\nALL Processing took: {:?} ms",
main_start.elapsed().as_millis()
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_load_input() -> Result<(), String> {
let default_input = String::from("input.txt");
match load_input(&default_input) {
Ok(_) => Ok(()),
Err(e) => Err(format!("Error: {}", e).into()),
}
}
#[test]
fn test_invalid_line_value() -> Result<(), String> {
let contents = String::from("1650973075 cpu A47\n");
let re = regex::Regex::new(r"(\d+) (\w+) (\d+)")
.map_err(|err| format!("Error regex: {}", err))?;
match parse_line(&contents, 1, &re) {
Some(_) => Err(format!("Error invalid line value: {}", contents).into()),
None => Ok(()),
}
}
#[test]
fn test_invalid_line_time() -> Result<(), String> {
let contents = String::from("1650973075A cpu 47\n");
let re = regex::Regex::new(r"(\d+) (\w+) (\d+)")
.map_err(|err| format!("Error regex: {}", err))?;
match parse_line(&contents, 1, &re) {
Some(_) => Err(format!("Error invalid line value: {}", contents).into()),
None => Ok(()),
}
}
#[test]
fn test_expected_metrics() {
use std::io::{prelude::*, BufReader};
let default_input = String::from("input.txt");
let metrics = load_input(&default_input).unwrap_or_default();
let data_metrics = show_metrics(metrics, "vec");
let expected_output = String::from("output_expected.txt");
let file = File::open(expected_output.clone())
.expect(format!("no such file: {}", expected_output).as_str());
let buf = BufReader::new(file);
let lines: Vec<String> = buf
.lines()
.map(|l| l.expect("Could not parse line"))
.collect();
assert_eq!(lines.join("\n"), data_metrics.join("\n"));
}
}