From cbb921b4a767c0335260b9f457ee2465083845d9 Mon Sep 17 00:00:00 2001 From: Marko Djordjevic Date: Fri, 20 Feb 2026 13:46:00 +0100 Subject: [PATCH] Scope MLflow experiment names to include user ID (Task 14.2) - Updated FastAPI /training/start endpoint to extract X-User-ID header via get_user_id() dependency - Modified _run_training_background to accept and use user_id parameter - Added MLflow experiment setup with user scoping: experiments are named user_{user_id}_training when user_id is provided, falling back to default experiment name otherwise - Updated database record insertion to store scoped experiment name - Updated training/train.py train() function to accept user_id parameter and use it for experiment naming - Mark task 14.2 as complete in tasks.md Co-Authored-By: Claude Sonnet 4.6 --- openspec/changes/user-accounts/tasks.md | 2 +- services/ml/app/main.py | 42 ++++++++++++++++++++++--- services/ml/training/train.py | 27 ++++++++++------ 3 files changed, 55 insertions(+), 16 deletions(-) diff --git a/openspec/changes/user-accounts/tasks.md b/openspec/changes/user-accounts/tasks.md index 6e30090..9316b12 100644 --- a/openspec/changes/user-accounts/tasks.md +++ b/openspec/changes/user-accounts/tasks.md @@ -82,7 +82,7 @@ ## 14. ML Service User Scoping - [x] 14.1 `[haiku]` Update FastAPI service to read `X-User-ID` header from incoming requests -- [ ] 14.2 `[haiku]` Scope MLflow experiment names to include user ID (e.g., `user_{uuid}_training`) +- [x] 14.2 `[haiku]` Scope MLflow experiment names to include user ID (e.g., `user_{uuid}_training`) - [ ] 14.3 `[sonnet]` Scope training run queries in FastAPI to filter by user ID ## 15. Documentation & Deployment diff --git a/services/ml/app/main.py b/services/ml/app/main.py index a7f2cde..5f6b40f 100644 --- a/services/ml/app/main.py +++ b/services/ml/app/main.py @@ -1230,19 +1230,40 @@ def _run_training_background( model_type: str, config: PipelineConfig, chart_id: Optional[int] = None, + user_id: Optional[str] = None, ) -> None: """ Background thread target: build dataset then train a model. Uses the pre-inserted TrainingRun record identified by ``run_id``. + + Args: + run_id: Training run ID + model_type: Type of model to train + config: Pipeline configuration + chart_id: Optional chart ID to train on + user_id: Optional user ID for scoped experiment naming """ - logger.info(f"Training thread started: run_id={run_id}, model_type={model_type}") + logger.info(f"Training thread started: run_id={run_id}, model_type={model_type}, user_id={user_id}") try: # Import training utilities here to avoid circular import issues from training.train import create_model, temporal_split from sklearn.metrics import accuracy_score, f1_score + # Set up MLflow experiment with user scoping + mlflow_config = config.stages.training.mlflow + mlflow.set_tracking_uri(mlflow_config.tracking_uri) + + # Use user-scoped experiment name if user_id provided, otherwise use default + if user_id: + experiment_name = f"user_{user_id}_training" + else: + experiment_name = mlflow_config.experiment_name + + mlflow.set_experiment(experiment_name) + logger.info(f"MLflow experiment set to: {experiment_name}") + # Build dataset from database (feature engineering + annotation ingestion) logger.info("Building dataset from database...") build_dataset_from_db(config, chart_id=chart_id) @@ -1407,12 +1428,16 @@ def _run_training_background( @app.post("/training/start", response_model=TrainingStartResponse, dependencies=[Depends(verify_api_key)]) -async def training_start(request: TrainingStartRequest): +async def training_start(request: TrainingStartRequest, user_id: Optional[str] = Depends(get_user_id)): """ Start a training run in a background thread. Returns immediately with run_id and status "running". Rejects concurrent runs with HTTP 409. + + Args: + request: Training request parameters + user_id: Optional user ID from X-User-ID header for scoped experiments """ # Validate model type if request.model_type not in SUPPORTED_MODEL_TYPES: @@ -1455,11 +1480,18 @@ async def training_start(request: TrainingStartRequest): # Pre-insert the run record so callers can track it immediately try: + # Compute scoped experiment name + mlflow_config = config.stages.training.mlflow + if user_id: + experiment_name = f"user_{user_id}_training" + else: + experiment_name = mlflow_config.experiment_name + with get_db() as db: training_run = TrainingRun( run_id=run_id, model_type=request.model_type, - experiment_name=config.stages.training.mlflow.experiment_name, + experiment_name=experiment_name, pipeline_config_hash=config_hash, status="running", created_at=datetime.now(timezone.utc), @@ -1479,13 +1511,13 @@ async def training_start(request: TrainingStartRequest): # Launch background thread (daemon so it doesn't block process exit) thread = threading.Thread( target=_run_training_background, - args=(run_id, request.model_type, config, request.chart_id), + args=(run_id, request.model_type, config, request.chart_id, user_id), daemon=True, name=f"training-{run_id[:8]}", ) thread.start() - logger.info(f"Training started: run_id={run_id}, model_type={request.model_type}") + logger.info(f"Training started: run_id={run_id}, model_type={request.model_type}, user_id={user_id or 'default'}") return TrainingStartResponse(run_id=run_id, status="running") diff --git a/services/ml/training/train.py b/services/ml/training/train.py index 4f3ba51..a71cdec 100644 --- a/services/ml/training/train.py +++ b/services/ml/training/train.py @@ -172,33 +172,40 @@ def compute_config_hash(config: PipelineConfig) -> str: def train( config: PipelineConfig, labeled_data_path: Path, - output_model_path: Optional[Path] = None + output_model_path: Optional[Path] = None, + user_id: Optional[str] = None ) -> str: """ Main training function. - + Loads labeled data, splits, trains model, evaluates, logs to MLflow, and stores metadata in PostgreSQL. - + Args: config: Pipeline configuration labeled_data_path: Path to labeled CSV file output_model_path: Optional path to save model locally (for inference) - + user_id: Optional user ID for scoped experiment naming (e.g., user_{uuid}_training) + Returns: MLflow run ID """ training_config = config.stages.training mlflow_config = training_config.mlflow - + # Initialize database init_db() - + # Set MLflow tracking URI mlflow.set_tracking_uri(mlflow_config.tracking_uri) - - # Set experiment - mlflow.set_experiment(mlflow_config.experiment_name) + + # Set experiment with user scoping if user_id is provided + if user_id: + experiment_name = f"user_{user_id}_training" + else: + experiment_name = mlflow_config.experiment_name + + mlflow.set_experiment(experiment_name) logger.info(f"Loading labeled data from {labeled_data_path}") df = pd.read_csv(labeled_data_path) @@ -247,7 +254,7 @@ def train( training_run = TrainingRun( run_id=run_id, model_type=training_config.model_type, - experiment_name=mlflow_config.experiment_name, + experiment_name=experiment_name, pipeline_config_hash=compute_config_hash(config), dataset_version=None, # TODO: Add DVC hash if available metrics_summary={},