candle-annotator/services/ml/features/engineer.py
Marko Djordjevic fd29ab91e0 feat(ml): implement feature engineering pipeline
- Create pipeline.py with CLI argument parsing for running stages
- Implement TA-Lib indicator computation with multi-output support
- Add candle feature extraction (body_size, wicks, ratios, etc.)
- Create custom feature loader with dynamic module import
- Wire all feature engineering stages with NaN handling
- Tasks completed: 2.2, 2.3, 3.1, 3.2, 3.3, 3.4, 3.5
2026-02-15 12:22:59 +01:00

143 lines
4.6 KiB
Python

"""
Feature engineering stage orchestrator.
Coordinates TA-Lib indicators, candle features, and custom features.
"""
import logging
from pathlib import Path
import pandas as pd
from app.config import PipelineConfig
from features.talib_features import compute_talib_indicators
from features.candle_features import compute_candle_features, validate_candle_data
from features.custom_loader import load_custom_features
logger = logging.getLogger(__name__)
def run_feature_engineering_stage(config: PipelineConfig) -> None:
"""
Run the complete feature engineering stage.
Steps:
1. Load raw OHLCV data
2. Validate OHLC data consistency
3. Compute TA-Lib indicators (if enabled)
4. Compute candle features (if enabled)
5. Load custom features (if configured)
6. Handle NaN values from indicator warmup periods
7. Write enriched CSV
Args:
config: Pipeline configuration
Raises:
FileNotFoundError: If raw data file doesn't exist
ValueError: If data validation fails
"""
fe_config = config.stages.feature_engineering
data_config = config.data
# Load raw data
raw_path = Path(data_config.raw_path)
if not raw_path.exists():
raise FileNotFoundError(
f"Raw data file not found: {raw_path}. "
f"Please ensure OHLCV data is available at this path."
)
logger.info(f"Loading raw OHLCV data from: {raw_path}")
df = pd.read_csv(raw_path)
logger.info(f"Loaded {len(df)} rows with columns: {list(df.columns)}")
# Validate OHLC data
validate_candle_data(df)
original_rows = len(df)
# Compute TA-Lib indicators
if fe_config.talib_indicators:
logger.info(f"Computing {len(fe_config.talib_indicators)} TA-Lib indicators")
df = compute_talib_indicators(df, fe_config.talib_indicators)
else:
logger.info("No TA-Lib indicators configured, skipping")
# Compute candle features
if fe_config.candle_features:
logger.info("Computing candle features")
df = compute_candle_features(df)
else:
logger.info("Candle features disabled, skipping")
# Load custom features
if fe_config.custom_features:
logger.info(f"Loading {len(fe_config.custom_features)} custom feature(s)")
df = load_custom_features(df, fe_config.custom_features)
else:
logger.info("No custom features configured, skipping")
# Handle NaN values from indicator warmup periods
df = handle_indicator_warmup(df, original_rows)
# Create output directory if it doesn't exist
enriched_path = Path(data_config.enriched_path)
enriched_path.parent.mkdir(parents=True, exist_ok=True)
# Write enriched data
logger.info(f"Writing enriched data to: {enriched_path}")
df.to_csv(enriched_path, index=False)
logger.info(
f"Feature engineering complete: {original_rows} rows -> {len(df)} rows "
f"({original_rows - len(df)} dropped), {len(df.columns)} columns"
)
def handle_indicator_warmup(df: pd.DataFrame, original_rows: int) -> pd.DataFrame:
"""
Handle NaN values introduced by indicator warmup periods.
Rows with NaN values in any column are dropped. This is necessary because
indicators like RSI, MACD, etc. need a warmup period before producing valid values.
Args:
df: DataFrame with computed indicators
original_rows: Number of rows before computing indicators
Returns:
DataFrame with NaN rows dropped
"""
# Count NaN values before dropping
nan_counts = df.isnull().sum()
cols_with_nan = nan_counts[nan_counts > 0]
if not cols_with_nan.empty:
logger.info("Columns with NaN values (indicator warmup):")
for col, count in cols_with_nan.items():
logger.info(f" {col}: {count} NaN values")
# Drop rows with any NaN values
df_clean = df.dropna()
rows_dropped = original_rows - len(df_clean)
if rows_dropped > 0:
logger.info(
f"Dropped {rows_dropped} rows due to indicator warmup "
f"({rows_dropped / original_rows * 100:.1f}% of original data)"
)
# Warn if too much data was dropped
if rows_dropped / original_rows > 0.1:
logger.warning(
f"More than 10% of data was dropped due to indicator warmup. "
f"Consider reducing indicator periods or using more historical data."
)
else:
logger.info("No rows dropped (no NaN values from indicators)")
return df_clean