candle-annotator/ml-pipeline-prompt.md

17 KiB

ML Pipeline for Candlestick Pattern Recognition

Build a modular ML pipeline for forex candlestick pattern recognition. The pipeline has distinct stages that can be enabled/disabled independently via config. Each stage reads from and writes to well-defined file formats so stages can run standalone.

Architecture Overview

[Stage 1: Feature Engineering]  ← optional, uses TA-Lib
        ↓ outputs enriched OHLCV CSV
[Stage 2: Annotation Ingestion]  ← optional, merges human labels
        ↓ outputs labeled dataset CSV
[Stage 3: Training]              ← always runs
        ↓ logs to MLflow, saves model
[Stage 4: Inference]             ← loads model, returns predictions
        ↓ serves predictions via API or direct call

Pipeline Config

A single YAML config controls the entire pipeline:

pipeline:
  name: "candlestick_pattern_v1"
  pair: "EURUSD"
  timeframe: "1H"

data:
  raw_ohlcv_path: "data/raw/EURUSD_1H.csv"    # input: raw OHLCV data
  enriched_path: "data/enriched/EURUSD_1H.csv" # output of stage 1
  labeled_path: "data/labeled/EURUSD_1H.csv"   # output of stage 2
  annotations_path: "data/annotations/annotations.json"  # from annotation tool

stages:
  feature_engineering:
    enabled: true          # set false to skip, use raw OHLCV only
    talib_indicators:
      - name: "RSI"
        params: { timeperiod: 14 }
      - name: "ATR"
        params: { timeperiod: 14 }
      - name: "EMA"
        params: { timeperiod: 20 }
      - name: "MACD"
        params: { fastperiod: 12, slowperiod: 26, signalperiod: 9 }
      - name: "BBANDS"
        params: { timeperiod: 20 }
    candle_features: true  # compute body_size, wick_ratio, body_direction, gap, etc.
    custom_features:       # user-defined feature functions
      - "features.custom.trend_slope"
      - "features.custom.volume_zscore"

  annotation_ingestion:
    enabled: true           # set false to skip, use programmatic labels only
    source: "span"          # "span" (from annotation tool) or "point" (single candle marks)
    context_padding: 20     # include N candles before/after each span
    label_encoding: "bio"   # "bio" for sequence models, "window" for classification
    window_size: 30         # fixed window size for classification (pad/truncate spans)
    min_confidence: 3       # discard annotations below this confidence score
    programmatic_labels:
      enabled: true         # also generate labels from TA-Lib pattern functions
      talib_patterns:       # TA-Lib's CDL* pattern recognition functions
        - "CDL_ENGULFING"
        - "CDL_MORNINGSTAR"
        - "CDL_EVENINGSTAR"
        - "CDL_HAMMER"
        - "CDL_SHOOTINGSTAR"
        - "CDL_DOJI"
      merge_strategy: "human_priority"  # when human and programmatic disagree:
                                         # "human_priority" = keep human label
                                         # "programmatic_priority" = keep TA-Lib label
                                         # "both" = keep both as separate label columns

  training:
    enabled: true
    model_type: "xgboost"   # "xgboost", "lightgbm", "cnn_1d", "lstm", "transformer"
    task: "classification"  # "classification" or "sequence_labeling"
    target_column: "label"  # or "bio_tag" for sequence models
    test_split: 0.2
    validation_split: 0.1
    split_method: "temporal" # "temporal" (time-based) or "random"
                              # IMPORTANT: always use temporal for financial data
    class_weights: "balanced" # handle imbalanced pattern classes
    hyperparameters:          # model-specific, these are xgboost defaults
      n_estimators: 500
      max_depth: 6
      learning_rate: 0.01
      subsample: 0.8
    mlflow:
      tracking_uri: "http://localhost:5000"
      experiment_name: "candlestick_patterns"
      log_artifacts: true     # log feature importance plots, confusion matrices
      register_model: true    # auto-register if metric beats current best

  inference:
    model_source: "mlflow"    # "mlflow" (load from registry) or "local" (load from file)
    mlflow_model_name: "candlestick_pattern_v1"
    mlflow_model_stage: "Production"
    local_model_path: "models/best_model.pkl"
    serve_mode: "api"         # "api" (REST endpoint) or "library" (direct Python call)
    api_port: 8001
    batch_size: 64            # for batch inference on historical data

Stage 1: Feature Engineering (Optional)

Input: Raw OHLCV CSV (data.raw_ohlcv_path) Output: Enriched CSV (data.enriched_path) with additional feature columns Skip condition: If stages.feature_engineering.enabled is false, Stage 2/3 reads raw OHLCV directly.

What this stage does

  1. Load raw OHLCV data (columns: time, open, high, low, close, volume)
  2. If talib_indicators is configured, compute each indicator using TA-Lib and append as new columns. Column naming: lowercase indicator name, e.g. rsi_14, atr_14, ema_20, macd, macd_signal, macd_hist, bbands_upper, bbands_middle, bbands_lower
  3. If candle_features is true, compute these derived features for each candle:
    • body_size = abs(close - open)
    • body_direction = 1 if close >= open, else -1
    • upper_wick = high - max(open, close)
    • lower_wick = min(open, close) - low
    • wick_ratio = upper_wick / lower_wick (handle div by zero)
    • body_to_range = body_size / (high - low) (handle div by zero)
    • gap = open - previous close
    • range = high - low
  4. If custom_features is configured, import and call each function. Each custom feature function receives the full DataFrame and returns a Series.
  5. Handle NaN values from indicator warmup periods (drop or forward-fill based on config).
  6. Save enriched CSV.

TA-Lib installation note

TA-Lib requires a C library installation before the Python wrapper works:

  • macOS: brew install ta-lib
  • Ubuntu: apt-get install libta-lib-dev
  • Windows: download from ta-lib.org
  • Python wrapper: pip install TA-Lib

If TA-Lib is not installed and stage is enabled, fail with a clear error message and instructions, do not silently skip.


Stage 2: Annotation Ingestion (Optional)

Input: Enriched CSV (or raw OHLCV if Stage 1 skipped) + annotations JSON from the annotation tool Output: Labeled dataset CSV (data.labeled_path) Skip condition: If stages.annotation_ingestion.enabled is false, a target column must already exist in the input data (e.g., from a previous run or manual CSV column).

What this stage does

  1. Load the enriched/raw OHLCV CSV.
  2. Load annotations JSON (exported from the span annotation tool). Expected format:
{
  "annotations": [
    {
      "id": "uuid",
      "start_time": "2024-03-15T09:00:00Z",
      "end_time": "2024-03-15T16:00:00Z",
      "label": "bull_flag",
      "confidence": 4,
      "outcome": "win",
      "sub_spans": [...]
    }
  ]
}
  1. If label_encoding is "bio": For each candle in the dataset, assign a BIO tag based on annotations:

    • First candle of an annotation span → B-{label}
    • Subsequent candles in the span → I-{label}
    • Candles outside any span → O
    • For overlapping annotations, create multiple tag columns (bio_tag_1, bio_tag_2)
  2. If label_encoding is "window": For each annotation, extract a fixed-size window of window_size candles centered on the annotation span. If the span is shorter than window_size, pad with context candles. If longer, use the full span. Each window becomes one row in the output with flattened OHLCV + feature columns.

  3. Filter annotations below min_confidence.

  4. If programmatic_labels.enabled: Also run TA-Lib's CDL* functions on the OHLC data. These return +100/-100/0 for bullish/bearish/no-pattern. Convert to label names. Merge with human annotations using the configured merge_strategy.

  5. Add context_padding candles before and after each annotation span in the output (for models that need trend context).

  6. Log dataset statistics:

    • Total annotations by label
    • Class distribution
    • Average span length per label
    • Agreement rate between human and programmatic labels (if both enabled)
  7. Save labeled dataset CSV.


Stage 3: Training

Input: Labeled dataset CSV (data.labeled_path) Output: Trained model (logged to MLflow and/or saved locally) This stage always runs when the pipeline is executed.

What this stage does

  1. Load labeled dataset.

  2. Split into train/validation/test using split_method:

    • temporal: sort by time, first N% train, next M% validation, last K% test. Never shuffle financial time series.
    • random: standard sklearn split (not recommended for financial data, but available).
  3. Separate features (X) from target (y) using target_column.

  4. Apply class_weights to handle imbalanced labels (common: you'll have way more "O" / no-pattern than any specific pattern).

  5. Start an MLflow run:

import mlflow

mlflow.set_tracking_uri(config.training.mlflow.tracking_uri)
mlflow.set_experiment(config.training.mlflow.experiment_name)

with mlflow.start_run():
    # Log the full pipeline config
    mlflow.log_dict(config, "pipeline_config.yaml")

    # Log dataset info
    mlflow.log_param("dataset_version", dvc_version_hash)  # if using DVC
    mlflow.log_param("total_samples", len(X_train) + len(X_test))
    mlflow.log_param("num_classes", len(unique_labels))
    mlflow.log_param("model_type", config.training.model_type)
    mlflow.log_param("window_size", config.annotation_ingestion.window_size)
    mlflow.log_param("feature_engineering_enabled", config.stages.feature_engineering.enabled)
    mlflow.log_param("annotations_enabled", config.stages.annotation_ingestion.enabled)

    # Log per-class sample counts
    for label, count in label_counts.items():
        mlflow.log_param(f"samples_{label}", count)

    # Log all hyperparameters
    mlflow.log_params(config.training.hyperparameters)

    # Train model
    model = train(config.training.model_type, X_train, y_train, config.training.hyperparameters)

    # Evaluate
    y_pred = model.predict(X_test)

    # Log overall metrics
    mlflow.log_metric("accuracy", accuracy_score(y_test, y_pred))
    mlflow.log_metric("f1_macro", f1_score(y_test, y_pred, average='macro'))
    mlflow.log_metric("f1_weighted", f1_score(y_test, y_pred, average='weighted'))

    # Log PER-CLASS metrics (critical for imbalanced pattern data)
    report = classification_report(y_test, y_pred, output_dict=True)
    for label, metrics in report.items():
        if isinstance(metrics, dict):
            mlflow.log_metric(f"precision_{label}", metrics['precision'])
            mlflow.log_metric(f"recall_{label}", metrics['recall'])
            mlflow.log_metric(f"f1_{label}", metrics['f1-score'])

    # Log artifacts
    if config.training.mlflow.log_artifacts:
        # Confusion matrix plot
        fig = plot_confusion_matrix(y_test, y_pred, labels=unique_labels)
        mlflow.log_figure(fig, "confusion_matrix.png")

        # Feature importance (for tree models)
        if hasattr(model, 'feature_importances_'):
            fig = plot_feature_importance(model, feature_names)
            mlflow.log_figure(fig, "feature_importance.png")

        # Classification report as text
        mlflow.log_text(classification_report(y_test, y_pred), "classification_report.txt")

    # Log model
    mlflow.sklearn.log_model(model, "model")  # or mlflow.xgboost, mlflow.pytorch etc.

    # Register model if configured
    if config.training.mlflow.register_model:
        mlflow.register_model(f"runs:/{run.info.run_id}/model", config.inference.mlflow_model_name)
  1. Model-specific training logic:

    • xgboost / lightgbm: Direct fit on tabular features. Works with windowed classification format.
    • cnn_1d: Reshape windowed data into (samples, timesteps, features) tensor. Use Conv1D → MaxPool → Dense layers.
    • lstm: Same reshape as CNN. Use LSTM → Dense layers. Consider bidirectional LSTM for pattern detection.
    • transformer: Use a lightweight transformer encoder. Positional encoding is important for candle sequence order.

    For sequence labeling task (task: "sequence_labeling"):

    • BiLSTM-CRF: Use BIO-tagged data. Input is full candle sequence, output is tag per candle.

Stage 4: Inference

Input: New OHLCV data (with features if Stage 1 is enabled) + trained model Output: Pattern predictions for each candle or window

What this stage does

  1. Load model:
    • If model_source is "mlflow": load from MLflow model registry using model name + stage
    • If model_source is "local": load from file path
if config.inference.model_source == "mlflow":
    import mlflow.pyfunc
    model = mlflow.pyfunc.load_model(
        model_uri=f"models:/{config.inference.mlflow_model_name}/{config.inference.mlflow_model_stage}"
    )
else:
    import joblib
    model = joblib.load(config.inference.local_model_path)
  1. Preprocessing must match training exactly. If Stage 1 was enabled during training, it must also run during inference with the same config. Same indicators, same candle features, same custom features, same parameter values. Log the pipeline config as an MLflow artifact during training so inference can replicate it.

  2. If serve_mode is "api": Start a REST endpoint (Flask/FastAPI) on api_port:

POST /predict
Content-Type: application/json

{
  "candles": [
    {"time": "2024-03-15T09:00:00Z", "open": 1.0921, "high": 1.0935, "low": 1.0918, "close": 1.0933, "volume": 1200},
    ...
  ]
}

Response:
{
  "predictions": [
    {"time": "2024-03-15T09:00:00Z", "label": "bull_flag", "confidence": 0.87},
    {"time": "2024-03-15T10:00:00Z", "label": "bull_flag", "confidence": 0.82},
    {"time": "2024-03-15T11:00:00Z", "label": "O", "confidence": 0.95},
    ...
  ],
  "model_version": "v3",
  "pipeline_config_hash": "abc123"
}

The API must:

  • Accept raw OHLCV candles
  • Run Stage 1 (feature engineering) internally if it was used during training
  • Run the model
  • Return predictions with confidence scores
  • Return model version for traceability
  1. If serve_mode is "library": Expose a Python function that the annotation tool can call directly:
from pipeline import predict

results = predict(candles_df)  # returns DataFrame with label + confidence columns
  1. Batch inference: For historical data, process in chunks of batch_size to avoid memory issues. Output a full CSV with predictions alongside the original OHLCV data.

  2. Integration with annotation tool: The annotation tool can call the inference API to show predicted patterns as a separate visual layer (e.g., dashed outline vs. solid for human annotations). This enables the user to compare their annotations against the model's predictions and find disagreements.


Directory Structure

project/
├── config/
│   └── pipeline.yaml
├── data/
│   ├── raw/              # raw OHLCV CSVs
│   ├── enriched/         # after feature engineering
│   ├── labeled/          # after annotation ingestion
│   └── annotations/      # JSON exports from annotation tool
├── features/
│   ├── talib_features.py
│   ├── candle_features.py
│   └── custom/
│       ├── trend_slope.py
│       └── volume_zscore.py
├── training/
│   ├── train.py          # main training entry point
│   ├── models/           # model architecture definitions
│   │   ├── xgb.py
│   │   ├── cnn.py
│   │   ├── lstm.py
│   │   └── transformer.py
│   └── evaluation.py     # metrics, plots, reports
├── inference/
│   ├── serve.py          # REST API server
│   ├── predict.py        # library-mode prediction
│   └── preprocess.py     # must mirror training preprocessing exactly
├── pipeline.py           # orchestrates all stages
├── requirements.txt
└── README.md

Running the Pipeline

# Run full pipeline
python pipeline.py --config config/pipeline.yaml

# Run individual stages
python pipeline.py --config config/pipeline.yaml --stage feature_engineering
python pipeline.py --config config/pipeline.yaml --stage annotation_ingestion
python pipeline.py --config config/pipeline.yaml --stage training
python pipeline.py --config config/pipeline.yaml --stage inference

# Start inference API
python inference/serve.py --config config/pipeline.yaml

Key Principles

  1. Every stage is optional except training. The pipeline should work with raw OHLCV + no annotations (using only programmatic TA-Lib labels), with annotations but no TA-Lib features, or with everything enabled.
  2. Preprocessing parity. Whatever transformations run during training MUST run identically during inference. Log the full config as an MLflow artifact.
  3. Temporal splits only. Never randomly shuffle financial time series data. Future data must never leak into training.
  4. Per-class metrics matter more than overall accuracy. A model that predicts "no pattern" for everything will have high accuracy but is useless. Track precision/recall for each pattern class.
  5. MLflow is the observer, not the engine. It logs, stores, and serves — your code does all computation.