313 lines
11 KiB
Python
313 lines
11 KiB
Python
"""
|
|
Preprocessing module for inference.
|
|
|
|
Replicates feature engineering pipeline to ensure preprocessing parity
|
|
between training and inference.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
from typing import List, Tuple, Optional
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from app.config import PipelineConfig
|
|
from features.talib_features import compute_talib_indicators
|
|
from features.candle_features import compute_candle_features, validate_candle_data
|
|
from features.custom_loader import load_custom_features
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Window size used during training (number of candles per flattened sample)
|
|
TRAINING_WINDOW_SIZE = 94
|
|
|
|
# Per-candle features expected by the model, in order
|
|
TRAINING_FEATURE_ORDER = [
|
|
'open', 'high', 'low', 'close', 'volume', 'average', 'barCount',
|
|
'rsi_14', 'ema_20', 'ema_50',
|
|
'macd_macd', 'macd_signal', 'macd_hist',
|
|
'bbands_upper', 'bbands_middle', 'bbands_lower',
|
|
'atr_14', 'adx_14', 'cci_14', 'mfi_14',
|
|
'stoch_slowk', 'stoch_slowd',
|
|
'body_size', 'body_direction', 'upper_wick', 'lower_wick',
|
|
'wick_ratio', 'range', 'body_to_range', 'gap',
|
|
]
|
|
|
|
|
|
def _parse_training_feature_columns(
|
|
feature_columns: List[str]
|
|
) -> Tuple[int, List[str]]:
|
|
"""
|
|
Derive window size and per-candle feature order from flattened training columns.
|
|
|
|
Expected column format: "<feature>_<index>" (e.g., "open_0", "rsi_14_12").
|
|
"""
|
|
if not feature_columns:
|
|
raise ValueError("Training feature columns are empty")
|
|
|
|
feature_order: List[str] = []
|
|
max_idx = -1
|
|
idx_set = set()
|
|
|
|
for col in feature_columns:
|
|
match = re.match(r"^(.*)_([0-9]+)$", col)
|
|
if not match:
|
|
raise ValueError(f"Invalid training feature column format: {col}")
|
|
base = match.group(1)
|
|
idx = int(match.group(2))
|
|
if idx == 0:
|
|
feature_order.append(base)
|
|
if idx > max_idx:
|
|
max_idx = idx
|
|
idx_set.add(idx)
|
|
|
|
window_size = max_idx + 1
|
|
if window_size <= 0:
|
|
raise ValueError("Could not derive window size from training feature columns")
|
|
|
|
missing_idx = set(range(window_size)) - idx_set
|
|
if missing_idx:
|
|
raise ValueError(f"Missing window indices in training feature columns: {sorted(missing_idx)[:5]}")
|
|
|
|
if not feature_order:
|
|
raise ValueError("Could not derive per-candle feature order from training feature columns")
|
|
|
|
return window_size, feature_order
|
|
|
|
|
|
def preprocess_candles(
|
|
candles: List[dict],
|
|
pipeline_config: PipelineConfig,
|
|
training_feature_columns: Optional[List[str]] = None
|
|
) -> Tuple[pd.DataFrame, np.ndarray]:
|
|
"""
|
|
Preprocess candle data for inference.
|
|
|
|
Applies the same feature engineering steps as used during training,
|
|
then creates sliding windows and flattens them to match training format.
|
|
|
|
Args:
|
|
candles: List of candle dictionaries with time, open, high, low, close, volume
|
|
pipeline_config: Pipeline configuration (must match training config)
|
|
|
|
Returns:
|
|
Tuple of:
|
|
- X: DataFrame with flattened windowed features (one row per window)
|
|
- window_times: Array of time values, one per window (time of last candle)
|
|
|
|
Raises:
|
|
ValueError: If data validation fails or too many rows dropped
|
|
"""
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(candles)
|
|
|
|
# Ensure time column exists for tracking
|
|
if 'time' not in df.columns:
|
|
raise ValueError("Candles must include 'time' field")
|
|
|
|
original_rows = len(df)
|
|
logger.info(f"Preprocessing {original_rows} candles")
|
|
|
|
# Validate OHLC data
|
|
try:
|
|
validate_candle_data(df)
|
|
except Exception as e:
|
|
raise ValueError(f"Candle data validation failed: {e}")
|
|
|
|
# Handle missing or all-null volume column
|
|
if 'volume' not in df.columns or df['volume'].isna().all():
|
|
logger.warning("Volume data missing from candles, filling with 0")
|
|
df['volume'] = 0.0
|
|
|
|
# Add missing columns that were present in training data
|
|
for col in ['average', 'barCount']:
|
|
if col not in df.columns:
|
|
df[col] = 0.0
|
|
|
|
# Get feature engineering config
|
|
fe_config = pipeline_config.stages.feature_engineering
|
|
|
|
if not fe_config.enabled:
|
|
logger.warning("Feature engineering disabled in config - returning raw OHLCV")
|
|
raise ValueError("Feature engineering must be enabled for windowed inference")
|
|
|
|
# Compute ALL TA-Lib indicators (including volume-dependent ones)
|
|
if fe_config.talib_indicators:
|
|
logger.info(f"Computing {len(fe_config.talib_indicators)} TA-Lib indicators")
|
|
try:
|
|
df = compute_talib_indicators(df, fe_config.talib_indicators)
|
|
except Exception as e:
|
|
logger.error(f"Failed to compute TA-Lib indicators: {e}")
|
|
raise ValueError(f"Indicator computation failed: {e}")
|
|
|
|
# Compute candle features
|
|
if fe_config.candle_features:
|
|
logger.info("Computing candle features")
|
|
try:
|
|
df = compute_candle_features(df)
|
|
except Exception as e:
|
|
logger.error(f"Failed to compute candle features: {e}")
|
|
raise ValueError(f"Candle feature computation failed: {e}")
|
|
|
|
# Load custom features
|
|
if fe_config.custom_features:
|
|
logger.info(f"Loading {len(fe_config.custom_features)} custom feature(s)")
|
|
try:
|
|
df = load_custom_features(df, fe_config.custom_features)
|
|
except Exception as e:
|
|
logger.error(f"Failed to load custom features: {e}")
|
|
raise ValueError(f"Custom feature loading failed: {e}")
|
|
|
|
# Fill NaN values from indicator warmup and missing data with 0
|
|
# (instead of dropping rows, since we need contiguous windows)
|
|
nan_counts = df.isna().sum()
|
|
nan_cols = nan_counts[nan_counts > 0]
|
|
if not nan_cols.empty:
|
|
logger.info(f"Filling NaN values in {len(nan_cols)} columns (indicator warmup + missing data)")
|
|
df = df.fillna(0.0)
|
|
|
|
# Determine expected feature order and window size
|
|
if training_feature_columns:
|
|
window_size, feature_order = _parse_training_feature_columns(training_feature_columns)
|
|
logger.info(f"Using training feature columns: {len(feature_order)} features, window_size={window_size}")
|
|
else:
|
|
window_size = TRAINING_WINDOW_SIZE
|
|
feature_order = TRAINING_FEATURE_ORDER
|
|
|
|
# Ensure all expected per-candle features exist
|
|
for col in feature_order:
|
|
if col not in df.columns:
|
|
logger.warning(f"Missing expected feature column '{col}', filling with 0")
|
|
df[col] = 0.0
|
|
|
|
logger.info(f"Preprocessing complete: {len(df)} candles with {len(feature_order)} features each")
|
|
|
|
# Create sliding windows and flatten
|
|
X, window_times = create_sliding_windows(df, window_size, feature_order)
|
|
|
|
return X, window_times
|
|
|
|
|
|
def create_sliding_windows(
|
|
df: pd.DataFrame,
|
|
window_size: int,
|
|
feature_cols: List[str]
|
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""
|
|
Create sliding windows from per-candle features and flatten.
|
|
|
|
Each window of `window_size` consecutive candles is flattened into a single
|
|
row of features: [feat0_candle0, feat1_candle0, ..., featN_candle0,
|
|
feat0_candle1, ..., featN_candleM]
|
|
|
|
Args:
|
|
df: DataFrame with per-candle features and 'time' column
|
|
window_size: Number of candles per window
|
|
feature_cols: Ordered list of feature column names
|
|
|
|
Returns:
|
|
Tuple of:
|
|
- X: numpy array of shape (n_windows, window_size * n_features)
|
|
- window_times: array of time values (last candle time in each window)
|
|
"""
|
|
n_candles = len(df)
|
|
n_features = len(feature_cols)
|
|
|
|
if n_candles < window_size:
|
|
raise ValueError(
|
|
f"Not enough candles ({n_candles}) for window size {window_size}. "
|
|
f"Need at least {window_size} candles."
|
|
)
|
|
|
|
# Extract feature matrix in correct column order
|
|
feature_matrix = df[feature_cols].values # shape: (n_candles, n_features)
|
|
times = df['time'].values
|
|
|
|
n_windows = n_candles - window_size + 1
|
|
|
|
# Create flattened windows using stride tricks for efficiency
|
|
# Each window: candle features are interleaved as col_0, col_1, ..., col_N for each candle index
|
|
X = np.zeros((n_windows, window_size * n_features), dtype=np.float64)
|
|
window_times = np.zeros(n_windows, dtype=times.dtype)
|
|
|
|
for i in range(n_windows):
|
|
window = feature_matrix[i:i + window_size] # shape: (window_size, n_features)
|
|
# Flatten: row-major means [candle0_feat0, candle0_feat1, ..., candle1_feat0, ...]
|
|
# But training used {col}_{candle_idx} ordering, which is column-first per candle
|
|
# i.e., open_0, high_0, ..., gap_0, open_1, high_1, ..., gap_1, ...
|
|
X[i] = window.flatten() # row-major: candle0_all_feats, candle1_all_feats, ...
|
|
window_times[i] = times[i + window_size - 1] # last candle in window
|
|
|
|
logger.info(f"Created {n_windows} sliding windows of size {window_size} "
|
|
f"({n_windows * n_features * window_size} total features)")
|
|
|
|
return X, window_times
|
|
|
|
|
|
def extract_feature_columns(
|
|
df: pd.DataFrame,
|
|
exclude_columns: List[str] = None
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Extract only feature columns for model prediction.
|
|
|
|
Removes metadata columns like 'time' that should not be used as features.
|
|
|
|
Args:
|
|
df: Preprocessed DataFrame
|
|
exclude_columns: Columns to exclude (default: ['time'])
|
|
|
|
Returns:
|
|
DataFrame with only feature columns
|
|
"""
|
|
if exclude_columns is None:
|
|
exclude_columns = ['time']
|
|
|
|
feature_cols = [col for col in df.columns if col not in exclude_columns]
|
|
|
|
logger.info(f"Using {len(feature_cols)} feature columns for prediction")
|
|
|
|
return df[feature_cols]
|
|
|
|
|
|
def validate_feature_parity(
|
|
inference_features: List[str],
|
|
training_features: List[str]
|
|
) -> bool:
|
|
"""
|
|
Validate that inference features match training features.
|
|
|
|
Args:
|
|
inference_features: Feature column names from inference preprocessing
|
|
training_features: Feature column names from training
|
|
|
|
Returns:
|
|
True if features match exactly
|
|
|
|
Raises:
|
|
ValueError: If features don't match
|
|
"""
|
|
inference_set = set(inference_features)
|
|
training_set = set(training_features)
|
|
|
|
missing = training_set - inference_set
|
|
extra = inference_set - training_set
|
|
|
|
if missing or extra:
|
|
error_msg = "Feature mismatch detected between training and inference:\n"
|
|
|
|
if missing:
|
|
error_msg += f" Missing features: {sorted(missing)}\n"
|
|
|
|
if extra:
|
|
error_msg += f" Extra features: {sorted(extra)}\n"
|
|
|
|
error_msg += "\nThis indicates preprocessing parity is broken. "
|
|
error_msg += "Ensure the pipeline config used for inference matches training."
|
|
|
|
logger.error(error_msg)
|
|
raise ValueError(error_msg)
|
|
|
|
logger.info("Feature parity validated: inference features match training")
|
|
return True
|