Connection Pool Configuration
Database connection pool patterns for production.
SQLAlchemy Pool Settings
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine
# Sync engine with pool config
engine = create_engine(
"postgresql://user:pass@localhost/db",
# Pool size
pool_size=5, # Persistent connections (default: 5)
max_overflow=10, # Extra connections when pool exhausted
# Total max connections = pool_size + max_overflow = 15
# Timeouts
pool_timeout=30, # Wait for connection (seconds)
pool_recycle=3600, # Recycle connections after N seconds
pool_pre_ping=True, # Test connections before use
# Connection args
connect_args={
"connect_timeout": 10,
"options": "-c statement_timeout=30000", # 30s query timeout
},
)
# Async engine
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True,
)
Pool Sizing Guidelines
"""
Connection Pool Sizing
Rule of thumb:
pool_size = (CPU cores × 2) + disk spindles
For async applications:
pool_size = expected_concurrent_requests / avg_queries_per_request
Examples:
- Web app, 4 cores, SSD: pool_size=10, max_overflow=10
- Worker, 4 cores, HDD: pool_size=12, max_overflow=5
- High-traffic API: pool_size=20, max_overflow=30
"""
import os
def calculate_pool_size() -> tuple[int, int]:
"""Calculate pool size based on environment."""
cpu_count = os.cpu_count() or 4
if os.getenv("ENV") == "production":
pool_size = cpu_count * 2 + 4
max_overflow = pool_size
else:
pool_size = 5
max_overflow = 5
return pool_size, max_overflow
pool_size, max_overflow = calculate_pool_size()
Pool Events and Monitoring
from sqlalchemy import event
from sqlalchemy.pool import Pool
import logging
logger = logging.getLogger(__name__)
@event.listens_for(Pool, "connect")
def on_connect(dbapi_conn, connection_record):
"""Called when a new connection is created."""
logger.debug("New database connection created")
@event.listens_for(Pool, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
"""Called when a connection is retrieved from pool."""
logger.debug("Connection checked out from pool")
@event.listens_for(Pool, "checkin")
def on_checkin(dbapi_conn, connection_record):
"""Called when a connection is returned to pool."""
logger.debug("Connection returned to pool")
@event.listens_for(Pool, "invalidate")
def on_invalidate(dbapi_conn, connection_record, exception):
"""Called when a connection is invalidated."""
logger.warning(f"Connection invalidated: {exception}")
# Pool statistics
def log_pool_status(engine):
"""Log current pool status."""
pool = engine.pool
logger.info(
f"Pool status: "
f"size={pool.size()}, "
f"checked_out={pool.checkedout()}, "
f"overflow={pool.overflow()}, "
f"checkedin={pool.checkedin()}"
)
Health Check Endpoint
from fastapi import FastAPI, HTTPException
from sqlalchemy import text
import asyncio
app = FastAPI()
async def check_database_health(timeout: float = 5.0) -> dict:
"""Check database connectivity and response time."""
try:
start = asyncio.get_event_loop().time()
async with async_session_factory() as session:
await asyncio.wait_for(
session.execute(text("SELECT 1")),
timeout=timeout
)
latency = (asyncio.get_event_loop().time() - start) * 1000
return {
"status": "healthy",
"latency_ms": round(latency, 2),
"pool_size": async_engine.pool.size(),
"pool_checked_out": async_engine.pool.checkedout(),
}
except asyncio.TimeoutError:
return {"status": "unhealthy", "error": "timeout"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
@app.get("/health/db")
async def database_health():
health = await check_database_health()
if health["status"] != "healthy":
raise HTTPException(status_code=503, detail=health)
return health
Connection Pool per Service
from dataclasses import dataclass
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
@dataclass
class DatabaseConfig:
url: str
pool_size: int = 5
max_overflow: int = 10
pool_timeout: int = 30
pool_recycle: int = 3600
class DatabasePool:
"""Manage multiple database connections."""
def __init__(self):
self._engines: dict[str, AsyncEngine] = {}
def add_database(self, name: str, config: DatabaseConfig):
"""Add a database connection pool."""
self._engines[name] = create_async_engine(
config.url,
pool_size=config.pool_size,
max_overflow=config.max_overflow,
pool_timeout=config.pool_timeout,
pool_recycle=config.pool_recycle,
pool_pre_ping=True,
)
def get_engine(self, name: str) -> AsyncEngine:
return self._engines[name]
async def close_all(self):
"""Close all connection pools."""
for engine in self._engines.values():
await engine.dispose()
# Usage
db_pool = DatabasePool()
db_pool.add_database("primary", DatabaseConfig(
url="postgresql+asyncpg://user:pass@primary/db",
pool_size=10,
))
db_pool.add_database("replica", DatabaseConfig(
url="postgresql+asyncpg://user:pass@replica/db",
pool_size=20, # More connections for read replica
))
Read/Write Splitting
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
# Separate session factories for read/write
write_engine = create_async_engine(
"postgresql+asyncpg://user:pass@primary/db",
pool_size=10,
)
read_engine = create_async_engine(
"postgresql+asyncpg://user:pass@replica/db",
pool_size=20,
)
write_session = async_sessionmaker(write_engine, expire_on_commit=False)
read_session = async_sessionmaker(read_engine, expire_on_commit=False)
# FastAPI dependencies
async def get_write_db():
async with write_session() as session:
yield session
async def get_read_db():
async with read_session() as session:
yield session
WriteDB = Annotated[AsyncSession, Depends(get_write_db)]
ReadDB = Annotated[AsyncSession, Depends(get_read_db)]
@app.get("/users")
async def list_users(db: ReadDB): # Read from replica
result = await db.execute(select(User))
return result.scalars().all()
@app.post("/users")
async def create_user(user: UserCreate, db: WriteDB): # Write to primary
db_user = User(**user.model_dump())
db.add(db_user)
await db.commit()
return db_user
Graceful Shutdown
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup - engines already created
yield
# Shutdown - close all pools gracefully
await async_engine.dispose()
logger.info("Database connections closed")
app = FastAPI(lifespan=lifespan)
Quick Reference
| Setting |
Purpose |
Typical Value |
pool_size |
Persistent connections |
5-20 |
max_overflow |
Extra connections |
10-30 |
pool_timeout |
Wait for connection |
30s |
pool_recycle |
Recycle connection age |
3600s |
pool_pre_ping |
Test before use |
True |
| Scenario |
pool_size |
max_overflow |
| Development |
5 |
5 |
| Small API |
10 |
10 |
| High-traffic |
20 |
30 |
| Background worker |
5 |
5 |