# ML Pipeline for Candlestick Pattern Recognition Build a modular ML pipeline for forex candlestick pattern recognition. The pipeline has distinct stages that can be enabled/disabled independently via config. Each stage reads from and writes to well-defined file formats so stages can run standalone. ## Architecture Overview ``` [Stage 1: Feature Engineering] ← optional, uses TA-Lib ↓ outputs enriched OHLCV CSV [Stage 2: Annotation Ingestion] ← optional, merges human labels ↓ outputs labeled dataset CSV [Stage 3: Training] ← always runs ↓ logs to MLflow, saves model [Stage 4: Inference] ← loads model, returns predictions ↓ serves predictions via API or direct call ``` ## Pipeline Config A single YAML config controls the entire pipeline: ```yaml pipeline: name: "candlestick_pattern_v1" pair: "EURUSD" timeframe: "1H" data: raw_ohlcv_path: "data/raw/EURUSD_1H.csv" # input: raw OHLCV data enriched_path: "data/enriched/EURUSD_1H.csv" # output of stage 1 labeled_path: "data/labeled/EURUSD_1H.csv" # output of stage 2 annotations_path: "data/annotations/annotations.json" # from annotation tool stages: feature_engineering: enabled: true # set false to skip, use raw OHLCV only talib_indicators: - name: "RSI" params: { timeperiod: 14 } - name: "ATR" params: { timeperiod: 14 } - name: "EMA" params: { timeperiod: 20 } - name: "MACD" params: { fastperiod: 12, slowperiod: 26, signalperiod: 9 } - name: "BBANDS" params: { timeperiod: 20 } candle_features: true # compute body_size, wick_ratio, body_direction, gap, etc. custom_features: # user-defined feature functions - "features.custom.trend_slope" - "features.custom.volume_zscore" annotation_ingestion: enabled: true # set false to skip, use programmatic labels only source: "span" # "span" (from annotation tool) or "point" (single candle marks) context_padding: 20 # include N candles before/after each span label_encoding: "bio" # "bio" for sequence models, "window" for classification window_size: 30 # fixed window size for classification (pad/truncate spans) min_confidence: 3 # discard annotations below this confidence score programmatic_labels: enabled: true # also generate labels from TA-Lib pattern functions talib_patterns: # TA-Lib's CDL* pattern recognition functions - "CDL_ENGULFING" - "CDL_MORNINGSTAR" - "CDL_EVENINGSTAR" - "CDL_HAMMER" - "CDL_SHOOTINGSTAR" - "CDL_DOJI" merge_strategy: "human_priority" # when human and programmatic disagree: # "human_priority" = keep human label # "programmatic_priority" = keep TA-Lib label # "both" = keep both as separate label columns training: enabled: true model_type: "xgboost" # "xgboost", "lightgbm", "cnn_1d", "lstm", "transformer" task: "classification" # "classification" or "sequence_labeling" target_column: "label" # or "bio_tag" for sequence models test_split: 0.2 validation_split: 0.1 split_method: "temporal" # "temporal" (time-based) or "random" # IMPORTANT: always use temporal for financial data class_weights: "balanced" # handle imbalanced pattern classes hyperparameters: # model-specific, these are xgboost defaults n_estimators: 500 max_depth: 6 learning_rate: 0.01 subsample: 0.8 mlflow: tracking_uri: "http://localhost:5000" experiment_name: "candlestick_patterns" log_artifacts: true # log feature importance plots, confusion matrices register_model: true # auto-register if metric beats current best inference: model_source: "mlflow" # "mlflow" (load from registry) or "local" (load from file) mlflow_model_name: "candlestick_pattern_v1" mlflow_model_stage: "Production" local_model_path: "models/best_model.pkl" serve_mode: "api" # "api" (REST endpoint) or "library" (direct Python call) api_port: 8001 batch_size: 64 # for batch inference on historical data ``` --- ## Stage 1: Feature Engineering (Optional) **Input:** Raw OHLCV CSV (`data.raw_ohlcv_path`) **Output:** Enriched CSV (`data.enriched_path`) with additional feature columns **Skip condition:** If `stages.feature_engineering.enabled` is false, Stage 2/3 reads raw OHLCV directly. ### What this stage does 1. Load raw OHLCV data (columns: `time, open, high, low, close, volume`) 2. If `talib_indicators` is configured, compute each indicator using TA-Lib and append as new columns. Column naming: lowercase indicator name, e.g. `rsi_14`, `atr_14`, `ema_20`, `macd`, `macd_signal`, `macd_hist`, `bbands_upper`, `bbands_middle`, `bbands_lower` 3. If `candle_features` is true, compute these derived features for each candle: - `body_size` = abs(close - open) - `body_direction` = 1 if close >= open, else -1 - `upper_wick` = high - max(open, close) - `lower_wick` = min(open, close) - low - `wick_ratio` = upper_wick / lower_wick (handle div by zero) - `body_to_range` = body_size / (high - low) (handle div by zero) - `gap` = open - previous close - `range` = high - low 4. If `custom_features` is configured, import and call each function. Each custom feature function receives the full DataFrame and returns a Series. 5. Handle NaN values from indicator warmup periods (drop or forward-fill based on config). 6. Save enriched CSV. ### TA-Lib installation note TA-Lib requires a C library installation before the Python wrapper works: - macOS: `brew install ta-lib` - Ubuntu: `apt-get install libta-lib-dev` - Windows: download from ta-lib.org - Python wrapper: `pip install TA-Lib` If TA-Lib is not installed and stage is enabled, fail with a clear error message and instructions, do not silently skip. --- ## Stage 2: Annotation Ingestion (Optional) **Input:** Enriched CSV (or raw OHLCV if Stage 1 skipped) + annotations JSON from the annotation tool **Output:** Labeled dataset CSV (`data.labeled_path`) **Skip condition:** If `stages.annotation_ingestion.enabled` is false, a target column must already exist in the input data (e.g., from a previous run or manual CSV column). ### What this stage does 1. Load the enriched/raw OHLCV CSV. 2. Load annotations JSON (exported from the span annotation tool). Expected format: ```json { "annotations": [ { "id": "uuid", "start_time": "2024-03-15T09:00:00Z", "end_time": "2024-03-15T16:00:00Z", "label": "bull_flag", "confidence": 4, "outcome": "win", "sub_spans": [...] } ] } ``` 3. **If `label_encoding` is "bio":** For each candle in the dataset, assign a BIO tag based on annotations: - First candle of an annotation span → `B-{label}` - Subsequent candles in the span → `I-{label}` - Candles outside any span → `O` - For overlapping annotations, create multiple tag columns (`bio_tag_1`, `bio_tag_2`) 4. **If `label_encoding` is "window":** For each annotation, extract a fixed-size window of `window_size` candles centered on the annotation span. If the span is shorter than window_size, pad with context candles. If longer, use the full span. Each window becomes one row in the output with flattened OHLCV + feature columns. 5. Filter annotations below `min_confidence`. 6. **If `programmatic_labels.enabled`:** Also run TA-Lib's CDL* functions on the OHLC data. These return +100/-100/0 for bullish/bearish/no-pattern. Convert to label names. Merge with human annotations using the configured `merge_strategy`. 7. Add `context_padding` candles before and after each annotation span in the output (for models that need trend context). 8. Log dataset statistics: - Total annotations by label - Class distribution - Average span length per label - Agreement rate between human and programmatic labels (if both enabled) 9. Save labeled dataset CSV. --- ## Stage 3: Training **Input:** Labeled dataset CSV (`data.labeled_path`) **Output:** Trained model (logged to MLflow and/or saved locally) **This stage always runs when the pipeline is executed.** ### What this stage does 1. Load labeled dataset. 2. Split into train/validation/test using `split_method`: - `temporal`: sort by time, first N% train, next M% validation, last K% test. **Never shuffle financial time series.** - `random`: standard sklearn split (not recommended for financial data, but available). 3. Separate features (X) from target (y) using `target_column`. 4. Apply `class_weights` to handle imbalanced labels (common: you'll have way more "O" / no-pattern than any specific pattern). 5. **Start an MLflow run:** ```python import mlflow mlflow.set_tracking_uri(config.training.mlflow.tracking_uri) mlflow.set_experiment(config.training.mlflow.experiment_name) with mlflow.start_run(): # Log the full pipeline config mlflow.log_dict(config, "pipeline_config.yaml") # Log dataset info mlflow.log_param("dataset_version", dvc_version_hash) # if using DVC mlflow.log_param("total_samples", len(X_train) + len(X_test)) mlflow.log_param("num_classes", len(unique_labels)) mlflow.log_param("model_type", config.training.model_type) mlflow.log_param("window_size", config.annotation_ingestion.window_size) mlflow.log_param("feature_engineering_enabled", config.stages.feature_engineering.enabled) mlflow.log_param("annotations_enabled", config.stages.annotation_ingestion.enabled) # Log per-class sample counts for label, count in label_counts.items(): mlflow.log_param(f"samples_{label}", count) # Log all hyperparameters mlflow.log_params(config.training.hyperparameters) # Train model model = train(config.training.model_type, X_train, y_train, config.training.hyperparameters) # Evaluate y_pred = model.predict(X_test) # Log overall metrics mlflow.log_metric("accuracy", accuracy_score(y_test, y_pred)) mlflow.log_metric("f1_macro", f1_score(y_test, y_pred, average='macro')) mlflow.log_metric("f1_weighted", f1_score(y_test, y_pred, average='weighted')) # Log PER-CLASS metrics (critical for imbalanced pattern data) report = classification_report(y_test, y_pred, output_dict=True) for label, metrics in report.items(): if isinstance(metrics, dict): mlflow.log_metric(f"precision_{label}", metrics['precision']) mlflow.log_metric(f"recall_{label}", metrics['recall']) mlflow.log_metric(f"f1_{label}", metrics['f1-score']) # Log artifacts if config.training.mlflow.log_artifacts: # Confusion matrix plot fig = plot_confusion_matrix(y_test, y_pred, labels=unique_labels) mlflow.log_figure(fig, "confusion_matrix.png") # Feature importance (for tree models) if hasattr(model, 'feature_importances_'): fig = plot_feature_importance(model, feature_names) mlflow.log_figure(fig, "feature_importance.png") # Classification report as text mlflow.log_text(classification_report(y_test, y_pred), "classification_report.txt") # Log model mlflow.sklearn.log_model(model, "model") # or mlflow.xgboost, mlflow.pytorch etc. # Register model if configured if config.training.mlflow.register_model: mlflow.register_model(f"runs:/{run.info.run_id}/model", config.inference.mlflow_model_name) ``` 6. **Model-specific training logic:** - **xgboost / lightgbm:** Direct fit on tabular features. Works with windowed classification format. - **cnn_1d:** Reshape windowed data into (samples, timesteps, features) tensor. Use Conv1D → MaxPool → Dense layers. - **lstm:** Same reshape as CNN. Use LSTM → Dense layers. Consider bidirectional LSTM for pattern detection. - **transformer:** Use a lightweight transformer encoder. Positional encoding is important for candle sequence order. For sequence labeling task (`task: "sequence_labeling"`): - **BiLSTM-CRF:** Use BIO-tagged data. Input is full candle sequence, output is tag per candle. --- ## Stage 4: Inference **Input:** New OHLCV data (with features if Stage 1 is enabled) + trained model **Output:** Pattern predictions for each candle or window ### What this stage does 1. **Load model:** - If `model_source` is "mlflow": load from MLflow model registry using model name + stage - If `model_source` is "local": load from file path ```python if config.inference.model_source == "mlflow": import mlflow.pyfunc model = mlflow.pyfunc.load_model( model_uri=f"models:/{config.inference.mlflow_model_name}/{config.inference.mlflow_model_stage}" ) else: import joblib model = joblib.load(config.inference.local_model_path) ``` 2. **Preprocessing must match training exactly.** If Stage 1 was enabled during training, it must also run during inference with the same config. Same indicators, same candle features, same custom features, same parameter values. Log the pipeline config as an MLflow artifact during training so inference can replicate it. 3. **If `serve_mode` is "api":** Start a REST endpoint (Flask/FastAPI) on `api_port`: ``` POST /predict Content-Type: application/json { "candles": [ {"time": "2024-03-15T09:00:00Z", "open": 1.0921, "high": 1.0935, "low": 1.0918, "close": 1.0933, "volume": 1200}, ... ] } Response: { "predictions": [ {"time": "2024-03-15T09:00:00Z", "label": "bull_flag", "confidence": 0.87}, {"time": "2024-03-15T10:00:00Z", "label": "bull_flag", "confidence": 0.82}, {"time": "2024-03-15T11:00:00Z", "label": "O", "confidence": 0.95}, ... ], "model_version": "v3", "pipeline_config_hash": "abc123" } ``` The API must: - Accept raw OHLCV candles - Run Stage 1 (feature engineering) internally if it was used during training - Run the model - Return predictions with confidence scores - Return model version for traceability 4. **If `serve_mode` is "library":** Expose a Python function that the annotation tool can call directly: ```python from pipeline import predict results = predict(candles_df) # returns DataFrame with label + confidence columns ``` 5. **Batch inference:** For historical data, process in chunks of `batch_size` to avoid memory issues. Output a full CSV with predictions alongside the original OHLCV data. 6. **Integration with annotation tool:** The annotation tool can call the inference API to show predicted patterns as a separate visual layer (e.g., dashed outline vs. solid for human annotations). This enables the user to compare their annotations against the model's predictions and find disagreements. --- ## Directory Structure ``` project/ ├── config/ │ └── pipeline.yaml ├── data/ │ ├── raw/ # raw OHLCV CSVs │ ├── enriched/ # after feature engineering │ ├── labeled/ # after annotation ingestion │ └── annotations/ # JSON exports from annotation tool ├── features/ │ ├── talib_features.py │ ├── candle_features.py │ └── custom/ │ ├── trend_slope.py │ └── volume_zscore.py ├── training/ │ ├── train.py # main training entry point │ ├── models/ # model architecture definitions │ │ ├── xgb.py │ │ ├── cnn.py │ │ ├── lstm.py │ │ └── transformer.py │ └── evaluation.py # metrics, plots, reports ├── inference/ │ ├── serve.py # REST API server │ ├── predict.py # library-mode prediction │ └── preprocess.py # must mirror training preprocessing exactly ├── pipeline.py # orchestrates all stages ├── requirements.txt └── README.md ``` ## Running the Pipeline ```bash # Run full pipeline python pipeline.py --config config/pipeline.yaml # Run individual stages python pipeline.py --config config/pipeline.yaml --stage feature_engineering python pipeline.py --config config/pipeline.yaml --stage annotation_ingestion python pipeline.py --config config/pipeline.yaml --stage training python pipeline.py --config config/pipeline.yaml --stage inference # Start inference API python inference/serve.py --config config/pipeline.yaml ``` ## Key Principles 1. **Every stage is optional except training.** The pipeline should work with raw OHLCV + no annotations (using only programmatic TA-Lib labels), with annotations but no TA-Lib features, or with everything enabled. 2. **Preprocessing parity.** Whatever transformations run during training MUST run identically during inference. Log the full config as an MLflow artifact. 3. **Temporal splits only.** Never randomly shuffle financial time series data. Future data must never leak into training. 4. **Per-class metrics matter more than overall accuracy.** A model that predicts "no pattern" for everything will have high accuracy but is useless. Track precision/recall for each pattern class. 5. **MLflow is the observer, not the engine.** It logs, stores, and serves — your code does all computation.