Modern async database patterns with SQLAlchemy 2.0.
from sqlalchemy.ext.asyncio import (
AsyncSession,
AsyncEngine,
async_sessionmaker,
create_async_engine,
)
# Create async engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
echo=False, # SQL logging
pool_size=5, # Connection pool size
max_overflow=10, # Extra connections allowed
pool_pre_ping=True, # Test connections before use
pool_recycle=3600, # Recycle connections after 1 hour
)
# Session factory (not the session itself)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Don't expire objects after commit
)
# Usage with context manager
async def get_users():
async with async_session_factory() as session:
result = await session.execute(select(User))
return result.scalars().all()
# Per-request (FastAPI dependency)
async def get_db():
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Explicit transaction control
async def transfer_funds(from_id: int, to_id: int, amount: Decimal):
async with async_session_factory() as session:
async with session.begin(): # Auto-commit on success
from_account = await session.get(Account, from_id)
to_account = await session.get(Account, to_id)
from_account.balance -= amount
to_account.balance += amount
# Commits automatically if no exception
# Nested transactions (savepoints)
async def complex_operation():
async with async_session_factory() as session:
async with session.begin():
# Outer transaction
user = User(name="Test")
session.add(user)
try:
async with session.begin_nested(): # Savepoint
# Inner operation that might fail
await risky_operation(session)
except RiskyOperationError:
# Savepoint rolled back, outer continues
pass
await session.commit()
from sqlalchemy.orm import selectinload, joinedload, subqueryload
# WRONG - lazy loading doesn't work in async
async def bad_example():
async with async_session_factory() as session:
user = await session.get(User, 1)
# This raises an error!
print(user.posts) # MissingGreenlet error
# CORRECT - eager loading
async def good_example():
async with async_session_factory() as session:
# Option 1: selectinload (separate query per relationship)
stmt = select(User).options(selectinload(User.posts))
result = await session.execute(stmt)
user = result.scalar_one()
print(user.posts) # Works!
# Option 2: joinedload (single JOIN query)
stmt = select(User).options(joinedload(User.profile))
result = await session.execute(stmt)
user = result.scalar_one()
# With nested relationships
stmt = select(User).options(
selectinload(User.posts).selectinload(Post.comments)
)
from fastapi import Depends
from typing import Annotated, AsyncGenerator
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dependency for FastAPI."""
async with async_session_factory() as session:
yield session
DB = Annotated[AsyncSession, Depends(get_db)]
# With automatic transaction handling
async def get_db_with_transaction() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
from sqlalchemy import insert, update, delete
# Bulk insert
async def bulk_create_users(users_data: list[dict]):
async with async_session_factory() as session:
stmt = insert(User).values(users_data)
await session.execute(stmt)
await session.commit()
# Bulk update
async def deactivate_users(user_ids: list[int]):
async with async_session_factory() as session:
stmt = (
update(User)
.where(User.id.in_(user_ids))
.values(is_active=False)
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
# Bulk delete
async def delete_old_posts(before_date: datetime):
async with async_session_factory() as session:
stmt = delete(Post).where(Post.created_at < before_date)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
# Batch processing with chunks
async def process_all_users(batch_size: int = 100):
async with async_session_factory() as session:
offset = 0
while True:
stmt = select(User).offset(offset).limit(batch_size)
result = await session.execute(stmt)
users = result.scalars().all()
if not users:
break
for user in users:
await process_user(user)
await session.commit()
offset += batch_size
from sqlalchemy import select
async def stream_large_table():
"""Process large tables without loading all into memory."""
async with async_session_factory() as session:
stmt = select(User).execution_options(yield_per=100)
result = await session.stream(stmt)
async for user in result.scalars():
await process_user(user)
# Partitioned streaming
async def stream_partitioned():
async with async_session_factory() as session:
stmt = select(User).execution_options(yield_per=100)
result = await session.stream(stmt)
async for partition in result.scalars().partitions(100):
# partition is a list of 100 users
await process_batch(partition)
from sqlalchemy import text
async def raw_query():
async with async_session_factory() as session:
# Simple query
result = await session.execute(
text("SELECT * FROM users WHERE is_active = :active"),
{"active": True}
)
rows = result.fetchall()
# With column access
for row in rows:
print(row.id, row.name)
async def raw_insert():
async with async_session_factory() as session:
await session.execute(
text("INSERT INTO logs (message) VALUES (:msg)"),
{"msg": "Test log"}
)
await session.commit()
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
@pytest_asyncio.fixture(scope="session")
async def async_engine():
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest_asyncio.fixture
async def async_session(async_engine):
async with AsyncSession(async_engine) as session:
async with session.begin():
yield session
await session.rollback()
@pytest.mark.asyncio
async def test_create_user(async_session):
user = User(name="Test", email="test@example.com")
async_session.add(user)
await async_session.flush()
assert user.id is not None
| Pattern | Async SQLAlchemy |
|---|---|
| Create engine | create_async_engine(url) |
| Session factory | async_sessionmaker(engine) |
| Get session | async with factory() as session: |
| Execute | await session.execute(stmt) |
| Get one | result.scalar_one_or_none() |
| Get all | result.scalars().all() |
| Stream | await session.stream(stmt) |
| Commit | await session.commit() |
| Transaction | async with session.begin(): |
| Eager load | .options(selectinload(rel)) |