migration-patterns.md 8.8 KB

SQLite Migration Patterns

Version-controlled schema migrations for SQLite databases.

Basic Migration Pattern

import sqlite3

MIGRATIONS = [
    # Version 1: Initial schema
    """
    CREATE TABLE IF NOT EXISTS items (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        created_at TEXT DEFAULT (datetime('now'))
    );
    """,
    # Version 2: Add status column
    """
    ALTER TABLE items ADD COLUMN status TEXT DEFAULT 'active';
    CREATE INDEX IF NOT EXISTS idx_items_status ON items(status);
    """,
    # Version 3: Add user reference
    """
    ALTER TABLE items ADD COLUMN user_id INTEGER;
    CREATE INDEX IF NOT EXISTS idx_items_user ON items(user_id);
    """,
]

def migrate(conn: sqlite3.Connection):
    """Apply pending migrations."""
    # Create version tracking table
    conn.execute("""
        CREATE TABLE IF NOT EXISTS schema_version (
            version INTEGER PRIMARY KEY,
            applied_at TEXT DEFAULT (datetime('now'))
        )
    """)

    # Get current version
    result = conn.execute(
        "SELECT MAX(version) FROM schema_version"
    ).fetchone()
    current = result[0] if result[0] is not None else 0

    # Apply pending migrations
    for i, migration in enumerate(MIGRATIONS[current:], start=current + 1):
        print(f"Applying migration {i}...")
        conn.executescript(migration)
        conn.execute(
            "INSERT INTO schema_version (version) VALUES (?)",
            (i,)
        )
        conn.commit()
        print(f"Migration {i} complete")

    print(f"Database at version {len(MIGRATIONS)}")

Named Migrations

from dataclasses import dataclass
from datetime import datetime

@dataclass
class Migration:
    name: str
    up: str
    down: str | None = None

MIGRATIONS = [
    Migration(
        name="001_initial_schema",
        up="""
            CREATE TABLE users (
                id INTEGER PRIMARY KEY,
                email TEXT UNIQUE NOT NULL,
                name TEXT,
                created_at TEXT DEFAULT (datetime('now'))
            );
            CREATE INDEX idx_users_email ON users(email);
        """,
        down="""
            DROP INDEX IF EXISTS idx_users_email;
            DROP TABLE IF EXISTS users;
        """
    ),
    Migration(
        name="002_add_orders",
        up="""
            CREATE TABLE orders (
                id INTEGER PRIMARY KEY,
                user_id INTEGER NOT NULL REFERENCES users(id),
                total REAL NOT NULL,
                status TEXT DEFAULT 'pending',
                created_at TEXT DEFAULT (datetime('now'))
            );
            CREATE INDEX idx_orders_user ON orders(user_id);
            CREATE INDEX idx_orders_status ON orders(status);
        """,
        down="""
            DROP TABLE IF EXISTS orders;
        """
    ),
]

def migrate_up(conn: sqlite3.Connection, target: int | None = None):
    """Apply migrations up to target version."""
    conn.execute("""
        CREATE TABLE IF NOT EXISTS migrations (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            applied_at TEXT DEFAULT (datetime('now'))
        )
    """)

    applied = {
        row[0] for row in
        conn.execute("SELECT name FROM migrations").fetchall()
    }

    target = target or len(MIGRATIONS)

    for i, migration in enumerate(MIGRATIONS[:target]):
        if migration.name not in applied:
            print(f"Applying: {migration.name}")
            conn.executescript(migration.up)
            conn.execute(
                "INSERT INTO migrations (name) VALUES (?)",
                (migration.name,)
            )
            conn.commit()

def migrate_down(conn: sqlite3.Connection, steps: int = 1):
    """Rollback migrations."""
    applied = conn.execute(
        "SELECT name FROM migrations ORDER BY id DESC LIMIT ?",
        (steps,)
    ).fetchall()

    for (name,) in applied:
        migration = next(m for m in MIGRATIONS if m.name == name)
        if migration.down:
            print(f"Rolling back: {name}")
            conn.executescript(migration.down)
            conn.execute("DELETE FROM migrations WHERE name = ?", (name,))
            conn.commit()
        else:
            print(f"Cannot rollback {name}: no down migration")
            break

Async Migrations

import aiosqlite

async def async_migrate(db_path: str, migrations: list[str]):
    """Apply migrations asynchronously."""
    async with aiosqlite.connect(db_path) as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS schema_version (
                version INTEGER PRIMARY KEY
            )
        """)

        result = await db.execute("SELECT MAX(version) FROM schema_version")
        row = await result.fetchone()
        current = row[0] if row[0] is not None else 0

        for i, migration in enumerate(migrations[current:], start=current + 1):
            await db.executescript(migration)
            await db.execute(
                "INSERT INTO schema_version (version) VALUES (?)",
                (i,)
            )
            await db.commit()

Safe Column Operations

SQLite has limited ALTER TABLE support. Here are safe patterns:

Adding Columns

-- Safe: Add column with default
ALTER TABLE items ADD COLUMN status TEXT DEFAULT 'active';

-- Safe: Add nullable column
ALTER TABLE items ADD COLUMN notes TEXT;

Renaming Columns (SQLite 3.25+)

-- Safe in SQLite 3.25+
ALTER TABLE items RENAME COLUMN old_name TO new_name;

Recreate Table Pattern

For complex changes (dropping columns, changing types):

def recreate_table(conn: sqlite3.Connection):
    """Safely modify table structure by recreating."""
    conn.executescript("""
        -- 1. Rename old table
        ALTER TABLE items RENAME TO items_old;

        -- 2. Create new table with desired schema
        CREATE TABLE items (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            status TEXT DEFAULT 'active',
            -- dropped: old_column
            -- changed: type of some_column
            created_at TEXT DEFAULT (datetime('now'))
        );

        -- 3. Copy data (mapping columns as needed)
        INSERT INTO items (id, name, status, created_at)
        SELECT id, name, COALESCE(status, 'active'), created_at
        FROM items_old;

        -- 4. Drop old table
        DROP TABLE items_old;

        -- 5. Recreate indexes
        CREATE INDEX idx_items_status ON items(status);
    """)
    conn.commit()

JSON in SQLite

Storing JSON

import json

def store_json(conn: sqlite3.Connection, key: str, data: dict):
    """Store JSON data."""
    conn.execute(
        "INSERT OR REPLACE INTO json_store (key, data) VALUES (?, ?)",
        (key, json.dumps(data))
    )
    conn.commit()

def get_json(conn: sqlite3.Connection, key: str) -> dict | None:
    """Retrieve JSON data."""
    result = conn.execute(
        "SELECT data FROM json_store WHERE key = ?", (key,)
    ).fetchone()
    return json.loads(result[0]) if result else None

Querying JSON (SQLite 3.38+)

-- Create table with JSON column
CREATE TABLE events (
    id INTEGER PRIMARY KEY,
    payload TEXT NOT NULL  -- JSON
);

-- Extract JSON field
SELECT json_extract(payload, '$.type') as event_type FROM events;

-- Filter by JSON value
SELECT * FROM events
WHERE json_extract(payload, '$.user_id') = 123;

-- Get nested value
SELECT json_extract(payload, '$.metadata.source') FROM events;

-- Check if key exists
SELECT * FROM events
WHERE json_type(payload, '$.optional_field') IS NOT NULL;

-- Array operations
SELECT json_extract(payload, '$.tags[0]') FROM events;
SELECT json_array_length(json_extract(payload, '$.tags')) FROM events;

JSON with Python

def query_json_field(conn: sqlite3.Connection, field: str, value: any) -> list:
    """Query by JSON field value."""
    conn.row_factory = sqlite3.Row
    cursor = conn.execute(
        f"SELECT * FROM events WHERE json_extract(payload, '$.{field}') = ?",
        (value,)
    )
    return [dict(row) for row in cursor.fetchall()]

def update_json_field(conn: sqlite3.Connection, event_id: int, field: str, value: any):
    """Update specific JSON field."""
    conn.execute(
        f"UPDATE events SET payload = json_set(payload, '$.{field}', ?) WHERE id = ?",
        (json.dumps(value) if isinstance(value, (dict, list)) else value, event_id)
    )
    conn.commit()

CLI Quick Reference

# Run migration from file
sqlite3 mydb.sqlite < migrations/001_initial.sql

# Check schema version
sqlite3 mydb.sqlite "SELECT * FROM schema_version"

# Export schema
sqlite3 mydb.sqlite ".schema" > schema.sql

# Dump with data
sqlite3 mydb.sqlite ".dump" > backup.sql

# Restore from dump
sqlite3 newdb.sqlite < backup.sql

# Compare schemas
diff <(sqlite3 db1.sqlite ".schema") <(sqlite3 db2.sqlite ".schema")