async-patterns.md 8.4 KB

SQLite Async Patterns

Python aiosqlite patterns for async applications.

Async Connection

import aiosqlite

async def get_async_connection(db_path: str) -> aiosqlite.Connection:
    """Create async connection with best practices."""
    conn = await aiosqlite.connect(db_path)
    conn.row_factory = aiosqlite.Row
    await conn.execute("PRAGMA journal_mode=WAL")
    await conn.execute("PRAGMA foreign_keys=ON")
    return conn

Context Manager Pattern

async def query_items(db_path: str, status: str) -> list[dict]:
    """Query with automatic connection cleanup."""
    async with aiosqlite.connect(db_path) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute(
            "SELECT * FROM items WHERE status = ?", (status,)
        ) as cursor:
            rows = await cursor.fetchall()
            return [dict(row) for row in rows]

Async CRUD Operations

Create

async def create_item(db_path: str, name: str, data: dict) -> int:
    """Insert and return new ID."""
    async with aiosqlite.connect(db_path) as db:
        cursor = await db.execute(
            "INSERT INTO items (name, data) VALUES (?, ?)",
            (name, json.dumps(data))
        )
        await db.commit()
        return cursor.lastrowid

Read

async def get_item(db_path: str, item_id: int) -> dict | None:
    """Get single item by ID."""
    async with aiosqlite.connect(db_path) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute(
            "SELECT * FROM items WHERE id = ?", (item_id,)
        ) as cursor:
            row = await cursor.fetchone()
            return dict(row) if row else None

Update

async def update_item(db_path: str, item_id: int, **updates) -> bool:
    """Update item fields."""
    if not updates:
        return False

    set_clause = ", ".join(f"{k} = ?" for k in updates.keys())
    values = list(updates.values()) + [item_id]

    async with aiosqlite.connect(db_path) as db:
        cursor = await db.execute(
            f"UPDATE items SET {set_clause} WHERE id = ?",
            values
        )
        await db.commit()
        return cursor.rowcount > 0

Delete

async def delete_item(db_path: str, item_id: int) -> bool:
    """Delete item by ID."""
    async with aiosqlite.connect(db_path) as db:
        cursor = await db.execute(
            "DELETE FROM items WHERE id = ?", (item_id,)
        )
        await db.commit()
        return cursor.rowcount > 0

Batch Operations

Batch Insert

async def batch_insert(db_path: str, items: list[dict]) -> int:
    """Insert multiple items efficiently."""
    async with aiosqlite.connect(db_path) as db:
        await db.executemany(
            "INSERT INTO items (name, data) VALUES (?, ?)",
            [(i["name"], json.dumps(i.get("data", {}))) for i in items]
        )
        await db.commit()
        return len(items)

Batch Update

async def batch_update_status(db_path: str, ids: list[int], status: str) -> int:
    """Update status for multiple items."""
    async with aiosqlite.connect(db_path) as db:
        cursor = await db.executemany(
            "UPDATE items SET status = ? WHERE id = ?",
            [(status, id) for id in ids]
        )
        await db.commit()
        return len(ids)

Batch with Transaction

async def batch_transfer(db_path: str, transfers: list[tuple[int, int, float]]) -> None:
    """Transfer amounts between accounts atomically."""
    async with aiosqlite.connect(db_path) as db:
        try:
            for from_id, to_id, amount in transfers:
                await db.execute(
                    "UPDATE accounts SET balance = balance - ? WHERE id = ?",
                    (amount, from_id)
                )
                await db.execute(
                    "UPDATE accounts SET balance = balance + ? WHERE id = ?",
                    (amount, to_id)
                )
            await db.commit()
        except Exception:
            await db.rollback()
            raise

Connection Pool Pattern

from contextlib import asynccontextmanager
import asyncio

class AsyncDBPool:
    """Simple connection pool for aiosqlite."""

    def __init__(self, db_path: str, max_connections: int = 5):
        self.db_path = db_path
        self.max_connections = max_connections
        self._pool: asyncio.Queue[aiosqlite.Connection] = asyncio.Queue()
        self._created = 0
        self._lock = asyncio.Lock()

    async def _create_connection(self) -> aiosqlite.Connection:
        conn = await aiosqlite.connect(self.db_path)
        conn.row_factory = aiosqlite.Row
        await conn.execute("PRAGMA journal_mode=WAL")
        return conn

    @asynccontextmanager
    async def acquire(self):
        # Try to get from pool
        try:
            conn = self._pool.get_nowait()
        except asyncio.QueueEmpty:
            # Create new if under limit
            async with self._lock:
                if self._created < self.max_connections:
                    conn = await self._create_connection()
                    self._created += 1
                else:
                    # Wait for one to be returned
                    conn = await self._pool.get()

        try:
            yield conn
        finally:
            # Return to pool
            await self._pool.put(conn)

    async def close_all(self):
        while not self._pool.empty():
            conn = await self._pool.get()
            await conn.close()
        self._created = 0

# Usage
pool = AsyncDBPool("mydb.sqlite")

async def get_user(user_id: int):
    async with pool.acquire() as db:
        async with db.execute(
            "SELECT * FROM users WHERE id = ?", (user_id,)
        ) as cursor:
            return await cursor.fetchone()

Streaming Large Results

async def stream_items(db_path: str, batch_size: int = 1000):
    """Yield items in batches to avoid memory issues."""
    async with aiosqlite.connect(db_path) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute("SELECT * FROM items ORDER BY id") as cursor:
            while True:
                rows = await cursor.fetchmany(batch_size)
                if not rows:
                    break
                for row in rows:
                    yield dict(row)

Concurrent Queries

async def get_dashboard_data(db_path: str, user_id: int) -> dict:
    """Run multiple queries concurrently."""
    async with aiosqlite.connect(db_path) as db:
        db.row_factory = aiosqlite.Row

        # Execute queries concurrently
        user_task = db.execute("SELECT * FROM users WHERE id = ?", (user_id,))
        orders_task = db.execute(
            "SELECT * FROM orders WHERE user_id = ? ORDER BY created_at DESC LIMIT 10",
            (user_id,)
        )
        stats_task = db.execute(
            "SELECT COUNT(*) as count, SUM(total) as total FROM orders WHERE user_id = ?",
            (user_id,)
        )

        # Await all
        user_cursor, orders_cursor, stats_cursor = await asyncio.gather(
            user_task, orders_task, stats_task
        )

        return {
            "user": dict(await user_cursor.fetchone()),
            "recent_orders": [dict(r) for r in await orders_cursor.fetchall()],
            "stats": dict(await stats_cursor.fetchone()),
        }

Error Handling

import aiosqlite
from sqlite3 import IntegrityError, OperationalError

async def safe_insert(db_path: str, data: dict) -> tuple[bool, str]:
    """Insert with comprehensive error handling."""
    try:
        async with aiosqlite.connect(db_path) as db:
            await db.execute(
                "INSERT INTO items (name, value) VALUES (?, ?)",
                (data["name"], data["value"])
            )
            await db.commit()
            return True, "Success"

    except IntegrityError as e:
        if "UNIQUE constraint" in str(e):
            return False, "Duplicate entry"
        elif "FOREIGN KEY constraint" in str(e):
            return False, "Referenced record not found"
        return False, f"Integrity error: {e}"

    except OperationalError as e:
        if "database is locked" in str(e):
            return False, "Database busy, try again"
        elif "no such table" in str(e):
            return False, "Table not found"
        return False, f"Database error: {e}"

    except Exception as e:
        return False, f"Unexpected error: {e}"