candle-annotator/services/ml/pipeline.py
Marko Djordjevic f4c0f9a836 feat(ml): implement training stage with MLflow tracking and model wrappers
- Create RandomForestModel and XGBoostModel wrappers with class weight support
- Implement temporal and random train/val/test splitting
- Add MLflow experiment tracking with full parameter and metric logging
- Create evaluation module for confusion matrix, feature importance, and classification reports
- Implement model training with sklearn/xgboost flavor logging and optional registry registration
- Store training run metadata in PostgreSQL
- Wire training stage into pipeline.py orchestrator
- Support both RandomForest and XGBoost models with configurable hyperparameters
2026-02-15 14:22:19 +01:00

220 lines
6.3 KiB
Python

"""
ML Pipeline orchestrator.
Runs feature engineering, annotation ingestion, training, and inference stages
based on configuration.
"""
import argparse
import logging
from pathlib import Path
from typing import Optional
from app.config import load_config, PipelineConfig
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def run_feature_engineering(config: PipelineConfig) -> None:
"""
Run the feature engineering stage.
Args:
config: Pipeline configuration
"""
logger.info("=" * 60)
logger.info("FEATURE ENGINEERING STAGE")
logger.info("=" * 60)
if not config.stages.feature_engineering.enabled:
logger.info("Feature engineering disabled in config, skipping")
return
# Import here to avoid circular dependencies
from features.engineer import run_feature_engineering_stage
logger.info(f"Reading raw data from: {config.data.raw_path}")
run_feature_engineering_stage(config)
logger.info(f"Enriched data written to: {config.data.enriched_path}")
logger.info("Feature engineering stage completed successfully")
def run_annotation_ingestion(config: PipelineConfig) -> None:
"""
Run the annotation ingestion stage.
Args:
config: Pipeline configuration
"""
logger.info("=" * 60)
logger.info("ANNOTATION INGESTION STAGE")
logger.info("=" * 60)
if not config.stages.annotation_ingestion.enabled:
logger.info("Annotation ingestion disabled in config, skipping")
return
# Import here to avoid circular dependencies
from app.annotation_ingestion import run_annotation_ingestion as run_ingestion
logger.info(f"Reading enriched data from: {config.data.enriched_path}")
logger.info(f"Reading annotations from: {config.data.annotations_path}")
run_ingestion(
config=config.stages.annotation_ingestion,
enriched_path=config.data.enriched_path,
annotations_path=config.data.annotations_path,
output_path=config.data.labeled_path
)
logger.info(f"Labeled data written to: {config.data.labeled_path}")
logger.info("Annotation ingestion stage completed successfully")
def run_training(config: PipelineConfig) -> None:
"""
Run the training stage.
Args:
config: Pipeline configuration
"""
logger.info("=" * 60)
logger.info("TRAINING STAGE")
logger.info("=" * 60)
if not config.stages.training.enabled:
logger.info("Training disabled in config, skipping")
return
# Import here to avoid circular dependencies
from training.train import train
logger.info(f"Reading labeled data from: {config.data.labeled_path}")
# Set output model path from config
output_model_path = Path(config.stages.inference.local_model_path)
# Run training
run_id = train(config, Path(config.data.labeled_path), output_model_path)
logger.info(f"Training completed. MLflow run ID: {run_id}")
logger.info("Training stage completed successfully")
def run_pipeline(
config: PipelineConfig,
stage: Optional[str] = None
) -> None:
"""
Run the full pipeline or a specific stage.
Args:
config: Pipeline configuration
stage: Optional stage name to run. If None, runs all enabled stages.
Valid values: "feature_engineering", "annotation_ingestion", "training"
"""
logger.info("Starting ML pipeline")
logger.info(f"Config loaded from: {config}")
if stage:
logger.info(f"Running single stage: {stage}")
if stage == "feature_engineering":
run_feature_engineering(config)
elif stage == "annotation_ingestion":
run_annotation_ingestion(config)
elif stage == "training":
run_training(config)
else:
raise ValueError(
f"Invalid stage: {stage}. "
f"Valid stages: feature_engineering, annotation_ingestion, training"
)
else:
logger.info("Running all enabled stages")
run_feature_engineering(config)
run_annotation_ingestion(config)
run_training(config)
logger.info("=" * 60)
logger.info("PIPELINE COMPLETED")
logger.info("=" * 60)
def main():
"""Main entry point for the pipeline CLI."""
parser = argparse.ArgumentParser(
description="ML Pipeline for candlestick pattern recognition",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Run full pipeline with default config
python pipeline.py
# Run with custom config
python pipeline.py --config config/custom_pipeline.yaml
# Run only feature engineering stage
python pipeline.py --stage feature_engineering
# Run only training stage with custom config
python pipeline.py --config config/pipeline.yaml --stage training
"""
)
parser.add_argument(
"--config",
type=str,
default="config/pipeline.yaml",
help="Path to pipeline configuration YAML file (default: config/pipeline.yaml)"
)
parser.add_argument(
"--stage",
type=str,
choices=["feature_engineering", "annotation_ingestion", "training"],
default=None,
help="Run a specific stage only. If not specified, runs all enabled stages."
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Enable verbose debug logging"
)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
logger.debug("Verbose logging enabled")
try:
# Load and validate config
logger.info(f"Loading configuration from: {args.config}")
config = load_config(args.config)
# Run pipeline
run_pipeline(config, stage=args.stage)
except FileNotFoundError as e:
logger.error(f"Configuration file not found: {e}")
return 1
except ValueError as e:
logger.error(f"Configuration validation error: {e}")
return 1
except Exception as e:
logger.error(f"Pipeline failed with error: {e}", exc_info=True)
return 1
return 0
if __name__ == "__main__":
exit(main())