diff --git a/openspec/changes/code-review-fix/tasks.md b/openspec/changes/code-review-fix/tasks.md index de1ef52..6913574 100644 --- a/openspec/changes/code-review-fix/tasks.md +++ b/openspec/changes/code-review-fix/tasks.md @@ -47,7 +47,7 @@ - [x] 5.4 `[sonnet]` Add date range validation (max 1 year) to `POST /predict/batch` in `services/ml/app/main.py` - [x] 5.5 `[sonnet]` Add candle time-sort validation/auto-sort to `POST /predict` in `services/ml/app/main.py` - [x] 5.6 `[sonnet]` Implement real health checks: `SELECT 1` for PostgreSQL, MLflow API ping in `services/ml/app/main.py:396-409` -- [ ] 5.7 `[sonnet]` Add training resource limits: 500MB dataset size check, 30-minute timeout with status update on expiry in `services/ml/app/main.py:907-1030` +- [x] 5.7 `[sonnet]` Add training resource limits: 500MB dataset size check, 30-minute timeout with status update on expiry in `services/ml/app/main.py:907-1030` - [ ] 5.8 `[haiku]` Add `run_id` format validation to `DELETE /training/runs/{run_id}` and `GET /training/runs/{run_id}` endpoints ## 6. Infrastructure & Docker diff --git a/services/ml/app/main.py b/services/ml/app/main.py index 152fb95..3a00171 100644 --- a/services/ml/app/main.py +++ b/services/ml/app/main.py @@ -4,6 +4,7 @@ FastAPI inference service for candlestick pattern prediction. Provides REST API endpoints for model serving, health checks, and prediction. """ +import concurrent.futures import hashlib import logging import os @@ -1166,6 +1167,18 @@ def _run_training_background(run_id: str, model_type: str, config: PipelineConfi if "label" not in df.columns: raise ValueError("Labeled dataset must have 'label' column") + # Check dataset size: reject if it exceeds 500MB in memory + _DATASET_SIZE_LIMIT_BYTES = 500 * 1024 * 1024 # 500 MB + dataset_size_bytes = df.memory_usage(deep=True).sum() + if dataset_size_bytes > _DATASET_SIZE_LIMIT_BYTES: + raise ValueError( + f"Dataset too large. Maximum size is 500MB " + f"(current size: {dataset_size_bytes / (1024 * 1024):.1f}MB)." + ) + logger.info( + f"Dataset size check passed: {dataset_size_bytes / (1024 * 1024):.1f}MB" + ) + feature_cols = [ col for col in df.columns if col not in ("label", "time", "timestamp") @@ -1183,11 +1196,45 @@ def _run_training_background(run_id: str, model_type: str, config: PipelineConfi X, y, training_cfg.test_split, training_cfg.validation_split ) - # Train model - model_instance = create_model( - model_type, training_cfg.hyperparameters, training_cfg.class_weights - ) - model_instance.fit(X_train, y_train) + # Run model training with a 30-minute timeout + _TRAINING_TIMEOUT_SECONDS = 1800 # 30 minutes + + def _do_train(): + _model = create_model( + model_type, training_cfg.hyperparameters, training_cfg.class_weights + ) + _model.fit(X_train, y_train) + return _model + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(_do_train) + try: + model_instance = future.result(timeout=_TRAINING_TIMEOUT_SECONDS) + except concurrent.futures.TimeoutError: + logger.error( + f"Training timed out after 30 minutes: run_id={run_id}" + ) + try: + with get_db() as db: + stmt = ( + sa_update(TrainingRun) + .where(TrainingRun.run_id == run_id) + .values( + status="failed", + completed_at=datetime.utcnow(), + metrics_summary={ + "error": "Training timed out after 30 minutes" + }, + ) + ) + db.execute(stmt) + db.commit() + except Exception as db_exc: + logger.error( + f"Failed to update DB for timed-out run {run_id}: {db_exc}" + ) + return + logger.info("Model training complete") # Evaluate