#!/usr/bin/env python3 """ SQLite to PostgreSQL Migration Script Migrates data from the legacy SQLite database to PostgreSQL. """ import sqlite3 import psycopg2 import psycopg2.extras import json import os from datetime import datetime # Configuration SQLITE_PATH = os.getenv('DATABASE_PATH', './data/candles.db') POSTGRES_URL = os.getenv('DATABASE_URL') if not POSTGRES_URL: print('ERROR: DATABASE_URL environment variable is required') exit(1) print('=' * 60) print('SQLite to PostgreSQL Migration') print('=' * 60) print(f'SQLite source: {SQLITE_PATH}') print(f'PostgreSQL target: {POSTGRES_URL.replace(POSTGRES_URL.split("@")[0].split(":")[-1], "****")}') print('=' * 60) print() # Connect to databases sqlite_conn = sqlite3.connect(SQLITE_PATH) sqlite_conn.row_factory = sqlite3.Row sqlite_cur = sqlite_conn.cursor() pg_conn = psycopg2.connect(POSTGRES_URL) pg_cur = pg_conn.cursor() stats = {} def migrate_table(table_name, columns, transform_fn=None): """Generic table migration function""" print(f'Migrating {table_name}...') # Get data from SQLite sqlite_cur.execute(f'SELECT * FROM {table_name}') rows = sqlite_cur.fetchall() migrated = 0 skipped = 0 errors = 0 for row in rows: try: # Transform data if needed if transform_fn: values = transform_fn(row) else: values = [row[col] for col in columns] # Check if already exists (by id) pg_cur.execute(f'SELECT 1 FROM {table_name} WHERE id = %s', (row['id'],)) if pg_cur.fetchone(): skipped += 1 continue # Insert into PostgreSQL placeholders = ', '.join(['%s'] * len(columns)) pg_cur.execute( f'INSERT INTO {table_name} ({", ".join(columns)}) VALUES ({placeholders})', values ) migrated += 1 except Exception as e: print(f' Error migrating {table_name} id={row["id"]}: {e}') errors += 1 pg_conn.commit() stats[table_name] = {'source': len(rows), 'migrated': migrated, 'skipped': skipped, 'errors': errors} print(f' {table_name}: {migrated} migrated, {skipped} skipped, {errors} errors\n') # Migration functions with type conversions def transform_charts(row): return [ row['id'], row['name'], datetime.fromtimestamp(row['created_at']) if row['created_at'] else None ] def transform_candles(row): return [ row['id'], row['chart_id'], datetime.fromtimestamp(row['time']) if row['time'] else None, row['open'], row['high'], row['low'], row['close'] ] def transform_annotation_types(row): return [ row['id'], row['name'], row['display_name'], row['color'], row['category'], row['icon'], bool(row['is_active']), datetime.fromtimestamp(row['created_at']) if row['created_at'] else None ] def transform_annotations(row): # For JSONB columns, we need to pass the JSON as a string and use psycopg2.extras.Json # But simpler: just keep it as a JSON string and let PostgreSQL handle it geometry_val = row['geometry'] if row['geometry'] else None return [ row['id'], row['chart_id'], datetime.fromtimestamp(row['timestamp']) if row['timestamp'] else None, row['label_type'], psycopg2.extras.Json(json.loads(geometry_val)) if geometry_val else None, row['color'] or '#3b82f6', datetime.fromtimestamp(row['created_at']) if row['created_at'] else None ] def transform_span_label_types(row): return [ row['id'], row['name'], row['display_name'], row['color'], row['hotkey'], bool(row['is_active']), row['sort_order'] or 0, datetime.fromtimestamp(row['created_at']) if row['created_at'] else None ] def transform_span_annotations(row): sub_spans_val = row['sub_spans'] if row['sub_spans'] else None model_prediction_val = row['model_prediction'] if row['model_prediction'] else None return [ row['id'], row['chart_id'], datetime.fromtimestamp(row['start_time']) if row['start_time'] else None, datetime.fromtimestamp(row['end_time']) if row['end_time'] else None, row['label'], row['confidence'], row['outcome'], row['notes'], psycopg2.extras.Json(json.loads(sub_spans_val)) if sub_spans_val else None, row['color'] or '#2196F3', row['source'] or 'human', psycopg2.extras.Json(json.loads(model_prediction_val)) if model_prediction_val else None, datetime.fromtimestamp(row['created_at']) if row['created_at'] else None ] # Migrate tables in dependency order try: migrate_table('charts', ['id', 'name', 'created_at'], transform_charts) migrate_table('candles', ['id', 'chart_id', 'time', 'open', 'high', 'low', 'close'], transform_candles) migrate_table('annotation_types', ['id', 'name', 'display_name', 'color', 'category', 'icon', 'is_active', 'created_at'], transform_annotation_types) migrate_table('annotations', ['id', 'chart_id', 'timestamp', 'label_type', 'geometry', 'color', 'created_at'], transform_annotations) migrate_table('span_label_types', ['id', 'name', 'display_name', 'color', 'hotkey', 'is_active', 'sort_order', 'created_at'], transform_span_label_types) migrate_table('span_annotations', ['id', 'chart_id', 'start_time', 'end_time', 'label', 'confidence', 'outcome', 'notes', 'sub_spans', 'color', 'source', 'model_prediction', 'created_at'], transform_span_annotations) # Print summary print('=' * 60) print('Migration Summary') print('=' * 60) print() print(f'{"Table":<25} | {"Source":>6} | {"Migrated":>8} | {"Skipped":>7} | {"Errors":>6}') print('-' * 60) total_source = 0 total_migrated = 0 total_skipped = 0 total_errors = 0 for table, stat in stats.items(): print(f'{table:<25} | {stat["source"]:>6} | {stat["migrated"]:>8} | {stat["skipped"]:>7} | {stat["errors"]:>6}') total_source += stat['source'] total_migrated += stat['migrated'] total_skipped += stat['skipped'] total_errors += stat['errors'] print('-' * 60) print(f'{"TOTAL":<25} | {total_source:>6} | {total_migrated:>8} | {total_skipped:>7} | {total_errors:>6}') print('=' * 60) if total_errors > 0: print(f'\n⚠️ Migration completed with {total_errors} errors. Check logs above.') else: print('\n✅ Migration completed successfully!') except Exception as e: print(f'\n❌ Migration failed: {e}') pg_conn.rollback() exit(1) finally: sqlite_conn.close() pg_conn.close()