# SQLite Migration Patterns Version-controlled schema migrations for SQLite databases. ## Basic Migration Pattern ```python import sqlite3 MIGRATIONS = [ # Version 1: Initial schema """ CREATE TABLE IF NOT EXISTS items ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); """, # Version 2: Add status column """ ALTER TABLE items ADD COLUMN status TEXT DEFAULT 'active'; CREATE INDEX IF NOT EXISTS idx_items_status ON items(status); """, # Version 3: Add user reference """ ALTER TABLE items ADD COLUMN user_id INTEGER; CREATE INDEX IF NOT EXISTS idx_items_user ON items(user_id); """, ] def migrate(conn: sqlite3.Connection): """Apply pending migrations.""" # Create version tracking table conn.execute(""" CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER PRIMARY KEY, applied_at TEXT DEFAULT (datetime('now')) ) """) # Get current version result = conn.execute( "SELECT MAX(version) FROM schema_version" ).fetchone() current = result[0] if result[0] is not None else 0 # Apply pending migrations for i, migration in enumerate(MIGRATIONS[current:], start=current + 1): print(f"Applying migration {i}...") conn.executescript(migration) conn.execute( "INSERT INTO schema_version (version) VALUES (?)", (i,) ) conn.commit() print(f"Migration {i} complete") print(f"Database at version {len(MIGRATIONS)}") ``` ## Named Migrations ```python from dataclasses import dataclass from datetime import datetime @dataclass class Migration: name: str up: str down: str | None = None MIGRATIONS = [ Migration( name="001_initial_schema", up=""" CREATE TABLE users ( id INTEGER PRIMARY KEY, email TEXT UNIQUE NOT NULL, name TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX idx_users_email ON users(email); """, down=""" DROP INDEX IF EXISTS idx_users_email; DROP TABLE IF EXISTS users; """ ), Migration( name="002_add_orders", up=""" CREATE TABLE orders ( id INTEGER PRIMARY KEY, user_id INTEGER NOT NULL REFERENCES users(id), total REAL NOT NULL, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX idx_orders_user ON orders(user_id); CREATE INDEX idx_orders_status ON orders(status); """, down=""" DROP TABLE IF EXISTS orders; """ ), ] def migrate_up(conn: sqlite3.Connection, target: int | None = None): """Apply migrations up to target version.""" conn.execute(""" CREATE TABLE IF NOT EXISTS migrations ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, applied_at TEXT DEFAULT (datetime('now')) ) """) applied = { row[0] for row in conn.execute("SELECT name FROM migrations").fetchall() } target = target or len(MIGRATIONS) for i, migration in enumerate(MIGRATIONS[:target]): if migration.name not in applied: print(f"Applying: {migration.name}") conn.executescript(migration.up) conn.execute( "INSERT INTO migrations (name) VALUES (?)", (migration.name,) ) conn.commit() def migrate_down(conn: sqlite3.Connection, steps: int = 1): """Rollback migrations.""" applied = conn.execute( "SELECT name FROM migrations ORDER BY id DESC LIMIT ?", (steps,) ).fetchall() for (name,) in applied: migration = next(m for m in MIGRATIONS if m.name == name) if migration.down: print(f"Rolling back: {name}") conn.executescript(migration.down) conn.execute("DELETE FROM migrations WHERE name = ?", (name,)) conn.commit() else: print(f"Cannot rollback {name}: no down migration") break ``` ## Async Migrations ```python import aiosqlite async def async_migrate(db_path: str, migrations: list[str]): """Apply migrations asynchronously.""" async with aiosqlite.connect(db_path) as db: await db.execute(""" CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER PRIMARY KEY ) """) result = await db.execute("SELECT MAX(version) FROM schema_version") row = await result.fetchone() current = row[0] if row[0] is not None else 0 for i, migration in enumerate(migrations[current:], start=current + 1): await db.executescript(migration) await db.execute( "INSERT INTO schema_version (version) VALUES (?)", (i,) ) await db.commit() ``` ## Safe Column Operations SQLite has limited ALTER TABLE support. Here are safe patterns: ### Adding Columns ```sql -- Safe: Add column with default ALTER TABLE items ADD COLUMN status TEXT DEFAULT 'active'; -- Safe: Add nullable column ALTER TABLE items ADD COLUMN notes TEXT; ``` ### Renaming Columns (SQLite 3.25+) ```sql -- Safe in SQLite 3.25+ ALTER TABLE items RENAME COLUMN old_name TO new_name; ``` ### Recreate Table Pattern For complex changes (dropping columns, changing types): ```python def recreate_table(conn: sqlite3.Connection): """Safely modify table structure by recreating.""" conn.executescript(""" -- 1. Rename old table ALTER TABLE items RENAME TO items_old; -- 2. Create new table with desired schema CREATE TABLE items ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, status TEXT DEFAULT 'active', -- dropped: old_column -- changed: type of some_column created_at TEXT DEFAULT (datetime('now')) ); -- 3. Copy data (mapping columns as needed) INSERT INTO items (id, name, status, created_at) SELECT id, name, COALESCE(status, 'active'), created_at FROM items_old; -- 4. Drop old table DROP TABLE items_old; -- 5. Recreate indexes CREATE INDEX idx_items_status ON items(status); """) conn.commit() ``` ## JSON in SQLite ### Storing JSON ```python import json def store_json(conn: sqlite3.Connection, key: str, data: dict): """Store JSON data.""" conn.execute( "INSERT OR REPLACE INTO json_store (key, data) VALUES (?, ?)", (key, json.dumps(data)) ) conn.commit() def get_json(conn: sqlite3.Connection, key: str) -> dict | None: """Retrieve JSON data.""" result = conn.execute( "SELECT data FROM json_store WHERE key = ?", (key,) ).fetchone() return json.loads(result[0]) if result else None ``` ### Querying JSON (SQLite 3.38+) ```sql -- Create table with JSON column CREATE TABLE events ( id INTEGER PRIMARY KEY, payload TEXT NOT NULL -- JSON ); -- Extract JSON field SELECT json_extract(payload, '$.type') as event_type FROM events; -- Filter by JSON value SELECT * FROM events WHERE json_extract(payload, '$.user_id') = 123; -- Get nested value SELECT json_extract(payload, '$.metadata.source') FROM events; -- Check if key exists SELECT * FROM events WHERE json_type(payload, '$.optional_field') IS NOT NULL; -- Array operations SELECT json_extract(payload, '$.tags[0]') FROM events; SELECT json_array_length(json_extract(payload, '$.tags')) FROM events; ``` ### JSON with Python ```python def query_json_field(conn: sqlite3.Connection, field: str, value: any) -> list: """Query by JSON field value.""" conn.row_factory = sqlite3.Row cursor = conn.execute( f"SELECT * FROM events WHERE json_extract(payload, '$.{field}') = ?", (value,) ) return [dict(row) for row in cursor.fetchall()] def update_json_field(conn: sqlite3.Connection, event_id: int, field: str, value: any): """Update specific JSON field.""" conn.execute( f"UPDATE events SET payload = json_set(payload, '$.{field}', ?) WHERE id = ?", (json.dumps(value) if isinstance(value, (dict, list)) else value, event_id) ) conn.commit() ``` ## CLI Quick Reference ```bash # Run migration from file sqlite3 mydb.sqlite < migrations/001_initial.sql # Check schema version sqlite3 mydb.sqlite "SELECT * FROM schema_version" # Export schema sqlite3 mydb.sqlite ".schema" > schema.sql # Dump with data sqlite3 mydb.sqlite ".dump" > backup.sql # Restore from dump sqlite3 newdb.sqlite < backup.sql # Compare schemas diff <(sqlite3 db1.sqlite ".schema") <(sqlite3 db2.sqlite ".schema") ```