1122 lines
39 KiB
Python
1122 lines
39 KiB
Python
# /// script
|
|
# requires-python = ">=3.13"
|
|
# dependencies = [
|
|
# "keras==3.8.0",
|
|
# "marimo",
|
|
# "numpy==2.2.2",
|
|
# ]
|
|
# ///
|
|
|
|
import marimo
|
|
|
|
__generated_with = "0.10.17"
|
|
app = marimo.App(width="medium")
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def title():
|
|
import marimo as mo
|
|
notebook_name = 'train_perfspec.py'
|
|
|
|
from lib_perfspec import perfspec_vars
|
|
(_,_defs) = perfspec_vars.run()
|
|
perfspec = _defs['perfspec']
|
|
|
|
from lib_perfspec import perfspec_header
|
|
(_,_defs) = perfspec_header.run()
|
|
lib_header = _defs['header']
|
|
lib_intro = _defs['intro']
|
|
|
|
mo.md(
|
|
f"""
|
|
{lib_header(notebook_name)}
|
|
|
|
## Train **{perfspec['app']['train_mode']}** model
|
|
"""
|
|
)
|
|
return (
|
|
lib_header,
|
|
lib_intro,
|
|
mo,
|
|
notebook_name,
|
|
perfspec,
|
|
perfspec_header,
|
|
perfspec_vars,
|
|
)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def imports():
|
|
import tensorflow as tf
|
|
import numpy as np
|
|
from pathlib import Path
|
|
import keras
|
|
return Path, keras, np, tf
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def intro_load(Path, lib_intro, mo, notebook_name, perfspec):
|
|
verbose = perfspec['settings']['verbose']
|
|
perfspec['vars'] = {}
|
|
|
|
from lib_perfspec import perfspec_args
|
|
(_,_defs) = perfspec_args.run()
|
|
|
|
if not Path(perfspec['defaults']['models_dirpath']).exists():
|
|
exit(f"Trained models dir path not found: {perfspec['defaults']['models_dirpath']}")
|
|
|
|
if not Path(perfspec['defaults']['checkpoints_dirpath']).exists():
|
|
exit(f"Trained checkpoints models dir path not found: {perfspec['defaults']['checkpoints_dirpath']}")
|
|
|
|
if not Path(perfspec['defaults']['data_dirpath']).exists():
|
|
exit(f"data dir path not found: {perfspec['defaults']['data_dirpath']}")
|
|
|
|
from lib_perfspec import perfspec_load_actions
|
|
(_,_defs) = perfspec_load_actions.run()
|
|
lib_load_actions = _defs['load_actions']
|
|
|
|
from lib_perfspec import perfspec_input_sequence
|
|
(_,_defs) = perfspec_input_sequence.run()
|
|
lib_get_input_sequence = _defs['get_input_sequence']
|
|
|
|
from lib_perfspec import perfspec_predict
|
|
_, _defs = perfspec_predict.run()
|
|
lib_predict_action = _defs['predict_action']
|
|
|
|
verbose=perfspec['settings'].get('verbose')
|
|
|
|
perfspec['vars']['model'] = None
|
|
perfspec['vars']['history'] = None
|
|
|
|
(perfspec['vars']['actions'],
|
|
perfspec['vars']['unique_actions'],
|
|
perfspec['vars']['label_encoder'],
|
|
perfspec['vars']['encoded_actions']
|
|
) = lib_load_actions(
|
|
actions_path=perfspec['settings'].get('actions_filepath'),
|
|
verbose=verbose
|
|
)
|
|
|
|
perfspec['vars']['input_sequence'] = lib_get_input_sequence(
|
|
input_str=perfspec['settings']['input_str'],
|
|
unique_actions=perfspec['vars']['unique_actions']
|
|
)
|
|
|
|
mo.md(
|
|
f"""
|
|
{lib_intro(notebook_name)}
|
|
|
|
"""
|
|
)
|
|
return (
|
|
lib_get_input_sequence,
|
|
lib_load_actions,
|
|
lib_predict_action,
|
|
perfspec_args,
|
|
perfspec_input_sequence,
|
|
perfspec_load_actions,
|
|
perfspec_predict,
|
|
verbose,
|
|
)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def setting(mo, notebook_name, perfspec):
|
|
from lib_perfspec import perfspec_out_settings
|
|
(_,_defs) = perfspec_out_settings.run()
|
|
out_settings = _defs['out_settings']
|
|
|
|
mo.md(
|
|
f"""
|
|
Settings are defined in: [lib_perfspec.py]({mo.notebook_dir()} / lib_perfspec.py)
|
|
|
|
{out_settings(notebook_name)}
|
|
|
|
Total of <u>values</u> from data: **{len(perfspec['vars']['actions'])}**
|
|
with total of <u>unique values</u>: **{len(perfspec['vars']['unique_actions'])}**
|
|
|
|
> <h4>Train size <u>{perfspec['defaults']['train_size']}</u> to train model: **{
|
|
int(perfspec['defaults']['train_size'] * len(perfspec['vars']['actions']))
|
|
}** with sequence of **{perfspec['defaults']['sequence_length']}**</h4>
|
|
|
|
Values can be overwritten by using **command-line** (see options below).
|
|
"""
|
|
)
|
|
return out_settings, perfspec_out_settings
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def command_line(mo, notebook_name):
|
|
from lib_perfspec import perfspec_cli_ops
|
|
(_,_defs) = perfspec_cli_ops.run()
|
|
out_cli_ops = _defs['out_cli_ops']
|
|
mo.accordion({
|
|
"Mostrar command Line options ": out_cli_ops(notebook_name)
|
|
})
|
|
return out_cli_ops, perfspec_cli_ops
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_define_confusion_matrix(
|
|
load_model_from_path,
|
|
mo,
|
|
np,
|
|
prepare_train,
|
|
):
|
|
def make_confusion_matrix(perfspec):
|
|
from tensorflow.keras.utils import to_categorical
|
|
from tensorflow.keras.preprocessing.sequence import pad_sequences
|
|
from sklearn.preprocessing import LabelEncoder
|
|
from sklearn.metrics import confusion_matrix
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
|
|
if perfspec['vars']['model'] == None:
|
|
model = load_model_from_path(perfspec['settings']['verbose'])
|
|
if perfspec['vars'].get('model') != None:
|
|
print("No model found")
|
|
return
|
|
else:
|
|
model = perfspec['vars']['model']
|
|
|
|
(perfspec['vars']['X'],perfspec['vars']['y'],perfspec['vars']['_y']) = prepare_train(perfspec)
|
|
#X = []
|
|
#y = []
|
|
#sequence_length = perfspec['settings']['sequence_length']
|
|
|
|
# Generate input-output pairs
|
|
#for j in range(len(perfspec['vars']['encoded_actions']) - sequence_length):
|
|
# X.append(perfspec['vars']['encoded_actions'][j:j + sequence_length]) # Input sequence
|
|
# y.append(perfspec['vars']['encoded_actions'][j + sequence_length]) # Target (next action)
|
|
|
|
#X = np.array(X)
|
|
#y = np.array(y)
|
|
|
|
## Ensure _X has the correct shape for LSTM
|
|
#X = pad_sequences(X, maxlen=perfspec['settings']['sequence_length'], padding='pre')
|
|
#X = np.expand_dims(X, axis=-1) # Shape: (num_samples, sequence_length, 1)
|
|
|
|
X = perfspec['vars']['X']
|
|
y = perfspec['vars']['_y']
|
|
encoder = LabelEncoder()
|
|
y_encoded = encoder.fit_transform(y)
|
|
#y_encoded = encoder.fit_transform(perfspec['vars']['unique_actions'])
|
|
#y_encoded = encoded_input
|
|
|
|
# Split the dataset into training and testing sets
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42)
|
|
|
|
# Predict with your model (replace this with actual predictions)
|
|
y_pred = model.predict(X_test) # This returns the predicted probabilities
|
|
#y_pred = predicted_probabilities
|
|
y_pred_classes = np.argmax(y_pred, axis=1) # Get class predictions from probabilities
|
|
|
|
# Plot confusion matrix
|
|
cm = confusion_matrix(y_test, y_pred_classes)
|
|
ax = sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels='auto', yticklabels='auto')
|
|
ax.set_xticklabels(ax.get_xticklabels(), fontsize=7)
|
|
ax.set_yticklabels(ax.get_yticklabels(), fontsize=7)
|
|
|
|
plt.title("Confusion Matrix")
|
|
plt.xlabel("Predicted Labels")
|
|
plt.ylabel("True Labels")
|
|
plt.show()
|
|
|
|
mo.md(
|
|
r"""
|
|
### Define Confusion Matrix
|
|
|
|
|
|
"""
|
|
)
|
|
return (make_confusion_matrix,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def consfusion_matrix_callback(
|
|
ConfusionMatrixDisplay,
|
|
confusion_matrix,
|
|
keras,
|
|
mo,
|
|
np,
|
|
plt,
|
|
tf,
|
|
):
|
|
@keras.saving.register_keras_serializable(package="Custom")
|
|
class ConfusionMatrixCallback(tf.keras.callbacks.Callback):
|
|
def __init__(self, val_data, label_encoder=None, display=False):
|
|
"""
|
|
Custom callback to compute and display a confusion matrix at the end of each epoch.
|
|
|
|
Parameters:
|
|
- val_data: Tuple (X_val, y_val) for validation data.
|
|
- label_encoder: Optional, if labels are encoded, to map back to original labels.
|
|
- display: Whether to display the confusion matrix as a plot.
|
|
"""
|
|
super().__init__()
|
|
self.val_data = val_data
|
|
self.label_encoder = label_encoder
|
|
self.display = display
|
|
|
|
def on_epoch_end(self, epoch, logs=None):
|
|
# Get the validation data
|
|
val_X, val_y = self.val_data
|
|
|
|
# Predict the classes for validation data
|
|
y_pred = self.model.predict(val_X, verbose=0)
|
|
y_pred_classes = np.argmax(y_pred, axis=1)
|
|
|
|
# If label encoding is provided, decode the labels
|
|
if self.label_encoder:
|
|
val_y = self.label_encoder.inverse_transform(val_y)
|
|
y_pred_classes = self.label_encoder.inverse_transform(y_pred_classes)
|
|
|
|
# Compute the confusion matrix
|
|
cm = confusion_matrix(val_y, y_pred_classes)
|
|
|
|
# Optionally display or save the confusion matrix
|
|
if self.display:
|
|
self.plot_confusion_matrix(cm, epoch)
|
|
|
|
# Print the confusion matrix to console
|
|
print(f"Epoch {epoch + 1}: Confusion Matrix\n{cm}")
|
|
|
|
def plot_confusion_matrix(self, cm, epoch):
|
|
"""
|
|
Plot and display the confusion matrix using matplotlib.
|
|
"""
|
|
plt.figure(figsize=(8, 8))
|
|
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
|
|
disp.plot(cmap='viridis', values_format='d')
|
|
plt.title(f'Confusion Matrix - Epoch {epoch + 1}')
|
|
plt.show()
|
|
|
|
def get_config(self):
|
|
# Return the configuration of the metric as a dictionary
|
|
config = super().get_config()
|
|
return config
|
|
|
|
mo.md(
|
|
r"""
|
|
### Confusion Matrix Callback
|
|
"""
|
|
)
|
|
return (ConfusionMatrixCallback,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def precsion_metric(F1Score, keras, mo, tf):
|
|
@keras.saving.register_keras_serializable(package="Custom")
|
|
class PrecisionMetric(tf.keras.metrics.Metric):
|
|
def __init__(self, name="precision", **kwargs):
|
|
super(PrecisionMetric, self).__init__(name=name, **kwargs)
|
|
self.true_positives = self.add_weight(name="tp", initializer="zeros")
|
|
self.false_positives = self.add_weight(name="fp", initializer="zeros")
|
|
|
|
def update_state(self, y_true, y_pred, sample_weight=None):
|
|
y_pred = tf.argmax(y_pred, axis=1)
|
|
y_true = tf.cast(y_true, tf.int32)
|
|
y_pred = tf.cast(y_pred, tf.int32)
|
|
|
|
true_positives = tf.reduce_sum(tf.cast(tf.equal(y_true, y_pred) & tf.equal(y_true, 1), tf.float32))
|
|
false_positives = tf.reduce_sum(tf.cast(tf.not_equal(y_true, y_pred) & tf.equal(y_pred, 1), tf.float32))
|
|
|
|
self.true_positives.assign_add(true_positives)
|
|
self.false_positives.assign_add(false_positives)
|
|
|
|
def result(self):
|
|
return self.true_positives / (self.true_positives + self.false_positives + tf.keras.backend.epsilon())
|
|
|
|
def reset_states(self):
|
|
self.true_positives.assign(0.0)
|
|
self.false_positives.assign(0.0)
|
|
|
|
def get_config(self):
|
|
# Return the configuration of the metric as a dictionary
|
|
config = super().get_config()
|
|
return config
|
|
|
|
def get_config(self):
|
|
base_config = super(F1Score, self).get_config()
|
|
return base_config
|
|
|
|
@classmethod
|
|
def from_config(cls, config):
|
|
return cls(**config)
|
|
|
|
mo.md(
|
|
r"""
|
|
### Precision Metric
|
|
|
|
**Precision** measures how many of the predicted positive labels are actually correct. It is calculated as:
|
|
|
|
\[
|
|
\text{Precision} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Positives}}
|
|
\]
|
|
|
|
- **True Positives (TP)**: Cases where the model predicted `1` (positive) and the actual label is also `1`.
|
|
- **False Positives (FP)**: Cases where the model predicted `1` (positive), but the actual label is not `1`.
|
|
- The logical conditions check if predictions match and filter for the positive class (`1`).
|
|
- These counts are accumulated over batches.
|
|
"""
|
|
)
|
|
return (PrecisionMetric,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def recall_metric(keras, mo, tf):
|
|
@keras.saving.register_keras_serializable(package="Custom")
|
|
class RecallMetric(tf.keras.metrics.Metric):
|
|
def __init__(self, name="recall", **kwargs):
|
|
super(RecallMetric, self).__init__(name=name, **kwargs)
|
|
self.true_positives = self.add_weight(name="tp", initializer="zeros")
|
|
self.false_negatives = self.add_weight(name="fn", initializer="zeros")
|
|
|
|
def update_state(self, y_true, y_pred, sample_weight=None):
|
|
y_pred = tf.argmax(y_pred, axis=1)
|
|
y_true = tf.cast(y_true, tf.int32)
|
|
y_pred = tf.cast(y_pred, tf.int32)
|
|
|
|
true_positives = tf.reduce_sum(tf.cast(tf.equal(y_true, y_pred) & tf.equal(y_true, 1), tf.float32))
|
|
false_negatives = tf.reduce_sum(tf.cast(tf.not_equal(y_true, y_pred) & tf.equal(y_true, 1), tf.float32))
|
|
|
|
self.true_positives.assign_add(true_positives)
|
|
self.false_negatives.assign_add(false_negatives)
|
|
|
|
def result(self):
|
|
return self.true_positives / (self.true_positives + self.false_negatives + tf.keras.backend.epsilon())
|
|
|
|
def reset_states(self):
|
|
self.true_positives.assign(0.0)
|
|
self.false_negatives.assign(0.0)
|
|
|
|
def get_config(self):
|
|
# Return the configuration of the metric as a dictionary
|
|
config = super().get_config()
|
|
return config
|
|
|
|
@classmethod
|
|
def from_config(cls, config):
|
|
return cls(**config)
|
|
|
|
mo.md(
|
|
r"""
|
|
### Define Recall Mectric
|
|
|
|
**Recall** measures how many actual positive labels were correctly predicted. It is calculated as:
|
|
|
|
\[
|
|
\text{Recall} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Negatives}}
|
|
\]
|
|
|
|
- **False Negatives (FN)**: Cases where the model predicted `0` (negative), but the actual label is `1`.
|
|
- This is similar to precision but focuses on capturing all actual positives.
|
|
"""
|
|
)
|
|
return (RecallMetric,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def f1_score_metric(PrecisionMetric, RecallMetric, keras, mo, tf):
|
|
@keras.saving.register_keras_serializable(package="Custom")
|
|
class F1ScoreMetric(tf.keras.metrics.Metric):
|
|
def __init__(self, name="f1_score", **kwargs):
|
|
super(F1ScoreMetric, self).__init__(name=name, **kwargs)
|
|
self.precision = PrecisionMetric()
|
|
self.recall = RecallMetric()
|
|
|
|
def update_state(self, y_true, y_pred, sample_weight=None):
|
|
self.precision.update_state(y_true, y_pred, sample_weight)
|
|
self.recall.update_state(y_true, y_pred, sample_weight)
|
|
|
|
def result(self):
|
|
precision = self.precision.result()
|
|
recall = self.recall.result()
|
|
return 2 * (precision * recall) / (precision + recall + tf.keras.backend.epsilon())
|
|
|
|
def reset_states(self):
|
|
self.precision.reset_states()
|
|
self.recall.reset_states()
|
|
|
|
def get_config(self):
|
|
# Return the configuration of the metric as a dictionary
|
|
config = super().get_config()
|
|
# config = super(F1Score, self).get_config()
|
|
return config
|
|
|
|
@classmethod
|
|
def from_config(cls, config):
|
|
return cls(**config)
|
|
|
|
mo.md(
|
|
r"""
|
|
### Define F1 Score Metric
|
|
|
|
F1-score balances precision and recall, especially useful when the dataset is imbalanced. It is the harmonic mean of precision and recall:
|
|
|
|
\[
|
|
\text{F1-Score} = 2 \cdot \frac{\text{Precision} \cdot \text{Recall}}{\text{Precision} + \text{Recall}}
|
|
\]
|
|
|
|
- `epsilon` is added to avoid division by zero.
|
|
"""
|
|
)
|
|
return (F1ScoreMetric,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def custom_validation_metrics(mo, tf):
|
|
#Custom callback to compute metrics on validation data
|
|
class CustomValidationMetrics(tf.keras.callbacks.Callback):
|
|
def __init__(self, X_val, y_val):
|
|
super().__init__() # Initialize the parent class
|
|
self.X_val = X_val
|
|
self.y_val = y_val
|
|
|
|
def on_epoch_end(self, epoch, logs=None):
|
|
val_predictions = self.model.predict(self.X_val, verbose=0)
|
|
val_predictions = (val_predictions > 0.5).astype(int) # Binarize predictions
|
|
|
|
# Compute precision, recall, and f1-score
|
|
precision = tf.keras.metrics.Precision()(self.y_val, val_predictions)
|
|
recall = tf.keras.metrics.Recall()(self.y_val, val_predictions)
|
|
f1_score = 2 * (precision * recall) / (precision + recall + 1e-7)
|
|
|
|
print(f"\nEpoch {epoch + 1} Validation Metrics - Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1_score:.4f}")
|
|
|
|
mo.md(
|
|
r"""
|
|
### Custom Validation Metrics
|
|
|
|
To apply metrics only to the validation set in Keras, via custom callback that computes metrics on the validation data at the end of each epoch.
|
|
|
|
Keras does not seems to have a natively support specifying metrics exclusively for validation data in the compile() method
|
|
"""
|
|
)
|
|
return (CustomValidationMetrics,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_prepare_model_train(mo, np):
|
|
def prepare_train(perfspec):
|
|
from tensorflow.keras.utils import to_categorical
|
|
from tensorflow.keras.preprocessing.sequence import pad_sequences
|
|
#from sklearn.preprocessing import LabelEncoder
|
|
|
|
# Parameters
|
|
X = []
|
|
y = []
|
|
sequence_length = perfspec['settings']['sequence_length']
|
|
|
|
# Generate input-output pairs
|
|
for j in range(len(perfspec['vars']['encoded_actions']) - sequence_length):
|
|
X.append(perfspec['vars']['encoded_actions'][j:j + sequence_length]) # Input sequence
|
|
y.append(perfspec['vars']['encoded_actions'][j + sequence_length]) # Target (next action)
|
|
|
|
X = np.array(X)
|
|
y = np.array(y)
|
|
|
|
# Ensure _X has the correct shape for LSTM
|
|
X = pad_sequences(X, maxlen=sequence_length, padding='pre')
|
|
X = np.expand_dims(X, axis=-1) # Shape: (num_samples, sequence_length, 1)
|
|
|
|
_y = y
|
|
vocab_size =len(perfspec['vars']['label_encoder'].classes_) # Total number of unique actions
|
|
# One-hot encode _y for classification
|
|
y = to_categorical(y, num_classes=vocab_size)
|
|
|
|
return (X,y,_y)
|
|
|
|
mo.md(
|
|
r"""
|
|
## Prepare train model input
|
|
|
|
Load input and **shape X,y** for model
|
|
|
|
"""
|
|
)
|
|
return (prepare_train,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def show_train_model_shape(mo, perfspec, prepare_train, verbose):
|
|
(perfspec['vars']['X'], perfspec['vars']['y'],perfspec['vars']['_y']) = prepare_train(perfspec)
|
|
|
|
if verbose != None or mo.cli_args().get("verbose") != None or mo.running_in_notebook():
|
|
print(f"X shape : {perfspec['vars']['X'].shape}")
|
|
print(f"y shape : {perfspec['vars']['y'].shape}")
|
|
|
|
mo.md(
|
|
r"""
|
|
### Show train model shape
|
|
|
|
"""
|
|
)
|
|
return
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def make_model(CustomValidationMetrics, mo, np, perfspec):
|
|
# Define the LSTM model
|
|
def make_model(X=[],y=[],label_encoder=[], encoded_actions=[]):
|
|
if len(X) == 0 or len(y) == 0:
|
|
print ("make_model: No values fond for X y i")
|
|
return
|
|
from tensorflow.keras.models import Sequential
|
|
from tensorflow.keras.layers import Input, LSTM, Dense, Embedding, Dropout
|
|
from tensorflow.keras.optimizers import Adam
|
|
from tensorflow.keras.callbacks import (
|
|
EarlyStopping,
|
|
ReduceLROnPlateau,
|
|
Callback,
|
|
ModelCheckpoint,
|
|
)
|
|
|
|
vocab_size = len(
|
|
label_encoder.classes_
|
|
) # Get the number of unique action labels
|
|
|
|
# Model architecture
|
|
embedding_dim = 50
|
|
#input_shape = (perfspec['settings']['input_length'],perfspec['settings']['input_length'],vocab_size),
|
|
perfspec['vars']['model'] = Sequential(
|
|
[
|
|
#Embedding(input_dim=vocab_size, output_dim=embedding_dim),
|
|
Input(shape=(perfspec['settings']['sequence_length'], 1)),
|
|
LSTM(
|
|
perfspec['settings']['lstm_units_1'],
|
|
return_sequences=True,
|
|
recurrent_dropout=perfspec['settings']['dropout_rate'],
|
|
input_shape=(perfspec['settings']['sequence_length'], 1),
|
|
),
|
|
LSTM(
|
|
perfspec['settings']['lstm_units_2'],
|
|
return_sequences=False,
|
|
recurrent_dropout=perfspec['settings']['dropout_rate'],
|
|
),
|
|
Dropout(0.5),
|
|
Dense(
|
|
vocab_size, activation="softmax"
|
|
), # Softmax for multi-class classification
|
|
]
|
|
)
|
|
|
|
# Split the data for training and validation
|
|
train_size = int(perfspec['defaults']['train_size'] * len(X))
|
|
train_X, val_X = X[:train_size], X[train_size:]
|
|
train_y, val_y = y[:train_size], y[train_size:]
|
|
|
|
if train_y.ndim > 1 and train_y.shape[-1] == vocab_size:
|
|
loss_function = "categorical_crossentropy"
|
|
else:
|
|
# Convert targets to integers if needed
|
|
train_y = np.argmax(train_y, axis=-1) if train_y.ndim > 1 else train_y
|
|
val_y = np.argmax(val_y, axis=-1) if val_y.ndim > 1 else val_y
|
|
loss_function = "sparse_categorical_crossentropy"
|
|
|
|
# Compile the model
|
|
perfspec['vars']['model'].compile(
|
|
optimizer=Adam(),
|
|
loss=loss_function,
|
|
metrics=[
|
|
"accuracy"
|
|
# PrecisionMetric(),
|
|
# RecallMetric(),
|
|
# F1ScoreMetric()
|
|
],
|
|
)
|
|
|
|
# Create confusion matrix callback
|
|
#confusion_callback = ConfusionMatrixCallback(
|
|
# val_data=(val_X, val_y),
|
|
# label_encoder=label_encoder,
|
|
# display=True
|
|
#)
|
|
|
|
# Callbacks
|
|
early_stopping = EarlyStopping(
|
|
monitor="val_loss", patience=10, restore_best_weights=True
|
|
)
|
|
lr_reduction = ReduceLROnPlateau(
|
|
monitor="val_loss", patience=8, factor=0.8, min_lr=0.0001
|
|
)
|
|
custom_metrics_callback = CustomValidationMetrics(X, y)
|
|
if perfspec['settings']['checkpoint_mode'] == "weights":
|
|
# Save only the weights of the model instead of the full model.
|
|
checkpoint = ModelCheckpoint(
|
|
filepath=perfspec['settings']['checkpoint_filepath'].as_posix(),
|
|
save_best_only=True, # True to save only when validation loss improves
|
|
monitor='val_loss', # Metric to monitor
|
|
mode='min', # Minimize the validation loss
|
|
verbose=1 # Print messages when saving
|
|
)
|
|
elif perfspec['settings']['checkpoint_mode'] == "epochs":
|
|
# Define to collect save checkpoint at the end of every epoch via callback
|
|
#_checkpoint_filepath = checkpoint_dirpath + '/' + 'model_at_epoch_{epoch:02d}.h5'
|
|
checkpoint = ModelCheckpoint(
|
|
filepath=perfspec['settings']['checkpoint_filepath'].as_posix(),
|
|
save_best_only=False, # True to save only when validation loss improves
|
|
verbose=1 # Print messages when saving
|
|
)
|
|
|
|
callbacks=[early_stopping,lr_reduction]
|
|
callbacks=[early_stopping,lr_reduction]
|
|
callbacks.append(custom_metrics_callback)
|
|
if checkpoint != None:
|
|
callbacks.append(checkpoint)
|
|
|
|
# Print the model summary
|
|
perfspec['vars']['model'].summary()
|
|
print (" train size: {}".format(train_size))
|
|
print (" train_X size: {}".format(len(train_X)))
|
|
print (" train_y size: {}".format(len(train_y)))
|
|
print (" val_X size: {}".format(len(train_X)))
|
|
print (" val_y size: {}".format(len(train_y)))
|
|
|
|
# Train the model
|
|
perfspec['vars']['history'] = perfspec['vars']['model'].fit(
|
|
train_X,
|
|
train_y,
|
|
batch_size=perfspec['settings']['batch_size'],
|
|
epochs=perfspec['defaults']['epochs'],
|
|
validation_data=(val_X, val_y),
|
|
verbose=2,
|
|
callbacks=callbacks
|
|
)
|
|
return
|
|
|
|
mo.md(
|
|
r"""
|
|
## Make Model train
|
|
|
|
This is where **model** is creates and **fit**
|
|
|
|
Saved in `perfspec['vars']` as `model` and `history`
|
|
"""
|
|
)
|
|
return (make_model,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def main(mo):
|
|
mo.md(r"""<a id='main' />""")
|
|
return
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def use_or_make_model(
|
|
Path,
|
|
load_model_from_path,
|
|
make_model,
|
|
mo,
|
|
perfspec,
|
|
verbose,
|
|
):
|
|
if Path(perfspec['settings']['model_filepath']).exists():
|
|
if mo.running_in_notebook() or (verbose != None or mo.cli_args().get("verbose") != None):
|
|
print (f"Trained model path already exist, to train model DELETE existing path: {perfspec['settings']['model_filepath']}")
|
|
perfspec['vars']['model']=load_model_from_path(perfspec,perfspec['settings']['verbose'])
|
|
else:
|
|
if perfspec['vars'].get('X').any() and perfspec['vars'].get('y').any():
|
|
make_model(
|
|
X=perfspec['vars']['X'],
|
|
y=perfspec['vars']['y'],
|
|
label_encoder=perfspec['vars']['label_encoder'],
|
|
encoded_actions=perfspec['vars']['encoded_actions']
|
|
)
|
|
#evaluate_model(the_model,X,y)
|
|
|
|
mo.md(
|
|
f"""
|
|
### Use or Train Model
|
|
|
|
**IMPORTANT** if model_filepathmodel_filepath not exists it will be <u>created</u> otherwise it will be <u>loaded</u> form model_filepathmodel_filepath
|
|
|
|
Path: "🍃" {perfspec['settings']['model_filepath']}
|
|
"""
|
|
)
|
|
return
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_load_model_from_path(Path, mo, perfspec):
|
|
def load_model_from_path(perfspec,verbose):
|
|
from keras.models import load_model
|
|
custom_objects = {
|
|
# "PrecisionMetric": PrecisionMetric,
|
|
# "RecallMetric": RecallMetric,
|
|
# "F1ScoreMetric": F1ScoreMetric,
|
|
}
|
|
if Path(perfspec['settings']['model_filepath']).exists():
|
|
perfspec['vars']['model'] = load_model(
|
|
perfspec['settings']['model_filepath'],
|
|
#custom_objects=custom_objects
|
|
)
|
|
if verbose != None and not mo.running_in_notebook():
|
|
print (f"Trained model loaded from: {perfspec['settings']['model_filepath']}")
|
|
return perfspec['vars']['model']
|
|
else:
|
|
return None
|
|
|
|
mo.md(
|
|
f"""
|
|
### Load model from path
|
|
|
|
Load trained model from file **model_filepath** 🍃
|
|
|
|
Path: {perfspec['settings']['model_filepath']}
|
|
"""
|
|
)
|
|
return (load_model_from_path,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_save_model(Path, mo, perfspec):
|
|
def save_model():
|
|
if not Path(perfspec['settings']['model_filepath']).exists() and perfspec['vars']['model'] != None:
|
|
# Save the model
|
|
print (f"Save trained model to: {perfspec['settings']['model_filepath']}")
|
|
perfspec['vars']['model'].save(perfspec['settings']['model_filepath'])
|
|
if perfspec['vars']['history'] != None:
|
|
import json
|
|
try:
|
|
with open(perfspec['settings']['model_history_filepath'], 'w') as output_file:
|
|
json.dump(perfspec['vars']['history'].history,output_file)
|
|
print (f"Save trained model history to: {perfspec['settings']['model_history_filepath']}")
|
|
except IOError as e:
|
|
print(f"Error writing to file: {e}")
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error encoding JSON: {e}")
|
|
|
|
save_model()
|
|
mo.md(
|
|
f"""
|
|
## Save Model
|
|
|
|
to `model_filepath` "🍃" {perfspec['settings']['model_filepath']}
|
|
|
|
Path can be changed via **command-line** with **--model** `model-filepath`
|
|
|
|
"""
|
|
)
|
|
return (save_model,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_plot_history(Path, mo):
|
|
def plot_history(perfspec):
|
|
import json
|
|
from keras.src.callbacks import History
|
|
if 'vars' not in perfspec:
|
|
return None
|
|
if perfspec['vars']['history'] != None:
|
|
if isinstance(perfspec['vars']['history'], History):
|
|
_model_history = perfspec['vars']['history'].history
|
|
else:
|
|
_model_history = perfspec['vars']['history']
|
|
else:
|
|
if not Path(perfspec['settings']['model_history_filepath']).exists():
|
|
print(f"Not found: {perfspec['settings']['model_history_filepath']}")
|
|
_model_history = {}
|
|
else:
|
|
with open(perfspec['settings']['model_history_filepath']) as history_file:
|
|
perfspec['vars']['history'] = json.load(history_file)
|
|
_model_history = perfspec['vars']['history']
|
|
return _model_history
|
|
|
|
mo.md(
|
|
f"""
|
|
## Plot Model history
|
|
|
|
"""
|
|
)
|
|
#if 'plt_accuracy' in perfspec['vars']: mo.as_html(perfspec['vars']['plt_accuracy'].gcf())
|
|
return (plot_history,)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_plot_defs(mo, model_history, plot_history):
|
|
def plot_accuracy(perfspec):
|
|
_model_history = plot_history(perfspec)
|
|
if _model_history != None and mo.running_in_notebook():
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
if _model_history.get('accuracy'):
|
|
plt.style.use('dark_background')
|
|
# Plot training & validation accuracy values
|
|
(_, acc) = plt.subplots()
|
|
acc.plot(_model_history['accuracy'],color='white')
|
|
acc.plot(_model_history['val_accuracy'], color='orange')
|
|
acc.set_title('Model Accuracy')
|
|
acc.set_xlabel('Epochs')
|
|
acc.set_ylabel('Accuracy')
|
|
acc.legend(['Train', 'Test'], loc='upper left')
|
|
return plt
|
|
elif perfspec['settings']['verbose'] == 2:
|
|
if 'accuracy' in _model_history:
|
|
print ('model val_accuracy')
|
|
for idx,it in enumerate(_model_history.get('val_accuracy')):
|
|
print(f"epoch: {idx} value: {it}")
|
|
return None
|
|
|
|
def plot_loss(perfspec):
|
|
_model_history = plot_history(perfspec)
|
|
if _model_history != None and mo.running_in_notebook():
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
if _model_history.get('loss'):
|
|
plt.style.use('dark_background')
|
|
(_, loss) = plt.subplots()
|
|
loss.plot(_model_history['loss'])
|
|
loss.plot(_model_history['val_loss'])
|
|
loss.set_title('Model Loss')
|
|
loss.set_label('Epochs')
|
|
loss.set_ylabel('Loss')
|
|
loss.legend(['Train', 'Test'], loc='upper left')
|
|
return plt
|
|
elif perfspec['settings']['verbose'] == 2:
|
|
if 'val_loss' in _model_history:
|
|
print ('model val_loss')
|
|
for idx,it in enumerate(_model_history.get('val_loss')):
|
|
print(f"epoch: {idx} value: {it}")
|
|
return None
|
|
|
|
def plot_precision(perfspec):
|
|
_model_history = plot_history(perfspec)
|
|
if _model_history != None and mo.running_in_notebook():
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
if _model_history.get('precision'):
|
|
plt.style.use('dark_background')
|
|
# Plot training & validation accuracy values
|
|
plt.plot(model_history['learning_rate'])
|
|
plt.plot(model_history['precision'])
|
|
plt.plot(model_history['recall'])
|
|
plt.plot(model_history['f1_score'])
|
|
plt.title('Model learning')
|
|
plt.xlabel('Epochs')
|
|
plt.ylabel('Learning')
|
|
plt.legend(['Rate','Precision','Recall', 'f1_score'], loc='upper left')
|
|
return plt
|
|
else:
|
|
return None
|
|
return plot_accuracy, plot_loss, plot_precision
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def pefspec_ploc_accuracy(mo, perfspec, plot_accuracy):
|
|
_plot_acc=plot_accuracy(perfspec)
|
|
|
|
if perfspec['vars']['history'] != None and mo.running_in_notebook():
|
|
_output = mo.as_html(_plot_acc.gcf())
|
|
else:
|
|
_output = None
|
|
|
|
mo.md(
|
|
f"""
|
|
## Model Accuracy history
|
|
|
|
From model train plot accuracy and epochs
|
|
|
|
{_output}
|
|
|
|
"""
|
|
)
|
|
return
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def pefspec_plot_loss(mo, perfspec, plot_loss):
|
|
_plot_loss = plot_loss(perfspec)
|
|
|
|
if perfspec['vars']['history'] != None and mo.running_in_notebook():
|
|
_output = mo.as_html(_plot_loss.gcf())
|
|
else:
|
|
_output = None
|
|
|
|
mo.md(
|
|
f"""
|
|
## Model loss history
|
|
From model train loss
|
|
|
|
{_output}
|
|
|
|
"""
|
|
)
|
|
return
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_plot_precision(mo, perfspec, plot_precision):
|
|
_plot_pre = plot_precision(perfspec)
|
|
if perfspec['vars']['history'] != None and mo.running_in_notebook():
|
|
mo.md(
|
|
f"""
|
|
|
|
From model train plot Precision
|
|
|
|
{mo.as_html(_plot_pre.gcf())}
|
|
|
|
"""
|
|
)
|
|
return
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_confusion_matric(
|
|
load_model_from_path,
|
|
make_confusion_matrix,
|
|
mo,
|
|
perfspec,
|
|
):
|
|
if mo.running_in_notebook():
|
|
if perfspec['vars'].get('model') == None:
|
|
load_model_from_path(perfspec['settings']['verbose'])
|
|
if perfspec['vars'].get('model') != None:
|
|
make_confusion_matrix(perfspec)
|
|
|
|
mo.md("### Confusion Matrix")
|
|
return
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_evaluate_model(Path, mo, np, prepare_train):
|
|
def evaluate_model(test_input=[], test_labels=[],model=None):
|
|
if model == None:
|
|
return
|
|
if len(test_input) > 0:
|
|
#Evaluate the model on test data
|
|
#(test_loss, test_precision, test_recall, test_f1_score) = perfspec['vars']['model'].evaluate(test_input, test_labels)
|
|
(test_loss,test_accuracy) = model.evaluate(test_input, test_labels)
|
|
print(f"Test Accuracy: {"{:2.4f}".format(test_accuracy)}")
|
|
print(f"Test Loss: {"{:2.4f}".format(test_loss)}")
|
|
#print(f"Precision: {test_precision}")
|
|
#print(f"Recall: {test_recall}")
|
|
#print(f"F1 Score: {test_f1_score}")
|
|
|
|
def run_evaluate(perfspec):
|
|
# load_trained_model()
|
|
if perfspec['vars']['model'] == None:
|
|
return
|
|
(perfspec['vars']['X'], perfspec['vars']['y'], perfspec['vars']['_y']) = prepare_train(perfspec)
|
|
print ("\nEvaluate Model")
|
|
X=perfspec['vars']['X'],
|
|
y=perfspec['vars']['y'],
|
|
train_size = int(perfspec['defaults']['train_size'] * len(X))
|
|
train_X, val_X = X[:train_size], X[train_size:]
|
|
train_y, val_y = y[:train_size], y[train_size:]
|
|
evaluate_model(val_X,val_y,perfspec['vars']['model'])
|
|
|
|
def history_info(perfspec):
|
|
import json
|
|
from keras.src.callbacks import History
|
|
if perfspec['vars']['history'] != None:
|
|
if isinstance(perfspec['vars']['history'], History):
|
|
model_history = perfspec['vars']['history'].history
|
|
else:
|
|
model_history = perfspec['vars']['history']
|
|
else:
|
|
if not Path(perfspec['settings']['model_history_filepath']).exists():
|
|
print(f"Not found: {perfspec['settings']['model_history_filepath']}")
|
|
model_history = {}
|
|
else:
|
|
with open(perfspec['settings']['model_history_filepath']) as history_file:
|
|
model_history = json.load(history_file)
|
|
if model_history != None:
|
|
from prettytable import PrettyTable
|
|
train_loss = model_history['loss']
|
|
val_loss = model_history['val_loss']
|
|
train_acc = model_history['accuracy']
|
|
val_acc = model_history['val_accuracy']
|
|
table_find = PrettyTable()
|
|
# Find the epoch with the minimum validation loss
|
|
best_epoch_val_loss = np.argmin(val_loss) # The index of the minimum validation loss
|
|
#Find the epoch with the maximum validation accuracy
|
|
best_epoch_val_acc = np.argmax(val_acc) # The index of the maximum validation accuracy
|
|
#print ("\nFrom Model History")
|
|
table_find.field_names = ["description", "value"]
|
|
table_find.align["description"] = "l"
|
|
table_find.align["value"] = "r"
|
|
table_find.border = True
|
|
table_find.add_rows([
|
|
["Best Epoch (Validation Loss)",(best_epoch_val_loss + 1)],
|
|
["Validation Loss at Best Epoch","{:2.4f}".format(val_loss[best_epoch_val_loss])],
|
|
["Best Epoch (Validation Accuracy)",(best_epoch_val_acc + 1)],
|
|
["Validation Accuracy at Best Epoch","{:2.4f}".format(val_acc[best_epoch_val_acc])]
|
|
])
|
|
if mo.running_in_notebook() :
|
|
return table_find.get_formatted_string('html')
|
|
else:
|
|
return table_find.get_formatted_string('text')
|
|
else:
|
|
return ""
|
|
return evaluate_model, history_info, run_evaluate
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_run_evaluate_model_run(
|
|
history_info,
|
|
mo,
|
|
perfspec,
|
|
run_evaluate,
|
|
):
|
|
#with mo.capture_stdout() as buffer:
|
|
# run_evaluate(perfspec)
|
|
#_output = buffer.getvalue()
|
|
|
|
_output=run_evaluate(perfspec)
|
|
#_history_info = history_info(perfspec)
|
|
|
|
mo.md(
|
|
f"""
|
|
## Evaluate Model
|
|
{mo.md(history_info((perfspec)))}
|
|
"""
|
|
)
|
|
return
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def perfspec_test_model_prediction(
|
|
lib_get_input_sequence,
|
|
lib_predict_action,
|
|
load_model_from_path,
|
|
mo,
|
|
perfspec,
|
|
verbose,
|
|
):
|
|
_verbose = "1" if mo.running_in_notebook() else perfspec['settings']['verbose']
|
|
_input_sequence = lib_get_input_sequence(perfspec['settings']['input_str'],perfspec['vars']['unique_actions'])
|
|
if len(_input_sequence) > 0:
|
|
if perfspec['vars']['model'] != None:
|
|
_model = perfspec['vars']['model']
|
|
else:
|
|
_model = load_model_from_path(perfspec,verbose)
|
|
if _model != None:
|
|
(encoded_input,predicted_probabilities) = lib_predict_action(
|
|
_model,
|
|
perfspec['settings']['sequence_length'],
|
|
_input_sequence,
|
|
perfspec['vars']['label_encoder'],
|
|
_verbose
|
|
)
|
|
|
|
mo.md(
|
|
r"""
|
|
## Test Model Prediction
|
|
|
|
Use **{input_sequence}** with trained model <u>created</u> or <u>loaded</u>
|
|
<br>from {model_filepath}
|
|
|
|
input value can be changed in **command-line** with **--input** `value` argument<br>
|
|
with **--verbose** option more info is show in **command-line** mode
|
|
"""
|
|
)
|
|
return encoded_input, predicted_probabilities
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run()
|