candle-annotator/services/ml/training/train.py
Marko Djordjevic cbb921b4a7 Scope MLflow experiment names to include user ID (Task 14.2)
- Updated FastAPI /training/start endpoint to extract X-User-ID header via get_user_id() dependency
- Modified _run_training_background to accept and use user_id parameter
- Added MLflow experiment setup with user scoping: experiments are named user_{user_id}_training when user_id is provided, falling back to default experiment name otherwise
- Updated database record insertion to store scoped experiment name
- Updated training/train.py train() function to accept user_id parameter and use it for experiment naming
- Mark task 14.2 as complete in tasks.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 13:46:00 +01:00

476 lines
16 KiB
Python

"""
Model training module.
Main training entry point: load labeled CSV, split, train, evaluate, log to MLflow.
"""
import hashlib
import logging
from datetime import datetime
from pathlib import Path
from typing import Tuple, Optional
import warnings
import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn
import mlflow.xgboost
from sklearn.metrics import accuracy_score, f1_score, precision_recall_fscore_support
from app.config import PipelineConfig, TrainingConfig
from app.db import get_db, TrainingRun, init_db
from training.models.random_forest import RandomForestModel
from training.models.xgboost_model import XGBoostModel
from training import evaluation
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def temporal_split(
X: np.ndarray,
y: np.ndarray,
test_split: float,
validation_split: float
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Split data temporally into train/validation/test sets.
Data is assumed to already be sorted by time.
Split ratios:
- test_split: fraction for test set (from the end)
- validation_split: fraction for validation set (from remaining data after test)
- remainder: training set
Args:
X: Feature matrix (n_samples, n_features)
y: Labels (n_samples,)
test_split: Test set fraction (0.0-1.0)
validation_split: Validation set fraction (0.0-1.0)
Returns:
X_train, X_val, X_test, y_train, y_val, y_test
"""
n_samples = len(X)
# Calculate split indices
n_test = int(n_samples * test_split)
n_val = int((n_samples - n_test) * validation_split)
n_train = n_samples - n_test - n_val
# Split
X_train = X[:n_train]
y_train = y[:n_train]
X_val = X[n_train:n_train + n_val]
y_val = y[n_train:n_train + n_val]
X_test = X[n_train + n_val:]
y_test = y[n_train + n_val:]
logger.info(f"Temporal split: train={n_train}, val={n_val}, test={n_test}")
return X_train, X_val, X_test, y_train, y_val, y_test
def random_split(
X: np.ndarray,
y: np.ndarray,
test_split: float,
validation_split: float,
random_state: int = 42
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Split data randomly into train/validation/test sets.
WARNING: Not recommended for financial time series data.
Args:
X: Feature matrix (n_samples, n_features)
y: Labels (n_samples,)
test_split: Test set fraction (0.0-1.0)
validation_split: Validation set fraction (0.0-1.0)
random_state: Random seed
Returns:
X_train, X_val, X_test, y_train, y_val, y_test
"""
from sklearn.model_selection import train_test_split
warnings.warn(
"Random splitting is not recommended for financial time series data. "
"Use temporal splitting instead to avoid data leakage.",
UserWarning
)
logger.warning("Using random split (not recommended for time series)")
# First split: train+val vs test
X_temp, X_test, y_temp, y_test = train_test_split(
X, y,
test_size=test_split,
random_state=random_state,
stratify=y
)
# Second split: train vs val
val_size = validation_split / (1 - test_split)
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp,
test_size=val_size,
random_state=random_state,
stratify=y_temp
)
logger.info(f"Random split: train={len(X_train)}, val={len(X_val)}, test={len(X_test)}")
return X_train, X_val, X_test, y_train, y_val, y_test
def create_model(model_type: str, hyperparameters: dict, class_weights: Optional[str]):
"""
Create model instance based on model_type.
Args:
model_type: "random_forest" or "xgboost"
hyperparameters: Model hyperparameters
class_weights: "balanced" or None
Returns:
Model instance
Raises:
ValueError: If model_type is not supported
"""
if model_type == "random_forest":
return RandomForestModel(hyperparameters, class_weights)
elif model_type == "xgboost":
return XGBoostModel(hyperparameters, class_weights)
else:
supported_types = ["random_forest", "xgboost"]
raise ValueError(
f"Unsupported model type: {model_type}. "
f"Supported types: {supported_types}"
)
def compute_config_hash(config: PipelineConfig) -> str:
"""
Compute hash of pipeline configuration.
Args:
config: Pipeline configuration
Returns:
SHA256 hash (first 16 chars)
"""
import json
config_str = json.dumps(config.model_dump(), sort_keys=True)
return hashlib.sha256(config_str.encode()).hexdigest()[:16]
def train(
config: PipelineConfig,
labeled_data_path: Path,
output_model_path: Optional[Path] = None,
user_id: Optional[str] = None
) -> str:
"""
Main training function.
Loads labeled data, splits, trains model, evaluates, logs to MLflow,
and stores metadata in PostgreSQL.
Args:
config: Pipeline configuration
labeled_data_path: Path to labeled CSV file
output_model_path: Optional path to save model locally (for inference)
user_id: Optional user ID for scoped experiment naming (e.g., user_{uuid}_training)
Returns:
MLflow run ID
"""
training_config = config.stages.training
mlflow_config = training_config.mlflow
# Initialize database
init_db()
# Set MLflow tracking URI
mlflow.set_tracking_uri(mlflow_config.tracking_uri)
# Set experiment with user scoping if user_id is provided
if user_id:
experiment_name = f"user_{user_id}_training"
else:
experiment_name = mlflow_config.experiment_name
mlflow.set_experiment(experiment_name)
logger.info(f"Loading labeled data from {labeled_data_path}")
df = pd.read_csv(labeled_data_path)
# Separate features and labels
if 'label' not in df.columns:
raise ValueError("Labeled dataset must have 'label' column")
label_col = 'label'
# Exclude label columns, time columns, and programmatic label columns (which contain string values)
feature_cols = [col for col in df.columns
if col not in ['label', 'time', 'timestamp']
and not col.startswith('label_programmatic_')]
X = df[feature_cols].values
y = df[label_col].values
feature_names = feature_cols
logger.info(f"Loaded {len(X)} samples with {len(feature_cols)} features")
logger.info(f"Class distribution: {dict(zip(*np.unique(y, return_counts=True)))}")
# Split data
if training_config.split_method == "temporal":
X_train, X_val, X_test, y_train, y_val, y_test = temporal_split(
X, y,
training_config.test_split,
training_config.validation_split
)
elif training_config.split_method == "random":
X_train, X_val, X_test, y_train, y_val, y_test = random_split(
X, y,
training_config.test_split,
training_config.validation_split,
random_state=training_config.hyperparameters.get('random_state', 42)
)
else:
raise ValueError(f"Unknown split_method: {training_config.split_method}")
# Start MLflow run
with mlflow.start_run() as run:
run_id = run.info.run_id
logger.info(f"Started MLflow run: {run_id}")
# Create training run record in PostgreSQL
with get_db() as db:
training_run = TrainingRun(
run_id=run_id,
model_type=training_config.model_type,
experiment_name=experiment_name,
pipeline_config_hash=compute_config_hash(config),
dataset_version=None, # TODO: Add DVC hash if available
metrics_summary={},
status="running",
created_at=datetime.utcnow()
)
db.add(training_run)
db.commit()
# Log parameters
mlflow.log_param("model_type", training_config.model_type)
mlflow.log_param("split_method", training_config.split_method)
mlflow.log_param("test_split", training_config.test_split)
mlflow.log_param("validation_split", training_config.validation_split)
mlflow.log_param("class_weights", training_config.class_weights)
mlflow.log_param("n_train_samples", len(X_train))
mlflow.log_param("n_val_samples", len(X_val))
mlflow.log_param("n_test_samples", len(X_test))
mlflow.log_param("n_features", X.shape[1])
mlflow.log_param("n_classes", len(np.unique(y)))
# Log per-class sample counts
for label, count in zip(*np.unique(y_train, return_counts=True)):
mlflow.log_param(f"train_samples_{label}", int(count))
# Log all hyperparameters
for param, value in training_config.hyperparameters.items():
mlflow.log_param(param, value)
# Log pipeline config as artifact
import yaml
config_dict = config.model_dump()
config_yaml = yaml.dump(config_dict, default_flow_style=False)
mlflow.log_text(config_yaml, "pipeline_config.yaml")
# Create and train model
logger.info(f"Training {training_config.model_type} model")
model = create_model(
training_config.model_type,
training_config.hyperparameters,
training_config.class_weights
)
model.fit(X_train, y_train)
logger.info("Training complete")
# Evaluate on validation set
y_val_pred = model.predict(X_val)
val_accuracy = accuracy_score(y_val, y_val_pred)
val_f1_macro = f1_score(y_val, y_val_pred, average='macro')
val_f1_weighted = f1_score(y_val, y_val_pred, average='weighted')
mlflow.log_metric("val_accuracy", val_accuracy)
mlflow.log_metric("val_f1_macro", val_f1_macro)
mlflow.log_metric("val_f1_weighted", val_f1_weighted)
# Evaluate on test set
y_test_pred = model.predict(X_test)
test_accuracy = accuracy_score(y_test, y_test_pred)
test_f1_macro = f1_score(y_test, y_test_pred, average='macro')
test_f1_weighted = f1_score(y_test, y_test_pred, average='weighted')
mlflow.log_metric("test_accuracy", test_accuracy)
mlflow.log_metric("test_f1_macro", test_f1_macro)
mlflow.log_metric("test_f1_weighted", test_f1_weighted)
logger.info(f"Test accuracy: {test_accuracy:.4f}")
logger.info(f"Test F1 (macro): {test_f1_macro:.4f}")
logger.info(f"Test F1 (weighted): {test_f1_weighted:.4f}")
# Compute per-class metrics
classes = model.classes_
precision, recall, f1, support = precision_recall_fscore_support(
y_test, y_test_pred, labels=classes, average=None
)
for i, label in enumerate(classes):
mlflow.log_metric(f"test_precision_{label}", precision[i])
mlflow.log_metric(f"test_recall_{label}", recall[i])
mlflow.log_metric(f"test_f1_{label}", f1[i])
logger.info(f"Class {label}: P={precision[i]:.4f}, R={recall[i]:.4f}, F1={f1[i]:.4f}")
# Generate and log artifacts if enabled
if mlflow_config.log_artifacts:
logger.info("Generating evaluation artifacts")
# Confusion matrix
cm_bytes = evaluation.generate_confusion_matrix_plot(
y_test, y_test_pred, labels=classes.tolist()
)
import tempfile
with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f:
f.write(cm_bytes)
cm_path = f.name
mlflow.log_artifact(cm_path, "confusion_matrix.png")
Path(cm_path).unlink()
# Feature importance (if available)
if hasattr(model, 'feature_importances_'):
fi_bytes = evaluation.generate_feature_importance_plot(
feature_names, model.feature_importances_
)
with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f:
f.write(fi_bytes)
fi_path = f.name
mlflow.log_artifact(fi_path, "feature_importance.png")
Path(fi_path).unlink()
# Classification report
report_text = evaluation.generate_classification_report_text(
y_test, y_test_pred, labels=classes.tolist()
)
mlflow.log_text(report_text, "classification_report.txt")
# Log model to MLflow
logger.info("Logging model to MLflow")
if training_config.model_type == "random_forest":
mlflow.sklearn.log_model(
model.model,
"model",
registered_model_name=(
config.stages.inference.mlflow_model_name
if mlflow_config.register_model else None
)
)
elif training_config.model_type == "xgboost":
mlflow.xgboost.log_model(
model.model,
"model",
registered_model_name=(
config.stages.inference.mlflow_model_name
if mlflow_config.register_model else None
)
)
# Save model locally if path provided
if output_model_path:
import joblib
output_model_path = Path(output_model_path)
output_model_path.parent.mkdir(parents=True, exist_ok=True)
labels = []
try:
if hasattr(model, "classes_"):
labels = [str(c) for c in model.classes_]
elif hasattr(model, "model") and hasattr(model.model, "classes_"):
labels = [str(c) for c in model.model.classes_]
except Exception:
labels = []
model_data = {
"model": model,
"metadata": {
"model_type": training_config.model_type,
"trained_at": datetime.utcnow().isoformat(),
"run_id": run_id,
"feature_columns": feature_cols,
"feature_engineering_enabled": config.stages.feature_engineering.enabled,
"labels": labels,
},
}
joblib.dump(model_data, output_model_path)
logger.info(f"Saved model to {output_model_path}")
# Update training run record in PostgreSQL
metrics_summary = {
"test_accuracy": float(test_accuracy),
"test_f1_macro": float(test_f1_macro),
"test_f1_weighted": float(test_f1_weighted),
"val_accuracy": float(val_accuracy),
"val_f1_macro": float(val_f1_macro),
"val_f1_weighted": float(val_f1_weighted)
}
with get_db() as db:
training_run = db.query(TrainingRun).filter(
TrainingRun.run_id == run_id
).first()
if training_run:
training_run.metrics_summary = metrics_summary
training_run.status = "completed"
training_run.completed_at = datetime.utcnow()
db.commit()
logger.info(f"Training run {run_id} completed successfully")
return run_id
if __name__ == "__main__":
import argparse
from app.config import load_config
parser = argparse.ArgumentParser(description="Train candlestick pattern model")
parser.add_argument(
"--config",
type=str,
default="config/pipeline.yaml",
help="Path to pipeline config file"
)
parser.add_argument(
"--output-model",
type=str,
default="models/best_model.pkl",
help="Path to save trained model"
)
args = parser.parse_args()
config = load_config(args.config)
labeled_data_path = Path(config.data.labeled_path)
if not labeled_data_path.exists():
logger.error(f"Labeled data not found: {labeled_data_path}")
logger.error("Run annotation ingestion stage first")
exit(1)
run_id = train(config, labeled_data_path, Path(args.output_model))
print(f"Training complete. Run ID: {run_id}")