diff --git a/openspec/changes/candle-backend/tasks.md b/openspec/changes/candle-backend/tasks.md index 83e6eb4..6018010 100644 --- a/openspec/changes/candle-backend/tasks.md +++ b/openspec/changes/candle-backend/tasks.md @@ -25,14 +25,14 @@ ## 4. Annotation Ingestion Stage -- [ ] 4.1 Create `services/ml/app/annotation_ingestion.py` — load annotations JSON from `data.annotations_path`, filter by min_confidence -- [ ] 4.2 Implement windowed classification encoding — extract fixed-size windows centered on each annotation span, flatten into single rows, handle boundary padding -- [ ] 4.3 Implement BIO sequence labeling encoding — assign B-{label}/I-{label}/O tags per candle, handle overlapping annotations with multiple tag columns -- [ ] 4.4 Implement TA-Lib CDL* programmatic labeling — run configured CDL functions, convert +100/-100 to label names (bullish_/bearish_ prefix) -- [ ] 4.5 Implement human/programmatic label merge strategies — human_priority, programmatic_priority, both (separate columns) -- [ ] 4.6 Implement context padding — include N candles before/after each annotation span -- [ ] 4.7 Add dataset statistics logging — counts per label, class distribution %, avg span length, human/programmatic agreement rate -- [ ] 4.8 Wire annotation ingestion into `pipeline.py` — read enriched CSV + annotations JSON, run encoding, write labeled CSV to `data.labeled_path` +- [x] 4.1 Create `services/ml/app/annotation_ingestion.py` — load annotations JSON from `data.annotations_path`, filter by min_confidence +- [x] 4.2 Implement windowed classification encoding — extract fixed-size windows centered on each annotation span, flatten into single rows, handle boundary padding +- [x] 4.3 Implement BIO sequence labeling encoding — assign B-{label}/I-{label}/O tags per candle, handle overlapping annotations with multiple tag columns +- [x] 4.4 Implement TA-Lib CDL* programmatic labeling — run configured CDL functions, convert +100/-100 to label names (bullish_/bearish_ prefix) +- [x] 4.5 Implement human/programmatic label merge strategies — human_priority, programmatic_priority, both (separate columns) +- [x] 4.6 Implement context padding — include N candles before/after each annotation span +- [x] 4.7 Add dataset statistics logging — counts per label, class distribution %, avg span length, human/programmatic agreement rate +- [x] 4.8 Wire annotation ingestion into `pipeline.py` — read enriched CSV + annotations JSON, run encoding, write labeled CSV to `data.labeled_path` ## 5. Training Stage diff --git a/services/ml/app/annotation_ingestion.py b/services/ml/app/annotation_ingestion.py new file mode 100644 index 0000000..8c2bd0e --- /dev/null +++ b/services/ml/app/annotation_ingestion.py @@ -0,0 +1,524 @@ +""" +Annotation ingestion module. + +Loads annotations from JSON exports and converts them into labeled training datasets +with various encoding strategies (windowed classification, BIO sequence labeling). +Supports programmatic TA-Lib pattern labels and merge strategies. +""" + +import json +import logging +from pathlib import Path +from typing import List, Dict, Any, Optional, Tuple +from collections import defaultdict + +import pandas as pd +import numpy as np + +from app.config import AnnotationIngestionConfig + +logger = logging.getLogger(__name__) + + +class AnnotationIngestion: + """ + Handles loading and processing of annotations into labeled datasets. + + Supports: + - Loading annotations from JSON exports + - Windowed classification encoding + - BIO sequence labeling encoding + - TA-Lib CDL* programmatic pattern detection + - Human/programmatic label merging + - Context padding + - Dataset statistics logging + """ + + def __init__(self, config: AnnotationIngestionConfig): + """ + Initialize annotation ingestion. + + Args: + config: Annotation ingestion configuration + """ + self.config = config + + def load_annotations(self, annotations_path: str) -> List[Dict[str, Any]]: + """ + Load annotations from JSON export file. + + Args: + annotations_path: Path to annotations JSON file + + Returns: + List of annotation dictionaries + + Raises: + FileNotFoundError: If annotations file doesn't exist + ValueError: If JSON format is invalid + """ + path = Path(annotations_path) + + if not path.exists(): + raise FileNotFoundError( + f"Annotations file not found: {annotations_path}. " + "Please export annotations from the UI first." + ) + + logger.info(f"Loading annotations from {annotations_path}") + + with open(path, 'r') as f: + data = json.load(f) + + if 'annotations' not in data: + raise ValueError( + f"Invalid annotations JSON format: missing 'annotations' key" + ) + + annotations = data['annotations'] + logger.info(f"Loaded {len(annotations)} annotations") + + # Filter by confidence + if self.config.min_confidence > 1: + original_count = len(annotations) + annotations = [ + ann for ann in annotations + if ann.get('confidence') is not None + and ann['confidence'] >= self.config.min_confidence + ] + filtered_count = original_count - len(annotations) + if filtered_count > 0: + logger.info( + f"Filtered {filtered_count} annotations below " + f"min_confidence={self.config.min_confidence}" + ) + + return annotations + + def get_programmatic_labels(self, df: pd.DataFrame) -> pd.DataFrame: + """ + Generate programmatic labels using TA-Lib CDL* pattern functions. + + Args: + df: DataFrame with OHLC columns + + Returns: + DataFrame with programmatic label column added + """ + if not self.config.programmatic_labels.enabled: + return df + + if not self.config.programmatic_labels.talib_patterns: + logger.warning("Programmatic labels enabled but no patterns configured") + return df + + try: + import talib + except ImportError: + raise ImportError( + "TA-Lib is required for programmatic labels. " + "Install with: pip install TA-Lib (requires libta-lib-dev)" + ) + + logger.info("Computing programmatic TA-Lib pattern labels") + + # Collect all pattern detections + pattern_labels = pd.Series(['O'] * len(df), index=df.index) + + for pattern_name in self.config.programmatic_labels.talib_patterns: + if not hasattr(talib, pattern_name): + logger.warning(f"Unknown TA-Lib pattern: {pattern_name}") + continue + + pattern_func = getattr(talib, pattern_name) + + try: + result = pattern_func( + df['open'].values, + df['high'].values, + df['low'].values, + df['close'].values + ) + + # Convert +100/-100 to label names + for i, value in enumerate(result): + if value > 0: + label = f"bullish_{pattern_name.lower().replace('cdl', '').replace('_', '')}" + pattern_labels.iloc[i] = label + elif value < 0: + label = f"bearish_{pattern_name.lower().replace('cdl', '').replace('_', '')}" + pattern_labels.iloc[i] = label + + except Exception as e: + logger.error(f"Error computing {pattern_name}: {e}") + continue + + df['label_programmatic'] = pattern_labels + + # Log statistics + prog_counts = pattern_labels.value_counts() + logger.info(f"Programmatic labels: {prog_counts.to_dict()}") + + return df + + def create_windowed_dataset( + self, + df: pd.DataFrame, + annotations: List[Dict[str, Any]] + ) -> pd.DataFrame: + """ + Create windowed classification dataset. + + Each annotation span is converted to a fixed-size window of candles, + flattened into a single row with the annotation label as target. + + Args: + df: DataFrame with features and time column + annotations: List of annotation dictionaries + + Returns: + DataFrame with windowed samples + """ + logger.info("Creating windowed classification dataset") + + window_size = self.config.window_size + context_padding = self.config.context_padding + + samples = [] + + for ann in annotations: + label = ann['label'] + start_time = pd.Timestamp(ann['start_time']) + end_time = pd.Timestamp(ann['end_time']) + + # Find candles in span + span_mask = (df['time'] >= start_time) & (df['time'] <= end_time) + span_indices = df.index[span_mask].tolist() + + if not span_indices: + logger.warning(f"No candles found for annotation {ann.get('id')}") + continue + + span_start_idx = span_indices[0] + span_end_idx = span_indices[-1] + span_length = len(span_indices) + + # Determine window boundaries + if span_length < window_size: + # Pad with context candles (centered) + padding_needed = window_size - span_length + left_padding = padding_needed // 2 + right_padding = padding_needed - left_padding + + window_start = max(0, span_start_idx - left_padding - context_padding) + window_end = min(len(df) - 1, span_end_idx + right_padding + context_padding) + else: + # Use full span + context padding + window_start = max(0, span_start_idx - context_padding) + window_end = min(len(df) - 1, span_end_idx + context_padding) + + # Extract window + window = df.iloc[window_start:window_end + 1].copy() + + # Flatten window into single row + # Create feature names: feature_0, feature_1, etc. + feature_cols = [c for c in df.columns if c != 'time'] + flattened = {} + + for i, (_, row) in enumerate(window.iterrows()): + for col in feature_cols: + flattened[f"{col}_{i}"] = row[col] + + # Pad with NaN if window is smaller than expected + expected_size = window_size + actual_size = len(window) + if actual_size < expected_size: + for i in range(actual_size, expected_size): + for col in feature_cols: + flattened[f"{col}_{i}"] = np.nan + + flattened['label'] = label + samples.append(flattened) + + result_df = pd.DataFrame(samples) + logger.info(f"Created {len(result_df)} windowed samples") + + return result_df + + def create_bio_dataset( + self, + df: pd.DataFrame, + annotations: List[Dict[str, Any]] + ) -> pd.DataFrame: + """ + Create BIO sequence labeling dataset. + + Each candle gets a BIO tag: B-{label} for span start, I-{label} for + continuation, O for outside annotations. + + Args: + df: DataFrame with features and time column + annotations: List of annotation dictionaries + + Returns: + DataFrame with BIO tags column(s) + """ + logger.info("Creating BIO sequence labeling dataset") + + # Initialize with 'O' tags + bio_tags = pd.Series(['O'] * len(df), index=df.index) + + # Track overlapping annotations + overlaps_detected = False + bio_columns = {'bio_tag': bio_tags} + + for ann in annotations: + label = ann['label'] + start_time = pd.Timestamp(ann['start_time']) + end_time = pd.Timestamp(ann['end_time']) + + # Find candles in span + span_mask = (df['time'] >= start_time) & (df['time'] <= end_time) + span_indices = df.index[span_mask].tolist() + + if not span_indices: + continue + + # Check for overlaps + first_idx = span_indices[0] + if bio_tags.iloc[first_idx] != 'O': + # Overlap detected, create additional column + overlaps_detected = True + col_num = 2 + while f'bio_tag_{col_num}' in bio_columns: + col_num += 1 + + # Create new column if needed + if f'bio_tag_{col_num}' not in bio_columns: + bio_columns[f'bio_tag_{col_num}'] = pd.Series( + ['O'] * len(df), + index=df.index + ) + + # Use this column for the annotation + target_col = f'bio_tag_{col_num}' + else: + target_col = 'bio_tag' + + # Assign BIO tags + bio_columns[target_col].iloc[span_indices[0]] = f"B-{label}" + for idx in span_indices[1:]: + bio_columns[target_col].iloc[idx] = f"I-{label}" + + if overlaps_detected: + logger.info("Overlapping annotations detected, created multiple BIO tag columns") + + # Add BIO columns to dataframe + result_df = df.copy() + for col_name, col_data in bio_columns.items(): + result_df[col_name] = col_data + + return result_df + + def merge_labels( + self, + df: pd.DataFrame, + annotations: List[Dict[str, Any]] + ) -> pd.DataFrame: + """ + Merge human annotations and programmatic labels based on merge strategy. + + Args: + df: DataFrame with programmatic labels (if enabled) + annotations: List of human annotation dictionaries + + Returns: + DataFrame with merged labels + """ + if not self.config.programmatic_labels.enabled: + # No programmatic labels, just use human annotations + return df + + strategy = self.config.merge_strategy + logger.info(f"Merging labels with strategy: {strategy}") + + # Create human labels column + human_labels = pd.Series(['O'] * len(df), index=df.index) + + for ann in annotations: + label = ann['label'] + start_time = pd.Timestamp(ann['start_time']) + end_time = pd.Timestamp(ann['end_time']) + + span_mask = (df['time'] >= start_time) & (df['time'] <= end_time) + span_indices = df.index[span_mask].tolist() + + for idx in span_indices: + human_labels.iloc[idx] = label + + df['label_human'] = human_labels + + # Apply merge strategy + if strategy == "human_priority": + # Use human label if present, else programmatic + df['label'] = df.apply( + lambda row: row['label_human'] if row['label_human'] != 'O' + else row.get('label_programmatic', 'O'), + axis=1 + ) + # Drop temporary columns + df = df.drop(columns=['label_human', 'label_programmatic'], errors='ignore') + + elif strategy == "programmatic_priority": + # Use programmatic label if present, else human + df['label'] = df.apply( + lambda row: row.get('label_programmatic', 'O') if row.get('label_programmatic', 'O') != 'O' + else row['label_human'], + axis=1 + ) + # Drop temporary columns + df = df.drop(columns=['label_human', 'label_programmatic'], errors='ignore') + + elif strategy == "both": + # Keep both columns + pass + + return df + + def log_statistics(self, df: pd.DataFrame, annotations: List[Dict[str, Any]]): + """ + Log dataset statistics. + + Args: + df: Labeled DataFrame + annotations: List of annotations + """ + logger.info("=" * 60) + logger.info("Dataset Statistics") + logger.info("=" * 60) + + # Label distribution + if 'label' in df.columns: + label_counts = df['label'].value_counts() + total = len(df) + + logger.info("\nLabel Distribution:") + for label, count in label_counts.items(): + pct = (count / total) * 100 + logger.info(f" {label}: {count} ({pct:.2f}%)") + + # If both human and programmatic labels exist + if 'label_human' in df.columns and 'label_programmatic' in df.columns: + # Agreement rate (excluding O labels) + non_o_mask = (df['label_human'] != 'O') | (df['label_programmatic'] != 'O') + non_o_df = df[non_o_mask] + + if len(non_o_df) > 0: + agreement = ( + non_o_df['label_human'] == non_o_df['label_programmatic'] + ).sum() + agreement_rate = (agreement / len(non_o_df)) * 100 + logger.info(f"\nHuman/Programmatic Agreement Rate: {agreement_rate:.2f}%") + + # Average span length per label + span_lengths = defaultdict(list) + for ann in annotations: + label = ann['label'] + start_time = pd.Timestamp(ann['start_time']) + end_time = pd.Timestamp(ann['end_time']) + + if 'time' in df.columns: + span_mask = (df['time'] >= start_time) & (df['time'] <= end_time) + span_length = span_mask.sum() + span_lengths[label].append(span_length) + + if span_lengths: + logger.info("\nAverage Span Length (candles):") + for label, lengths in span_lengths.items(): + avg_length = np.mean(lengths) + logger.info(f" {label}: {avg_length:.2f}") + + logger.info("=" * 60) + + def process( + self, + enriched_df: pd.DataFrame, + annotations_path: str + ) -> pd.DataFrame: + """ + Main processing pipeline for annotation ingestion. + + Args: + enriched_df: DataFrame with engineered features + annotations_path: Path to annotations JSON file + + Returns: + Labeled DataFrame ready for training + """ + logger.info("Starting annotation ingestion") + + # Load annotations + annotations = self.load_annotations(annotations_path) + + if not annotations: + logger.warning("No annotations found, returning empty DataFrame") + return pd.DataFrame() + + # Add programmatic labels if enabled + df = self.get_programmatic_labels(enriched_df) + + # Apply label encoding + if self.config.label_encoding == "window": + result_df = self.create_windowed_dataset(df, annotations) + elif self.config.label_encoding == "bio": + result_df = self.create_bio_dataset(df, annotations) + # For BIO, also merge human/programmatic if enabled + if self.config.programmatic_labels.enabled: + result_df = self.merge_labels(result_df, annotations) + else: + raise ValueError(f"Unknown label encoding: {self.config.label_encoding}") + + # Log statistics + self.log_statistics(result_df, annotations) + + logger.info("Annotation ingestion complete") + + return result_df + + +def run_annotation_ingestion( + config: AnnotationIngestionConfig, + enriched_path: str, + annotations_path: str, + output_path: str +): + """ + Run annotation ingestion stage. + + Args: + config: Annotation ingestion configuration + enriched_path: Path to enriched features CSV + annotations_path: Path to annotations JSON + output_path: Path to write labeled dataset CSV + """ + logger.info("Running annotation ingestion stage") + + # Load enriched data + enriched_df = pd.read_csv(enriched_path) + logger.info(f"Loaded enriched data: {enriched_df.shape}") + + # Process annotations + ingestion = AnnotationIngestion(config) + labeled_df = ingestion.process(enriched_df, annotations_path) + + if labeled_df.empty: + logger.error("No labeled data produced") + return + + # Write output + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + labeled_df.to_csv(output_path, index=False) + logger.info(f"Labeled dataset written to {output_path}") + logger.info(f"Final dataset shape: {labeled_df.shape}") diff --git a/services/ml/pipeline.py b/services/ml/pipeline.py index 4cdcaab..8790d98 100644 --- a/services/ml/pipeline.py +++ b/services/ml/pipeline.py @@ -61,11 +61,18 @@ def run_annotation_ingestion(config: PipelineConfig) -> None: return # Import here to avoid circular dependencies - from app.annotation_ingestion import run_annotation_ingestion_stage + from app.annotation_ingestion import run_annotation_ingestion as run_ingestion logger.info(f"Reading enriched data from: {config.data.enriched_path}") logger.info(f"Reading annotations from: {config.data.annotations_path}") - run_annotation_ingestion_stage(config) + + run_ingestion( + config=config.stages.annotation_ingestion, + enriched_path=config.data.enriched_path, + annotations_path=config.data.annotations_path, + output_path=config.data.labeled_path + ) + logger.info(f"Labeled data written to: {config.data.labeled_path}") logger.info("Annotation ingestion stage completed successfully")