- Create pipeline.py with CLI argument parsing for running stages - Implement TA-Lib indicator computation with multi-output support - Add candle feature extraction (body_size, wicks, ratios, etc.) - Create custom feature loader with dynamic module import - Wire all feature engineering stages with NaN handling - Tasks completed: 2.2, 2.3, 3.1, 3.2, 3.3, 3.4, 3.5
143 lines
4.6 KiB
Python
143 lines
4.6 KiB
Python
"""
|
|
Feature engineering stage orchestrator.
|
|
|
|
Coordinates TA-Lib indicators, candle features, and custom features.
|
|
"""
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
|
|
from app.config import PipelineConfig
|
|
from features.talib_features import compute_talib_indicators
|
|
from features.candle_features import compute_candle_features, validate_candle_data
|
|
from features.custom_loader import load_custom_features
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def run_feature_engineering_stage(config: PipelineConfig) -> None:
|
|
"""
|
|
Run the complete feature engineering stage.
|
|
|
|
Steps:
|
|
1. Load raw OHLCV data
|
|
2. Validate OHLC data consistency
|
|
3. Compute TA-Lib indicators (if enabled)
|
|
4. Compute candle features (if enabled)
|
|
5. Load custom features (if configured)
|
|
6. Handle NaN values from indicator warmup periods
|
|
7. Write enriched CSV
|
|
|
|
Args:
|
|
config: Pipeline configuration
|
|
|
|
Raises:
|
|
FileNotFoundError: If raw data file doesn't exist
|
|
ValueError: If data validation fails
|
|
"""
|
|
fe_config = config.stages.feature_engineering
|
|
data_config = config.data
|
|
|
|
# Load raw data
|
|
raw_path = Path(data_config.raw_path)
|
|
if not raw_path.exists():
|
|
raise FileNotFoundError(
|
|
f"Raw data file not found: {raw_path}. "
|
|
f"Please ensure OHLCV data is available at this path."
|
|
)
|
|
|
|
logger.info(f"Loading raw OHLCV data from: {raw_path}")
|
|
df = pd.read_csv(raw_path)
|
|
|
|
logger.info(f"Loaded {len(df)} rows with columns: {list(df.columns)}")
|
|
|
|
# Validate OHLC data
|
|
validate_candle_data(df)
|
|
|
|
original_rows = len(df)
|
|
|
|
# Compute TA-Lib indicators
|
|
if fe_config.talib_indicators:
|
|
logger.info(f"Computing {len(fe_config.talib_indicators)} TA-Lib indicators")
|
|
df = compute_talib_indicators(df, fe_config.talib_indicators)
|
|
else:
|
|
logger.info("No TA-Lib indicators configured, skipping")
|
|
|
|
# Compute candle features
|
|
if fe_config.candle_features:
|
|
logger.info("Computing candle features")
|
|
df = compute_candle_features(df)
|
|
else:
|
|
logger.info("Candle features disabled, skipping")
|
|
|
|
# Load custom features
|
|
if fe_config.custom_features:
|
|
logger.info(f"Loading {len(fe_config.custom_features)} custom feature(s)")
|
|
df = load_custom_features(df, fe_config.custom_features)
|
|
else:
|
|
logger.info("No custom features configured, skipping")
|
|
|
|
# Handle NaN values from indicator warmup periods
|
|
df = handle_indicator_warmup(df, original_rows)
|
|
|
|
# Create output directory if it doesn't exist
|
|
enriched_path = Path(data_config.enriched_path)
|
|
enriched_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Write enriched data
|
|
logger.info(f"Writing enriched data to: {enriched_path}")
|
|
df.to_csv(enriched_path, index=False)
|
|
|
|
logger.info(
|
|
f"Feature engineering complete: {original_rows} rows -> {len(df)} rows "
|
|
f"({original_rows - len(df)} dropped), {len(df.columns)} columns"
|
|
)
|
|
|
|
|
|
def handle_indicator_warmup(df: pd.DataFrame, original_rows: int) -> pd.DataFrame:
|
|
"""
|
|
Handle NaN values introduced by indicator warmup periods.
|
|
|
|
Rows with NaN values in any column are dropped. This is necessary because
|
|
indicators like RSI, MACD, etc. need a warmup period before producing valid values.
|
|
|
|
Args:
|
|
df: DataFrame with computed indicators
|
|
original_rows: Number of rows before computing indicators
|
|
|
|
Returns:
|
|
DataFrame with NaN rows dropped
|
|
"""
|
|
# Count NaN values before dropping
|
|
nan_counts = df.isnull().sum()
|
|
cols_with_nan = nan_counts[nan_counts > 0]
|
|
|
|
if not cols_with_nan.empty:
|
|
logger.info("Columns with NaN values (indicator warmup):")
|
|
for col, count in cols_with_nan.items():
|
|
logger.info(f" {col}: {count} NaN values")
|
|
|
|
# Drop rows with any NaN values
|
|
df_clean = df.dropna()
|
|
|
|
rows_dropped = original_rows - len(df_clean)
|
|
|
|
if rows_dropped > 0:
|
|
logger.info(
|
|
f"Dropped {rows_dropped} rows due to indicator warmup "
|
|
f"({rows_dropped / original_rows * 100:.1f}% of original data)"
|
|
)
|
|
|
|
# Warn if too much data was dropped
|
|
if rows_dropped / original_rows > 0.1:
|
|
logger.warning(
|
|
f"More than 10% of data was dropped due to indicator warmup. "
|
|
f"Consider reducing indicator periods or using more historical data."
|
|
)
|
|
else:
|
|
logger.info("No rows dropped (no NaN values from indicators)")
|
|
|
|
return df_clean
|