chore: upload source code
This commit is contained in:
parent
972b510bc4
commit
0e347bb70b
12
Cargo.toml
12
Cargo.toml
@ -1,8 +1,20 @@
|
||||
[package]
|
||||
name = "be-technical-interview-rust"
|
||||
description = "Backend internal interview (rust) October 2024"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
authors = ["Jesús Pérez <jpl@jesusperez.pro>"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
regex = "1.10.4"
|
||||
chrono = "0.4"
|
||||
|
||||
# This are to parse and load toml [`Config`] files
|
||||
serde = { version = "1.0.210", features = ["derive"] }
|
||||
serde_derive = "1.0.210"
|
||||
toml = "0.8.19"
|
||||
|
||||
# This is for command line options [`Cli`]
|
||||
clap = {version = "4.5.20", features = [ "derive"] }
|
20
src/defs.rs
Normal file
20
src/defs.rs
Normal file
@ -0,0 +1,20 @@
|
||||
//! ## Definitions (settings and in common types)
|
||||
//! - Group some types definitions in a directory
|
||||
//! - Includes global **const** as **&str**
|
||||
//! - Export / shared to other code files in crate or public
|
||||
mod cli;
|
||||
mod config;
|
||||
pub mod metric_data;
|
||||
pub mod metrics;
|
||||
|
||||
pub(crate) use cli::{parse_args, CliSettings};
|
||||
pub(crate) use config::{load_from_file, Config};
|
||||
pub(crate) use metric_data::MetricsConsumerData;
|
||||
pub(crate) use metrics::MetricParser;
|
||||
|
||||
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
|
||||
pub const PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
pub const CFG_FILE_EXTENSION: &str = ".toml";
|
||||
pub const DEFAULT_CONFIG_PATH: &str = "config.toml";
|
||||
pub const DEFAULT_INPUT_PATH: &str = "input.txt";
|
||||
pub const DEFAULT_REG_EXP: &str = r"(\d+) (\w+) (\d+)";
|
60
src/defs/cli.rs
Normal file
60
src/defs/cli.rs
Normal file
@ -0,0 +1,60 @@
|
||||
//! ## Cli definitions, arguments parsing
|
||||
//! - It uses [clap](https://docs.rs/clap/latest/clap/)
|
||||
//! - It includes a help options with **-h**
|
||||
//! - Alows to use differents **config paths** for batch processing
|
||||
//! - Alows to be used interactively via terminal as a command for a single input / output file
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
use crate::{DEFAULT_CONFIG_PATH, PKG_NAME, PKG_VERSION};
|
||||
|
||||
/// Use [clap](https://docs.rs/clap/latest/clap/) to parse command line options with **derive mode**
|
||||
#[derive(Parser, Debug)]
|
||||
pub struct Cli {
|
||||
/// Config path to load targets settings (-c) command args override config values
|
||||
#[clap(short = 'c', long = "config", value_parser, display_order = 1)]
|
||||
pub config_path: Option<String>,
|
||||
|
||||
/// Quiet mode only data print (-q)
|
||||
#[clap(short = 'q', long = "quiet", action, display_order = 3)]
|
||||
pub be_quiet: bool,
|
||||
|
||||
/// Output path to load input data (-i)
|
||||
#[clap(short = 'i', long = "input", value_parser, display_order = 3)]
|
||||
pub input_path: Option<String>,
|
||||
|
||||
/// Output path to save metric aggreates (-o)
|
||||
#[clap(short = 'o', long = "output", value_parser, display_order = 4)]
|
||||
pub output_path: Option<String>,
|
||||
|
||||
/// Show version
|
||||
#[clap(short = 'v', long = "version", action, display_order = 5)]
|
||||
pub version: bool,
|
||||
}
|
||||
/// Collect settings for metric targets <br>
|
||||
/// Only one **input** and **output** item, for more than one it is much better to use **defs::Config** for better customization for each target.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CliSettings {
|
||||
pub config_path: String,
|
||||
pub be_quiet: Option<bool>,
|
||||
pub input: Option<String>,
|
||||
pub output: Option<String>,
|
||||
}
|
||||
/// Runs some options from command line <br>
|
||||
/// Set TOML config-path to load settings
|
||||
pub fn parse_args() -> CliSettings {
|
||||
let args = Cli::parse();
|
||||
if args.version {
|
||||
println!("{} version: {}", PKG_NAME, PKG_VERSION);
|
||||
std::process::exit(0);
|
||||
}
|
||||
let config_path = args
|
||||
.config_path
|
||||
.unwrap_or(String::from(DEFAULT_CONFIG_PATH));
|
||||
CliSettings {
|
||||
config_path,
|
||||
be_quiet: if args.be_quiet { Some(true) } else { None },
|
||||
input: args.input_path,
|
||||
output: args.output_path,
|
||||
}
|
||||
}
|
138
src/defs/config.rs
Normal file
138
src/defs/config.rs
Normal file
@ -0,0 +1,138 @@
|
||||
//! # Config settings definitions
|
||||
//! To load config values from TOML file path, it can be provided via command-line arguments <br>
|
||||
//! It use [serde](https://serde.rs/) via [`load_from_file`]
|
||||
//
|
||||
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize}; // ,Deserializer,Serializer};
|
||||
use std::io::{Error, ErrorKind, Result};
|
||||
|
||||
use crate::{defs::CliSettings, CFG_FILE_EXTENSION, DEFAULT_INPUT_PATH, DEFAULT_REG_EXP};
|
||||
|
||||
fn default_config_input() -> String {
|
||||
String::from("input.txt")
|
||||
}
|
||||
fn default_config_output() -> String {
|
||||
String::from("")
|
||||
}
|
||||
fn default_config_overwrite() -> bool {
|
||||
true
|
||||
}
|
||||
fn default_config_be_quiet() -> bool {
|
||||
false
|
||||
}
|
||||
fn default_config_reg_exp() -> String {
|
||||
String::from(DEFAULT_REG_EXP)
|
||||
}
|
||||
fn default_config_targets() -> Vec<ConfigTarget> {
|
||||
vec![ConfigTarget::default()]
|
||||
}
|
||||
/// Settings for each target metric defined in **config path**
|
||||
/// **config.toml** content example:
|
||||
/// ```toml
|
||||
/// be_quiet = false
|
||||
/// [[targets]]
|
||||
/// input = "input.txt"
|
||||
///
|
||||
/// [[targets]]
|
||||
/// input = "input_2.txt"
|
||||
/// ```
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ConfigTarget {
|
||||
#[serde(default = "default_config_input")]
|
||||
pub input: String,
|
||||
#[serde(default = "default_config_output")]
|
||||
pub output: String,
|
||||
#[serde(default = "default_config_overwrite")]
|
||||
pub out_overwrite: bool,
|
||||
#[serde(default = "default_config_reg_exp")]
|
||||
pub reg_exp: String,
|
||||
}
|
||||
|
||||
impl Default for ConfigTarget {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
input: String::from(DEFAULT_INPUT_PATH),
|
||||
output: String::from(""),
|
||||
out_overwrite: true,
|
||||
reg_exp: String::from(DEFAULT_REG_EXP),
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Config Settings with target metric settings [`ConfigTarget`]
|
||||
/// **config.toml** content example:
|
||||
/// ```toml
|
||||
/// be_quiet = false
|
||||
/// [[targets]]
|
||||
/// input = "input.txt"
|
||||
///
|
||||
/// [[targets]]
|
||||
/// input = "input_2.txt"
|
||||
/// ```
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Config {
|
||||
#[serde(default = "default_config_be_quiet")]
|
||||
pub be_quiet: bool,
|
||||
#[serde(default = "default_config_targets")]
|
||||
pub targets: Vec<ConfigTarget>,
|
||||
}
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
be_quiet: false,
|
||||
targets: vec![ConfigTarget::default()],
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Config {
|
||||
// To override [`Config`] values with [`CliSettings`] provided via command line arguments and loaded via [`parse_args`]
|
||||
pub fn add_cli_settings(&self, cli_settings: CliSettings) -> Self {
|
||||
let be_quiet = if let Some(be_quiet) = cli_settings.be_quiet {
|
||||
be_quiet
|
||||
} else {
|
||||
self.be_quiet
|
||||
};
|
||||
let targets = if let Some(input) = cli_settings.input {
|
||||
let mut target = ConfigTarget {
|
||||
input,
|
||||
..Default::default()
|
||||
};
|
||||
if let Some(output) = cli_settings.output {
|
||||
target.output = output
|
||||
}
|
||||
vec![target]
|
||||
} else {
|
||||
self.targets.to_owned()
|
||||
};
|
||||
Self { be_quiet, targets }
|
||||
}
|
||||
}
|
||||
/// To load config settings and **Deserialize** content to [`Config`] struct <br>
|
||||
/// It use **T** as generic, allowing to load from a file to a **T** type
|
||||
/// It use **fs::read_to_string** as it is expecting short files size, no need to buffers.
|
||||
pub fn load_from_file<T: DeserializeOwned>(file_cfg: &str) -> Result<T> {
|
||||
let file_path = if file_cfg.contains(CFG_FILE_EXTENSION) {
|
||||
file_cfg.to_string()
|
||||
} else {
|
||||
format!("{}{}", file_cfg, CFG_FILE_EXTENSION)
|
||||
};
|
||||
let config_content = match std::fs::read_to_string(&file_path) {
|
||||
Ok(cfgcontent) => cfgcontent,
|
||||
Err(e) => {
|
||||
return Err(Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("Error read {}: {}", &file_path, e),
|
||||
))
|
||||
}
|
||||
};
|
||||
let item_cfg = match toml::from_str::<T>(&config_content) {
|
||||
Ok(cfg) => cfg,
|
||||
Err(e) => {
|
||||
return Err(Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("Error loading config {}: {}", &file_path, e),
|
||||
))
|
||||
}
|
||||
};
|
||||
Ok(item_cfg)
|
||||
}
|
101
src/defs/metric_data.rs
Normal file
101
src/defs/metric_data.rs
Normal file
@ -0,0 +1,101 @@
|
||||
//! ## Metrics Data for "Consumer Metrics"
|
||||
//! - Collecting, grouping and differentiate name, values, etc.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use std::{cmp::Ordering, time::SystemTime};
|
||||
|
||||
/// Associate type for metric name in [`MetricsConsumerData`] <br>
|
||||
/// It allows to collect several **values** related with same **time**
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MetricTimeData {
|
||||
pub time: SystemTime,
|
||||
pub values: Vec<f64>,
|
||||
}
|
||||
/// Magic rust **enum** to clasify metrics names and their associated values in [`MetricTimeData`] <br>
|
||||
/// - It can combine different items with different types with associated values
|
||||
/// - Can be extended easily, **rust** will enforce consistence and definitions
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub enum MetricsConsumerData {
|
||||
Mem(MetricTimeData),
|
||||
Cpu(MetricTimeData),
|
||||
#[default]
|
||||
Unknown,
|
||||
}
|
||||
/// As **sort** is needed for some use cases, like **output** <br>
|
||||
/// Some implementations has to be written here, are not auto generated via **derive** macros, associtated types with types like **f64** will not allow autogeneration
|
||||
impl Eq for MetricsConsumerData {}
|
||||
impl Ord for MetricsConsumerData {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.name().cmp(&other.name())
|
||||
}
|
||||
}
|
||||
impl PartialOrd for MetricsConsumerData {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.name().cmp(&other.name()))
|
||||
}
|
||||
}
|
||||
impl PartialEq for MetricsConsumerData {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.name() == other.name()
|
||||
}
|
||||
}
|
||||
/// Display per item here
|
||||
impl std::fmt::Display for MetricsConsumerData {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
MetricsConsumerData::Mem(data) => write!(
|
||||
f,
|
||||
"{} mem {}",
|
||||
DateTime::<Utc>::from(data.time).format("%Y-%m-%dT%H:%M:%SZ"),
|
||||
format!("{:?}", data.values)
|
||||
.replace("[", "")
|
||||
.replace("]", "")
|
||||
),
|
||||
MetricsConsumerData::Cpu(data) => write!(
|
||||
f,
|
||||
"{} cpu {}",
|
||||
DateTime::<Utc>::from(data.time).format("%Y-%m-%dT%H:%M:%SZ"),
|
||||
format!("{:?}", data.values)
|
||||
.replace("[", "")
|
||||
.replace("]", "")
|
||||
),
|
||||
MetricsConsumerData::Unknown => write!(f, "anonymous"),
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Some implementations:
|
||||
/// - to get item **name** or **data** values
|
||||
/// - to **add_data** to exiting [`MetricTimeData`] **values**
|
||||
/// - **from_values** allows create an **enum** item
|
||||
impl MetricsConsumerData {
|
||||
pub fn name(&self) -> String {
|
||||
match self {
|
||||
MetricsConsumerData::Mem(_) => String::from("mem"),
|
||||
MetricsConsumerData::Cpu(_) => String::from("cpu"),
|
||||
MetricsConsumerData::Unknown => String::from("unknown"),
|
||||
}
|
||||
}
|
||||
pub fn time_data(&self) -> Option<MetricTimeData> {
|
||||
match self {
|
||||
MetricsConsumerData::Mem(data) => Some(data.to_owned()),
|
||||
MetricsConsumerData::Cpu(data) => Some(data.to_owned()),
|
||||
MetricsConsumerData::Unknown => None,
|
||||
}
|
||||
}
|
||||
pub fn add_data(&mut self, value: f64) {
|
||||
match self {
|
||||
MetricsConsumerData::Mem(data) => data.values.push(value),
|
||||
MetricsConsumerData::Cpu(data) => data.values.push(value),
|
||||
MetricsConsumerData::Unknown => (),
|
||||
}
|
||||
}
|
||||
pub fn from_values(name: &str, time: SystemTime, values: Vec<f64>) -> MetricsConsumerData {
|
||||
let metric_time_data = MetricTimeData { time, values };
|
||||
match name {
|
||||
"mem" | "Mem" | "MEM" => MetricsConsumerData::Mem(metric_time_data),
|
||||
"cpu" | "Cpu" | "CPU" => MetricsConsumerData::Cpu(metric_time_data),
|
||||
"unknown" | "Unknown" => MetricsConsumerData::Unknown,
|
||||
_ => MetricsConsumerData::default(),
|
||||
}
|
||||
}
|
||||
}
|
13
src/defs/metrics.rs
Normal file
13
src/defs/metrics.rs
Normal file
@ -0,0 +1,13 @@
|
||||
//! ## metrics definitions generic models.
|
||||
//! - Abstraction via **traits**
|
||||
//! - Generic process / tasks / steps required for metrics traitment.
|
||||
|
||||
use std::{error::Error, fs::File};
|
||||
use regex::Regex;
|
||||
|
||||
pub trait MetricParser {
|
||||
fn load_input(&mut self) -> Result<bool, Box<dyn Error>>;
|
||||
fn parse(&mut self, file: File, reg_exp: Regex);
|
||||
fn collect_aggregates(&mut self);
|
||||
fn show_metrics(&mut self) -> Result<Vec<String>, std::io::Error>;
|
||||
}
|
334
src/main.rs
334
src/main.rs
@ -1,215 +1,173 @@
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
error::Error,
|
||||
time::{SystemTime,Duration, UNIX_EPOCH,Instant},
|
||||
io::{prelude::*, BufReader},
|
||||
fs::File,
|
||||
thread::spawn,
|
||||
sync::{ mpsc, mpsc::{Sender, Receiver}},
|
||||
//! # Backend internal interview (rust) - improved
|
||||
//!
|
||||
//! This **Improved** branch is a rather disruptive approach to the [initial proposal](https://repo.jesusperez.pro/NewRelic/be-technical-interview-rust)<br>
|
||||
//! [Branch basic-fixed](https://repo.jesusperez.pro/NewRelic/be-technical-interview-rust/src/branch/basic-fixed) tried to solve proposal from initial code
|
||||
//! as a **continuity effort** with the necessary changes and some improvement adjustments such as the **parallel input processing**
|
||||
//!
|
||||
//! ## In summary
|
||||
//! - [x] Define a basic model, easily to extend and modify. **Abstraction / Generic**.
|
||||
//! - [x] Structs and implementations to specific metricis traitments. **Modular appoach**.
|
||||
//! - [x] Settings and configuration for interactive and non interactive processing (batch mode) **Customize on context**.
|
||||
//!
|
||||
//! ## Main improvements
|
||||
//! - Create abstractions / models to load and parse metrics, can be easily extended
|
||||
//! - Define basic operations via [`MetricParser`] trait path **defs/metrics.rs**
|
||||
//! - Use **structs** to group metrics attributes and implement operations like [`metrics_consumer::MetricsConsumerTarget`]
|
||||
//! - Use **enums** with associated values like [`defs::metric_data::MetricsConsumerData`] path **defs/metric_data** to group attributes and values
|
||||
//! - Remove **Maps collections**, use [Vectors](https://doc.rust-lang.org/std/vec/struct.Vec.html)
|
||||
//! - Use **const** for DEFAULT values on [`defs`] module
|
||||
//! - Remove all <i>hardcoded</i> values to [`Config`] settings, a **TOML config file** (see config.toml) can be used to define several metrics input and customize output
|
||||
//! - **Output** write to file (append o rewrite mode) is working (for now I/O is sync)
|
||||
//! - **Command line** arguments are processed [`parse_args`] with [`defs::CliSettings`] via [clap](https://docs.rs/clap/latest/clap/) and **overide** the ones loaded in **config files**
|
||||
//! - **Tests** have been accommodated from previous [Branch basic-fixed](https://repo.jesusperez.pro/NewRelic/be-technical-interview-rust/src/branch/basic-fixed) version to **imporved** approach
|
||||
//!
|
||||
//! ## Benefits
|
||||
//!
|
||||
//! | Element | Benefit |
|
||||
//! |----------------- |---------|
|
||||
//! | Generic traits | Group generic task for metric processing, steps / tasks separation |
|
||||
//! | Structs | Customize atributes and implementation for specific target or patterns |
|
||||
//! | Enums with values| Associate attributes metrics and values, easy to add new attributes or combine with different values at once |
|
||||
//! | Vectors | Simplify types / grouped in structs, priorize vectors type, easy to iterate, filter, sort, etc |
|
||||
//! | Const and Config | Group main const, define metric targes and operations in declarative mode for non intective traitment |
|
||||
//! | Command line args| Help to run in terminal as a cli |
|
||||
//! | Unit Tests | Verify some operations results |
|
||||
//!
|
||||
//! ## Ideas not included
|
||||
//! - **Async I/O** to scale and performance ?
|
||||
//! - Other Thread alternatives like [Tokio](https://tokio.rs/) or/and [Coroutines](https://doc.rust-lang.org/std/ops/trait.Coroutine.html)
|
||||
//! - Benchmarking for optimization
|
||||
//! - More **tests**
|
||||
//! - Run as **API mode** not only as batch processing
|
||||
//! <br>
|
||||
//!
|
||||
//! Code is in the private repository with several varias branches[^note].
|
||||
//! [^note]: Link to [branch repository improved](https://repo.jesusperez.pro/NewRelic/be-technical-interview-rust/src/branch/improved)
|
||||
|
||||
#![doc(html_logo_url = "https://info.jesusperez.pro/img/jesusperez-logo-b.png")]
|
||||
#![doc = include_str!("../assets/howto.md")]
|
||||
use crate::defs::{
|
||||
load_from_file, parse_args, Config, MetricParser, CFG_FILE_EXTENSION, DEFAULT_CONFIG_PATH,
|
||||
DEFAULT_INPUT_PATH, DEFAULT_REG_EXP, PKG_NAME, PKG_VERSION,
|
||||
};
|
||||
use crate::metrics_consumer::MetricsConsumerTarget;
|
||||
#[doc = include_str!("../README.md")]
|
||||
use std::{
|
||||
sync::{
|
||||
mpsc,
|
||||
mpsc::{Receiver, Sender},
|
||||
},
|
||||
thread::spawn,
|
||||
time::Instant,
|
||||
};
|
||||
use chrono::{DateTime,Utc};
|
||||
use regex::Regex;
|
||||
|
||||
type MetricMap = BTreeMap<String, BTreeMap<SystemTime, f64>>;
|
||||
type MetricMapVec = BTreeMap<String, BTreeMap<SystemTime, Vec<f64>>>;
|
||||
mod defs;
|
||||
mod metrics_consumer;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MetricLine {
|
||||
timestamp: i64,
|
||||
name: String,
|
||||
value: f64
|
||||
}
|
||||
// Tests are in a separated module for easy access
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
fn show_invalid_line(index: usize, line: &str) {
|
||||
println!("invalid line: {} {}", index, line);
|
||||
}
|
||||
fn parse_line(line: &str, index: usize, re: &Regex) -> Option<MetricLine> {
|
||||
if let Some(caps) = re.captures(&line) {
|
||||
let timestamp = match caps[1].parse::<i64>() {
|
||||
Ok(value) => value,
|
||||
Err(e) => {
|
||||
println!("Parse timestamp {} error {}",&caps[1],e);
|
||||
show_invalid_line(index, line);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let metric_value = match caps[3].parse::<f64>() {
|
||||
Ok(value) => value,
|
||||
Err(e) => {
|
||||
println!("Parse metric_value {} error {}",&caps[3],e);
|
||||
show_invalid_line(index, line);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
Some( MetricLine {
|
||||
timestamp,
|
||||
name: caps[2].to_string(),
|
||||
value: metric_value
|
||||
})
|
||||
} else {
|
||||
show_invalid_line(index, &line);
|
||||
None
|
||||
}
|
||||
}
|
||||
fn parse(
|
||||
file: File,
|
||||
) -> Result<MetricMap, Box<dyn Error>> {
|
||||
let re = Regex::new(r"(\d+) (\w+) (\d+)")?;
|
||||
let mut metrics: MetricMapVec = BTreeMap::new();
|
||||
let buf = BufReader::new(file);
|
||||
buf.lines().enumerate().into_iter().for_each(|(index, read_line)|
|
||||
match read_line {
|
||||
Ok(line) => {
|
||||
if let Some(metric_line) = parse_line(&line, index, &re) {
|
||||
let minute =
|
||||
UNIX_EPOCH + Duration::from_secs((metric_line.timestamp - (metric_line.timestamp % 60)) as u64);
|
||||
if let Some(metric) = metrics.get_mut(&metric_line.name) {
|
||||
if let Some(metric_data) = metric.get_mut(&minute) {
|
||||
metric_data.push(metric_line.value);
|
||||
} else {
|
||||
metric.insert(minute, vec![metric_line.value]);
|
||||
}
|
||||
} else {
|
||||
let metric_time: BTreeMap <SystemTime, Vec<f64>> = [(minute, vec![metric_line.value])].into_iter().collect();
|
||||
metrics.entry(metric_line.name.to_string()).or_insert( metric_time);
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Error reading line {}: {}", index, e);
|
||||
},
|
||||
}
|
||||
);
|
||||
let mut aggregated_metrics: MetricMap = BTreeMap::new();
|
||||
metrics.into_iter().for_each(|(metric_name, time_val_list)| {
|
||||
time_val_list.into_iter().for_each(|(time, values)| {
|
||||
let average = values.iter().sum::<f64>() / values.len() as f64;
|
||||
if let Some(metric) = aggregated_metrics.get_mut(&metric_name) {
|
||||
if let Some(metric_data) = metric.get_mut(&time) {
|
||||
*metric_data = average;
|
||||
} else {
|
||||
metric.insert(time, average);
|
||||
}
|
||||
} else {
|
||||
let metric_time: BTreeMap <SystemTime,f64> = [(time, average)].into_iter().collect();
|
||||
aggregated_metrics.entry(metric_name.to_string()).or_insert( metric_time);
|
||||
}
|
||||
})
|
||||
});
|
||||
Ok(aggregated_metrics)
|
||||
}
|
||||
fn load_input(file_path: &str) -> Result<MetricMap, Box<dyn Error>> {
|
||||
let file = File::open(&file_path)
|
||||
.map_err(|err| format!("Error reading file: {} {}", &file_path, err))?;
|
||||
let metrics = parse(file)
|
||||
.map_err(|err| format!("Unable to parse: {} {}", &file_path, err))?;
|
||||
Ok(metrics)
|
||||
}
|
||||
fn show_metrics(metrics: BTreeMap<String, BTreeMap<SystemTime, f64>>, output_path: &str) -> Vec<String> {
|
||||
let mut output = Vec::new();
|
||||
metrics.into_iter().for_each(|(metric_name, time_val)|
|
||||
for (time, value) in time_val {
|
||||
let output_line = format!(
|
||||
"{} {} {:?}",
|
||||
DateTime::<Utc>::from(time).format("%Y-%m-%dT%H:%M:%SZ"),
|
||||
metric_name,
|
||||
value
|
||||
);
|
||||
match output_path {
|
||||
"vec" => output.push(output_line),
|
||||
"print" | _ => println!("{}", output_line),
|
||||
}
|
||||
}
|
||||
);
|
||||
output
|
||||
}
|
||||
fn generate_metrics(inputs_list: Vec<String>) {
|
||||
let n_items = inputs_list.len();
|
||||
/// Main threads control flow for each `target_list` item <br>
|
||||
/// All process are collected and finally <br>
|
||||
/// [`MetricsConsumerTarget`] to **show_metrics** is called to get **print** or **write** results <br>
|
||||
/// **be_quiet** attribute is just to avoid (true) all messages around processing and parsing operations <br>
|
||||
/// ## For paralellism
|
||||
/// This can be done with [Tokio](https://tokio.rs/) as alternative to [std::thread](https://doc.rust-lang.org/std/thread/) <br>
|
||||
/// It will require load other **crates** and feature customizations<br>
|
||||
/// Another alternative could be [Coroutines](https://doc.rust-lang.org/std/ops/trait.Coroutine.html) <br>
|
||||
/// As experimental features is a **nightly-only** (October 2024)
|
||||
fn generate_metrics(targets_list: Vec<MetricsConsumerTarget>, be_quiet: bool) {
|
||||
let n_items = targets_list.len();
|
||||
let mut input_threads = Vec::with_capacity(n_items);
|
||||
let (tx, rx): (Sender<(String,MetricMap)>, Receiver<(String,MetricMap)>) = mpsc::channel();
|
||||
for input in inputs_list.clone() {
|
||||
let (tx, rx): (
|
||||
Sender<MetricsConsumerTarget>,
|
||||
Receiver<MetricsConsumerTarget>,
|
||||
) = mpsc::channel();
|
||||
for mut metrics_consumer_data in targets_list.clone() {
|
||||
let thread_tx = tx.clone();
|
||||
input_threads.push(spawn(move || {
|
||||
let start = Instant::now();
|
||||
match load_input(&input) {
|
||||
Ok(metrics) => {
|
||||
let _ = thread_tx.send((input.clone(), metrics)).unwrap_or_default();
|
||||
},
|
||||
match metrics_consumer_data.load_input() {
|
||||
Ok(_) => {
|
||||
metrics_consumer_data.collect_aggregates();
|
||||
thread_tx
|
||||
.send(metrics_consumer_data.clone())
|
||||
.unwrap_or_default();
|
||||
}
|
||||
Err(err) => {
|
||||
eprint!("Error: {}", err);
|
||||
let _ = thread_tx.send((input.clone(), BTreeMap::new())).unwrap_or_default();
|
||||
thread_tx
|
||||
.send(metrics_consumer_data.clone())
|
||||
.unwrap_or_default();
|
||||
}
|
||||
}
|
||||
println!("\nProcessing {} took: {:?} ms", &input, start.elapsed().as_millis())
|
||||
if !be_quiet {
|
||||
println!(
|
||||
"Processing {} took: {:?} ms",
|
||||
&metrics_consumer_data.input,
|
||||
start.elapsed().as_millis()
|
||||
)
|
||||
}
|
||||
}));
|
||||
}
|
||||
let mut inputs_metrics= Vec::with_capacity(n_items);
|
||||
let mut inputs_metrics = Vec::with_capacity(n_items);
|
||||
for _ in 0..input_threads.len() {
|
||||
match rx.recv() {
|
||||
Ok(result) => inputs_metrics.push(result),
|
||||
Err(e) => eprint!("Error: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
for thread in input_threads {
|
||||
let _ = thread.join();
|
||||
}
|
||||
inputs_metrics.into_iter().for_each(|data_metrics|{
|
||||
let (name, metrics) = data_metrics;
|
||||
println!("\n{}: ---------------\n", name);
|
||||
show_metrics(metrics, "");
|
||||
inputs_metrics.iter_mut().for_each(|metrics_consumer_data| {
|
||||
if !be_quiet {
|
||||
println!("\n{}: ---------------\n", &metrics_consumer_data.input);
|
||||
}
|
||||
let _ = metrics_consumer_data.show_metrics();
|
||||
});
|
||||
}
|
||||
/// Keep it short:
|
||||
/// - Parse [`defs::CliSettings`] from command-line arguments
|
||||
/// - Load [`Config`] file settings from **config_path**
|
||||
/// - Override [`Config`] settings with cli arguments parsed
|
||||
/// - Create **targets list** as [`MetricsConsumerTarget`] vector
|
||||
/// - Call to [`generate_metrics`] to do the job
|
||||
/// - If not **be_quiet** mode print elapsed time in milliseconds
|
||||
/// <br>
|
||||
/// > This is not running async and not expect any **Result**.
|
||||
fn main() {
|
||||
let main_start = Instant::now();
|
||||
let inputs_list = vec![String::from("input.txt"), String::from("input_2.txt")];
|
||||
|
||||
generate_metrics(inputs_list);
|
||||
println!("\nALL Processing took: {:?} ms", main_start.elapsed().as_millis())
|
||||
let args_settings = parse_args();
|
||||
let config: Config = if std::path::Path::new(&args_settings.config_path).exists() {
|
||||
load_from_file(&args_settings.config_path).unwrap_or_else(|e| {
|
||||
eprintln!("Settings error: {}", e);
|
||||
Config::default()
|
||||
})
|
||||
} else {
|
||||
Config::default()
|
||||
};
|
||||
let config = config.add_cli_settings(args_settings.clone());
|
||||
if !config.be_quiet {
|
||||
println!("Loaded config from: {}", &args_settings.config_path);
|
||||
}
|
||||
let targets_list: Vec<MetricsConsumerTarget> = config
|
||||
.targets
|
||||
.iter()
|
||||
.map(|item| MetricsConsumerTarget {
|
||||
input: String::from(&item.input),
|
||||
output: String::from(&item.output),
|
||||
out_overwrite: item.out_overwrite,
|
||||
reg_exp: String::from(&item.reg_exp),
|
||||
..Default::default()
|
||||
})
|
||||
.collect();
|
||||
generate_metrics(targets_list, config.be_quiet);
|
||||
if !config.be_quiet {
|
||||
println!(
|
||||
"\nALL Processing took: {:?} ms",
|
||||
main_start.elapsed().as_millis()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
#[test]
|
||||
fn test_load_input() -> Result<(), String>{
|
||||
let default_input = String::from("input.txt");
|
||||
match load_input(&default_input) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(format!("Error: {}",e).into()),
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_invalid_line_value() -> Result<(), String>{
|
||||
let contents = String::from("1650973075 cpu A47\n");
|
||||
let re = regex::Regex::new(r"(\d+) (\w+) (\d+)")
|
||||
.map_err(|err| format!("Error regex: {}", err))?;
|
||||
match parse_line(&contents, 1, &re) {
|
||||
Some(_) => Err(format!("Error invalid line value: {}", contents).into()),
|
||||
None => Ok(()),
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_invalid_line_time() -> Result<(), String>{
|
||||
let contents = String::from("1650973075A cpu 47\n");
|
||||
let re = regex::Regex::new(r"(\d+) (\w+) (\d+)")
|
||||
.map_err(|err| format!("Error regex: {}", err))?;
|
||||
match parse_line(&contents, 1, &re) {
|
||||
Some(_) => Err(format!("Error invalid line value: {}", contents).into()),
|
||||
None => Ok(()),
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_expected_metrics() {
|
||||
use std::io::{prelude::*, BufReader};
|
||||
let default_input = String::from("input.txt");
|
||||
let metrics = load_input(&default_input).unwrap_or_default();
|
||||
let data_metrics = show_metrics(metrics, "vec");
|
||||
|
||||
let expected_output = String::from("output_expected.txt");
|
||||
let file = File::open(expected_output.clone()).expect(format!("no such file: {}", expected_output).as_str());
|
||||
let buf = BufReader::new(file);
|
||||
let lines: Vec<String> = buf.lines().map(|l| l.expect("Could not parse line")).collect();
|
||||
|
||||
assert_eq!(lines.join("\n"),data_metrics.join("\n"));
|
||||
}
|
||||
}
|
170
src/metrics_consumer.rs
Normal file
170
src/metrics_consumer.rs
Normal file
@ -0,0 +1,170 @@
|
||||
//! ## MetricsConsumerTarget definitions and implementations
|
||||
//! Specific metric class **Consumer Metric** using generic metric operations <br>
|
||||
//! Save source / result paths, parsing regular_expression and collect metrics values and their aggreates using [`MetricsConsumerData`]
|
||||
//! From this modular approach other metrics classes can be defined and implemented
|
||||
|
||||
use regex::Regex;
|
||||
use std::{
|
||||
error::Error,
|
||||
fs::File,
|
||||
io::{prelude::*, BufReader, BufWriter},
|
||||
time::{Duration, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use crate::defs::{MetricParser, MetricsConsumerData};
|
||||
|
||||
/// Attributes definition
|
||||
/// - **reg_exp** has some difficulties to be used with [regex](https://docs.rs/regex/latest/regex/), it works better with [string literals](https://doc.rust-lang.org/reference/expressions/literal-expr.html#string-literal-expressions)
|
||||
/// - **metrics** and **aggregates** are vectors of [`MetricsConsumerData`] enums values to group input lines or save aggregates values
|
||||
/// - **metrics** and **aggregates** have same type for implementation simplification, **aggregates** only use <u>first vector value</u> <br>
|
||||
/// it can be easily used or extended to also save other computed values like: max, min, etc.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct MetricsConsumerTarget {
|
||||
pub input: String,
|
||||
pub output: String,
|
||||
pub out_overwrite: bool,
|
||||
pub reg_exp: String,
|
||||
pub metrics: Vec<MetricsConsumerData>,
|
||||
pub aggregates: Vec<MetricsConsumerData>,
|
||||
}
|
||||
|
||||
/// Implement generic metrics operations / tasks for [`MetricsConsumerData`]
|
||||
///
|
||||
impl MetricParser for MetricsConsumerTarget {
|
||||
fn parse(&mut self, file: File, reg_exp: Regex) {
|
||||
let buf = BufReader::new(file);
|
||||
buf.lines()
|
||||
.enumerate()
|
||||
.for_each(|(index, read_line)| match read_line {
|
||||
Ok(line) => {
|
||||
if let Some(metric_line) = self.parse_line(&line, index, ®_exp) {
|
||||
let (timestamp, name, value) = metric_line;
|
||||
let minute =
|
||||
UNIX_EPOCH + Duration::from_secs((timestamp - (timestamp % 60)) as u64);
|
||||
let mut not_found = true;
|
||||
for metric in self.metrics.iter_mut() {
|
||||
if metric.name() == name.to_lowercase() {
|
||||
if let Some(metric_data) = metric.time_data() {
|
||||
if metric_data.time == minute {
|
||||
metric.add_data(value);
|
||||
not_found = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if not_found {
|
||||
self.metrics.push(MetricsConsumerData::from_values(
|
||||
&name,
|
||||
minute,
|
||||
Vec::from([value]),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Error reading line {}: {}", index, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
fn load_input(&mut self) -> Result<bool, Box<dyn Error>> {
|
||||
let file = File::open(&self.input)
|
||||
.map_err(|err| format!("Error reading file: {} {}", &self.input, err))?;
|
||||
if self.reg_exp.is_empty() {
|
||||
return Err(String::from("Error invalid reg expression").into());
|
||||
}
|
||||
let reg_exp = Regex::new(&self.reg_exp)?;
|
||||
self.parse(file, reg_exp);
|
||||
Ok(true)
|
||||
}
|
||||
fn collect_aggregates(&mut self) {
|
||||
self.metrics.iter().for_each(|metric_data| {
|
||||
let name = metric_data.name();
|
||||
if let Some(metric_time_data) = metric_data.time_data() {
|
||||
let average = metric_time_data.values.iter().sum::<f64>()
|
||||
/ metric_time_data.values.len() as f64;
|
||||
let mut not_found = true;
|
||||
for metric in self.aggregates.iter_mut() {
|
||||
if metric.name() == name.to_lowercase() {
|
||||
if let Some(metric_data) = metric.time_data() {
|
||||
if metric_data.time == metric_time_data.time {
|
||||
metric.add_data(average);
|
||||
not_found = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if not_found {
|
||||
self.aggregates.push(MetricsConsumerData::from_values(
|
||||
&name,
|
||||
metric_time_data.time,
|
||||
Vec::from([average]),
|
||||
));
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
fn show_metrics(&mut self) -> Result<Vec<String>, std::io::Error> {
|
||||
let mut output = Vec::new();
|
||||
self.aggregates.sort();
|
||||
self.aggregates.iter().for_each(|metric_data| {
|
||||
if metric_data.time_data().is_some() {
|
||||
let output_line = format!("{}", metric_data);
|
||||
match self.output.as_str() {
|
||||
"vec" => output.push(output_line),
|
||||
"print" | "" => println!("{}", output_line),
|
||||
_ => output.push(output_line),
|
||||
}
|
||||
}
|
||||
});
|
||||
match self.output.as_str() {
|
||||
"vec" | "print" | "" => return Ok(output.to_owned()),
|
||||
_ => {
|
||||
if self.out_overwrite && std::path::Path::new(&self.output).exists() {
|
||||
std::fs::remove_file(&self.output)?;
|
||||
}
|
||||
if !std::path::Path::new(&self.output).exists() {
|
||||
File::create(&self.output)?;
|
||||
}
|
||||
let file = File::options().append(true).open(&self.output)?;
|
||||
let mut writer = BufWriter::new(file);
|
||||
writer.write_all(output.join("\n").as_bytes())?;
|
||||
let text_overwrite = if self.out_overwrite { String::from ("overwriten")} else { String::from("") };
|
||||
println!("Metrics for '{}' are saved in '{}' {}", &self.input, &self.output, &text_overwrite);
|
||||
}
|
||||
};
|
||||
Ok(output.to_owned())
|
||||
}
|
||||
}
|
||||
/// Specific implementations like **parse_line**
|
||||
impl MetricsConsumerTarget {
|
||||
fn show_invalid_line(&self, index: usize, line: &str) {
|
||||
println!("invalid line: {} {}", index, line);
|
||||
}
|
||||
/// Check metric line values
|
||||
pub fn parse_line(&self, line: &str, index: usize, re: &Regex) -> Option<(i64, String, f64)> {
|
||||
if let Some(caps) = re.captures(line) {
|
||||
let timestamp = match caps[1].parse::<i64>() {
|
||||
Ok(value) => value,
|
||||
Err(e) => {
|
||||
println!("Parse timestamp {} error {}", &caps[1], e);
|
||||
self.show_invalid_line(index, line);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let metric_value = match caps[3].parse::<f64>() {
|
||||
Ok(value) => value,
|
||||
Err(e) => {
|
||||
println!("Parse metric_value {} error {}", &caps[3], e);
|
||||
self.show_invalid_line(index, line);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
Some((timestamp, caps[2].to_string(), metric_value))
|
||||
} else {
|
||||
self.show_invalid_line(index, line);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
73
src/tests.rs
Normal file
73
src/tests.rs
Normal file
@ -0,0 +1,73 @@
|
||||
//! ## Tests
|
||||
//! Some unitary tests grouped here <br>
|
||||
//! [`test_expected_metrics`] the more important one to verify results with **output_expected.txt**
|
||||
//!
|
||||
use super::*;
|
||||
#[test]
|
||||
fn test_load_input() -> Result<(), String> {
|
||||
let mut metrics_target = MetricsConsumerTarget {
|
||||
input: String::from(DEFAULT_INPUT_PATH),
|
||||
reg_exp: String::from(DEFAULT_REG_EXP),
|
||||
..Default::default()
|
||||
};
|
||||
match metrics_target.load_input() {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(format!("Error: {}", e).into()),
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_invalid_line_value() -> Result<(), String> {
|
||||
let metrics_target = MetricsConsumerTarget {
|
||||
input: String::from(DEFAULT_INPUT_PATH),
|
||||
reg_exp: String::from(DEFAULT_REG_EXP),
|
||||
..Default::default()
|
||||
};
|
||||
let contents = String::from("1650973075 cpu A47\n");
|
||||
let re = regex::Regex::new(&metrics_target.reg_exp)
|
||||
.map_err(|err| format!("Error regex: {}", err))?;
|
||||
match metrics_target.parse_line(&contents, 1, &re) {
|
||||
Some(_) => Err(format!("Error invalid line value: {}", contents).into()),
|
||||
None => Ok(()),
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_invalid_line_time() -> Result<(), String> {
|
||||
let metrics_target = MetricsConsumerTarget {
|
||||
input: String::from(DEFAULT_INPUT_PATH),
|
||||
reg_exp: String::from(DEFAULT_REG_EXP),
|
||||
..Default::default()
|
||||
};
|
||||
let contents = String::from("1650973075A cpu 47\n");
|
||||
let re = regex::Regex::new(&metrics_target.reg_exp)
|
||||
.map_err(|err| format!("Error regex: {}", err))?;
|
||||
match metrics_target.parse_line(&contents, 1, &re) {
|
||||
Some(_) => Err(format!("Error invalid line value: {}", contents).into()),
|
||||
None => Ok(()),
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_expected_metrics() {
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{prelude::*, BufReader},
|
||||
};
|
||||
let mut metrics_target = MetricsConsumerTarget {
|
||||
input: String::from(DEFAULT_INPUT_PATH),
|
||||
output: String::from("vec"),
|
||||
reg_exp: String::from(DEFAULT_REG_EXP),
|
||||
..Default::default()
|
||||
};
|
||||
metrics_target.load_input().unwrap_or_default();
|
||||
metrics_target.collect_aggregates();
|
||||
let data_metrics = metrics_target.show_metrics().unwrap_or_default();
|
||||
|
||||
let expected_output = String::from("output_expected.txt");
|
||||
let file = File::open(expected_output.clone())
|
||||
.expect(format!("no such file: {}", expected_output).as_str());
|
||||
let buf = BufReader::new(file);
|
||||
let lines: Vec<String> = buf
|
||||
.lines()
|
||||
.map(|l| l.expect("Could not parse line"))
|
||||
.collect();
|
||||
assert_eq!(lines.join("\n"), data_metrics.join("\n"));
|
||||
}
|
Loading…
Reference in New Issue
Block a user