Common schema designs for state management, caching, logging, and deduplication.
Key-value store with automatic timestamps:
CREATE TABLE IF NOT EXISTS app_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT DEFAULT (datetime('now'))
);
-- Upsert pattern (insert or update)
INSERT INTO app_state (key, value) VALUES ('last_sync', '2024-01-15')
ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = datetime('now');
-- Get value
SELECT value FROM app_state WHERE key = 'last_sync';
-- Get all state
SELECT * FROM app_state ORDER BY updated_at DESC;
Time-based cache with expiry cleanup:
CREATE TABLE IF NOT EXISTS cache (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
expires_at TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
-- Create index for expiry cleanup
CREATE INDEX IF NOT EXISTS idx_cache_expires ON cache(expires_at);
-- Insert with 1 hour TTL
INSERT INTO cache (key, value, expires_at)
VALUES ('user:123', '{"name": "Alice"}', datetime('now', '+1 hour'))
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
expires_at = excluded.expires_at;
-- Get non-expired value
SELECT value FROM cache
WHERE key = 'user:123' AND expires_at > datetime('now');
-- Cleanup expired entries
DELETE FROM cache WHERE expires_at < datetime('now');
Append-only event log with type indexing:
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
payload TEXT, -- JSON
created_at TEXT DEFAULT (datetime('now'))
);
-- Index for type + date queries
CREATE INDEX IF NOT EXISTS idx_events_type_date ON events(event_type, created_at);
-- Insert event
INSERT INTO events (event_type, payload)
VALUES ('user_login', '{"user_id": 123, "ip": "10.0.0.1"}');
-- Get recent events by type
SELECT * FROM events
WHERE event_type = 'user_login'
AND created_at > datetime('now', '-1 day')
ORDER BY created_at DESC;
-- Count by type
SELECT event_type, COUNT(*) as count
FROM events
GROUP BY event_type;
Track seen items to avoid reprocessing:
CREATE TABLE IF NOT EXISTS seen_items (
hash TEXT PRIMARY KEY,
source TEXT NOT NULL,
first_seen TEXT DEFAULT (datetime('now'))
);
-- Check if seen
SELECT 1 FROM seen_items WHERE hash = ? LIMIT 1;
-- Mark as seen
INSERT OR IGNORE INTO seen_items (hash, source) VALUES (?, ?);
-- Get sources for hash
SELECT source, first_seen FROM seen_items WHERE hash = ?;
-- Cleanup old entries
DELETE FROM seen_items WHERE first_seen < datetime('now', '-30 days');
Simple job queue with status tracking:
CREATE TABLE IF NOT EXISTS job_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_type TEXT NOT NULL,
payload TEXT NOT NULL, -- JSON
status TEXT DEFAULT 'pending', -- pending, processing, completed, failed
priority INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
completed_at TEXT,
error TEXT
);
-- Index for fetching next job
CREATE INDEX IF NOT EXISTS idx_jobs_status_priority
ON job_queue(status, priority DESC, created_at);
-- Claim next job (atomic with transaction)
UPDATE job_queue
SET status = 'processing', started_at = datetime('now')
WHERE id = (
SELECT id FROM job_queue
WHERE status = 'pending'
ORDER BY priority DESC, created_at
LIMIT 1
)
RETURNING *;
-- Complete job
UPDATE job_queue
SET status = 'completed', completed_at = datetime('now')
WHERE id = ?;
-- Fail job
UPDATE job_queue
SET status = 'failed', completed_at = datetime('now'), error = ?
WHERE id = ?;
User sessions with expiry:
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
data TEXT, -- JSON
created_at TEXT DEFAULT (datetime('now')),
expires_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at);
-- Create session (1 week expiry)
INSERT INTO sessions (token, user_id, data, expires_at)
VALUES (?, ?, '{}', datetime('now', '+7 days'));
-- Get valid session
SELECT * FROM sessions
WHERE token = ? AND expires_at > datetime('now');
-- Extend session
UPDATE sessions
SET expires_at = datetime('now', '+7 days')
WHERE token = ?;
-- Delete session
DELETE FROM sessions WHERE token = ?;
-- Cleanup expired
DELETE FROM sessions WHERE expires_at < datetime('now');
Using SQLite FTS5:
-- Create FTS table
CREATE VIRTUAL TABLE IF NOT EXISTS documents_fts USING fts5(
title,
content,
content='documents',
content_rowid='id'
);
-- Trigger to keep FTS in sync
CREATE TRIGGER documents_ai AFTER INSERT ON documents BEGIN
INSERT INTO documents_fts(rowid, title, content)
VALUES (new.id, new.title, new.content);
END;
CREATE TRIGGER documents_ad AFTER DELETE ON documents BEGIN
INSERT INTO documents_fts(documents_fts, rowid, title, content)
VALUES('delete', old.id, old.title, old.content);
END;
CREATE TRIGGER documents_au AFTER UPDATE ON documents BEGIN
INSERT INTO documents_fts(documents_fts, rowid, title, content)
VALUES('delete', old.id, old.title, old.content);
INSERT INTO documents_fts(rowid, title, content)
VALUES (new.id, new.title, new.content);
END;
-- Search
SELECT * FROM documents_fts WHERE documents_fts MATCH 'search query';
-- Ranked search
SELECT *, rank FROM documents_fts
WHERE documents_fts MATCH 'query'
ORDER BY rank;