# SQLite Async Patterns Python aiosqlite patterns for async applications. ## Async Connection ```python import aiosqlite async def get_async_connection(db_path: str) -> aiosqlite.Connection: """Create async connection with best practices.""" conn = await aiosqlite.connect(db_path) conn.row_factory = aiosqlite.Row await conn.execute("PRAGMA journal_mode=WAL") await conn.execute("PRAGMA foreign_keys=ON") return conn ``` ## Context Manager Pattern ```python async def query_items(db_path: str, status: str) -> list[dict]: """Query with automatic connection cleanup.""" async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM items WHERE status = ?", (status,) ) as cursor: rows = await cursor.fetchall() return [dict(row) for row in rows] ``` ## Async CRUD Operations ### Create ```python async def create_item(db_path: str, name: str, data: dict) -> int: """Insert and return new ID.""" async with aiosqlite.connect(db_path) as db: cursor = await db.execute( "INSERT INTO items (name, data) VALUES (?, ?)", (name, json.dumps(data)) ) await db.commit() return cursor.lastrowid ``` ### Read ```python async def get_item(db_path: str, item_id: int) -> dict | None: """Get single item by ID.""" async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM items WHERE id = ?", (item_id,) ) as cursor: row = await cursor.fetchone() return dict(row) if row else None ``` ### Update ```python async def update_item(db_path: str, item_id: int, **updates) -> bool: """Update item fields.""" if not updates: return False set_clause = ", ".join(f"{k} = ?" for k in updates.keys()) values = list(updates.values()) + [item_id] async with aiosqlite.connect(db_path) as db: cursor = await db.execute( f"UPDATE items SET {set_clause} WHERE id = ?", values ) await db.commit() return cursor.rowcount > 0 ``` ### Delete ```python async def delete_item(db_path: str, item_id: int) -> bool: """Delete item by ID.""" async with aiosqlite.connect(db_path) as db: cursor = await db.execute( "DELETE FROM items WHERE id = ?", (item_id,) ) await db.commit() return cursor.rowcount > 0 ``` ## Batch Operations ### Batch Insert ```python async def batch_insert(db_path: str, items: list[dict]) -> int: """Insert multiple items efficiently.""" async with aiosqlite.connect(db_path) as db: await db.executemany( "INSERT INTO items (name, data) VALUES (?, ?)", [(i["name"], json.dumps(i.get("data", {}))) for i in items] ) await db.commit() return len(items) ``` ### Batch Update ```python async def batch_update_status(db_path: str, ids: list[int], status: str) -> int: """Update status for multiple items.""" async with aiosqlite.connect(db_path) as db: cursor = await db.executemany( "UPDATE items SET status = ? WHERE id = ?", [(status, id) for id in ids] ) await db.commit() return len(ids) ``` ### Batch with Transaction ```python async def batch_transfer(db_path: str, transfers: list[tuple[int, int, float]]) -> None: """Transfer amounts between accounts atomically.""" async with aiosqlite.connect(db_path) as db: try: for from_id, to_id, amount in transfers: await db.execute( "UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, from_id) ) await db.execute( "UPDATE accounts SET balance = balance + ? WHERE id = ?", (amount, to_id) ) await db.commit() except Exception: await db.rollback() raise ``` ## Connection Pool Pattern ```python from contextlib import asynccontextmanager import asyncio class AsyncDBPool: """Simple connection pool for aiosqlite.""" def __init__(self, db_path: str, max_connections: int = 5): self.db_path = db_path self.max_connections = max_connections self._pool: asyncio.Queue[aiosqlite.Connection] = asyncio.Queue() self._created = 0 self._lock = asyncio.Lock() async def _create_connection(self) -> aiosqlite.Connection: conn = await aiosqlite.connect(self.db_path) conn.row_factory = aiosqlite.Row await conn.execute("PRAGMA journal_mode=WAL") return conn @asynccontextmanager async def acquire(self): # Try to get from pool try: conn = self._pool.get_nowait() except asyncio.QueueEmpty: # Create new if under limit async with self._lock: if self._created < self.max_connections: conn = await self._create_connection() self._created += 1 else: # Wait for one to be returned conn = await self._pool.get() try: yield conn finally: # Return to pool await self._pool.put(conn) async def close_all(self): while not self._pool.empty(): conn = await self._pool.get() await conn.close() self._created = 0 # Usage pool = AsyncDBPool("mydb.sqlite") async def get_user(user_id: int): async with pool.acquire() as db: async with db.execute( "SELECT * FROM users WHERE id = ?", (user_id,) ) as cursor: return await cursor.fetchone() ``` ## Streaming Large Results ```python async def stream_items(db_path: str, batch_size: int = 1000): """Yield items in batches to avoid memory issues.""" async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row async with db.execute("SELECT * FROM items ORDER BY id") as cursor: while True: rows = await cursor.fetchmany(batch_size) if not rows: break for row in rows: yield dict(row) ``` ## Concurrent Queries ```python async def get_dashboard_data(db_path: str, user_id: int) -> dict: """Run multiple queries concurrently.""" async with aiosqlite.connect(db_path) as db: db.row_factory = aiosqlite.Row # Execute queries concurrently user_task = db.execute("SELECT * FROM users WHERE id = ?", (user_id,)) orders_task = db.execute( "SELECT * FROM orders WHERE user_id = ? ORDER BY created_at DESC LIMIT 10", (user_id,) ) stats_task = db.execute( "SELECT COUNT(*) as count, SUM(total) as total FROM orders WHERE user_id = ?", (user_id,) ) # Await all user_cursor, orders_cursor, stats_cursor = await asyncio.gather( user_task, orders_task, stats_task ) return { "user": dict(await user_cursor.fetchone()), "recent_orders": [dict(r) for r in await orders_cursor.fetchall()], "stats": dict(await stats_cursor.fetchone()), } ``` ## Error Handling ```python import aiosqlite from sqlite3 import IntegrityError, OperationalError async def safe_insert(db_path: str, data: dict) -> tuple[bool, str]: """Insert with comprehensive error handling.""" try: async with aiosqlite.connect(db_path) as db: await db.execute( "INSERT INTO items (name, value) VALUES (?, ?)", (data["name"], data["value"]) ) await db.commit() return True, "Success" except IntegrityError as e: if "UNIQUE constraint" in str(e): return False, "Duplicate entry" elif "FOREIGN KEY constraint" in str(e): return False, "Referenced record not found" return False, f"Integrity error: {e}" except OperationalError as e: if "database is locked" in str(e): return False, "Database busy, try again" elif "no such table" in str(e): return False, "Table not found" return False, f"Database error: {e}" except Exception as e: return False, f"Unexpected error: {e}" ```