schema-patterns.md 5.5 KB

SQLite Schema Patterns

Common schema designs for state management, caching, logging, and deduplication.

State/Config Storage

Key-value store with automatic timestamps:

CREATE TABLE IF NOT EXISTS app_state (
    key TEXT PRIMARY KEY,
    value TEXT NOT NULL,
    updated_at TEXT DEFAULT (datetime('now'))
);

-- Upsert pattern (insert or update)
INSERT INTO app_state (key, value) VALUES ('last_sync', '2024-01-15')
ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = datetime('now');

-- Get value
SELECT value FROM app_state WHERE key = 'last_sync';

-- Get all state
SELECT * FROM app_state ORDER BY updated_at DESC;

Cache Table

Time-based cache with expiry cleanup:

CREATE TABLE IF NOT EXISTS cache (
    key TEXT PRIMARY KEY,
    value TEXT NOT NULL,
    expires_at TEXT NOT NULL,
    created_at TEXT DEFAULT (datetime('now'))
);

-- Create index for expiry cleanup
CREATE INDEX IF NOT EXISTS idx_cache_expires ON cache(expires_at);

-- Insert with 1 hour TTL
INSERT INTO cache (key, value, expires_at)
VALUES ('user:123', '{"name": "Alice"}', datetime('now', '+1 hour'))
ON CONFLICT(key) DO UPDATE SET
    value = excluded.value,
    expires_at = excluded.expires_at;

-- Get non-expired value
SELECT value FROM cache
WHERE key = 'user:123' AND expires_at > datetime('now');

-- Cleanup expired entries
DELETE FROM cache WHERE expires_at < datetime('now');

Event/Log Table

Append-only event log with type indexing:

CREATE TABLE IF NOT EXISTS events (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    event_type TEXT NOT NULL,
    payload TEXT,  -- JSON
    created_at TEXT DEFAULT (datetime('now'))
);

-- Index for type + date queries
CREATE INDEX IF NOT EXISTS idx_events_type_date ON events(event_type, created_at);

-- Insert event
INSERT INTO events (event_type, payload)
VALUES ('user_login', '{"user_id": 123, "ip": "10.0.0.1"}');

-- Get recent events by type
SELECT * FROM events
WHERE event_type = 'user_login'
AND created_at > datetime('now', '-1 day')
ORDER BY created_at DESC;

-- Count by type
SELECT event_type, COUNT(*) as count
FROM events
GROUP BY event_type;

Deduplication Table

Track seen items to avoid reprocessing:

CREATE TABLE IF NOT EXISTS seen_items (
    hash TEXT PRIMARY KEY,
    source TEXT NOT NULL,
    first_seen TEXT DEFAULT (datetime('now'))
);

-- Check if seen
SELECT 1 FROM seen_items WHERE hash = ? LIMIT 1;

-- Mark as seen
INSERT OR IGNORE INTO seen_items (hash, source) VALUES (?, ?);

-- Get sources for hash
SELECT source, first_seen FROM seen_items WHERE hash = ?;

-- Cleanup old entries
DELETE FROM seen_items WHERE first_seen < datetime('now', '-30 days');

Queue Table

Simple job queue with status tracking:

CREATE TABLE IF NOT EXISTS job_queue (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_type TEXT NOT NULL,
    payload TEXT NOT NULL,  -- JSON
    status TEXT DEFAULT 'pending',  -- pending, processing, completed, failed
    priority INTEGER DEFAULT 0,
    created_at TEXT DEFAULT (datetime('now')),
    started_at TEXT,
    completed_at TEXT,
    error TEXT
);

-- Index for fetching next job
CREATE INDEX IF NOT EXISTS idx_jobs_status_priority
ON job_queue(status, priority DESC, created_at);

-- Claim next job (atomic with transaction)
UPDATE job_queue
SET status = 'processing', started_at = datetime('now')
WHERE id = (
    SELECT id FROM job_queue
    WHERE status = 'pending'
    ORDER BY priority DESC, created_at
    LIMIT 1
)
RETURNING *;

-- Complete job
UPDATE job_queue
SET status = 'completed', completed_at = datetime('now')
WHERE id = ?;

-- Fail job
UPDATE job_queue
SET status = 'failed', completed_at = datetime('now'), error = ?
WHERE id = ?;

Session Table

User sessions with expiry:

CREATE TABLE IF NOT EXISTS sessions (
    token TEXT PRIMARY KEY,
    user_id INTEGER NOT NULL,
    data TEXT,  -- JSON
    created_at TEXT DEFAULT (datetime('now')),
    expires_at TEXT NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at);

-- Create session (1 week expiry)
INSERT INTO sessions (token, user_id, data, expires_at)
VALUES (?, ?, '{}', datetime('now', '+7 days'));

-- Get valid session
SELECT * FROM sessions
WHERE token = ? AND expires_at > datetime('now');

-- Extend session
UPDATE sessions
SET expires_at = datetime('now', '+7 days')
WHERE token = ?;

-- Delete session
DELETE FROM sessions WHERE token = ?;

-- Cleanup expired
DELETE FROM sessions WHERE expires_at < datetime('now');

Full-Text Search Table

Using SQLite FTS5:

-- Create FTS table
CREATE VIRTUAL TABLE IF NOT EXISTS documents_fts USING fts5(
    title,
    content,
    content='documents',
    content_rowid='id'
);

-- Trigger to keep FTS in sync
CREATE TRIGGER documents_ai AFTER INSERT ON documents BEGIN
    INSERT INTO documents_fts(rowid, title, content)
    VALUES (new.id, new.title, new.content);
END;

CREATE TRIGGER documents_ad AFTER DELETE ON documents BEGIN
    INSERT INTO documents_fts(documents_fts, rowid, title, content)
    VALUES('delete', old.id, old.title, old.content);
END;

CREATE TRIGGER documents_au AFTER UPDATE ON documents BEGIN
    INSERT INTO documents_fts(documents_fts, rowid, title, content)
    VALUES('delete', old.id, old.title, old.content);
    INSERT INTO documents_fts(rowid, title, content)
    VALUES (new.id, new.title, new.content);
END;

-- Search
SELECT * FROM documents_fts WHERE documents_fts MATCH 'search query';

-- Ranked search
SELECT *, rank FROM documents_fts
WHERE documents_fts MATCH 'query'
ORDER BY rank;