candle-annotator/services/ml/app/config.py
Marko Djordjevic ea339a54a7 feat(ml): add database schema, config parser, and DVC setup
- Initialize DVC with local storage backend (task 1.6)
- Create PostgreSQL schema for training_runs table (task 1.7)
- Add SQLAlchemy database connection setup (task 1.8)
- Create Pydantic config models for pipeline.yaml (task 2.1)
- Add migration runner for database setup
- Fix pyproject.toml package discovery config
2026-02-15 12:08:53 +01:00

147 lines
4.6 KiB
Python

"""
Pipeline configuration module.
Pydantic models for validating and loading the pipeline.yaml configuration.
"""
from typing import List, Dict, Any, Optional, Literal
from pathlib import Path
import yaml
from pydantic import BaseModel, Field, field_validator
class TALibIndicator(BaseModel):
"""Configuration for a single TA-Lib indicator."""
name: str
params: Dict[str, Any] = Field(default_factory=dict)
class FeatureEngineeringConfig(BaseModel):
"""Feature engineering stage configuration."""
enabled: bool = True
talib_indicators: List[TALibIndicator] = Field(default_factory=list)
candle_features: bool = True
custom_features: List[str] = Field(default_factory=list)
class ProgrammaticLabelsConfig(BaseModel):
"""Configuration for programmatic TA-Lib pattern labels."""
enabled: bool = True
talib_patterns: List[str] = Field(default_factory=list)
class AnnotationIngestionConfig(BaseModel):
"""Annotation ingestion stage configuration."""
enabled: bool = True
label_encoding: Literal["window", "bio"] = "window"
window_size: int = 30
context_padding: int = 20
min_confidence: int = 1
programmatic_labels: ProgrammaticLabelsConfig = Field(
default_factory=ProgrammaticLabelsConfig
)
merge_strategy: Literal["human_priority", "programmatic_priority", "both"] = "human_priority"
class MLflowConfig(BaseModel):
"""MLflow experiment tracking configuration."""
tracking_uri: str = "http://mlflow:5000"
experiment_name: str = "candlestick_patterns"
log_artifacts: bool = True
register_model: bool = False
class TrainingConfig(BaseModel):
"""Training stage configuration."""
enabled: bool = True
model_type: Literal["random_forest", "xgboost"] = "random_forest"
split_method: Literal["temporal", "random"] = "temporal"
test_split: float = Field(0.2, ge=0.0, le=1.0)
validation_split: float = Field(0.1, ge=0.0, le=1.0)
class_weights: Optional[Literal["balanced"]] = "balanced"
hyperparameters: Dict[str, Any] = Field(default_factory=dict)
mlflow: MLflowConfig = Field(default_factory=MLflowConfig)
@field_validator("test_split", "validation_split")
@classmethod
def validate_split(cls, v):
if not 0.0 <= v <= 1.0:
raise ValueError("Split must be between 0.0 and 1.0")
return v
class InferenceConfig(BaseModel):
"""Inference stage configuration."""
enabled: bool = True
model_source: Literal["mlflow", "local"] = "local"
mlflow_model_name: Optional[str] = "candlestick_pattern_v1"
mlflow_model_stage: Literal["Production", "Staging", "None"] = "Production"
local_model_path: str = "models/best_model.pkl"
batch_size: int = 1000
use_training_config: bool = True
class DataConfig(BaseModel):
"""Data paths configuration."""
raw_path: str = "data/raw/OHLCV.csv"
enriched_path: str = "data/enriched/features.csv"
labeled_path: str = "data/labeled/dataset.csv"
annotations_path: str = "data/annotations/export.json"
class StagesConfig(BaseModel):
"""All pipeline stages configuration."""
feature_engineering: FeatureEngineeringConfig = Field(
default_factory=FeatureEngineeringConfig
)
annotation_ingestion: AnnotationIngestionConfig = Field(
default_factory=AnnotationIngestionConfig
)
training: TrainingConfig = Field(default_factory=TrainingConfig)
inference: InferenceConfig = Field(default_factory=InferenceConfig)
class PipelineConfig(BaseModel):
"""Root pipeline configuration."""
data: DataConfig = Field(default_factory=DataConfig)
stages: StagesConfig = Field(default_factory=StagesConfig)
def load_config(config_path: str | Path) -> PipelineConfig:
"""
Load and validate pipeline configuration from YAML file.
Args:
config_path: Path to pipeline.yaml file
Returns:
Validated PipelineConfig object
Raises:
FileNotFoundError: If config file doesn't exist
ValueError: If config validation fails
yaml.YAMLError: If YAML parsing fails
"""
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, 'r') as f:
config_dict = yaml.safe_load(f)
try:
return PipelineConfig(**config_dict)
except Exception as e:
raise ValueError(f"Config validation failed: {e}")
def get_default_config() -> PipelineConfig:
"""
Get default pipeline configuration.
Returns:
PipelineConfig with default values
"""
return PipelineConfig()