diff --git a/learning/python/train_perfspec.py b/learning/python/train_perfspec.py
new file mode 100644
index 0000000..63fe262
--- /dev/null
+++ b/learning/python/train_perfspec.py
@@ -0,0 +1,1102 @@
+import marimo
+
+__generated_with = "0.10.16"
+app = marimo.App(width="medium")
+
+
+@app.cell(hide_code=True)
+def title():
+ import marimo as mo
+ notebook_name = 'train_perfspec.py'
+
+ from lib_perfspec import perfspec_vars
+ (_,_defs) = perfspec_vars.run()
+ perfspec = _defs['perfspec']
+
+ from lib_perfspec import perfspec_header
+ (_,_defs) = perfspec_header.run()
+ lib_header = _defs['header']
+ lib_intro = _defs['intro']
+
+ mo.md(
+ f"""
+ {lib_header(notebook_name)}
+
+ ## Train **{perfspec['app']['train_mode']}** model
+ """
+ )
+ return (
+ lib_header,
+ lib_intro,
+ mo,
+ notebook_name,
+ perfspec,
+ perfspec_header,
+ perfspec_vars,
+ )
+
+
+@app.cell(hide_code=True)
+def imports():
+ import tensorflow as tf
+ import numpy as np
+ from pathlib import Path
+ import keras
+ return Path, keras, np, tf
+
+
+@app.cell(hide_code=True)
+def intro_load(Path, lib_intro, mo, notebook_name, perfspec):
+ verbose = perfspec['settings']['verbose']
+ perfspec['vars'] = {}
+
+ from lib_perfspec import perfspec_args
+ (_,_defs) = perfspec_args.run()
+
+ if not Path(perfspec['defaults']['models_dirpath']).exists():
+ exit(f"Trained models dir path not found: {perfspec['defaults']['models_dirpath']}")
+
+ if not Path(perfspec['defaults']['checkpoints_dirpath']).exists():
+ exit(f"Trained checkpoints models dir path not found: {perfspec['defaults']['checkpoints_dirpath']}")
+
+ if not Path(perfspec['defaults']['data_dirpath']).exists():
+ exit(f"data dir path not found: {perfspec['defaults']['data_dirpath']}")
+
+ from lib_perfspec import perfspec_load_actions
+ (_,_defs) = perfspec_load_actions.run()
+ lib_load_actions = _defs['load_actions']
+
+ from lib_perfspec import perfspec_input_sequence
+ (_,_defs) = perfspec_input_sequence.run()
+ lib_get_input_sequence = _defs['get_input_sequence']
+
+ from lib_perfspec import perfspec_predict
+ _, _defs = perfspec_predict.run()
+ lib_predict_action = _defs['predict_action']
+
+ verbose=perfspec['settings'].get('verbose')
+
+ perfspec['vars']['model'] = None
+ perfspec['vars']['history'] = None
+
+ (perfspec['vars']['actions'],
+ perfspec['vars']['unique_actions'],
+ perfspec['vars']['label_encoder'],
+ perfspec['vars']['encoded_actions']
+ ) = lib_load_actions(
+ actions_path=perfspec['settings'].get('actions_filepath'),
+ verbose=verbose
+ )
+
+ perfspec['vars']['input_sequence'] = lib_get_input_sequence(
+ input_str=perfspec['settings']['input_str'],
+ unique_actions=perfspec['vars']['unique_actions']
+ )
+
+ mo.md(
+ f"""
+ {lib_intro(notebook_name)}
+
+ """
+ )
+ return (
+ lib_get_input_sequence,
+ lib_load_actions,
+ lib_predict_action,
+ perfspec_args,
+ perfspec_input_sequence,
+ perfspec_load_actions,
+ perfspec_predict,
+ verbose,
+ )
+
+
+@app.cell(hide_code=True)
+def setting(mo, notebook_name, perfspec):
+ from lib_perfspec import perfspec_out_settings
+ (_,_defs) = perfspec_out_settings.run()
+ out_settings = _defs['out_settings']
+
+ mo.md(
+ f"""
+ Settings are defined in: [lib_perfspec.py]({mo.notebook_dir()} / lib_perfspec.py)
+
+ {out_settings(notebook_name)}
+
+ Total of values from data: **{len(perfspec['vars']['actions'])}**
+ with total of unique values: **{len(perfspec['vars']['unique_actions'])}**
+
+ >
Train size {perfspec['defaults']['train_size']} to train model: **{
+ int(perfspec['defaults']['train_size'] * len(perfspec['vars']['actions']))
+ }** with sequence of **{perfspec['defaults']['sequence_length']}**
+
+ Values can be overwritten by using **command-line** (see options below).
+ """
+ )
+ return out_settings, perfspec_out_settings
+
+
+@app.cell(hide_code=True)
+def command_line(mo, notebook_name):
+ from lib_perfspec import perfspec_cli_ops
+ (_,_defs) = perfspec_cli_ops.run()
+ out_cli_ops = _defs['out_cli_ops']
+ mo.accordion({
+ "Mostrar command Line options ": out_cli_ops(notebook_name)
+ })
+ return out_cli_ops, perfspec_cli_ops
+
+
+@app.cell(hide_code=True)
+def perfspec_define_confusion_matrix(
+ load_model_from_path,
+ mo,
+ np,
+ prepare_train,
+):
+ def make_confusion_matrix(perfspec):
+ from tensorflow.keras.utils import to_categorical
+ from tensorflow.keras.preprocessing.sequence import pad_sequences
+ from sklearn.preprocessing import LabelEncoder
+ from sklearn.metrics import confusion_matrix
+ from sklearn.model_selection import train_test_split
+ from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
+ import matplotlib.pyplot as plt
+ import seaborn as sns
+
+ if perfspec['vars']['model'] == None:
+ model = load_model_from_path(perfspec['settings']['verbose'])
+ if perfspec['vars'].get('model') != None:
+ print("No model found")
+ return
+ else:
+ model = perfspec['vars']['model']
+
+ (perfspec['vars']['X'],perfspec['vars']['y'],perfspec['vars']['_y']) = prepare_train(perfspec)
+ #X = []
+ #y = []
+ #sequence_length = perfspec['settings']['sequence_length']
+
+ # Generate input-output pairs
+ #for j in range(len(perfspec['vars']['encoded_actions']) - sequence_length):
+ # X.append(perfspec['vars']['encoded_actions'][j:j + sequence_length]) # Input sequence
+ # y.append(perfspec['vars']['encoded_actions'][j + sequence_length]) # Target (next action)
+
+ #X = np.array(X)
+ #y = np.array(y)
+
+ ## Ensure _X has the correct shape for LSTM
+ #X = pad_sequences(X, maxlen=perfspec['settings']['sequence_length'], padding='pre')
+ #X = np.expand_dims(X, axis=-1) # Shape: (num_samples, sequence_length, 1)
+
+ X = perfspec['vars']['X']
+ y = perfspec['vars']['_y']
+ encoder = LabelEncoder()
+ y_encoded = encoder.fit_transform(y)
+ #y_encoded = encoder.fit_transform(perfspec['vars']['unique_actions'])
+ #y_encoded = encoded_input
+
+ # Split the dataset into training and testing sets
+ X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42)
+
+ # Predict with your model (replace this with actual predictions)
+ y_pred = model.predict(X_test) # This returns the predicted probabilities
+ #y_pred = predicted_probabilities
+ y_pred_classes = np.argmax(y_pred, axis=1) # Get class predictions from probabilities
+
+ # Plot confusion matrix
+ cm = confusion_matrix(y_test, y_pred_classes)
+ ax = sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels='auto', yticklabels='auto')
+ ax.set_xticklabels(ax.get_xticklabels(), fontsize=7)
+ ax.set_yticklabels(ax.get_yticklabels(), fontsize=7)
+
+ plt.title("Confusion Matrix")
+ plt.xlabel("Predicted Labels")
+ plt.ylabel("True Labels")
+ plt.show()
+
+ mo.md(
+ r"""
+ ### Define Confusion Matrix
+
+
+ """
+ )
+ return (make_confusion_matrix,)
+
+
+@app.cell(hide_code=True)
+def consfusion_matrix_callback(
+ ConfusionMatrixDisplay,
+ confusion_matrix,
+ keras,
+ mo,
+ np,
+ plt,
+ tf,
+):
+ @keras.saving.register_keras_serializable(package="Custom")
+ class ConfusionMatrixCallback(tf.keras.callbacks.Callback):
+ def __init__(self, val_data, label_encoder=None, display=False):
+ """
+ Custom callback to compute and display a confusion matrix at the end of each epoch.
+
+ Parameters:
+ - val_data: Tuple (X_val, y_val) for validation data.
+ - label_encoder: Optional, if labels are encoded, to map back to original labels.
+ - display: Whether to display the confusion matrix as a plot.
+ """
+ super().__init__()
+ self.val_data = val_data
+ self.label_encoder = label_encoder
+ self.display = display
+
+ def on_epoch_end(self, epoch, logs=None):
+ # Get the validation data
+ val_X, val_y = self.val_data
+
+ # Predict the classes for validation data
+ y_pred = self.model.predict(val_X, verbose=0)
+ y_pred_classes = np.argmax(y_pred, axis=1)
+
+ # If label encoding is provided, decode the labels
+ if self.label_encoder:
+ val_y = self.label_encoder.inverse_transform(val_y)
+ y_pred_classes = self.label_encoder.inverse_transform(y_pred_classes)
+
+ # Compute the confusion matrix
+ cm = confusion_matrix(val_y, y_pred_classes)
+
+ # Optionally display or save the confusion matrix
+ if self.display:
+ self.plot_confusion_matrix(cm, epoch)
+
+ # Print the confusion matrix to console
+ print(f"Epoch {epoch + 1}: Confusion Matrix\n{cm}")
+
+ def plot_confusion_matrix(self, cm, epoch):
+ """
+ Plot and display the confusion matrix using matplotlib.
+ """
+ plt.figure(figsize=(8, 8))
+ disp = ConfusionMatrixDisplay(confusion_matrix=cm)
+ disp.plot(cmap='viridis', values_format='d')
+ plt.title(f'Confusion Matrix - Epoch {epoch + 1}')
+ plt.show()
+
+ def get_config(self):
+ # Return the configuration of the metric as a dictionary
+ config = super().get_config()
+ return config
+
+ mo.md(
+ r"""
+ ### Confusion Matrix Callback
+ """
+ )
+ return (ConfusionMatrixCallback,)
+
+
+@app.cell(hide_code=True)
+def precsion_metric(F1Score, keras, mo, tf):
+ @keras.saving.register_keras_serializable(package="Custom")
+ class PrecisionMetric(tf.keras.metrics.Metric):
+ def __init__(self, name="precision", **kwargs):
+ super(PrecisionMetric, self).__init__(name=name, **kwargs)
+ self.true_positives = self.add_weight(name="tp", initializer="zeros")
+ self.false_positives = self.add_weight(name="fp", initializer="zeros")
+
+ def update_state(self, y_true, y_pred, sample_weight=None):
+ y_pred = tf.argmax(y_pred, axis=1)
+ y_true = tf.cast(y_true, tf.int32)
+ y_pred = tf.cast(y_pred, tf.int32)
+
+ true_positives = tf.reduce_sum(tf.cast(tf.equal(y_true, y_pred) & tf.equal(y_true, 1), tf.float32))
+ false_positives = tf.reduce_sum(tf.cast(tf.not_equal(y_true, y_pred) & tf.equal(y_pred, 1), tf.float32))
+
+ self.true_positives.assign_add(true_positives)
+ self.false_positives.assign_add(false_positives)
+
+ def result(self):
+ return self.true_positives / (self.true_positives + self.false_positives + tf.keras.backend.epsilon())
+
+ def reset_states(self):
+ self.true_positives.assign(0.0)
+ self.false_positives.assign(0.0)
+
+ def get_config(self):
+ # Return the configuration of the metric as a dictionary
+ config = super().get_config()
+ return config
+
+ def get_config(self):
+ base_config = super(F1Score, self).get_config()
+ return base_config
+
+ @classmethod
+ def from_config(cls, config):
+ return cls(**config)
+
+ mo.md(
+ r"""
+ ### Precision Metric
+
+ **Precision** measures how many of the predicted positive labels are actually correct. It is calculated as:
+
+ \[
+ \text{Precision} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Positives}}
+ \]
+
+ - **True Positives (TP)**: Cases where the model predicted `1` (positive) and the actual label is also `1`.
+ - **False Positives (FP)**: Cases where the model predicted `1` (positive), but the actual label is not `1`.
+ - The logical conditions check if predictions match and filter for the positive class (`1`).
+ - These counts are accumulated over batches.
+ """
+ )
+ return (PrecisionMetric,)
+
+
+@app.cell(hide_code=True)
+def recall_metric(keras, mo, tf):
+ @keras.saving.register_keras_serializable(package="Custom")
+ class RecallMetric(tf.keras.metrics.Metric):
+ def __init__(self, name="recall", **kwargs):
+ super(RecallMetric, self).__init__(name=name, **kwargs)
+ self.true_positives = self.add_weight(name="tp", initializer="zeros")
+ self.false_negatives = self.add_weight(name="fn", initializer="zeros")
+
+ def update_state(self, y_true, y_pred, sample_weight=None):
+ y_pred = tf.argmax(y_pred, axis=1)
+ y_true = tf.cast(y_true, tf.int32)
+ y_pred = tf.cast(y_pred, tf.int32)
+
+ true_positives = tf.reduce_sum(tf.cast(tf.equal(y_true, y_pred) & tf.equal(y_true, 1), tf.float32))
+ false_negatives = tf.reduce_sum(tf.cast(tf.not_equal(y_true, y_pred) & tf.equal(y_true, 1), tf.float32))
+
+ self.true_positives.assign_add(true_positives)
+ self.false_negatives.assign_add(false_negatives)
+
+ def result(self):
+ return self.true_positives / (self.true_positives + self.false_negatives + tf.keras.backend.epsilon())
+
+ def reset_states(self):
+ self.true_positives.assign(0.0)
+ self.false_negatives.assign(0.0)
+
+ def get_config(self):
+ # Return the configuration of the metric as a dictionary
+ config = super().get_config()
+ return config
+
+ @classmethod
+ def from_config(cls, config):
+ return cls(**config)
+
+ mo.md(
+ r"""
+ ### Define Recall Mectric
+
+ **Recall** measures how many actual positive labels were correctly predicted. It is calculated as:
+
+ \[
+ \text{Recall} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Negatives}}
+ \]
+
+ - **False Negatives (FN)**: Cases where the model predicted `0` (negative), but the actual label is `1`.
+ - This is similar to precision but focuses on capturing all actual positives.
+ """
+ )
+ return (RecallMetric,)
+
+
+@app.cell(hide_code=True)
+def f1_score_metric(PrecisionMetric, RecallMetric, keras, mo, tf):
+ @keras.saving.register_keras_serializable(package="Custom")
+ class F1ScoreMetric(tf.keras.metrics.Metric):
+ def __init__(self, name="f1_score", **kwargs):
+ super(F1ScoreMetric, self).__init__(name=name, **kwargs)
+ self.precision = PrecisionMetric()
+ self.recall = RecallMetric()
+
+ def update_state(self, y_true, y_pred, sample_weight=None):
+ self.precision.update_state(y_true, y_pred, sample_weight)
+ self.recall.update_state(y_true, y_pred, sample_weight)
+
+ def result(self):
+ precision = self.precision.result()
+ recall = self.recall.result()
+ return 2 * (precision * recall) / (precision + recall + tf.keras.backend.epsilon())
+
+ def reset_states(self):
+ self.precision.reset_states()
+ self.recall.reset_states()
+
+ def get_config(self):
+ # Return the configuration of the metric as a dictionary
+ config = super().get_config()
+ # config = super(F1Score, self).get_config()
+ return config
+
+ @classmethod
+ def from_config(cls, config):
+ return cls(**config)
+
+ mo.md(
+ r"""
+ ### Define F1 Score Metric
+
+ F1-score balances precision and recall, especially useful when the dataset is imbalanced. It is the harmonic mean of precision and recall:
+
+ \[
+ \text{F1-Score} = 2 \cdot \frac{\text{Precision} \cdot \text{Recall}}{\text{Precision} + \text{Recall}}
+ \]
+
+ - `epsilon` is added to avoid division by zero.
+ """
+ )
+ return (F1ScoreMetric,)
+
+
+@app.cell(hide_code=True)
+def custom_validation_metrics(X_val, mo, tf, y_val):
+ #Custom callback to compute metrics on validation data
+ class CustomValidationMetrics(tf.keras.callbacks.Callback):
+ def on_epoch_end(self, epoch, logs=None):
+ val_predictions = self.model.predict(X_val, verbose=0)
+ val_predictions = (val_predictions > 0.5).astype(int) # Binarize predictions
+
+ precision = tf.keras.metrics.Precision()(y_val, val_predictions)
+ recall = tf.keras.metrics.Recall()(y_val, val_predictions)
+ f1_score = 2 * (precision * recall) / (precision + recall + 1e-7)
+
+ print(f"\nEpoch {epoch + 1} Validation Metrics - Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1_score:.4f}")
+
+ mo.md(
+ r"""
+ ### Custom Validation Metrics
+
+ To apply metrics only to the validation set in Keras, via custom callback that computes metrics on the validation data at the end of each epoch.
+
+ Keras does not seems to have a natively support specifying metrics exclusively for validation data in the compile() method
+ """
+ )
+ return (CustomValidationMetrics,)
+
+
+@app.cell(hide_code=True)
+def perfspec_prepare_model_train(mo, np):
+ def prepare_train(perfspec):
+ from tensorflow.keras.utils import to_categorical
+ from tensorflow.keras.preprocessing.sequence import pad_sequences
+ #from sklearn.preprocessing import LabelEncoder
+
+ # Parameters
+ X = []
+ y = []
+ sequence_length = perfspec['settings']['sequence_length']
+
+ # Generate input-output pairs
+ for j in range(len(perfspec['vars']['encoded_actions']) - sequence_length):
+ X.append(perfspec['vars']['encoded_actions'][j:j + sequence_length]) # Input sequence
+ y.append(perfspec['vars']['encoded_actions'][j + sequence_length]) # Target (next action)
+
+ X = np.array(X)
+ y = np.array(y)
+
+ # Ensure _X has the correct shape for LSTM
+ X = pad_sequences(X, maxlen=sequence_length, padding='pre')
+ X = np.expand_dims(X, axis=-1) # Shape: (num_samples, sequence_length, 1)
+
+ _y = y
+ vocab_size =len(perfspec['vars']['label_encoder'].classes_) # Total number of unique actions
+ # One-hot encode _y for classification
+ y = to_categorical(y, num_classes=vocab_size)
+
+ return (X,y,_y)
+
+ mo.md(
+ r"""
+ ## Prepare train model input
+
+ Load input and **shape X,y** for model
+
+ """
+ )
+ return (prepare_train,)
+
+
+@app.cell(hide_code=True)
+def show_train_model_shape(mo, perfspec, prepare_train, verbose):
+ (perfspec['vars']['X'], perfspec['vars']['y'],perfspec['vars']['_y']) = prepare_train(perfspec)
+
+ if verbose != None or mo.cli_args().get("verbose") != None or mo.running_in_notebook():
+ print(f"X shape : {perfspec['vars']['X'].shape}")
+ print(f"y shape : {perfspec['vars']['y'].shape}")
+
+ mo.md(
+ r"""
+ ### Show train model shape
+
+ """
+ )
+ return
+
+
+@app.cell(hide_code=True)
+def make_model(mo, np, perfspec):
+ # Define the LSTM model
+ def make_model(X=[],y=[],label_encoder=[], encoded_actions=[]):
+ if len(X) == 0 or len(y) == 0:
+ print ("make_model: No values fond for X y i")
+ return
+ from tensorflow.keras.models import Sequential
+ from tensorflow.keras.layers import Input, LSTM, Dense, Embedding, Dropout
+ from tensorflow.keras.optimizers import Adam
+ from tensorflow.keras.callbacks import (
+ EarlyStopping,
+ ReduceLROnPlateau,
+ Callback,
+ ModelCheckpoint,
+ )
+
+ vocab_size = len(
+ label_encoder.classes_
+ ) # Get the number of unique action labels
+
+ # Model architecture
+ embedding_dim = 50
+ #input_shape = (perfspec['settings']['input_length'],perfspec['settings']['input_length'],vocab_size),
+ perfspec['vars']['model'] = Sequential(
+ [
+ #Embedding(input_dim=vocab_size, output_dim=embedding_dim),
+ LSTM(
+ perfspec['settings']['lstm_units_1'],
+ return_sequences=True,
+ recurrent_dropout=perfspec['settings']['dropout_rate'],
+ #input_shape = (2,vocab_size),
+ ),
+ Input(shape=(perfspec['settings']['sequence_length'], 1)),
+ LSTM(
+ perfspec['settings']['lstm_units_2'],
+ return_sequences=False,
+ recurrent_dropout=perfspec['settings']['dropout_rate'],
+ ),
+ Dropout(0.5),
+ Dense(
+ vocab_size, activation="softmax"
+ ), # Softmax for multi-class classification
+ ]
+ )
+
+ # Split the data for training and validation
+ train_size = int(perfspec['defaults']['train_size'] * len(X))
+ train_X, val_X = X[:train_size], X[train_size:]
+ train_y, val_y = y[:train_size], y[train_size:]
+
+ if train_y.ndim > 1 and train_y.shape[-1] == vocab_size:
+ loss_function = "categorical_crossentropy"
+ else:
+ # Convert targets to integers if needed
+ train_y = np.argmax(train_y, axis=-1) if train_y.ndim > 1 else train_y
+ val_y = np.argmax(val_y, axis=-1) if val_y.ndim > 1 else val_y
+ loss_function = "sparse_categorical_crossentropy"
+
+ # Compile the model
+ perfspec['vars']['model'].compile(
+ optimizer=Adam(),
+ loss=loss_function,
+ metrics=[
+ "accuracy"
+ # PrecisionMetric(),
+ # RecallMetric(),
+ # F1ScoreMetric()
+ ],
+ )
+
+ # Create confusion matrix callback
+ #confusion_callback = ConfusionMatrixCallback(
+ # val_data=(val_X, val_y),
+ # label_encoder=label_encoder,
+ # display=True
+ #)
+
+ # Callbacks
+ early_stopping = EarlyStopping(
+ monitor="val_loss", patience=5, restore_best_weights=True
+ )
+ lr_reduction = ReduceLROnPlateau(
+ monitor="val_loss", patience=3, factor=0.5, min_lr=0.0001
+ )
+ if perfspec['settings']['checkpoint_mode'] == "weights":
+ # Save only the weights of the model instead of the full model.
+ checkpoint = ModelCheckpoint(
+ filepath=perfspec['settings']['checkpoint_filepath'].as_posix(),
+ save_best_only=True, # True to save only when validation loss improves
+ monitor='val_loss', # Metric to monitor
+ mode='min', # Minimize the validation loss
+ verbose=1 # Print messages when saving
+ )
+ elif perfspec['settings']['checkpoint_mode'] == "epochs":
+ # Define to collect save checkpoint at the end of every epoch via callback
+ #_checkpoint_filepath = checkpoint_dirpath + '/' + 'model_at_epoch_{epoch:02d}.h5'
+ checkpoint = ModelCheckpoint(
+ filepath=perfspec['settings']['checkpoint_filepath'].as_posix(),
+ save_best_only=False, # True to save only when validation loss improves
+ verbose=1 # Print messages when saving
+ )
+
+ callbacks=[early_stopping,lr_reduction] #,CustomValidationMetrics]
+ callbacks=[] #,CustomValidationMetrics]
+ if checkpoint != None:
+ callbacks.append(checkpoint)
+
+ # Print the model summary
+ perfspec['vars']['model'].summary()
+ print (" train size: {}".format(train_size))
+ print (" train_X size: {}".format(len(train_X)))
+ print (" train_y size: {}".format(len(train_y)))
+ print (" val_X size: {}".format(len(train_X)))
+ print (" val_y size: {}".format(len(train_y)))
+
+ # Train the model
+ perfspec['vars']['history'] = perfspec['vars']['model'].fit(
+ train_X,
+ train_y,
+ batch_size=perfspec['settings']['batch_size'],
+ epochs=perfspec['defaults']['epochs'],
+ validation_data=(val_X, val_y),
+ verbose=2,
+ callbacks=callbacks
+ )
+ return
+
+ mo.md(
+ r"""
+ ## Make Model train
+
+ This is where **model** is creates and **fit**
+
+ Saved in `perfspec['vars'] as `model` and `history`
+ """
+ )
+ return (make_model,)
+
+
+@app.cell(hide_code=True)
+def main(mo):
+ mo.md(r"""""")
+ return
+
+
+@app.cell(hide_code=True)
+def use_or_make_model(
+ Path,
+ load_model_from_path,
+ make_model,
+ mo,
+ perfspec,
+ verbose,
+):
+ if Path(perfspec['settings']['model_filepath']).exists():
+ if mo.running_in_notebook() or (verbose != None or mo.cli_args().get("verbose") != None):
+ print (f"Trained model path already exist, to train model DELETE existing path: {perfspec['settings']['model_filepath']}")
+ perfspec['vars']['model']=load_model_from_path(perfspec,perfspec['settings']['verbose'])
+ else:
+ if perfspec['vars'].get('X').any() and perfspec['vars'].get('y').any():
+ make_model(
+ X=perfspec['vars']['X'],
+ y=perfspec['vars']['y'],
+ label_encoder=perfspec['vars']['label_encoder'],
+ encoded_actions=perfspec['vars']['encoded_actions']
+ )
+ #evaluate_model(the_model,X,y)
+
+ mo.md(
+ f"""
+ ### Use or Train Model
+
+ **IMPORTANT** if model_filepathmodel_filepath not exists it will be created otherwise it will be loaded form model_filepathmodel_filepath
+
+ Path: "🍃" {perfspec['settings']['model_filepath']}
+ """
+ )
+ return
+
+
+@app.cell(hide_code=True)
+def perfspec_load_model_from_path(Path, mo, perfspec):
+ def load_model_from_path(perfspec,verbose):
+ from keras.models import load_model
+ custom_objects = {
+ # "PrecisionMetric": PrecisionMetric,
+ # "RecallMetric": RecallMetric,
+ # "F1ScoreMetric": F1ScoreMetric,
+ }
+ if Path(perfspec['settings']['model_filepath']).exists():
+ perfspec['vars']['model'] = load_model(
+ perfspec['settings']['model_filepath'],
+ #custom_objects=custom_objects
+ )
+ if verbose != None and not mo.running_in_notebook():
+ print (f"Trained model loaded from: {perfspec['settings']['model_filepath']}")
+ return perfspec['vars']['model']
+ else:
+ return None
+
+ mo.md(
+ f"""
+ ### Load model from path
+
+ Load trained model from file **model_filepath** 🍃
+
+ Path: {perfspec['settings']['model_filepath']}
+ """
+ )
+ return (load_model_from_path,)
+
+
+@app.cell(hide_code=True)
+def perfspec_save_model(Path, mo, perfspec):
+ def save_model():
+ if not Path(perfspec['settings']['model_filepath']).exists() and perfspec['vars']['model'] != None:
+ # Save the model
+ print (f"Save trained model to: {perfspec['settings']['model_filepath']}")
+ perfspec['vars']['model'].save(perfspec['settings']['model_filepath'])
+ if perfspec['vars']['history'] != None:
+ import json
+ try:
+ with open(perfspec['settings']['model_history_filepath'], 'w') as output_file:
+ json.dump(perfspec['vars']['history'].history,output_file)
+ print (f"Save trained model history to: {perfspec['settings']['model_history_filepath']}")
+ except IOError as e:
+ print(f"Error writing to file: {e}")
+ except json.JSONDecodeError as e:
+ print(f"Error encoding JSON: {e}")
+
+ save_model()
+ mo.md(
+ f"""
+ ## Save Model
+
+ to `model_filepath` "🍃" {perfspec['settings']['model_filepath']}
+
+ Path can be changed via **command-line** with **--model** `model-filepath`
+
+ """
+ )
+ return (save_model,)
+
+
+@app.cell(hide_code=True)
+def perfspec_plot_history(Path, mo):
+ def plot_history(perfspec):
+ import json
+ if 'vars' not in perfspec:
+ return None
+ if perfspec['vars']['history'] != None:
+ if 'history' in perfspec['vars']['history']:
+ _model_history = perfspec['vars']['history'].history
+ else:
+ _model_history = perfspec['vars']['history']
+ else:
+ if not Path(perfspec['settings']['model_history_filepath']).exists():
+ print(f"Not found: {perfspec['settings']['model_history_filepath']}")
+ _model_history = {}
+ else:
+ with open(perfspec['settings']['model_history_filepath']) as history_file:
+ perfspec['vars']['history'] = json.load(history_file)
+ _model_history = perfspec['vars']['history']
+ return _model_history
+
+ mo.md(
+ f"""
+ ## Plot Model history
+
+ """
+ )
+ #if 'plt_accuracy' in perfspec['vars']: mo.as_html(perfspec['vars']['plt_accuracy'].gcf())
+ return (plot_history,)
+
+
+@app.cell(hide_code=True)
+def perfspec_plot_defs(mo, model_history, plot_history):
+ def plot_accuracy(perfspec):
+ _model_history = plot_history(perfspec)
+ if _model_history != None and mo.running_in_notebook():
+ import matplotlib.pyplot as plt
+ import seaborn as sns
+ if _model_history.get('accuracy'):
+ plt.style.use('dark_background')
+ # Plot training & validation accuracy values
+ (_, acc) = plt.subplots()
+ acc.plot(_model_history['accuracy'],color='white')
+ acc.plot(_model_history['val_accuracy'], color='orange')
+ acc.set_title('Model Accuracy')
+ acc.set_xlabel('Epochs')
+ acc.set_ylabel('Accuracy')
+ acc.legend(['Train', 'Test'], loc='upper left')
+ return plt
+ elif perfspec['settings']['verbose'] == 2:
+ if 'accuracy' in _model_history:
+ print ('model val_accuracy')
+ for idx,it in enumerate(_model_history.get('val_accuracy')):
+ print(f"epoch: {idx} value: {it}")
+ return None
+
+ def plot_loss(perfspec):
+ _model_history = plot_history(perfspec)
+ if _model_history != None and mo.running_in_notebook():
+ import matplotlib.pyplot as plt
+ import seaborn as sns
+ if _model_history.get('loss'):
+ plt.style.use('dark_background')
+ (_, loss) = plt.subplots()
+ loss.plot(_model_history['loss'])
+ loss.plot(_model_history['val_loss'])
+ loss.set_title('Model Loss')
+ loss.set_label('Epochs')
+ loss.set_ylabel('Loss')
+ loss.legend(['Train', 'Test'], loc='upper left')
+ return plt
+ elif perfspec['settings']['verbose'] == 2:
+ if 'val_loss' in _model_history:
+ print ('model val_loss')
+ for idx,it in enumerate(_model_history.get('val_loss')):
+ print(f"epoch: {idx} value: {it}")
+ return None
+
+ def plot_precision(perfspec):
+ _model_history = plot_history(perfspec)
+ if _model_history != None and mo.running_in_notebook():
+ import matplotlib.pyplot as plt
+ import seaborn as sns
+ if _model_history.get('precision'):
+ plt.style.use('dark_background')
+ # Plot training & validation accuracy values
+ plt.plot(model_history['learning_rate'])
+ plt.plot(model_history['precision'])
+ plt.plot(model_history['recall'])
+ plt.plot(model_history['f1_score'])
+ plt.title('Model learning')
+ plt.xlabel('Epochs')
+ plt.ylabel('Learning')
+ plt.legend(['Rate','Precision','Recall', 'f1_score'], loc='upper left')
+ return plt
+ else:
+ return None
+ return plot_accuracy, plot_loss, plot_precision
+
+
+@app.cell(hide_code=True)
+def pefspec_ploc_accuracy(mo, perfspec, plot_accuracy):
+ _plot_acc=plot_accuracy(perfspec)
+
+ if perfspec['vars']['history'] != None and mo.running_in_notebook():
+ _output = mo.as_html(_plot_acc.gcf())
+ else:
+ _output = None
+
+ mo.md(
+ f"""
+ ## Model Accuracy history
+
+ From model train plot accuracy and epochs
+
+ {_output}
+
+ """
+ )
+ return
+
+
+@app.cell(hide_code=True)
+def pefspec_plot_loss(mo, perfspec, plot_loss):
+ _plot_loss = plot_loss(perfspec)
+
+ if perfspec['vars']['history'] != None and mo.running_in_notebook():
+ _output = mo.as_html(_plot_loss.gcf())
+ else:
+ _output = None
+
+ mo.md(
+ f"""
+ ## Model loss history
+ From model train loss
+
+ {_output}
+
+ """
+ )
+ return
+
+
+@app.cell(hide_code=True)
+def perfspec_plot_precision(mo, perfspec, plot_precision):
+ _plot_pre = plot_precision(perfspec)
+ if perfspec['vars']['history'] != None and mo.running_in_notebook():
+ mo.md(
+ f"""
+
+ From model train plot Precision
+
+ {mo.as_html(_plot_pre.gcf())}
+
+ """
+ )
+ return
+
+
+@app.cell(hide_code=True)
+def perfspec_confusion_matric(
+ load_model_from_path,
+ make_confusion_matrix,
+ mo,
+ perfspec,
+):
+ if mo.running_in_notebook():
+ if perfspec['vars'].get('model') == None:
+ load_model_from_path(perfspec['settings']['verbose'])
+ if perfspec['vars'].get('model') != None:
+ make_confusion_matrix(perfspec)
+
+ mo.md("### Confusion Matrix")
+ return
+
+
+@app.cell(hide_code=True)
+def perfspec_evaluate_model(Path, mo, np, prepare_train):
+ def evaluate_model(test_input=[], test_labels=[],model=None):
+ if model == None:
+ return
+ if len(test_input) > 0:
+ #Evaluate the model on test data
+ #(test_loss, test_precision, test_recall, test_f1_score) = perfspec['vars']['model'].evaluate(test_input, test_labels)
+ (test_loss,test_accuracy) = model.evaluate(test_input, test_labels)
+ print(f"Test Accuracy: {"{:2.4f}".format(test_accuracy)}")
+ print(f"Test Loss: {"{:2.4f}".format(test_loss)}")
+ #print(f"Precision: {test_precision}")
+ #print(f"Recall: {test_recall}")
+ #print(f"F1 Score: {test_f1_score}")
+
+ def run_evaluate(perfspec):
+ # load_trained_model()
+ if perfspec['vars']['model'] == None:
+ return
+ (perfspec['vars']['X'], perfspec['vars']['y'], perfspec['vars']['_y']) = prepare_train(perfspec)
+ print ("\nEvaluate Model")
+ X=perfspec['vars']['X'],
+ y=perfspec['vars']['y'],
+ train_size = int(perfspec['defaults']['train_size'] * len(X))
+ train_X, val_X = X[:train_size], X[train_size:]
+ train_y, val_y = y[:train_size], y[train_size:]
+ evaluate_model(val_X,val_y,perfspec['vars']['model'])
+
+ def history_info(perfspec):
+ import json
+ if perfspec['vars']['history'] != None:
+ if 'history' in perfspec['vars']['history']:
+ model_history = perfspec['vars']['history'].history
+ else:
+ model_history = perfspec['vars']['history']
+ else:
+ if not Path(perfspec['settings']['model_history_filepath']).exists():
+ print(f"Not found: {perfspec['settings']['model_history_filepath']}")
+ model_history = {}
+ else:
+ with open(perfspec['settings']['model_history_filepath']) as history_file:
+ model_history = json.load(history_file)
+ if model_history != None:
+ from prettytable import PrettyTable
+ rain_loss = model_history['loss']
+ val_loss = model_history['val_loss']
+ train_acc = model_history['accuracy']
+ val_acc = model_history['val_accuracy']
+ table_find = PrettyTable()
+ # Find the epoch with the minimum validation loss
+ best_epoch_val_loss = np.argmin(val_loss) # The index of the minimum validation loss
+ #Find the epoch with the maximum validation accuracy
+ best_epoch_val_acc = np.argmax(val_acc) # The index of the maximum validation accuracy
+ #print ("\nFrom Model History")
+ table_find.field_names = ["description", "value"]
+ table_find.align["description"] = "l"
+ table_find.align["value"] = "r"
+ table_find.border = True
+ table_find.add_rows([
+ ["Best Epoch (Validation Loss)",(best_epoch_val_loss + 1)],
+ ["Validation Loss at Best Epoch","{:2.4f}".format(val_loss[best_epoch_val_loss])],
+ ["Best Epoch (Validation Accuracy)",(best_epoch_val_acc + 1)],
+ ["Validation Accuracy at Best Epoch","{:2.4f}".format(val_acc[best_epoch_val_acc])]
+ ])
+ if mo.running_in_notebook() :
+ return table_find.get_formatted_string('html')
+ else:
+ return table_find.get_formatted_string('text')
+ else:
+ return ""
+ return evaluate_model, history_info, run_evaluate
+
+
+@app.cell(hide_code=True)
+def perfspec_run_evaluate_model_run(
+ history_info,
+ mo,
+ perfspec,
+ run_evaluate,
+):
+ #with mo.capture_stdout() as buffer:
+ # run_evaluate(perfspec)
+ #_output = buffer.getvalue()
+
+ _output=run_evaluate(perfspec)
+ #_history_info = history_info(perfspec)
+
+ mo.md(
+ f"""
+ ## Evaluate Model
+ {mo.md(history_info((perfspec)))}
+ """
+ )
+ return
+
+
+@app.cell(hide_code=True)
+def perfspec_test_model_prediction(
+ lib_get_input_sequence,
+ lib_predict_action,
+ load_model_from_path,
+ mo,
+ perfspec,
+ verbose,
+):
+ _verbose = "1" if mo.running_in_notebook() else perfspec['settings']['verbose']
+ _input_sequence = lib_get_input_sequence(perfspec['settings']['input_str'],perfspec['vars']['unique_actions'])
+ if len(_input_sequence) > 0:
+ if perfspec['vars']['model'] != None:
+ _model = perfspec['vars']['model']
+ else:
+ _model = load_model_from_path(perfspec,verbose)
+ if _model != None:
+ (encoded_input,predicted_probabilities) = lib_predict_action(
+ _model,
+ perfspec['settings']['sequence_length'],
+ _input_sequence,
+ perfspec['vars']['label_encoder'],
+ _verbose
+ )
+
+ mo.md(
+ r"""
+ ## Test Model Prediction
+
+ Use **{input_sequence}** with trained model created or loaded
+
from {model_filepath}
+
+ input value can be changed in **command-line** with **--input** `value` argument
+ with **--verbose** option more info is show in **command-line** mode
+ """
+ )
+ return encoded_input, predicted_probabilities
+
+
+if __name__ == "__main__":
+ app.run()