candle-annotator/services/ml/app/preprocessing.py
2026-02-18 23:53:38 +01:00

313 lines
11 KiB
Python

"""
Preprocessing module for inference.
Replicates feature engineering pipeline to ensure preprocessing parity
between training and inference.
"""
import logging
import re
from typing import List, Tuple, Optional
import pandas as pd
import numpy as np
from app.config import PipelineConfig
from features.talib_features import compute_talib_indicators
from features.candle_features import compute_candle_features, validate_candle_data
from features.custom_loader import load_custom_features
logger = logging.getLogger(__name__)
# Window size used during training (number of candles per flattened sample)
TRAINING_WINDOW_SIZE = 94
# Per-candle features expected by the model, in order
TRAINING_FEATURE_ORDER = [
'open', 'high', 'low', 'close', 'volume', 'average', 'barCount',
'rsi_14', 'ema_20', 'ema_50',
'macd_macd', 'macd_signal', 'macd_hist',
'bbands_upper', 'bbands_middle', 'bbands_lower',
'atr_14', 'adx_14', 'cci_14', 'mfi_14',
'stoch_slowk', 'stoch_slowd',
'body_size', 'body_direction', 'upper_wick', 'lower_wick',
'wick_ratio', 'range', 'body_to_range', 'gap',
]
def _parse_training_feature_columns(
feature_columns: List[str]
) -> Tuple[int, List[str]]:
"""
Derive window size and per-candle feature order from flattened training columns.
Expected column format: "<feature>_<index>" (e.g., "open_0", "rsi_14_12").
"""
if not feature_columns:
raise ValueError("Training feature columns are empty")
feature_order: List[str] = []
max_idx = -1
idx_set = set()
for col in feature_columns:
match = re.match(r"^(.*)_([0-9]+)$", col)
if not match:
raise ValueError(f"Invalid training feature column format: {col}")
base = match.group(1)
idx = int(match.group(2))
if idx == 0:
feature_order.append(base)
if idx > max_idx:
max_idx = idx
idx_set.add(idx)
window_size = max_idx + 1
if window_size <= 0:
raise ValueError("Could not derive window size from training feature columns")
missing_idx = set(range(window_size)) - idx_set
if missing_idx:
raise ValueError(f"Missing window indices in training feature columns: {sorted(missing_idx)[:5]}")
if not feature_order:
raise ValueError("Could not derive per-candle feature order from training feature columns")
return window_size, feature_order
def preprocess_candles(
candles: List[dict],
pipeline_config: PipelineConfig,
training_feature_columns: Optional[List[str]] = None
) -> Tuple[pd.DataFrame, np.ndarray]:
"""
Preprocess candle data for inference.
Applies the same feature engineering steps as used during training,
then creates sliding windows and flattens them to match training format.
Args:
candles: List of candle dictionaries with time, open, high, low, close, volume
pipeline_config: Pipeline configuration (must match training config)
Returns:
Tuple of:
- X: DataFrame with flattened windowed features (one row per window)
- window_times: Array of time values, one per window (time of last candle)
Raises:
ValueError: If data validation fails or too many rows dropped
"""
# Convert to DataFrame
df = pd.DataFrame(candles)
# Ensure time column exists for tracking
if 'time' not in df.columns:
raise ValueError("Candles must include 'time' field")
original_rows = len(df)
logger.info(f"Preprocessing {original_rows} candles")
# Validate OHLC data
try:
validate_candle_data(df)
except Exception as e:
raise ValueError(f"Candle data validation failed: {e}")
# Handle missing or all-null volume column
if 'volume' not in df.columns or df['volume'].isna().all():
logger.warning("Volume data missing from candles, filling with 0")
df['volume'] = 0.0
# Add missing columns that were present in training data
for col in ['average', 'barCount']:
if col not in df.columns:
df[col] = 0.0
# Get feature engineering config
fe_config = pipeline_config.stages.feature_engineering
if not fe_config.enabled:
logger.warning("Feature engineering disabled in config - returning raw OHLCV")
raise ValueError("Feature engineering must be enabled for windowed inference")
# Compute ALL TA-Lib indicators (including volume-dependent ones)
if fe_config.talib_indicators:
logger.info(f"Computing {len(fe_config.talib_indicators)} TA-Lib indicators")
try:
df = compute_talib_indicators(df, fe_config.talib_indicators)
except Exception as e:
logger.error(f"Failed to compute TA-Lib indicators: {e}")
raise ValueError(f"Indicator computation failed: {e}")
# Compute candle features
if fe_config.candle_features:
logger.info("Computing candle features")
try:
df = compute_candle_features(df)
except Exception as e:
logger.error(f"Failed to compute candle features: {e}")
raise ValueError(f"Candle feature computation failed: {e}")
# Load custom features
if fe_config.custom_features:
logger.info(f"Loading {len(fe_config.custom_features)} custom feature(s)")
try:
df = load_custom_features(df, fe_config.custom_features)
except Exception as e:
logger.error(f"Failed to load custom features: {e}")
raise ValueError(f"Custom feature loading failed: {e}")
# Fill NaN values from indicator warmup and missing data with 0
# (instead of dropping rows, since we need contiguous windows)
nan_counts = df.isna().sum()
nan_cols = nan_counts[nan_counts > 0]
if not nan_cols.empty:
logger.info(f"Filling NaN values in {len(nan_cols)} columns (indicator warmup + missing data)")
df = df.fillna(0.0)
# Determine expected feature order and window size
if training_feature_columns:
window_size, feature_order = _parse_training_feature_columns(training_feature_columns)
logger.info(f"Using training feature columns: {len(feature_order)} features, window_size={window_size}")
else:
window_size = TRAINING_WINDOW_SIZE
feature_order = TRAINING_FEATURE_ORDER
# Ensure all expected per-candle features exist
for col in feature_order:
if col not in df.columns:
logger.warning(f"Missing expected feature column '{col}', filling with 0")
df[col] = 0.0
logger.info(f"Preprocessing complete: {len(df)} candles with {len(feature_order)} features each")
# Create sliding windows and flatten
X, window_times = create_sliding_windows(df, window_size, feature_order)
return X, window_times
def create_sliding_windows(
df: pd.DataFrame,
window_size: int,
feature_cols: List[str]
) -> Tuple[np.ndarray, np.ndarray]:
"""
Create sliding windows from per-candle features and flatten.
Each window of `window_size` consecutive candles is flattened into a single
row of features: [feat0_candle0, feat1_candle0, ..., featN_candle0,
feat0_candle1, ..., featN_candleM]
Args:
df: DataFrame with per-candle features and 'time' column
window_size: Number of candles per window
feature_cols: Ordered list of feature column names
Returns:
Tuple of:
- X: numpy array of shape (n_windows, window_size * n_features)
- window_times: array of time values (last candle time in each window)
"""
n_candles = len(df)
n_features = len(feature_cols)
if n_candles < window_size:
raise ValueError(
f"Not enough candles ({n_candles}) for window size {window_size}. "
f"Need at least {window_size} candles."
)
# Extract feature matrix in correct column order
feature_matrix = df[feature_cols].values # shape: (n_candles, n_features)
times = df['time'].values
n_windows = n_candles - window_size + 1
# Create flattened windows using stride tricks for efficiency
# Each window: candle features are interleaved as col_0, col_1, ..., col_N for each candle index
X = np.zeros((n_windows, window_size * n_features), dtype=np.float64)
window_times = np.zeros(n_windows, dtype=times.dtype)
for i in range(n_windows):
window = feature_matrix[i:i + window_size] # shape: (window_size, n_features)
# Flatten: row-major means [candle0_feat0, candle0_feat1, ..., candle1_feat0, ...]
# But training used {col}_{candle_idx} ordering, which is column-first per candle
# i.e., open_0, high_0, ..., gap_0, open_1, high_1, ..., gap_1, ...
X[i] = window.flatten() # row-major: candle0_all_feats, candle1_all_feats, ...
window_times[i] = times[i + window_size - 1] # last candle in window
logger.info(f"Created {n_windows} sliding windows of size {window_size} "
f"({n_windows * n_features * window_size} total features)")
return X, window_times
def extract_feature_columns(
df: pd.DataFrame,
exclude_columns: List[str] = None
) -> pd.DataFrame:
"""
Extract only feature columns for model prediction.
Removes metadata columns like 'time' that should not be used as features.
Args:
df: Preprocessed DataFrame
exclude_columns: Columns to exclude (default: ['time'])
Returns:
DataFrame with only feature columns
"""
if exclude_columns is None:
exclude_columns = ['time']
feature_cols = [col for col in df.columns if col not in exclude_columns]
logger.info(f"Using {len(feature_cols)} feature columns for prediction")
return df[feature_cols]
def validate_feature_parity(
inference_features: List[str],
training_features: List[str]
) -> bool:
"""
Validate that inference features match training features.
Args:
inference_features: Feature column names from inference preprocessing
training_features: Feature column names from training
Returns:
True if features match exactly
Raises:
ValueError: If features don't match
"""
inference_set = set(inference_features)
training_set = set(training_features)
missing = training_set - inference_set
extra = inference_set - training_set
if missing or extra:
error_msg = "Feature mismatch detected between training and inference:\n"
if missing:
error_msg += f" Missing features: {sorted(missing)}\n"
if extra:
error_msg += f" Extra features: {sorted(extra)}\n"
error_msg += "\nThis indicates preprocessing parity is broken. "
error_msg += "Ensure the pipeline config used for inference matches training."
logger.error(error_msg)
raise ValueError(error_msg)
logger.info("Feature parity validated: inference features match training")
return True