candle-annotator/services/ml/app/annotation_ingestion.py
Marko Djordjevic bfe437857b feat: add Python migration script and successfully test SQLite to PostgreSQL data migration
- Created scripts/migrate-sqlite-to-postgres.py as alternative to TypeScript version
- Handles all type conversions: timestamps, booleans, and JSONB fields
- Successfully migrated all 2,836 rows from SQLite to PostgreSQL
- Verified data integrity: all 6 tables migrated correctly
- Charts: 1, Candles: 2,592, Annotations: 4, Span annotations: 223
2026-02-17 14:01:21 +01:00

629 lines
22 KiB
Python

"""
Annotation ingestion module.
Loads annotations from JSON exports and converts them into labeled training datasets
with various encoding strategies (windowed classification, BIO sequence labeling).
Supports programmatic TA-Lib pattern labels and merge strategies.
"""
import json
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from collections import defaultdict
import pandas as pd
import numpy as np
from app.config import AnnotationIngestionConfig
from app.data_access import DataAccess
logger = logging.getLogger(__name__)
class AnnotationIngestion:
"""
Handles loading and processing of annotations into labeled datasets.
Supports:
- Loading annotations from JSON exports
- Windowed classification encoding
- BIO sequence labeling encoding
- TA-Lib CDL* programmatic pattern detection
- Human/programmatic label merging
- Context padding
- Dataset statistics logging
"""
def __init__(self, config: AnnotationIngestionConfig):
"""
Initialize annotation ingestion.
Args:
config: Annotation ingestion configuration
"""
self.config = config
def load_annotations(self, annotations_path: str) -> List[Dict[str, Any]]:
"""
Load annotations from JSON export file.
Args:
annotations_path: Path to annotations JSON file
Returns:
List of annotation dictionaries
Raises:
FileNotFoundError: If annotations file doesn't exist
ValueError: If JSON format is invalid
"""
path = Path(annotations_path)
if not path.exists():
raise FileNotFoundError(
f"Annotations file not found: {annotations_path}. "
"Please export annotations from the UI first."
)
logger.info(f"Loading annotations from {annotations_path}")
with open(path, 'r') as f:
data = json.load(f)
if 'annotations' not in data:
raise ValueError(
f"Invalid annotations JSON format: missing 'annotations' key"
)
annotations = data['annotations']
logger.info(f"Loaded {len(annotations)} annotations")
# Filter by confidence
if self.config.min_confidence > 1:
original_count = len(annotations)
annotations = [
ann for ann in annotations
if ann.get('confidence') is not None
and ann['confidence'] >= self.config.min_confidence
]
filtered_count = original_count - len(annotations)
if filtered_count > 0:
logger.info(
f"Filtered {filtered_count} annotations below "
f"min_confidence={self.config.min_confidence}"
)
return annotations
def load_annotations_from_db(
self,
chart_name: str,
source: str = "human"
) -> List[Dict[str, Any]]:
"""
Load annotations directly from PostgreSQL database.
This method replaces JSON file exports by querying the database directly.
Args:
chart_name: Name of the chart to load annotations for
source: Filter by annotation source ('human', 'model', 'hybrid')
Returns:
List of annotation dictionaries compatible with existing processing
"""
logger.info(f"Loading annotations from database for chart: {chart_name}")
data_access = DataAccess()
# Get span annotations from database
chart = data_access.get_chart_by_name(chart_name)
if not chart:
raise ValueError(f"Chart not found: {chart_name}")
annotations_df = data_access.get_span_annotations(
chart_id=chart['id'],
source=source,
min_confidence=self.config.min_confidence if self.config.min_confidence > 1 else None
)
if annotations_df.empty:
logger.warning(f"No annotations found for chart: {chart_name}")
return []
# Convert DataFrame to list of dictionaries compatible with existing code
annotations = []
for _, row in annotations_df.iterrows():
ann = {
'id': row['id'],
'label': row['label'],
'start_time': row['start_time'].isoformat() if pd.notna(row['start_time']) else None,
'end_time': row['end_time'].isoformat() if pd.notna(row['end_time']) else None,
'confidence': row.get('confidence'),
'outcome': row.get('outcome'),
'notes': row.get('notes'),
'source': row['source'],
}
annotations.append(ann)
logger.info(f"Loaded {len(annotations)} annotations from database")
return annotations
def get_programmatic_labels(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Generate programmatic labels using TA-Lib CDL* pattern functions.
Args:
df: DataFrame with OHLC columns
Returns:
DataFrame with programmatic label column added
"""
if not self.config.programmatic_labels.enabled:
return df
if not self.config.programmatic_labels.talib_patterns:
logger.warning("Programmatic labels enabled but no patterns configured")
return df
try:
import talib
except ImportError:
raise ImportError(
"TA-Lib is required for programmatic labels. "
"Install with: pip install TA-Lib (requires libta-lib-dev)"
)
logger.info("Computing programmatic TA-Lib pattern labels")
# Collect all pattern detections
pattern_labels = pd.Series(['O'] * len(df), index=df.index)
for pattern_name in self.config.programmatic_labels.talib_patterns:
if not hasattr(talib, pattern_name):
logger.warning(f"Unknown TA-Lib pattern: {pattern_name}")
continue
pattern_func = getattr(talib, pattern_name)
try:
result = pattern_func(
df['open'].values,
df['high'].values,
df['low'].values,
df['close'].values
)
# Convert +100/-100 to label names
for i, value in enumerate(result):
if value > 0:
label = f"bullish_{pattern_name.lower().replace('cdl', '').replace('_', '')}"
pattern_labels.iloc[i] = label
elif value < 0:
label = f"bearish_{pattern_name.lower().replace('cdl', '').replace('_', '')}"
pattern_labels.iloc[i] = label
except Exception as e:
logger.error(f"Error computing {pattern_name}: {e}")
continue
df['label_programmatic'] = pattern_labels
# Log statistics
prog_counts = pattern_labels.value_counts()
logger.info(f"Programmatic labels: {prog_counts.to_dict()}")
return df
def create_windowed_dataset(
self,
df: pd.DataFrame,
annotations: List[Dict[str, Any]]
) -> pd.DataFrame:
"""
Create windowed classification dataset.
Each annotation span is converted to a fixed-size window of candles,
flattened into a single row with the annotation label as target.
Args:
df: DataFrame with features and time column
annotations: List of annotation dictionaries
Returns:
DataFrame with windowed samples
"""
logger.info("Creating windowed classification dataset")
window_size = self.config.window_size
context_padding = self.config.context_padding
samples = []
for ann in annotations:
label = ann['label']
start_time = pd.Timestamp(ann['start_time']).tz_localize(None)
end_time = pd.Timestamp(ann['end_time']).tz_localize(None)
# Find candles in span
span_mask = (df['time'] >= start_time) & (df['time'] <= end_time)
span_indices = df.index[span_mask].tolist()
if not span_indices:
logger.warning(f"No candles found for annotation {ann.get('id')}")
continue
span_start_idx = span_indices[0]
span_end_idx = span_indices[-1]
span_length = len(span_indices)
# Determine window boundaries
if span_length < window_size:
# Pad with context candles (centered)
padding_needed = window_size - span_length
left_padding = padding_needed // 2
right_padding = padding_needed - left_padding
window_start = max(0, span_start_idx - left_padding - context_padding)
window_end = min(len(df) - 1, span_end_idx + right_padding + context_padding)
else:
# Use full span + context padding
window_start = max(0, span_start_idx - context_padding)
window_end = min(len(df) - 1, span_end_idx + context_padding)
# Extract window
window = df.iloc[window_start:window_end + 1].copy()
# Flatten window into single row
# Create feature names: feature_0, feature_1, etc.
feature_cols = [c for c in df.columns if c != 'time']
flattened = {}
for i, (_, row) in enumerate(window.iterrows()):
for col in feature_cols:
flattened[f"{col}_{i}"] = row[col]
# Pad with NaN if window is smaller than expected
expected_size = window_size
actual_size = len(window)
if actual_size < expected_size:
for i in range(actual_size, expected_size):
for col in feature_cols:
flattened[f"{col}_{i}"] = np.nan
flattened['label'] = label
samples.append(flattened)
result_df = pd.DataFrame(samples)
logger.info(f"Created {len(result_df)} windowed samples")
return result_df
def create_bio_dataset(
self,
df: pd.DataFrame,
annotations: List[Dict[str, Any]]
) -> pd.DataFrame:
"""
Create BIO sequence labeling dataset.
Each candle gets a BIO tag: B-{label} for span start, I-{label} for
continuation, O for outside annotations.
Args:
df: DataFrame with features and time column
annotations: List of annotation dictionaries
Returns:
DataFrame with BIO tags column(s)
"""
logger.info("Creating BIO sequence labeling dataset")
# Initialize with 'O' tags
bio_tags = pd.Series(['O'] * len(df), index=df.index)
# Track overlapping annotations
overlaps_detected = False
bio_columns = {'bio_tag': bio_tags}
for ann in annotations:
label = ann['label']
start_time = pd.Timestamp(ann['start_time'])
end_time = pd.Timestamp(ann['end_time'])
# Find candles in span
span_mask = (df['time'] >= start_time) & (df['time'] <= end_time)
span_indices = df.index[span_mask].tolist()
if not span_indices:
continue
# Check for overlaps
first_idx = span_indices[0]
if bio_tags.iloc[first_idx] != 'O':
# Overlap detected, create additional column
overlaps_detected = True
col_num = 2
while f'bio_tag_{col_num}' in bio_columns:
col_num += 1
# Create new column if needed
if f'bio_tag_{col_num}' not in bio_columns:
bio_columns[f'bio_tag_{col_num}'] = pd.Series(
['O'] * len(df),
index=df.index
)
# Use this column for the annotation
target_col = f'bio_tag_{col_num}'
else:
target_col = 'bio_tag'
# Assign BIO tags
bio_columns[target_col].iloc[span_indices[0]] = f"B-{label}"
for idx in span_indices[1:]:
bio_columns[target_col].iloc[idx] = f"I-{label}"
if overlaps_detected:
logger.info("Overlapping annotations detected, created multiple BIO tag columns")
# Add BIO columns to dataframe
result_df = df.copy()
for col_name, col_data in bio_columns.items():
result_df[col_name] = col_data
return result_df
def merge_labels(
self,
df: pd.DataFrame,
annotations: List[Dict[str, Any]]
) -> pd.DataFrame:
"""
Merge human annotations and programmatic labels based on merge strategy.
Args:
df: DataFrame with programmatic labels (if enabled)
annotations: List of human annotation dictionaries
Returns:
DataFrame with merged labels
"""
if not self.config.programmatic_labels.enabled:
# No programmatic labels, just use human annotations
return df
strategy = self.config.merge_strategy
logger.info(f"Merging labels with strategy: {strategy}")
# Create human labels column
human_labels = pd.Series(['O'] * len(df), index=df.index)
for ann in annotations:
label = ann['label']
start_time = pd.Timestamp(ann['start_time'])
end_time = pd.Timestamp(ann['end_time'])
span_mask = (df['time'] >= start_time) & (df['time'] <= end_time)
span_indices = df.index[span_mask].tolist()
for idx in span_indices:
human_labels.iloc[idx] = label
df['label_human'] = human_labels
# Apply merge strategy
if strategy == "human_priority":
# Use human label if present, else programmatic
df['label'] = df.apply(
lambda row: row['label_human'] if row['label_human'] != 'O'
else row.get('label_programmatic', 'O'),
axis=1
)
# Drop temporary columns
df = df.drop(columns=['label_human', 'label_programmatic'], errors='ignore')
elif strategy == "programmatic_priority":
# Use programmatic label if present, else human
df['label'] = df.apply(
lambda row: row.get('label_programmatic', 'O') if row.get('label_programmatic', 'O') != 'O'
else row['label_human'],
axis=1
)
# Drop temporary columns
df = df.drop(columns=['label_human', 'label_programmatic'], errors='ignore')
elif strategy == "both":
# Keep both columns
pass
return df
def log_statistics(self, df: pd.DataFrame, annotations: List[Dict[str, Any]]):
"""
Log dataset statistics.
Args:
df: Labeled DataFrame
annotations: List of annotations
"""
logger.info("=" * 60)
logger.info("Dataset Statistics")
logger.info("=" * 60)
# Label distribution
if 'label' in df.columns:
label_counts = df['label'].value_counts()
total = len(df)
logger.info("\nLabel Distribution:")
for label, count in label_counts.items():
pct = (count / total) * 100
logger.info(f" {label}: {count} ({pct:.2f}%)")
# If both human and programmatic labels exist
if 'label_human' in df.columns and 'label_programmatic' in df.columns:
# Agreement rate (excluding O labels)
non_o_mask = (df['label_human'] != 'O') | (df['label_programmatic'] != 'O')
non_o_df = df[non_o_mask]
if len(non_o_df) > 0:
agreement = (
non_o_df['label_human'] == non_o_df['label_programmatic']
).sum()
agreement_rate = (agreement / len(non_o_df)) * 100
logger.info(f"\nHuman/Programmatic Agreement Rate: {agreement_rate:.2f}%")
# Average span length per label
span_lengths = defaultdict(list)
for ann in annotations:
label = ann['label']
start_time = pd.Timestamp(ann['start_time'])
end_time = pd.Timestamp(ann['end_time'])
if 'time' in df.columns:
span_mask = (df['time'] >= start_time) & (df['time'] <= end_time)
span_length = span_mask.sum()
span_lengths[label].append(span_length)
if span_lengths:
logger.info("\nAverage Span Length (candles):")
for label, lengths in span_lengths.items():
avg_length = np.mean(lengths)
logger.info(f" {label}: {avg_length:.2f}")
logger.info("=" * 60)
def process(
self,
enriched_df: pd.DataFrame,
annotations_path: str
) -> pd.DataFrame:
"""
Main processing pipeline for annotation ingestion.
Args:
enriched_df: DataFrame with engineered features
annotations_path: Path to annotations JSON file
Returns:
Labeled DataFrame ready for training
"""
logger.info("Starting annotation ingestion")
# Load annotations
annotations = self.load_annotations(annotations_path)
if not annotations:
logger.warning("No annotations found, returning empty DataFrame")
return pd.DataFrame()
# Add programmatic labels if enabled
df = self.get_programmatic_labels(enriched_df)
# Apply label encoding
if self.config.label_encoding == "window":
result_df = self.create_windowed_dataset(df, annotations)
elif self.config.label_encoding == "bio":
result_df = self.create_bio_dataset(df, annotations)
# For BIO, also merge human/programmatic if enabled
if self.config.programmatic_labels.enabled:
result_df = self.merge_labels(result_df, annotations)
else:
raise ValueError(f"Unknown label encoding: {self.config.label_encoding}")
# Log statistics
self.log_statistics(result_df, annotations)
logger.info("Annotation ingestion complete")
return result_df
def process_from_db(
self,
enriched_df: pd.DataFrame,
chart_name: str,
source: str = "human"
) -> pd.DataFrame:
"""
Main processing pipeline using direct database access.
This method replaces JSON file exports by querying PostgreSQL directly.
Args:
enriched_df: DataFrame with engineered features
chart_name: Name of the chart to load annotations for
source: Filter by annotation source ('human', 'model', 'hybrid')
Returns:
Labeled DataFrame ready for training
"""
logger.info(f"Starting annotation ingestion from database for chart: {chart_name}")
# Load annotations from database
annotations = self.load_annotations_from_db(chart_name, source)
if not annotations:
logger.warning("No annotations found, returning empty DataFrame")
return pd.DataFrame()
# Add programmatic labels if enabled
df = self.get_programmatic_labels(enriched_df)
# Apply label encoding
if self.config.label_encoding == "window":
result_df = self.create_windowed_dataset(df, annotations)
elif self.config.label_encoding == "bio":
result_df = self.create_bio_dataset(df, annotations)
# For BIO, also merge human/programmatic if enabled
if self.config.programmatic_labels.enabled:
result_df = self.merge_labels(result_df, annotations)
else:
raise ValueError(f"Unknown label encoding: {self.config.label_encoding}")
# Log statistics
self.log_statistics(result_df, annotations)
logger.info("Annotation ingestion complete")
return result_df
def run_annotation_ingestion(
config: AnnotationIngestionConfig,
enriched_path: str,
annotations_path: str,
output_path: str
):
"""
Run annotation ingestion stage.
Args:
config: Annotation ingestion configuration
enriched_path: Path to enriched features CSV
annotations_path: Path to annotations JSON
output_path: Path to write labeled dataset CSV
"""
logger.info("Running annotation ingestion stage")
# Load enriched data
enriched_df = pd.read_csv(enriched_path, parse_dates=['time'])
logger.info(f"Loaded enriched data: {enriched_df.shape}")
# Process annotations
ingestion = AnnotationIngestion(config)
labeled_df = ingestion.process(enriched_df, annotations_path)
if labeled_df.empty:
logger.error("No labeled data produced")
return
# Write output
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
labeled_df.to_csv(output_path, index=False)
logger.info(f"Labeled dataset written to {output_path}")
logger.info(f"Final dataset shape: {labeled_df.shape}")