feat: auto-build training dataset from DB annotations before training

- Add build_dataset_from_db() that exports candles from DB, runs feature
  engineering, and ingests span annotations into labeled CSV
- Call it automatically in _run_training_background before training starts
- Add POST /training/build-dataset endpoint for standalone use
- Add Next.js proxy route /api/training/build-dataset
- Update TrainingPanel: remove dataset-missing block on Start Training,
  show informational message that dataset builds automatically
This commit is contained in:
Marko Djordjevic 2026-02-18 00:24:39 +01:00
parent b4956f3fb9
commit d3dcfcea7d
4 changed files with 148 additions and 5 deletions

1
RAZNO/annotations.csv Normal file
View file

@ -0,0 +1 @@
timestamp,label_type,price
1 timestamp label_type price

View file

@ -907,9 +907,82 @@ class DatasetInfoResponse(BaseModel):
row_count: Optional[int] = None row_count: Optional[int] = None
def build_dataset_from_db(config: PipelineConfig) -> dict:
"""
Build the labeled training dataset directly from the database.
Steps:
1. Export candles from PostgreSQL to raw CSV
2. Run feature engineering (TA-Lib indicators, candle features)
3. Run annotation ingestion from DB (span_annotations -> labeled CSV)
Returns:
dict with keys: chart_name, n_candles, n_annotations, n_samples, labeled_path
"""
from app.data_access import DataAccess
from app.annotation_ingestion import AnnotationIngestion
from features.engineer import run_feature_engineering_stage
data_access = DataAccess()
# Find all charts, use the first one (single-chart app)
charts_df = data_access.get_all_charts()
if charts_df.empty:
raise ValueError("No charts found in database. Upload candle data first.")
chart = charts_df.iloc[0]
chart_name = chart["name"]
chart_id = int(chart["id"])
logger.info(f"Building dataset for chart: {chart_name} (id={chart_id})")
# Step 1: Export candles to raw CSV
candles_df = data_access.get_candles(chart_id)
if candles_df.empty:
raise ValueError(f"No candles found for chart: {chart_name}")
raw_path = Path(config.data.raw_path)
raw_path.parent.mkdir(parents=True, exist_ok=True)
# Ensure 'time' column is suitable for feature engineering
export_df = candles_df[["time", "open", "high", "low", "close"]].copy()
export_df.to_csv(raw_path, index=False)
logger.info(f"Exported {len(export_df)} candles to {raw_path}")
# Step 2: Run feature engineering
run_feature_engineering_stage(config)
enriched_path = Path(config.data.enriched_path)
logger.info(f"Feature engineering complete: {enriched_path}")
# Step 3: Run annotation ingestion from database
enriched_df = pd.read_csv(enriched_path, parse_dates=["time"])
ingestion = AnnotationIngestion(config.stages.annotation_ingestion)
labeled_df = ingestion.process_from_db(enriched_df, chart_name, source="human")
if labeled_df.empty:
raise ValueError(
f"No labeled samples produced. "
f"Ensure you have span annotations on chart '{chart_name}'."
)
# Write labeled dataset
labeled_path = Path(config.data.labeled_path)
labeled_path.parent.mkdir(parents=True, exist_ok=True)
labeled_df.to_csv(labeled_path, index=False)
result = {
"chart_name": chart_name,
"n_candles": len(export_df),
"n_samples": len(labeled_df),
"n_features": len([c for c in labeled_df.columns if c != "label"]),
"labeled_path": str(labeled_path),
}
logger.info(f"Dataset built: {result}")
return result
def _run_training_background(run_id: str, model_type: str, config: PipelineConfig) -> None: def _run_training_background(run_id: str, model_type: str, config: PipelineConfig) -> None:
""" """
Background thread target: train a model, update DB on completion or failure. Background thread target: build dataset then train a model.
Uses the pre-inserted TrainingRun record identified by ``run_id``. Uses the pre-inserted TrainingRun record identified by ``run_id``.
""" """
@ -920,6 +993,10 @@ def _run_training_background(run_id: str, model_type: str, config: PipelineConfi
from training.train import create_model, temporal_split from training.train import create_model, temporal_split
from sklearn.metrics import accuracy_score, f1_score from sklearn.metrics import accuracy_score, f1_score
# Build dataset from database (feature engineering + annotation ingestion)
logger.info("Building dataset from database...")
build_dataset_from_db(config)
labeled_path = Path(config.data.labeled_path) labeled_path = Path(config.data.labeled_path)
if not labeled_path.exists(): if not labeled_path.exists():
raise FileNotFoundError(f"Labeled dataset not found: {labeled_path}") raise FileNotFoundError(f"Labeled dataset not found: {labeled_path}")
@ -1256,6 +1333,38 @@ async def training_dataset_info():
) )
class BuildDatasetResponse(BaseModel):
"""Response model for POST /training/build-dataset."""
chart_name: str
n_candles: int
n_samples: int
n_features: int
labeled_path: str
@app.post("/training/build-dataset", response_model=BuildDatasetResponse)
async def training_build_dataset():
"""
Build the labeled training dataset from database annotations.
Exports candles, runs feature engineering, and ingests span annotations
into a labeled CSV ready for training.
"""
config = state.pipeline_config or get_default_config()
try:
result = build_dataset_from_db(config)
return BuildDatasetResponse(**result)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
except Exception as exc:
logger.error(f"Failed to build dataset: {exc}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to build dataset: {exc}",
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Model Loading Endpoint # Model Loading Endpoint
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View file

@ -0,0 +1,34 @@
import { NextResponse } from 'next/server';
const INFERENCE_API_URL = process.env.INFERENCE_API_URL || 'http://localhost:8001';
const INFERENCE_API_TIMEOUT = parseInt(process.env.INFERENCE_API_TIMEOUT || '120000', 10);
export async function POST() {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), INFERENCE_API_TIMEOUT);
try {
const response = await fetch(`${INFERENCE_API_URL}/training/build-dataset`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
signal: controller.signal,
});
clearTimeout(timeoutId);
const data = await response.json();
if (!response.ok) {
return NextResponse.json({ error: data.detail || 'Failed to build dataset' }, { status: response.status });
}
return NextResponse.json(data);
} catch (error: any) {
clearTimeout(timeoutId);
if (error.name === 'AbortError') {
return NextResponse.json({ error: 'Dataset build timed out' }, { status: 504 });
}
if (error.cause?.code === 'ECONNREFUSED' || error.message?.includes('fetch failed')) {
return NextResponse.json({ error: 'Inference service unavailable' }, { status: 503 });
}
console.error('training/build-dataset proxy error:', error);
return NextResponse.json({ error: 'Internal server error' }, { status: 500 });
}
}

View file

@ -228,8 +228,7 @@ export default function TrainingPanel() {
} }
}; };
const datasetMissing = datasetInfo !== null && !datasetInfo.exists; const canTrain = !isTraining;
const canTrain = !isTraining && !datasetMissing && datasetInfo !== null;
return ( return (
<div> <div>
@ -272,8 +271,8 @@ export default function TrainingPanel() {
)} )}
</> </>
) : ( ) : (
<p className="text-orange-500"> <p className="text-muted-foreground">
No training dataset found. Export annotations first. No cached dataset. It will be built automatically from your annotations when training starts.
</p> </p>
)} )}
</div> </div>