""" Feature engineering stage orchestrator. Coordinates TA-Lib indicators, candle features, and custom features. """ import logging from pathlib import Path import pandas as pd from app.config import PipelineConfig from features.talib_features import compute_talib_indicators from features.candle_features import compute_candle_features, validate_candle_data from features.custom_loader import load_custom_features logger = logging.getLogger(__name__) def run_feature_engineering_stage(config: PipelineConfig) -> None: """ Run the complete feature engineering stage. Steps: 1. Load raw OHLCV data 2. Validate OHLC data consistency 3. Compute TA-Lib indicators (if enabled) 4. Compute candle features (if enabled) 5. Load custom features (if configured) 6. Handle NaN values from indicator warmup periods 7. Write enriched CSV Args: config: Pipeline configuration Raises: FileNotFoundError: If raw data file doesn't exist ValueError: If data validation fails """ fe_config = config.stages.feature_engineering data_config = config.data # Load raw data raw_path = Path(data_config.raw_path) if not raw_path.exists(): raise FileNotFoundError( f"Raw data file not found: {raw_path}. " f"Please ensure OHLCV data is available at this path." ) logger.info(f"Loading raw OHLCV data from: {raw_path}") df = pd.read_csv(raw_path) logger.info(f"Loaded {len(df)} rows with columns: {list(df.columns)}") # Validate OHLC data validate_candle_data(df) original_rows = len(df) # Compute TA-Lib indicators if fe_config.talib_indicators: logger.info(f"Computing {len(fe_config.talib_indicators)} TA-Lib indicators") df = compute_talib_indicators(df, fe_config.talib_indicators) else: logger.info("No TA-Lib indicators configured, skipping") # Compute candle features if fe_config.candle_features: logger.info("Computing candle features") df = compute_candle_features(df) else: logger.info("Candle features disabled, skipping") # Load custom features if fe_config.custom_features: logger.info(f"Loading {len(fe_config.custom_features)} custom feature(s)") df = load_custom_features(df, fe_config.custom_features) else: logger.info("No custom features configured, skipping") # Handle NaN values from indicator warmup periods df = handle_indicator_warmup(df, original_rows) # Create output directory if it doesn't exist enriched_path = Path(data_config.enriched_path) enriched_path.parent.mkdir(parents=True, exist_ok=True) # Write enriched data logger.info(f"Writing enriched data to: {enriched_path}") df.to_csv(enriched_path, index=False) logger.info( f"Feature engineering complete: {original_rows} rows -> {len(df)} rows " f"({original_rows - len(df)} dropped), {len(df.columns)} columns" ) def handle_indicator_warmup(df: pd.DataFrame, original_rows: int) -> pd.DataFrame: """ Handle NaN values introduced by indicator warmup periods. Rows with NaN values in any column are dropped. This is necessary because indicators like RSI, MACD, etc. need a warmup period before producing valid values. Args: df: DataFrame with computed indicators original_rows: Number of rows before computing indicators Returns: DataFrame with NaN rows dropped """ # Count NaN values before dropping nan_counts = df.isnull().sum() cols_with_nan = nan_counts[nan_counts > 0] if not cols_with_nan.empty: logger.info("Columns with NaN values (indicator warmup):") for col, count in cols_with_nan.items(): logger.info(f" {col}: {count} NaN values") # Drop rows with any NaN values df_clean = df.dropna() rows_dropped = original_rows - len(df_clean) if rows_dropped > 0: logger.info( f"Dropped {rows_dropped} rows due to indicator warmup " f"({rows_dropped / original_rows * 100:.1f}% of original data)" ) # Warn if too much data was dropped if rows_dropped / original_rows > 0.1: logger.warning( f"More than 10% of data was dropped due to indicator warmup. " f"Consider reducing indicator periods or using more historical data." ) else: logger.info("No rows dropped (no NaN values from indicators)") return df_clean