- Create RandomForestModel and XGBoostModel wrappers with class weight support - Implement temporal and random train/val/test splitting - Add MLflow experiment tracking with full parameter and metric logging - Create evaluation module for confusion matrix, feature importance, and classification reports - Implement model training with sklearn/xgboost flavor logging and optional registry registration - Store training run metadata in PostgreSQL - Wire training stage into pipeline.py orchestrator - Support both RandomForest and XGBoost models with configurable hyperparameters
220 lines
6.3 KiB
Python
220 lines
6.3 KiB
Python
"""
|
|
ML Pipeline orchestrator.
|
|
|
|
Runs feature engineering, annotation ingestion, training, and inference stages
|
|
based on configuration.
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from app.config import load_config, PipelineConfig
|
|
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def run_feature_engineering(config: PipelineConfig) -> None:
|
|
"""
|
|
Run the feature engineering stage.
|
|
|
|
Args:
|
|
config: Pipeline configuration
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info("FEATURE ENGINEERING STAGE")
|
|
logger.info("=" * 60)
|
|
|
|
if not config.stages.feature_engineering.enabled:
|
|
logger.info("Feature engineering disabled in config, skipping")
|
|
return
|
|
|
|
# Import here to avoid circular dependencies
|
|
from features.engineer import run_feature_engineering_stage
|
|
|
|
logger.info(f"Reading raw data from: {config.data.raw_path}")
|
|
run_feature_engineering_stage(config)
|
|
logger.info(f"Enriched data written to: {config.data.enriched_path}")
|
|
logger.info("Feature engineering stage completed successfully")
|
|
|
|
|
|
def run_annotation_ingestion(config: PipelineConfig) -> None:
|
|
"""
|
|
Run the annotation ingestion stage.
|
|
|
|
Args:
|
|
config: Pipeline configuration
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info("ANNOTATION INGESTION STAGE")
|
|
logger.info("=" * 60)
|
|
|
|
if not config.stages.annotation_ingestion.enabled:
|
|
logger.info("Annotation ingestion disabled in config, skipping")
|
|
return
|
|
|
|
# Import here to avoid circular dependencies
|
|
from app.annotation_ingestion import run_annotation_ingestion as run_ingestion
|
|
|
|
logger.info(f"Reading enriched data from: {config.data.enriched_path}")
|
|
logger.info(f"Reading annotations from: {config.data.annotations_path}")
|
|
|
|
run_ingestion(
|
|
config=config.stages.annotation_ingestion,
|
|
enriched_path=config.data.enriched_path,
|
|
annotations_path=config.data.annotations_path,
|
|
output_path=config.data.labeled_path
|
|
)
|
|
|
|
logger.info(f"Labeled data written to: {config.data.labeled_path}")
|
|
logger.info("Annotation ingestion stage completed successfully")
|
|
|
|
|
|
def run_training(config: PipelineConfig) -> None:
|
|
"""
|
|
Run the training stage.
|
|
|
|
Args:
|
|
config: Pipeline configuration
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info("TRAINING STAGE")
|
|
logger.info("=" * 60)
|
|
|
|
if not config.stages.training.enabled:
|
|
logger.info("Training disabled in config, skipping")
|
|
return
|
|
|
|
# Import here to avoid circular dependencies
|
|
from training.train import train
|
|
|
|
logger.info(f"Reading labeled data from: {config.data.labeled_path}")
|
|
|
|
# Set output model path from config
|
|
output_model_path = Path(config.stages.inference.local_model_path)
|
|
|
|
# Run training
|
|
run_id = train(config, Path(config.data.labeled_path), output_model_path)
|
|
logger.info(f"Training completed. MLflow run ID: {run_id}")
|
|
logger.info("Training stage completed successfully")
|
|
|
|
|
|
def run_pipeline(
|
|
config: PipelineConfig,
|
|
stage: Optional[str] = None
|
|
) -> None:
|
|
"""
|
|
Run the full pipeline or a specific stage.
|
|
|
|
Args:
|
|
config: Pipeline configuration
|
|
stage: Optional stage name to run. If None, runs all enabled stages.
|
|
Valid values: "feature_engineering", "annotation_ingestion", "training"
|
|
"""
|
|
logger.info("Starting ML pipeline")
|
|
logger.info(f"Config loaded from: {config}")
|
|
|
|
if stage:
|
|
logger.info(f"Running single stage: {stage}")
|
|
if stage == "feature_engineering":
|
|
run_feature_engineering(config)
|
|
elif stage == "annotation_ingestion":
|
|
run_annotation_ingestion(config)
|
|
elif stage == "training":
|
|
run_training(config)
|
|
else:
|
|
raise ValueError(
|
|
f"Invalid stage: {stage}. "
|
|
f"Valid stages: feature_engineering, annotation_ingestion, training"
|
|
)
|
|
else:
|
|
logger.info("Running all enabled stages")
|
|
run_feature_engineering(config)
|
|
run_annotation_ingestion(config)
|
|
run_training(config)
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("PIPELINE COMPLETED")
|
|
logger.info("=" * 60)
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the pipeline CLI."""
|
|
parser = argparse.ArgumentParser(
|
|
description="ML Pipeline for candlestick pattern recognition",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Run full pipeline with default config
|
|
python pipeline.py
|
|
|
|
# Run with custom config
|
|
python pipeline.py --config config/custom_pipeline.yaml
|
|
|
|
# Run only feature engineering stage
|
|
python pipeline.py --stage feature_engineering
|
|
|
|
# Run only training stage with custom config
|
|
python pipeline.py --config config/pipeline.yaml --stage training
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--config",
|
|
type=str,
|
|
default="config/pipeline.yaml",
|
|
help="Path to pipeline configuration YAML file (default: config/pipeline.yaml)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--stage",
|
|
type=str,
|
|
choices=["feature_engineering", "annotation_ingestion", "training"],
|
|
default=None,
|
|
help="Run a specific stage only. If not specified, runs all enabled stages."
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--verbose",
|
|
"-v",
|
|
action="store_true",
|
|
help="Enable verbose debug logging"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set logging level
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
logger.debug("Verbose logging enabled")
|
|
|
|
try:
|
|
# Load and validate config
|
|
logger.info(f"Loading configuration from: {args.config}")
|
|
config = load_config(args.config)
|
|
|
|
# Run pipeline
|
|
run_pipeline(config, stage=args.stage)
|
|
|
|
except FileNotFoundError as e:
|
|
logger.error(f"Configuration file not found: {e}")
|
|
return 1
|
|
except ValueError as e:
|
|
logger.error(f"Configuration validation error: {e}")
|
|
return 1
|
|
except Exception as e:
|
|
logger.error(f"Pipeline failed with error: {e}", exc_info=True)
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|