connection-pooling.md 7.7 KB

Connection Pool Configuration

Database connection pool patterns for production.

SQLAlchemy Pool Settings

from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine

# Sync engine with pool config
engine = create_engine(
    "postgresql://user:pass@localhost/db",

    # Pool size
    pool_size=5,           # Persistent connections (default: 5)
    max_overflow=10,       # Extra connections when pool exhausted
    # Total max connections = pool_size + max_overflow = 15

    # Timeouts
    pool_timeout=30,       # Wait for connection (seconds)
    pool_recycle=3600,     # Recycle connections after N seconds
    pool_pre_ping=True,    # Test connections before use

    # Connection args
    connect_args={
        "connect_timeout": 10,
        "options": "-c statement_timeout=30000",  # 30s query timeout
    },
)


# Async engine
async_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=5,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=3600,
    pool_pre_ping=True,
)

Pool Sizing Guidelines

"""
Connection Pool Sizing

Rule of thumb:
    pool_size = (CPU cores × 2) + disk spindles

For async applications:
    pool_size = expected_concurrent_requests / avg_queries_per_request

Examples:
    - Web app, 4 cores, SSD:  pool_size=10, max_overflow=10
    - Worker, 4 cores, HDD:   pool_size=12, max_overflow=5
    - High-traffic API:       pool_size=20, max_overflow=30
"""

import os

def calculate_pool_size() -> tuple[int, int]:
    """Calculate pool size based on environment."""
    cpu_count = os.cpu_count() or 4

    if os.getenv("ENV") == "production":
        pool_size = cpu_count * 2 + 4
        max_overflow = pool_size
    else:
        pool_size = 5
        max_overflow = 5

    return pool_size, max_overflow

pool_size, max_overflow = calculate_pool_size()

Pool Events and Monitoring

from sqlalchemy import event
from sqlalchemy.pool import Pool
import logging

logger = logging.getLogger(__name__)

@event.listens_for(Pool, "connect")
def on_connect(dbapi_conn, connection_record):
    """Called when a new connection is created."""
    logger.debug("New database connection created")

@event.listens_for(Pool, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
    """Called when a connection is retrieved from pool."""
    logger.debug("Connection checked out from pool")

@event.listens_for(Pool, "checkin")
def on_checkin(dbapi_conn, connection_record):
    """Called when a connection is returned to pool."""
    logger.debug("Connection returned to pool")

@event.listens_for(Pool, "invalidate")
def on_invalidate(dbapi_conn, connection_record, exception):
    """Called when a connection is invalidated."""
    logger.warning(f"Connection invalidated: {exception}")


# Pool statistics
def log_pool_status(engine):
    """Log current pool status."""
    pool = engine.pool
    logger.info(
        f"Pool status: "
        f"size={pool.size()}, "
        f"checked_out={pool.checkedout()}, "
        f"overflow={pool.overflow()}, "
        f"checkedin={pool.checkedin()}"
    )

Health Check Endpoint

from fastapi import FastAPI, HTTPException
from sqlalchemy import text
import asyncio

app = FastAPI()

async def check_database_health(timeout: float = 5.0) -> dict:
    """Check database connectivity and response time."""
    try:
        start = asyncio.get_event_loop().time()

        async with async_session_factory() as session:
            await asyncio.wait_for(
                session.execute(text("SELECT 1")),
                timeout=timeout
            )

        latency = (asyncio.get_event_loop().time() - start) * 1000

        return {
            "status": "healthy",
            "latency_ms": round(latency, 2),
            "pool_size": async_engine.pool.size(),
            "pool_checked_out": async_engine.pool.checkedout(),
        }
    except asyncio.TimeoutError:
        return {"status": "unhealthy", "error": "timeout"}
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}


@app.get("/health/db")
async def database_health():
    health = await check_database_health()
    if health["status"] != "healthy":
        raise HTTPException(status_code=503, detail=health)
    return health

Connection Pool per Service

from dataclasses import dataclass
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

@dataclass
class DatabaseConfig:
    url: str
    pool_size: int = 5
    max_overflow: int = 10
    pool_timeout: int = 30
    pool_recycle: int = 3600

class DatabasePool:
    """Manage multiple database connections."""

    def __init__(self):
        self._engines: dict[str, AsyncEngine] = {}

    def add_database(self, name: str, config: DatabaseConfig):
        """Add a database connection pool."""
        self._engines[name] = create_async_engine(
            config.url,
            pool_size=config.pool_size,
            max_overflow=config.max_overflow,
            pool_timeout=config.pool_timeout,
            pool_recycle=config.pool_recycle,
            pool_pre_ping=True,
        )

    def get_engine(self, name: str) -> AsyncEngine:
        return self._engines[name]

    async def close_all(self):
        """Close all connection pools."""
        for engine in self._engines.values():
            await engine.dispose()


# Usage
db_pool = DatabasePool()

db_pool.add_database("primary", DatabaseConfig(
    url="postgresql+asyncpg://user:pass@primary/db",
    pool_size=10,
))

db_pool.add_database("replica", DatabaseConfig(
    url="postgresql+asyncpg://user:pass@replica/db",
    pool_size=20,  # More connections for read replica
))

Read/Write Splitting

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

# Separate session factories for read/write
write_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@primary/db",
    pool_size=10,
)

read_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@replica/db",
    pool_size=20,
)

write_session = async_sessionmaker(write_engine, expire_on_commit=False)
read_session = async_sessionmaker(read_engine, expire_on_commit=False)


# FastAPI dependencies
async def get_write_db():
    async with write_session() as session:
        yield session

async def get_read_db():
    async with read_session() as session:
        yield session

WriteDB = Annotated[AsyncSession, Depends(get_write_db)]
ReadDB = Annotated[AsyncSession, Depends(get_read_db)]


@app.get("/users")
async def list_users(db: ReadDB):  # Read from replica
    result = await db.execute(select(User))
    return result.scalars().all()

@app.post("/users")
async def create_user(user: UserCreate, db: WriteDB):  # Write to primary
    db_user = User(**user.model_dump())
    db.add(db_user)
    await db.commit()
    return db_user

Graceful Shutdown

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup - engines already created
    yield
    # Shutdown - close all pools gracefully
    await async_engine.dispose()
    logger.info("Database connections closed")

app = FastAPI(lifespan=lifespan)

Quick Reference

Setting Purpose Typical Value
pool_size Persistent connections 5-20
max_overflow Extra connections 10-30
pool_timeout Wait for connection 30s
pool_recycle Recycle connection age 3600s
pool_pre_ping Test before use True
Scenario pool_size max_overflow
Development 5 5
Small API 10 10
High-traffic 20 30
Background worker 5 5