Python aiosqlite patterns for async applications.
import aiosqlite
async def get_async_connection(db_path: str) -> aiosqlite.Connection:
"""Create async connection with best practices."""
conn = await aiosqlite.connect(db_path)
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA journal_mode=WAL")
await conn.execute("PRAGMA foreign_keys=ON")
return conn
async def query_items(db_path: str, status: str) -> list[dict]:
"""Query with automatic connection cleanup."""
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM items WHERE status = ?", (status,)
) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def create_item(db_path: str, name: str, data: dict) -> int:
"""Insert and return new ID."""
async with aiosqlite.connect(db_path) as db:
cursor = await db.execute(
"INSERT INTO items (name, data) VALUES (?, ?)",
(name, json.dumps(data))
)
await db.commit()
return cursor.lastrowid
async def get_item(db_path: str, item_id: int) -> dict | None:
"""Get single item by ID."""
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM items WHERE id = ?", (item_id,)
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def update_item(db_path: str, item_id: int, **updates) -> bool:
"""Update item fields."""
if not updates:
return False
set_clause = ", ".join(f"{k} = ?" for k in updates.keys())
values = list(updates.values()) + [item_id]
async with aiosqlite.connect(db_path) as db:
cursor = await db.execute(
f"UPDATE items SET {set_clause} WHERE id = ?",
values
)
await db.commit()
return cursor.rowcount > 0
async def delete_item(db_path: str, item_id: int) -> bool:
"""Delete item by ID."""
async with aiosqlite.connect(db_path) as db:
cursor = await db.execute(
"DELETE FROM items WHERE id = ?", (item_id,)
)
await db.commit()
return cursor.rowcount > 0
async def batch_insert(db_path: str, items: list[dict]) -> int:
"""Insert multiple items efficiently."""
async with aiosqlite.connect(db_path) as db:
await db.executemany(
"INSERT INTO items (name, data) VALUES (?, ?)",
[(i["name"], json.dumps(i.get("data", {}))) for i in items]
)
await db.commit()
return len(items)
async def batch_update_status(db_path: str, ids: list[int], status: str) -> int:
"""Update status for multiple items."""
async with aiosqlite.connect(db_path) as db:
cursor = await db.executemany(
"UPDATE items SET status = ? WHERE id = ?",
[(status, id) for id in ids]
)
await db.commit()
return len(ids)
async def batch_transfer(db_path: str, transfers: list[tuple[int, int, float]]) -> None:
"""Transfer amounts between accounts atomically."""
async with aiosqlite.connect(db_path) as db:
try:
for from_id, to_id, amount in transfers:
await db.execute(
"UPDATE accounts SET balance = balance - ? WHERE id = ?",
(amount, from_id)
)
await db.execute(
"UPDATE accounts SET balance = balance + ? WHERE id = ?",
(amount, to_id)
)
await db.commit()
except Exception:
await db.rollback()
raise
from contextlib import asynccontextmanager
import asyncio
class AsyncDBPool:
"""Simple connection pool for aiosqlite."""
def __init__(self, db_path: str, max_connections: int = 5):
self.db_path = db_path
self.max_connections = max_connections
self._pool: asyncio.Queue[aiosqlite.Connection] = asyncio.Queue()
self._created = 0
self._lock = asyncio.Lock()
async def _create_connection(self) -> aiosqlite.Connection:
conn = await aiosqlite.connect(self.db_path)
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA journal_mode=WAL")
return conn
@asynccontextmanager
async def acquire(self):
# Try to get from pool
try:
conn = self._pool.get_nowait()
except asyncio.QueueEmpty:
# Create new if under limit
async with self._lock:
if self._created < self.max_connections:
conn = await self._create_connection()
self._created += 1
else:
# Wait for one to be returned
conn = await self._pool.get()
try:
yield conn
finally:
# Return to pool
await self._pool.put(conn)
async def close_all(self):
while not self._pool.empty():
conn = await self._pool.get()
await conn.close()
self._created = 0
# Usage
pool = AsyncDBPool("mydb.sqlite")
async def get_user(user_id: int):
async with pool.acquire() as db:
async with db.execute(
"SELECT * FROM users WHERE id = ?", (user_id,)
) as cursor:
return await cursor.fetchone()
async def stream_items(db_path: str, batch_size: int = 1000):
"""Yield items in batches to avoid memory issues."""
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM items ORDER BY id") as cursor:
while True:
rows = await cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
yield dict(row)
async def get_dashboard_data(db_path: str, user_id: int) -> dict:
"""Run multiple queries concurrently."""
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
# Execute queries concurrently
user_task = db.execute("SELECT * FROM users WHERE id = ?", (user_id,))
orders_task = db.execute(
"SELECT * FROM orders WHERE user_id = ? ORDER BY created_at DESC LIMIT 10",
(user_id,)
)
stats_task = db.execute(
"SELECT COUNT(*) as count, SUM(total) as total FROM orders WHERE user_id = ?",
(user_id,)
)
# Await all
user_cursor, orders_cursor, stats_cursor = await asyncio.gather(
user_task, orders_task, stats_task
)
return {
"user": dict(await user_cursor.fetchone()),
"recent_orders": [dict(r) for r in await orders_cursor.fetchall()],
"stats": dict(await stats_cursor.fetchone()),
}
import aiosqlite
from sqlite3 import IntegrityError, OperationalError
async def safe_insert(db_path: str, data: dict) -> tuple[bool, str]:
"""Insert with comprehensive error handling."""
try:
async with aiosqlite.connect(db_path) as db:
await db.execute(
"INSERT INTO items (name, value) VALUES (?, ?)",
(data["name"], data["value"])
)
await db.commit()
return True, "Success"
except IntegrityError as e:
if "UNIQUE constraint" in str(e):
return False, "Duplicate entry"
elif "FOREIGN KEY constraint" in str(e):
return False, "Referenced record not found"
return False, f"Integrity error: {e}"
except OperationalError as e:
if "database is locked" in str(e):
return False, "Database busy, try again"
elif "no such table" in str(e):
return False, "Table not found"
return False, f"Database error: {e}"
except Exception as e:
return False, f"Unexpected error: {e}"