feat: add training resource limits (500MB size check + 30-min timeout)

- Import concurrent.futures for timeout support
- In _run_training_background: check df.memory_usage(deep=True).sum()
  after loading the labeled dataset; raise ValueError if > 500MB
- Wrap model.fit() in a ThreadPoolExecutor with a 1800s timeout;
  on TimeoutError update DB status to "failed" with message
  "Training timed out after 30 minutes" and return early
- Mark task 5.7 as done in openspec/changes/code-review-fix/tasks.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Marko Djordjevic 2026-02-18 11:31:33 +01:00
parent f94d16c6ab
commit 3dc0014328
2 changed files with 53 additions and 6 deletions

View file

@ -47,7 +47,7 @@
- [x] 5.4 `[sonnet]` Add date range validation (max 1 year) to `POST /predict/batch` in `services/ml/app/main.py` - [x] 5.4 `[sonnet]` Add date range validation (max 1 year) to `POST /predict/batch` in `services/ml/app/main.py`
- [x] 5.5 `[sonnet]` Add candle time-sort validation/auto-sort to `POST /predict` in `services/ml/app/main.py` - [x] 5.5 `[sonnet]` Add candle time-sort validation/auto-sort to `POST /predict` in `services/ml/app/main.py`
- [x] 5.6 `[sonnet]` Implement real health checks: `SELECT 1` for PostgreSQL, MLflow API ping in `services/ml/app/main.py:396-409` - [x] 5.6 `[sonnet]` Implement real health checks: `SELECT 1` for PostgreSQL, MLflow API ping in `services/ml/app/main.py:396-409`
- [ ] 5.7 `[sonnet]` Add training resource limits: 500MB dataset size check, 30-minute timeout with status update on expiry in `services/ml/app/main.py:907-1030` - [x] 5.7 `[sonnet]` Add training resource limits: 500MB dataset size check, 30-minute timeout with status update on expiry in `services/ml/app/main.py:907-1030`
- [ ] 5.8 `[haiku]` Add `run_id` format validation to `DELETE /training/runs/{run_id}` and `GET /training/runs/{run_id}` endpoints - [ ] 5.8 `[haiku]` Add `run_id` format validation to `DELETE /training/runs/{run_id}` and `GET /training/runs/{run_id}` endpoints
## 6. Infrastructure & Docker ## 6. Infrastructure & Docker

View file

@ -4,6 +4,7 @@ FastAPI inference service for candlestick pattern prediction.
Provides REST API endpoints for model serving, health checks, and prediction. Provides REST API endpoints for model serving, health checks, and prediction.
""" """
import concurrent.futures
import hashlib import hashlib
import logging import logging
import os import os
@ -1166,6 +1167,18 @@ def _run_training_background(run_id: str, model_type: str, config: PipelineConfi
if "label" not in df.columns: if "label" not in df.columns:
raise ValueError("Labeled dataset must have 'label' column") raise ValueError("Labeled dataset must have 'label' column")
# Check dataset size: reject if it exceeds 500MB in memory
_DATASET_SIZE_LIMIT_BYTES = 500 * 1024 * 1024 # 500 MB
dataset_size_bytes = df.memory_usage(deep=True).sum()
if dataset_size_bytes > _DATASET_SIZE_LIMIT_BYTES:
raise ValueError(
f"Dataset too large. Maximum size is 500MB "
f"(current size: {dataset_size_bytes / (1024 * 1024):.1f}MB)."
)
logger.info(
f"Dataset size check passed: {dataset_size_bytes / (1024 * 1024):.1f}MB"
)
feature_cols = [ feature_cols = [
col for col in df.columns col for col in df.columns
if col not in ("label", "time", "timestamp") if col not in ("label", "time", "timestamp")
@ -1183,11 +1196,45 @@ def _run_training_background(run_id: str, model_type: str, config: PipelineConfi
X, y, training_cfg.test_split, training_cfg.validation_split X, y, training_cfg.test_split, training_cfg.validation_split
) )
# Train model # Run model training with a 30-minute timeout
model_instance = create_model( _TRAINING_TIMEOUT_SECONDS = 1800 # 30 minutes
def _do_train():
_model = create_model(
model_type, training_cfg.hyperparameters, training_cfg.class_weights model_type, training_cfg.hyperparameters, training_cfg.class_weights
) )
model_instance.fit(X_train, y_train) _model.fit(X_train, y_train)
return _model
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(_do_train)
try:
model_instance = future.result(timeout=_TRAINING_TIMEOUT_SECONDS)
except concurrent.futures.TimeoutError:
logger.error(
f"Training timed out after 30 minutes: run_id={run_id}"
)
try:
with get_db() as db:
stmt = (
sa_update(TrainingRun)
.where(TrainingRun.run_id == run_id)
.values(
status="failed",
completed_at=datetime.utcnow(),
metrics_summary={
"error": "Training timed out after 30 minutes"
},
)
)
db.execute(stmt)
db.commit()
except Exception as db_exc:
logger.error(
f"Failed to update DB for timed-out run {run_id}: {db_exc}"
)
return
logger.info("Model training complete") logger.info("Model training complete")
# Evaluate # Evaluate