feat(ml): implement annotation ingestion with windowed/BIO encoding and TA-Lib patterns
This commit is contained in:
parent
fd29ab91e0
commit
16763b967e
3 changed files with 541 additions and 10 deletions
524
services/ml/app/annotation_ingestion.py
Normal file
524
services/ml/app/annotation_ingestion.py
Normal file
|
|
@ -0,0 +1,524 @@
|
|||
"""
|
||||
Annotation ingestion module.
|
||||
|
||||
Loads annotations from JSON exports and converts them into labeled training datasets
|
||||
with various encoding strategies (windowed classification, BIO sequence labeling).
|
||||
Supports programmatic TA-Lib pattern labels and merge strategies.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
from collections import defaultdict
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
from app.config import AnnotationIngestionConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AnnotationIngestion:
|
||||
"""
|
||||
Handles loading and processing of annotations into labeled datasets.
|
||||
|
||||
Supports:
|
||||
- Loading annotations from JSON exports
|
||||
- Windowed classification encoding
|
||||
- BIO sequence labeling encoding
|
||||
- TA-Lib CDL* programmatic pattern detection
|
||||
- Human/programmatic label merging
|
||||
- Context padding
|
||||
- Dataset statistics logging
|
||||
"""
|
||||
|
||||
def __init__(self, config: AnnotationIngestionConfig):
|
||||
"""
|
||||
Initialize annotation ingestion.
|
||||
|
||||
Args:
|
||||
config: Annotation ingestion configuration
|
||||
"""
|
||||
self.config = config
|
||||
|
||||
def load_annotations(self, annotations_path: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Load annotations from JSON export file.
|
||||
|
||||
Args:
|
||||
annotations_path: Path to annotations JSON file
|
||||
|
||||
Returns:
|
||||
List of annotation dictionaries
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If annotations file doesn't exist
|
||||
ValueError: If JSON format is invalid
|
||||
"""
|
||||
path = Path(annotations_path)
|
||||
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(
|
||||
f"Annotations file not found: {annotations_path}. "
|
||||
"Please export annotations from the UI first."
|
||||
)
|
||||
|
||||
logger.info(f"Loading annotations from {annotations_path}")
|
||||
|
||||
with open(path, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
if 'annotations' not in data:
|
||||
raise ValueError(
|
||||
f"Invalid annotations JSON format: missing 'annotations' key"
|
||||
)
|
||||
|
||||
annotations = data['annotations']
|
||||
logger.info(f"Loaded {len(annotations)} annotations")
|
||||
|
||||
# Filter by confidence
|
||||
if self.config.min_confidence > 1:
|
||||
original_count = len(annotations)
|
||||
annotations = [
|
||||
ann for ann in annotations
|
||||
if ann.get('confidence') is not None
|
||||
and ann['confidence'] >= self.config.min_confidence
|
||||
]
|
||||
filtered_count = original_count - len(annotations)
|
||||
if filtered_count > 0:
|
||||
logger.info(
|
||||
f"Filtered {filtered_count} annotations below "
|
||||
f"min_confidence={self.config.min_confidence}"
|
||||
)
|
||||
|
||||
return annotations
|
||||
|
||||
def get_programmatic_labels(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Generate programmatic labels using TA-Lib CDL* pattern functions.
|
||||
|
||||
Args:
|
||||
df: DataFrame with OHLC columns
|
||||
|
||||
Returns:
|
||||
DataFrame with programmatic label column added
|
||||
"""
|
||||
if not self.config.programmatic_labels.enabled:
|
||||
return df
|
||||
|
||||
if not self.config.programmatic_labels.talib_patterns:
|
||||
logger.warning("Programmatic labels enabled but no patterns configured")
|
||||
return df
|
||||
|
||||
try:
|
||||
import talib
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"TA-Lib is required for programmatic labels. "
|
||||
"Install with: pip install TA-Lib (requires libta-lib-dev)"
|
||||
)
|
||||
|
||||
logger.info("Computing programmatic TA-Lib pattern labels")
|
||||
|
||||
# Collect all pattern detections
|
||||
pattern_labels = pd.Series(['O'] * len(df), index=df.index)
|
||||
|
||||
for pattern_name in self.config.programmatic_labels.talib_patterns:
|
||||
if not hasattr(talib, pattern_name):
|
||||
logger.warning(f"Unknown TA-Lib pattern: {pattern_name}")
|
||||
continue
|
||||
|
||||
pattern_func = getattr(talib, pattern_name)
|
||||
|
||||
try:
|
||||
result = pattern_func(
|
||||
df['open'].values,
|
||||
df['high'].values,
|
||||
df['low'].values,
|
||||
df['close'].values
|
||||
)
|
||||
|
||||
# Convert +100/-100 to label names
|
||||
for i, value in enumerate(result):
|
||||
if value > 0:
|
||||
label = f"bullish_{pattern_name.lower().replace('cdl', '').replace('_', '')}"
|
||||
pattern_labels.iloc[i] = label
|
||||
elif value < 0:
|
||||
label = f"bearish_{pattern_name.lower().replace('cdl', '').replace('_', '')}"
|
||||
pattern_labels.iloc[i] = label
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error computing {pattern_name}: {e}")
|
||||
continue
|
||||
|
||||
df['label_programmatic'] = pattern_labels
|
||||
|
||||
# Log statistics
|
||||
prog_counts = pattern_labels.value_counts()
|
||||
logger.info(f"Programmatic labels: {prog_counts.to_dict()}")
|
||||
|
||||
return df
|
||||
|
||||
def create_windowed_dataset(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
annotations: List[Dict[str, Any]]
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Create windowed classification dataset.
|
||||
|
||||
Each annotation span is converted to a fixed-size window of candles,
|
||||
flattened into a single row with the annotation label as target.
|
||||
|
||||
Args:
|
||||
df: DataFrame with features and time column
|
||||
annotations: List of annotation dictionaries
|
||||
|
||||
Returns:
|
||||
DataFrame with windowed samples
|
||||
"""
|
||||
logger.info("Creating windowed classification dataset")
|
||||
|
||||
window_size = self.config.window_size
|
||||
context_padding = self.config.context_padding
|
||||
|
||||
samples = []
|
||||
|
||||
for ann in annotations:
|
||||
label = ann['label']
|
||||
start_time = pd.Timestamp(ann['start_time'])
|
||||
end_time = pd.Timestamp(ann['end_time'])
|
||||
|
||||
# Find candles in span
|
||||
span_mask = (df['time'] >= start_time) & (df['time'] <= end_time)
|
||||
span_indices = df.index[span_mask].tolist()
|
||||
|
||||
if not span_indices:
|
||||
logger.warning(f"No candles found for annotation {ann.get('id')}")
|
||||
continue
|
||||
|
||||
span_start_idx = span_indices[0]
|
||||
span_end_idx = span_indices[-1]
|
||||
span_length = len(span_indices)
|
||||
|
||||
# Determine window boundaries
|
||||
if span_length < window_size:
|
||||
# Pad with context candles (centered)
|
||||
padding_needed = window_size - span_length
|
||||
left_padding = padding_needed // 2
|
||||
right_padding = padding_needed - left_padding
|
||||
|
||||
window_start = max(0, span_start_idx - left_padding - context_padding)
|
||||
window_end = min(len(df) - 1, span_end_idx + right_padding + context_padding)
|
||||
else:
|
||||
# Use full span + context padding
|
||||
window_start = max(0, span_start_idx - context_padding)
|
||||
window_end = min(len(df) - 1, span_end_idx + context_padding)
|
||||
|
||||
# Extract window
|
||||
window = df.iloc[window_start:window_end + 1].copy()
|
||||
|
||||
# Flatten window into single row
|
||||
# Create feature names: feature_0, feature_1, etc.
|
||||
feature_cols = [c for c in df.columns if c != 'time']
|
||||
flattened = {}
|
||||
|
||||
for i, (_, row) in enumerate(window.iterrows()):
|
||||
for col in feature_cols:
|
||||
flattened[f"{col}_{i}"] = row[col]
|
||||
|
||||
# Pad with NaN if window is smaller than expected
|
||||
expected_size = window_size
|
||||
actual_size = len(window)
|
||||
if actual_size < expected_size:
|
||||
for i in range(actual_size, expected_size):
|
||||
for col in feature_cols:
|
||||
flattened[f"{col}_{i}"] = np.nan
|
||||
|
||||
flattened['label'] = label
|
||||
samples.append(flattened)
|
||||
|
||||
result_df = pd.DataFrame(samples)
|
||||
logger.info(f"Created {len(result_df)} windowed samples")
|
||||
|
||||
return result_df
|
||||
|
||||
def create_bio_dataset(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
annotations: List[Dict[str, Any]]
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Create BIO sequence labeling dataset.
|
||||
|
||||
Each candle gets a BIO tag: B-{label} for span start, I-{label} for
|
||||
continuation, O for outside annotations.
|
||||
|
||||
Args:
|
||||
df: DataFrame with features and time column
|
||||
annotations: List of annotation dictionaries
|
||||
|
||||
Returns:
|
||||
DataFrame with BIO tags column(s)
|
||||
"""
|
||||
logger.info("Creating BIO sequence labeling dataset")
|
||||
|
||||
# Initialize with 'O' tags
|
||||
bio_tags = pd.Series(['O'] * len(df), index=df.index)
|
||||
|
||||
# Track overlapping annotations
|
||||
overlaps_detected = False
|
||||
bio_columns = {'bio_tag': bio_tags}
|
||||
|
||||
for ann in annotations:
|
||||
label = ann['label']
|
||||
start_time = pd.Timestamp(ann['start_time'])
|
||||
end_time = pd.Timestamp(ann['end_time'])
|
||||
|
||||
# Find candles in span
|
||||
span_mask = (df['time'] >= start_time) & (df['time'] <= end_time)
|
||||
span_indices = df.index[span_mask].tolist()
|
||||
|
||||
if not span_indices:
|
||||
continue
|
||||
|
||||
# Check for overlaps
|
||||
first_idx = span_indices[0]
|
||||
if bio_tags.iloc[first_idx] != 'O':
|
||||
# Overlap detected, create additional column
|
||||
overlaps_detected = True
|
||||
col_num = 2
|
||||
while f'bio_tag_{col_num}' in bio_columns:
|
||||
col_num += 1
|
||||
|
||||
# Create new column if needed
|
||||
if f'bio_tag_{col_num}' not in bio_columns:
|
||||
bio_columns[f'bio_tag_{col_num}'] = pd.Series(
|
||||
['O'] * len(df),
|
||||
index=df.index
|
||||
)
|
||||
|
||||
# Use this column for the annotation
|
||||
target_col = f'bio_tag_{col_num}'
|
||||
else:
|
||||
target_col = 'bio_tag'
|
||||
|
||||
# Assign BIO tags
|
||||
bio_columns[target_col].iloc[span_indices[0]] = f"B-{label}"
|
||||
for idx in span_indices[1:]:
|
||||
bio_columns[target_col].iloc[idx] = f"I-{label}"
|
||||
|
||||
if overlaps_detected:
|
||||
logger.info("Overlapping annotations detected, created multiple BIO tag columns")
|
||||
|
||||
# Add BIO columns to dataframe
|
||||
result_df = df.copy()
|
||||
for col_name, col_data in bio_columns.items():
|
||||
result_df[col_name] = col_data
|
||||
|
||||
return result_df
|
||||
|
||||
def merge_labels(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
annotations: List[Dict[str, Any]]
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Merge human annotations and programmatic labels based on merge strategy.
|
||||
|
||||
Args:
|
||||
df: DataFrame with programmatic labels (if enabled)
|
||||
annotations: List of human annotation dictionaries
|
||||
|
||||
Returns:
|
||||
DataFrame with merged labels
|
||||
"""
|
||||
if not self.config.programmatic_labels.enabled:
|
||||
# No programmatic labels, just use human annotations
|
||||
return df
|
||||
|
||||
strategy = self.config.merge_strategy
|
||||
logger.info(f"Merging labels with strategy: {strategy}")
|
||||
|
||||
# Create human labels column
|
||||
human_labels = pd.Series(['O'] * len(df), index=df.index)
|
||||
|
||||
for ann in annotations:
|
||||
label = ann['label']
|
||||
start_time = pd.Timestamp(ann['start_time'])
|
||||
end_time = pd.Timestamp(ann['end_time'])
|
||||
|
||||
span_mask = (df['time'] >= start_time) & (df['time'] <= end_time)
|
||||
span_indices = df.index[span_mask].tolist()
|
||||
|
||||
for idx in span_indices:
|
||||
human_labels.iloc[idx] = label
|
||||
|
||||
df['label_human'] = human_labels
|
||||
|
||||
# Apply merge strategy
|
||||
if strategy == "human_priority":
|
||||
# Use human label if present, else programmatic
|
||||
df['label'] = df.apply(
|
||||
lambda row: row['label_human'] if row['label_human'] != 'O'
|
||||
else row.get('label_programmatic', 'O'),
|
||||
axis=1
|
||||
)
|
||||
# Drop temporary columns
|
||||
df = df.drop(columns=['label_human', 'label_programmatic'], errors='ignore')
|
||||
|
||||
elif strategy == "programmatic_priority":
|
||||
# Use programmatic label if present, else human
|
||||
df['label'] = df.apply(
|
||||
lambda row: row.get('label_programmatic', 'O') if row.get('label_programmatic', 'O') != 'O'
|
||||
else row['label_human'],
|
||||
axis=1
|
||||
)
|
||||
# Drop temporary columns
|
||||
df = df.drop(columns=['label_human', 'label_programmatic'], errors='ignore')
|
||||
|
||||
elif strategy == "both":
|
||||
# Keep both columns
|
||||
pass
|
||||
|
||||
return df
|
||||
|
||||
def log_statistics(self, df: pd.DataFrame, annotations: List[Dict[str, Any]]):
|
||||
"""
|
||||
Log dataset statistics.
|
||||
|
||||
Args:
|
||||
df: Labeled DataFrame
|
||||
annotations: List of annotations
|
||||
"""
|
||||
logger.info("=" * 60)
|
||||
logger.info("Dataset Statistics")
|
||||
logger.info("=" * 60)
|
||||
|
||||
# Label distribution
|
||||
if 'label' in df.columns:
|
||||
label_counts = df['label'].value_counts()
|
||||
total = len(df)
|
||||
|
||||
logger.info("\nLabel Distribution:")
|
||||
for label, count in label_counts.items():
|
||||
pct = (count / total) * 100
|
||||
logger.info(f" {label}: {count} ({pct:.2f}%)")
|
||||
|
||||
# If both human and programmatic labels exist
|
||||
if 'label_human' in df.columns and 'label_programmatic' in df.columns:
|
||||
# Agreement rate (excluding O labels)
|
||||
non_o_mask = (df['label_human'] != 'O') | (df['label_programmatic'] != 'O')
|
||||
non_o_df = df[non_o_mask]
|
||||
|
||||
if len(non_o_df) > 0:
|
||||
agreement = (
|
||||
non_o_df['label_human'] == non_o_df['label_programmatic']
|
||||
).sum()
|
||||
agreement_rate = (agreement / len(non_o_df)) * 100
|
||||
logger.info(f"\nHuman/Programmatic Agreement Rate: {agreement_rate:.2f}%")
|
||||
|
||||
# Average span length per label
|
||||
span_lengths = defaultdict(list)
|
||||
for ann in annotations:
|
||||
label = ann['label']
|
||||
start_time = pd.Timestamp(ann['start_time'])
|
||||
end_time = pd.Timestamp(ann['end_time'])
|
||||
|
||||
if 'time' in df.columns:
|
||||
span_mask = (df['time'] >= start_time) & (df['time'] <= end_time)
|
||||
span_length = span_mask.sum()
|
||||
span_lengths[label].append(span_length)
|
||||
|
||||
if span_lengths:
|
||||
logger.info("\nAverage Span Length (candles):")
|
||||
for label, lengths in span_lengths.items():
|
||||
avg_length = np.mean(lengths)
|
||||
logger.info(f" {label}: {avg_length:.2f}")
|
||||
|
||||
logger.info("=" * 60)
|
||||
|
||||
def process(
|
||||
self,
|
||||
enriched_df: pd.DataFrame,
|
||||
annotations_path: str
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Main processing pipeline for annotation ingestion.
|
||||
|
||||
Args:
|
||||
enriched_df: DataFrame with engineered features
|
||||
annotations_path: Path to annotations JSON file
|
||||
|
||||
Returns:
|
||||
Labeled DataFrame ready for training
|
||||
"""
|
||||
logger.info("Starting annotation ingestion")
|
||||
|
||||
# Load annotations
|
||||
annotations = self.load_annotations(annotations_path)
|
||||
|
||||
if not annotations:
|
||||
logger.warning("No annotations found, returning empty DataFrame")
|
||||
return pd.DataFrame()
|
||||
|
||||
# Add programmatic labels if enabled
|
||||
df = self.get_programmatic_labels(enriched_df)
|
||||
|
||||
# Apply label encoding
|
||||
if self.config.label_encoding == "window":
|
||||
result_df = self.create_windowed_dataset(df, annotations)
|
||||
elif self.config.label_encoding == "bio":
|
||||
result_df = self.create_bio_dataset(df, annotations)
|
||||
# For BIO, also merge human/programmatic if enabled
|
||||
if self.config.programmatic_labels.enabled:
|
||||
result_df = self.merge_labels(result_df, annotations)
|
||||
else:
|
||||
raise ValueError(f"Unknown label encoding: {self.config.label_encoding}")
|
||||
|
||||
# Log statistics
|
||||
self.log_statistics(result_df, annotations)
|
||||
|
||||
logger.info("Annotation ingestion complete")
|
||||
|
||||
return result_df
|
||||
|
||||
|
||||
def run_annotation_ingestion(
|
||||
config: AnnotationIngestionConfig,
|
||||
enriched_path: str,
|
||||
annotations_path: str,
|
||||
output_path: str
|
||||
):
|
||||
"""
|
||||
Run annotation ingestion stage.
|
||||
|
||||
Args:
|
||||
config: Annotation ingestion configuration
|
||||
enriched_path: Path to enriched features CSV
|
||||
annotations_path: Path to annotations JSON
|
||||
output_path: Path to write labeled dataset CSV
|
||||
"""
|
||||
logger.info("Running annotation ingestion stage")
|
||||
|
||||
# Load enriched data
|
||||
enriched_df = pd.read_csv(enriched_path)
|
||||
logger.info(f"Loaded enriched data: {enriched_df.shape}")
|
||||
|
||||
# Process annotations
|
||||
ingestion = AnnotationIngestion(config)
|
||||
labeled_df = ingestion.process(enriched_df, annotations_path)
|
||||
|
||||
if labeled_df.empty:
|
||||
logger.error("No labeled data produced")
|
||||
return
|
||||
|
||||
# Write output
|
||||
output_path = Path(output_path)
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
labeled_df.to_csv(output_path, index=False)
|
||||
logger.info(f"Labeled dataset written to {output_path}")
|
||||
logger.info(f"Final dataset shape: {labeled_df.shape}")
|
||||
|
|
@ -61,11 +61,18 @@ def run_annotation_ingestion(config: PipelineConfig) -> None:
|
|||
return
|
||||
|
||||
# Import here to avoid circular dependencies
|
||||
from app.annotation_ingestion import run_annotation_ingestion_stage
|
||||
from app.annotation_ingestion import run_annotation_ingestion as run_ingestion
|
||||
|
||||
logger.info(f"Reading enriched data from: {config.data.enriched_path}")
|
||||
logger.info(f"Reading annotations from: {config.data.annotations_path}")
|
||||
run_annotation_ingestion_stage(config)
|
||||
|
||||
run_ingestion(
|
||||
config=config.stages.annotation_ingestion,
|
||||
enriched_path=config.data.enriched_path,
|
||||
annotations_path=config.data.annotations_path,
|
||||
output_path=config.data.labeled_path
|
||||
)
|
||||
|
||||
logger.info(f"Labeled data written to: {config.data.labeled_path}")
|
||||
logger.info("Annotation ingestion stage completed successfully")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue