production-patterns.md 10 KB

Production Async Patterns

Production-ready patterns for deploying async Python applications.

Graceful Shutdown

import asyncio
import signal
from contextlib import asynccontextmanager

class GracefulShutdown:
    """Handle graceful shutdown with signal handlers."""

    def __init__(self):
        self._shutdown = asyncio.Event()
        self._tasks: set[asyncio.Task] = set()

    @property
    def should_exit(self) -> bool:
        return self._shutdown.is_set()

    async def wait_for_shutdown(self):
        """Block until shutdown signal received."""
        await self._shutdown.wait()

    def trigger_shutdown(self):
        """Signal shutdown to all waiting coroutines."""
        self._shutdown.set()

    def register_task(self, task: asyncio.Task):
        """Track task for cleanup on shutdown."""
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    async def cleanup(self, timeout: float = 30.0):
        """Cancel and await all tracked tasks."""
        for task in self._tasks:
            task.cancel()

        if self._tasks:
            await asyncio.wait(
                self._tasks,
                timeout=timeout,
                return_when=asyncio.ALL_COMPLETED
            )


async def main():
    shutdown = GracefulShutdown()
    loop = asyncio.get_running_loop()

    # Register signal handlers
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, shutdown.trigger_shutdown)

    try:
        # Start background services
        worker = asyncio.create_task(background_worker(shutdown))
        shutdown.register_task(worker)

        # Run until shutdown
        await shutdown.wait_for_shutdown()
    finally:
        # Cleanup
        await shutdown.cleanup(timeout=30.0)

        # Remove signal handlers
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.remove_signal_handler(sig)


async def background_worker(shutdown: GracefulShutdown):
    """Worker that respects shutdown signals."""
    while not shutdown.should_exit:
        try:
            await process_next_item()
        except asyncio.CancelledError:
            # Finish current work before exiting
            await finish_current_work()
            raise

Lifespan Context Manager

from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan():
    """Application lifespan manager for startup/shutdown."""
    # Startup
    db_pool = await create_db_pool()
    redis = await create_redis_client()

    try:
        yield {"db": db_pool, "redis": redis}
    finally:
        # Shutdown (always runs)
        await redis.close()
        await db_pool.close()


# Usage with FastAPI
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    app.state.db = await create_db_pool()
    yield
    # Shutdown
    await app.state.db.close()

app = FastAPI(lifespan=lifespan)

Health Check Endpoints

import asyncio
from dataclasses import dataclass
from enum import Enum

class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

@dataclass
class ComponentHealth:
    name: str
    status: HealthStatus
    latency_ms: float | None = None
    error: str | None = None

async def check_database(pool) -> ComponentHealth:
    """Check database connectivity."""
    try:
        start = asyncio.get_event_loop().time()
        async with pool.acquire() as conn:
            await conn.execute("SELECT 1")
        latency = (asyncio.get_event_loop().time() - start) * 1000
        return ComponentHealth("database", HealthStatus.HEALTHY, latency)
    except Exception as e:
        return ComponentHealth("database", HealthStatus.UNHEALTHY, error=str(e))

async def check_redis(client) -> ComponentHealth:
    """Check Redis connectivity."""
    try:
        start = asyncio.get_event_loop().time()
        await client.ping()
        latency = (asyncio.get_event_loop().time() - start) * 1000
        return ComponentHealth("redis", HealthStatus.HEALTHY, latency)
    except Exception as e:
        return ComponentHealth("redis", HealthStatus.UNHEALTHY, error=str(e))

async def health_check(pool, redis) -> dict:
    """Aggregate health check for all components."""
    checks = await asyncio.gather(
        check_database(pool),
        check_redis(redis),
        return_exceptions=True
    )

    components = []
    overall = HealthStatus.HEALTHY

    for check in checks:
        if isinstance(check, Exception):
            components.append(ComponentHealth("unknown", HealthStatus.UNHEALTHY, error=str(check)))
            overall = HealthStatus.UNHEALTHY
        else:
            components.append(check)
            if check.status == HealthStatus.UNHEALTHY:
                overall = HealthStatus.UNHEALTHY
            elif check.status == HealthStatus.DEGRADED and overall == HealthStatus.HEALTHY:
                overall = HealthStatus.DEGRADED

    return {
        "status": overall.value,
        "components": [
            {"name": c.name, "status": c.status.value, "latency_ms": c.latency_ms, "error": c.error}
            for c in components
        ]
    }

Liveness vs Readiness Probes

class HealthProbes:
    """Kubernetes-style health probes."""

    def __init__(self):
        self._ready = asyncio.Event()
        self._alive = True

    def set_ready(self):
        """Mark application as ready to receive traffic."""
        self._ready.set()

    def set_not_ready(self):
        """Mark application as not ready (drain traffic)."""
        self._ready.clear()

    def set_not_alive(self):
        """Mark application as dead (trigger restart)."""
        self._alive = False

    async def liveness(self) -> bool:
        """
        Liveness probe - is the process healthy?
        Failing this triggers a container restart.
        """
        return self._alive

    async def readiness(self) -> bool:
        """
        Readiness probe - can the app handle traffic?
        Failing this removes the pod from service.
        """
        return self._ready.is_set()

    async def startup(self) -> bool:
        """
        Startup probe - has the app finished initializing?
        Prevents liveness checks during slow startup.
        """
        return self._ready.is_set()


# Usage
probes = HealthProbes()

async def startup():
    await initialize_db()
    await warm_caches()
    probes.set_ready()  # Now accept traffic

async def shutdown():
    probes.set_not_ready()  # Stop accepting new requests
    await drain_connections()  # Finish in-flight requests

Resource Cleanup on Cancellation

async def process_with_cleanup():
    """Ensure cleanup even when cancelled."""
    resource = await acquire_resource()
    try:
        await do_work(resource)
    except asyncio.CancelledError:
        # Perform essential cleanup before re-raising
        await resource.flush()
        raise
    finally:
        # Always close resource
        await resource.close()


async def shielded_cleanup():
    """Protect critical cleanup from cancellation."""
    resource = await acquire_resource()
    try:
        await do_work(resource)
    finally:
        # Shield cleanup from cancellation
        await asyncio.shield(resource.close())

Background Task Management

class BackgroundTaskManager:
    """Manage long-running background tasks."""

    def __init__(self):
        self._tasks: dict[str, asyncio.Task] = {}
        self._shutdown = asyncio.Event()

    def start(self, name: str, coro):
        """Start a named background task."""
        if name in self._tasks:
            raise ValueError(f"Task {name} already running")

        task = asyncio.create_task(coro, name=name)
        task.add_done_callback(lambda t: self._task_done(name, t))
        self._tasks[name] = task
        return task

    def _task_done(self, name: str, task: asyncio.Task):
        """Handle task completion."""
        self._tasks.pop(name, None)

        if not task.cancelled():
            exc = task.exception()
            if exc:
                # Log error, potentially restart
                logger.error(f"Task {name} failed: {exc}")

    async def stop(self, name: str, timeout: float = 10.0):
        """Stop a specific task."""
        if task := self._tasks.get(name):
            task.cancel()
            try:
                await asyncio.wait_for(task, timeout=timeout)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                pass

    async def shutdown(self, timeout: float = 30.0):
        """Stop all background tasks."""
        self._shutdown.set()

        for task in self._tasks.values():
            task.cancel()

        if self._tasks:
            await asyncio.wait(
                self._tasks.values(),
                timeout=timeout,
                return_when=asyncio.ALL_COMPLETED
            )

Periodic Tasks

async def periodic_task(
    interval: float,
    coro_func,
    shutdown_event: asyncio.Event | None = None
):
    """Run a coroutine periodically."""
    while True:
        if shutdown_event and shutdown_event.is_set():
            break

        try:
            await coro_func()
        except asyncio.CancelledError:
            raise
        except Exception as e:
            logger.error(f"Periodic task error: {e}")

        # Wait for interval or shutdown
        if shutdown_event:
            try:
                await asyncio.wait_for(
                    shutdown_event.wait(),
                    timeout=interval
                )
                break  # Shutdown signaled
            except asyncio.TimeoutError:
                pass  # Continue loop
        else:
            await asyncio.sleep(interval)

Quick Reference

Pattern Use Case
GracefulShutdown SIGTERM/SIGINT handling
lifespan context Startup/shutdown resources
HealthProbes Kubernetes health checks
asyncio.shield() Protect critical cleanup
BackgroundTaskManager Long-running task lifecycle
periodic_task Scheduled background work