""" Annotation ingestion module. Loads annotations from JSON exports and converts them into labeled training datasets with various encoding strategies (windowed classification, BIO sequence labeling). Supports programmatic TA-Lib pattern labels and merge strategies. """ import json import logging from pathlib import Path from typing import List, Dict, Any, Optional, Tuple from collections import defaultdict import pandas as pd import numpy as np from app.config import AnnotationIngestionConfig logger = logging.getLogger(__name__) class AnnotationIngestion: """ Handles loading and processing of annotations into labeled datasets. Supports: - Loading annotations from JSON exports - Windowed classification encoding - BIO sequence labeling encoding - TA-Lib CDL* programmatic pattern detection - Human/programmatic label merging - Context padding - Dataset statistics logging """ def __init__(self, config: AnnotationIngestionConfig): """ Initialize annotation ingestion. Args: config: Annotation ingestion configuration """ self.config = config def load_annotations(self, annotations_path: str) -> List[Dict[str, Any]]: """ Load annotations from JSON export file. Args: annotations_path: Path to annotations JSON file Returns: List of annotation dictionaries Raises: FileNotFoundError: If annotations file doesn't exist ValueError: If JSON format is invalid """ path = Path(annotations_path) if not path.exists(): raise FileNotFoundError( f"Annotations file not found: {annotations_path}. " "Please export annotations from the UI first." ) logger.info(f"Loading annotations from {annotations_path}") with open(path, 'r') as f: data = json.load(f) if 'annotations' not in data: raise ValueError( f"Invalid annotations JSON format: missing 'annotations' key" ) annotations = data['annotations'] logger.info(f"Loaded {len(annotations)} annotations") # Filter by confidence if self.config.min_confidence > 1: original_count = len(annotations) annotations = [ ann for ann in annotations if ann.get('confidence') is not None and ann['confidence'] >= self.config.min_confidence ] filtered_count = original_count - len(annotations) if filtered_count > 0: logger.info( f"Filtered {filtered_count} annotations below " f"min_confidence={self.config.min_confidence}" ) return annotations def get_programmatic_labels(self, df: pd.DataFrame) -> pd.DataFrame: """ Generate programmatic labels using TA-Lib CDL* pattern functions. Args: df: DataFrame with OHLC columns Returns: DataFrame with programmatic label column added """ if not self.config.programmatic_labels.enabled: return df if not self.config.programmatic_labels.talib_patterns: logger.warning("Programmatic labels enabled but no patterns configured") return df try: import talib except ImportError: raise ImportError( "TA-Lib is required for programmatic labels. " "Install with: pip install TA-Lib (requires libta-lib-dev)" ) logger.info("Computing programmatic TA-Lib pattern labels") # Collect all pattern detections pattern_labels = pd.Series(['O'] * len(df), index=df.index) for pattern_name in self.config.programmatic_labels.talib_patterns: if not hasattr(talib, pattern_name): logger.warning(f"Unknown TA-Lib pattern: {pattern_name}") continue pattern_func = getattr(talib, pattern_name) try: result = pattern_func( df['open'].values, df['high'].values, df['low'].values, df['close'].values ) # Convert +100/-100 to label names for i, value in enumerate(result): if value > 0: label = f"bullish_{pattern_name.lower().replace('cdl', '').replace('_', '')}" pattern_labels.iloc[i] = label elif value < 0: label = f"bearish_{pattern_name.lower().replace('cdl', '').replace('_', '')}" pattern_labels.iloc[i] = label except Exception as e: logger.error(f"Error computing {pattern_name}: {e}") continue df['label_programmatic'] = pattern_labels # Log statistics prog_counts = pattern_labels.value_counts() logger.info(f"Programmatic labels: {prog_counts.to_dict()}") return df def create_windowed_dataset( self, df: pd.DataFrame, annotations: List[Dict[str, Any]] ) -> pd.DataFrame: """ Create windowed classification dataset. Each annotation span is converted to a fixed-size window of candles, flattened into a single row with the annotation label as target. Args: df: DataFrame with features and time column annotations: List of annotation dictionaries Returns: DataFrame with windowed samples """ logger.info("Creating windowed classification dataset") window_size = self.config.window_size context_padding = self.config.context_padding samples = [] for ann in annotations: label = ann['label'] start_time = pd.Timestamp(ann['start_time']) end_time = pd.Timestamp(ann['end_time']) # Find candles in span span_mask = (df['time'] >= start_time) & (df['time'] <= end_time) span_indices = df.index[span_mask].tolist() if not span_indices: logger.warning(f"No candles found for annotation {ann.get('id')}") continue span_start_idx = span_indices[0] span_end_idx = span_indices[-1] span_length = len(span_indices) # Determine window boundaries if span_length < window_size: # Pad with context candles (centered) padding_needed = window_size - span_length left_padding = padding_needed // 2 right_padding = padding_needed - left_padding window_start = max(0, span_start_idx - left_padding - context_padding) window_end = min(len(df) - 1, span_end_idx + right_padding + context_padding) else: # Use full span + context padding window_start = max(0, span_start_idx - context_padding) window_end = min(len(df) - 1, span_end_idx + context_padding) # Extract window window = df.iloc[window_start:window_end + 1].copy() # Flatten window into single row # Create feature names: feature_0, feature_1, etc. feature_cols = [c for c in df.columns if c != 'time'] flattened = {} for i, (_, row) in enumerate(window.iterrows()): for col in feature_cols: flattened[f"{col}_{i}"] = row[col] # Pad with NaN if window is smaller than expected expected_size = window_size actual_size = len(window) if actual_size < expected_size: for i in range(actual_size, expected_size): for col in feature_cols: flattened[f"{col}_{i}"] = np.nan flattened['label'] = label samples.append(flattened) result_df = pd.DataFrame(samples) logger.info(f"Created {len(result_df)} windowed samples") return result_df def create_bio_dataset( self, df: pd.DataFrame, annotations: List[Dict[str, Any]] ) -> pd.DataFrame: """ Create BIO sequence labeling dataset. Each candle gets a BIO tag: B-{label} for span start, I-{label} for continuation, O for outside annotations. Args: df: DataFrame with features and time column annotations: List of annotation dictionaries Returns: DataFrame with BIO tags column(s) """ logger.info("Creating BIO sequence labeling dataset") # Initialize with 'O' tags bio_tags = pd.Series(['O'] * len(df), index=df.index) # Track overlapping annotations overlaps_detected = False bio_columns = {'bio_tag': bio_tags} for ann in annotations: label = ann['label'] start_time = pd.Timestamp(ann['start_time']) end_time = pd.Timestamp(ann['end_time']) # Find candles in span span_mask = (df['time'] >= start_time) & (df['time'] <= end_time) span_indices = df.index[span_mask].tolist() if not span_indices: continue # Check for overlaps first_idx = span_indices[0] if bio_tags.iloc[first_idx] != 'O': # Overlap detected, create additional column overlaps_detected = True col_num = 2 while f'bio_tag_{col_num}' in bio_columns: col_num += 1 # Create new column if needed if f'bio_tag_{col_num}' not in bio_columns: bio_columns[f'bio_tag_{col_num}'] = pd.Series( ['O'] * len(df), index=df.index ) # Use this column for the annotation target_col = f'bio_tag_{col_num}' else: target_col = 'bio_tag' # Assign BIO tags bio_columns[target_col].iloc[span_indices[0]] = f"B-{label}" for idx in span_indices[1:]: bio_columns[target_col].iloc[idx] = f"I-{label}" if overlaps_detected: logger.info("Overlapping annotations detected, created multiple BIO tag columns") # Add BIO columns to dataframe result_df = df.copy() for col_name, col_data in bio_columns.items(): result_df[col_name] = col_data return result_df def merge_labels( self, df: pd.DataFrame, annotations: List[Dict[str, Any]] ) -> pd.DataFrame: """ Merge human annotations and programmatic labels based on merge strategy. Args: df: DataFrame with programmatic labels (if enabled) annotations: List of human annotation dictionaries Returns: DataFrame with merged labels """ if not self.config.programmatic_labels.enabled: # No programmatic labels, just use human annotations return df strategy = self.config.merge_strategy logger.info(f"Merging labels with strategy: {strategy}") # Create human labels column human_labels = pd.Series(['O'] * len(df), index=df.index) for ann in annotations: label = ann['label'] start_time = pd.Timestamp(ann['start_time']) end_time = pd.Timestamp(ann['end_time']) span_mask = (df['time'] >= start_time) & (df['time'] <= end_time) span_indices = df.index[span_mask].tolist() for idx in span_indices: human_labels.iloc[idx] = label df['label_human'] = human_labels # Apply merge strategy if strategy == "human_priority": # Use human label if present, else programmatic df['label'] = df.apply( lambda row: row['label_human'] if row['label_human'] != 'O' else row.get('label_programmatic', 'O'), axis=1 ) # Drop temporary columns df = df.drop(columns=['label_human', 'label_programmatic'], errors='ignore') elif strategy == "programmatic_priority": # Use programmatic label if present, else human df['label'] = df.apply( lambda row: row.get('label_programmatic', 'O') if row.get('label_programmatic', 'O') != 'O' else row['label_human'], axis=1 ) # Drop temporary columns df = df.drop(columns=['label_human', 'label_programmatic'], errors='ignore') elif strategy == "both": # Keep both columns pass return df def log_statistics(self, df: pd.DataFrame, annotations: List[Dict[str, Any]]): """ Log dataset statistics. Args: df: Labeled DataFrame annotations: List of annotations """ logger.info("=" * 60) logger.info("Dataset Statistics") logger.info("=" * 60) # Label distribution if 'label' in df.columns: label_counts = df['label'].value_counts() total = len(df) logger.info("\nLabel Distribution:") for label, count in label_counts.items(): pct = (count / total) * 100 logger.info(f" {label}: {count} ({pct:.2f}%)") # If both human and programmatic labels exist if 'label_human' in df.columns and 'label_programmatic' in df.columns: # Agreement rate (excluding O labels) non_o_mask = (df['label_human'] != 'O') | (df['label_programmatic'] != 'O') non_o_df = df[non_o_mask] if len(non_o_df) > 0: agreement = ( non_o_df['label_human'] == non_o_df['label_programmatic'] ).sum() agreement_rate = (agreement / len(non_o_df)) * 100 logger.info(f"\nHuman/Programmatic Agreement Rate: {agreement_rate:.2f}%") # Average span length per label span_lengths = defaultdict(list) for ann in annotations: label = ann['label'] start_time = pd.Timestamp(ann['start_time']) end_time = pd.Timestamp(ann['end_time']) if 'time' in df.columns: span_mask = (df['time'] >= start_time) & (df['time'] <= end_time) span_length = span_mask.sum() span_lengths[label].append(span_length) if span_lengths: logger.info("\nAverage Span Length (candles):") for label, lengths in span_lengths.items(): avg_length = np.mean(lengths) logger.info(f" {label}: {avg_length:.2f}") logger.info("=" * 60) def process( self, enriched_df: pd.DataFrame, annotations_path: str ) -> pd.DataFrame: """ Main processing pipeline for annotation ingestion. Args: enriched_df: DataFrame with engineered features annotations_path: Path to annotations JSON file Returns: Labeled DataFrame ready for training """ logger.info("Starting annotation ingestion") # Load annotations annotations = self.load_annotations(annotations_path) if not annotations: logger.warning("No annotations found, returning empty DataFrame") return pd.DataFrame() # Add programmatic labels if enabled df = self.get_programmatic_labels(enriched_df) # Apply label encoding if self.config.label_encoding == "window": result_df = self.create_windowed_dataset(df, annotations) elif self.config.label_encoding == "bio": result_df = self.create_bio_dataset(df, annotations) # For BIO, also merge human/programmatic if enabled if self.config.programmatic_labels.enabled: result_df = self.merge_labels(result_df, annotations) else: raise ValueError(f"Unknown label encoding: {self.config.label_encoding}") # Log statistics self.log_statistics(result_df, annotations) logger.info("Annotation ingestion complete") return result_df def run_annotation_ingestion( config: AnnotationIngestionConfig, enriched_path: str, annotations_path: str, output_path: str ): """ Run annotation ingestion stage. Args: config: Annotation ingestion configuration enriched_path: Path to enriched features CSV annotations_path: Path to annotations JSON output_path: Path to write labeled dataset CSV """ logger.info("Running annotation ingestion stage") # Load enriched data enriched_df = pd.read_csv(enriched_path) logger.info(f"Loaded enriched data: {enriched_df.shape}") # Process annotations ingestion = AnnotationIngestion(config) labeled_df = ingestion.process(enriched_df, annotations_path) if labeled_df.empty: logger.error("No labeled data produced") return # Write output output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) labeled_df.to_csv(output_path, index=False) logger.info(f"Labeled dataset written to {output_path}") logger.info(f"Final dataset shape: {labeled_df.shape}")