Version-controlled schema migrations for SQLite databases.
import sqlite3
MIGRATIONS = [
# Version 1: Initial schema
"""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
""",
# Version 2: Add status column
"""
ALTER TABLE items ADD COLUMN status TEXT DEFAULT 'active';
CREATE INDEX IF NOT EXISTS idx_items_status ON items(status);
""",
# Version 3: Add user reference
"""
ALTER TABLE items ADD COLUMN user_id INTEGER;
CREATE INDEX IF NOT EXISTS idx_items_user ON items(user_id);
""",
]
def migrate(conn: sqlite3.Connection):
"""Apply pending migrations."""
# Create version tracking table
conn.execute("""
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TEXT DEFAULT (datetime('now'))
)
""")
# Get current version
result = conn.execute(
"SELECT MAX(version) FROM schema_version"
).fetchone()
current = result[0] if result[0] is not None else 0
# Apply pending migrations
for i, migration in enumerate(MIGRATIONS[current:], start=current + 1):
print(f"Applying migration {i}...")
conn.executescript(migration)
conn.execute(
"INSERT INTO schema_version (version) VALUES (?)",
(i,)
)
conn.commit()
print(f"Migration {i} complete")
print(f"Database at version {len(MIGRATIONS)}")
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Migration:
name: str
up: str
down: str | None = None
MIGRATIONS = [
Migration(
name="001_initial_schema",
up="""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
name TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX idx_users_email ON users(email);
""",
down="""
DROP INDEX IF EXISTS idx_users_email;
DROP TABLE IF EXISTS users;
"""
),
Migration(
name="002_add_orders",
up="""
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id),
total REAL NOT NULL,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX idx_orders_user ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
""",
down="""
DROP TABLE IF EXISTS orders;
"""
),
]
def migrate_up(conn: sqlite3.Connection, target: int | None = None):
"""Apply migrations up to target version."""
conn.execute("""
CREATE TABLE IF NOT EXISTS migrations (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT DEFAULT (datetime('now'))
)
""")
applied = {
row[0] for row in
conn.execute("SELECT name FROM migrations").fetchall()
}
target = target or len(MIGRATIONS)
for i, migration in enumerate(MIGRATIONS[:target]):
if migration.name not in applied:
print(f"Applying: {migration.name}")
conn.executescript(migration.up)
conn.execute(
"INSERT INTO migrations (name) VALUES (?)",
(migration.name,)
)
conn.commit()
def migrate_down(conn: sqlite3.Connection, steps: int = 1):
"""Rollback migrations."""
applied = conn.execute(
"SELECT name FROM migrations ORDER BY id DESC LIMIT ?",
(steps,)
).fetchall()
for (name,) in applied:
migration = next(m for m in MIGRATIONS if m.name == name)
if migration.down:
print(f"Rolling back: {name}")
conn.executescript(migration.down)
conn.execute("DELETE FROM migrations WHERE name = ?", (name,))
conn.commit()
else:
print(f"Cannot rollback {name}: no down migration")
break
import aiosqlite
async def async_migrate(db_path: str, migrations: list[str]):
"""Apply migrations asynchronously."""
async with aiosqlite.connect(db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY
)
""")
result = await db.execute("SELECT MAX(version) FROM schema_version")
row = await result.fetchone()
current = row[0] if row[0] is not None else 0
for i, migration in enumerate(migrations[current:], start=current + 1):
await db.executescript(migration)
await db.execute(
"INSERT INTO schema_version (version) VALUES (?)",
(i,)
)
await db.commit()
SQLite has limited ALTER TABLE support. Here are safe patterns:
-- Safe: Add column with default
ALTER TABLE items ADD COLUMN status TEXT DEFAULT 'active';
-- Safe: Add nullable column
ALTER TABLE items ADD COLUMN notes TEXT;
-- Safe in SQLite 3.25+
ALTER TABLE items RENAME COLUMN old_name TO new_name;
For complex changes (dropping columns, changing types):
def recreate_table(conn: sqlite3.Connection):
"""Safely modify table structure by recreating."""
conn.executescript("""
-- 1. Rename old table
ALTER TABLE items RENAME TO items_old;
-- 2. Create new table with desired schema
CREATE TABLE items (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
status TEXT DEFAULT 'active',
-- dropped: old_column
-- changed: type of some_column
created_at TEXT DEFAULT (datetime('now'))
);
-- 3. Copy data (mapping columns as needed)
INSERT INTO items (id, name, status, created_at)
SELECT id, name, COALESCE(status, 'active'), created_at
FROM items_old;
-- 4. Drop old table
DROP TABLE items_old;
-- 5. Recreate indexes
CREATE INDEX idx_items_status ON items(status);
""")
conn.commit()
import json
def store_json(conn: sqlite3.Connection, key: str, data: dict):
"""Store JSON data."""
conn.execute(
"INSERT OR REPLACE INTO json_store (key, data) VALUES (?, ?)",
(key, json.dumps(data))
)
conn.commit()
def get_json(conn: sqlite3.Connection, key: str) -> dict | None:
"""Retrieve JSON data."""
result = conn.execute(
"SELECT data FROM json_store WHERE key = ?", (key,)
).fetchone()
return json.loads(result[0]) if result else None
-- Create table with JSON column
CREATE TABLE events (
id INTEGER PRIMARY KEY,
payload TEXT NOT NULL -- JSON
);
-- Extract JSON field
SELECT json_extract(payload, '$.type') as event_type FROM events;
-- Filter by JSON value
SELECT * FROM events
WHERE json_extract(payload, '$.user_id') = 123;
-- Get nested value
SELECT json_extract(payload, '$.metadata.source') FROM events;
-- Check if key exists
SELECT * FROM events
WHERE json_type(payload, '$.optional_field') IS NOT NULL;
-- Array operations
SELECT json_extract(payload, '$.tags[0]') FROM events;
SELECT json_array_length(json_extract(payload, '$.tags')) FROM events;
def query_json_field(conn: sqlite3.Connection, field: str, value: any) -> list:
"""Query by JSON field value."""
conn.row_factory = sqlite3.Row
cursor = conn.execute(
f"SELECT * FROM events WHERE json_extract(payload, '$.{field}') = ?",
(value,)
)
return [dict(row) for row in cursor.fetchall()]
def update_json_field(conn: sqlite3.Connection, event_id: int, field: str, value: any):
"""Update specific JSON field."""
conn.execute(
f"UPDATE events SET payload = json_set(payload, '$.{field}', ?) WHERE id = ?",
(json.dumps(value) if isinstance(value, (dict, list)) else value, event_id)
)
conn.commit()
# Run migration from file
sqlite3 mydb.sqlite < migrations/001_initial.sql
# Check schema version
sqlite3 mydb.sqlite "SELECT * FROM schema_version"
# Export schema
sqlite3 mydb.sqlite ".schema" > schema.sql
# Dump with data
sqlite3 mydb.sqlite ".dump" > backup.sql
# Restore from dump
sqlite3 newdb.sqlite < backup.sql
# Compare schemas
diff <(sqlite3 db1.sqlite ".schema") <(sqlite3 db2.sqlite ".schema")