performance.md 10 KB

Async Performance Optimization

Performance patterns for high-throughput async applications.

uvloop - Drop-in Event Loop Replacement

# Install: pip install uvloop

# Option 1: Install as default (before any asyncio calls)
import uvloop
uvloop.install()

# Then use asyncio normally
import asyncio

async def main():
    # Now using uvloop
    pass

asyncio.run(main())


# Option 2: Use explicitly
import asyncio
import uvloop

async def main():
    pass

with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
    runner.run(main())


# Option 3: Check if available
def get_event_loop_policy():
    try:
        import uvloop
        return uvloop.EventLoopPolicy()
    except ImportError:
        return asyncio.DefaultEventLoopPolicy()

asyncio.set_event_loop_policy(get_event_loop_policy())

Performance gains:

  • 2-4x faster than default asyncio event loop
  • Significant improvement for I/O-bound workloads
  • Based on libuv (same as Node.js)

Connection Pool Tuning

import aiohttp

# Optimal connector settings
connector = aiohttp.TCPConnector(
    limit=100,              # Total connection limit
    limit_per_host=30,      # Per-host limit (prevents overwhelming one server)
    ttl_dns_cache=300,      # DNS cache TTL (seconds)
    use_dns_cache=True,     # Enable DNS caching
    keepalive_timeout=30,   # Keep connections alive (seconds)
    enable_cleanup_closed=True,  # Clean up closed connections
)

async with aiohttp.ClientSession(connector=connector) as session:
    # Use session
    pass


# Database connection pool (asyncpg)
import asyncpg

pool = await asyncpg.create_pool(
    dsn="postgresql://user:pass@localhost/db",
    min_size=5,        # Minimum connections to keep
    max_size=20,       # Maximum connections allowed
    max_inactive_connection_lifetime=300.0,  # Close idle connections
    command_timeout=60.0,  # Query timeout
)


# Redis connection pool (aioredis/redis-py)
import redis.asyncio as redis

pool = redis.ConnectionPool.from_url(
    "redis://localhost",
    max_connections=50,
    decode_responses=True,
)
client = redis.Redis(connection_pool=pool)

Pool Sizing Guidelines

Service Type Min Size Max Size Notes
Database (heavy) 10 50 Match CPU cores × 2-4
Database (light) 5 20 Standard web apps
HTTP external API N/A 100 Limited by rate limits
HTTP per-host N/A 30 Prevent overwhelming
Redis 10 50 Very fast, less critical

Buffer Sizing

# aiohttp response reading
async def fetch_large(session, url):
    async with session.get(url) as response:
        # Default: reads entire response into memory
        data = await response.read()

        # For large responses, stream:
        chunks = []
        async for chunk in response.content.iter_chunked(8192):
            chunks.append(chunk)


# Custom buffer sizes for TCP
import asyncio

async def create_connection():
    reader, writer = await asyncio.open_connection(
        "localhost", 8888,
        limit=2**20,  # 1MB read buffer (default is 64KB)
    )
    return reader, writer


# aiohttp server with custom limits
from aiohttp import web

app = web.Application(
    client_max_size=1024 * 1024 * 100,  # 100MB max request body
)

Batching Requests

import asyncio
from collections import defaultdict
from typing import TypeVar, Callable

T = TypeVar("T")

class BatchProcessor:
    """Batch multiple requests into single operations."""

    def __init__(
        self,
        batch_func: Callable[[list[str]], dict[str, T]],
        max_batch_size: int = 100,
        max_delay: float = 0.01  # 10ms
    ):
        self._batch_func = batch_func
        self._max_batch_size = max_batch_size
        self._max_delay = max_delay
        self._pending: dict[str, asyncio.Future] = {}
        self._batch: list[str] = []
        self._lock = asyncio.Lock()
        self._timer: asyncio.Task | None = None

    async def get(self, key: str) -> T:
        """Get single item (batched with other requests)."""
        async with self._lock:
            if key in self._pending:
                return await self._pending[key]

            future = asyncio.get_event_loop().create_future()
            self._pending[key] = future
            self._batch.append(key)

            if len(self._batch) >= self._max_batch_size:
                await self._flush()
            elif not self._timer:
                self._timer = asyncio.create_task(self._delayed_flush())

        return await future

    async def _delayed_flush(self):
        await asyncio.sleep(self._max_delay)
        async with self._lock:
            await self._flush()

    async def _flush(self):
        if not self._batch:
            return

        batch = self._batch
        pending = self._pending
        self._batch = []
        self._pending = {}
        self._timer = None

        try:
            results = await self._batch_func(batch)
            for key in batch:
                if key in results:
                    pending[key].set_result(results[key])
                else:
                    pending[key].set_exception(KeyError(key))
        except Exception as e:
            for key in batch:
                pending[key].set_exception(e)


# Usage
async def batch_fetch_users(user_ids: list[str]) -> dict[str, User]:
    # Single database query for multiple users
    return {u.id: u for u in await db.fetch_users(user_ids)}

user_batcher = BatchProcessor(batch_fetch_users, max_batch_size=50)

# These will be batched together:
user1 = await user_batcher.get("user-1")
user2 = await user_batcher.get("user-2")

Task Prioritization

import asyncio
import heapq
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedTask:
    priority: int
    item: Any = field(compare=False)

class PriorityQueue:
    """Async priority queue for task ordering."""

    def __init__(self):
        self._queue: list[PrioritizedTask] = []
        self._condition = asyncio.Condition()

    async def put(self, priority: int, item: Any):
        async with self._condition:
            heapq.heappush(self._queue, PrioritizedTask(priority, item))
            self._condition.notify()

    async def get(self) -> Any:
        async with self._condition:
            while not self._queue:
                await self._condition.wait()
            return heapq.heappop(self._queue).item


# Usage
queue = PriorityQueue()

# Lower number = higher priority
await queue.put(1, "critical task")
await queue.put(10, "low priority task")
await queue.put(5, "normal task")

Memory Optimization

import asyncio
from weakref import WeakValueDictionary

# Use weak references for caches
class AsyncCache:
    """Memory-efficient async cache using weak references."""

    def __init__(self, fetch_func):
        self._cache = WeakValueDictionary()
        self._fetch_func = fetch_func
        self._locks: dict[str, asyncio.Lock] = {}

    async def get(self, key: str):
        if key in self._cache:
            return self._cache[key]

        if key not in self._locks:
            self._locks[key] = asyncio.Lock()

        async with self._locks[key]:
            if key in self._cache:
                return self._cache[key]

            value = await self._fetch_func(key)
            self._cache[key] = value
            return value


# Limit concurrent operations to prevent memory spikes
async def process_large_dataset(items: list, concurrency: int = 10):
    """Process items with limited concurrency."""
    semaphore = asyncio.Semaphore(concurrency)

    async def process_one(item):
        async with semaphore:
            result = await heavy_processing(item)
            return result

    # Process in chunks to avoid memory issues with huge lists
    chunk_size = 1000
    all_results = []

    for i in range(0, len(items), chunk_size):
        chunk = items[i:i + chunk_size]
        results = await asyncio.gather(*[process_one(item) for item in chunk])
        all_results.extend(results)

    return all_results

Profiling Async Code

import asyncio
import time
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_timer(name: str):
    """Context manager to time async operations."""
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        print(f"{name}: {elapsed:.3f}s")


# Usage
async with async_timer("fetch_all"):
    results = await fetch_all(urls)


# Detailed profiling
class AsyncProfiler:
    def __init__(self):
        self.timings: dict[str, list[float]] = {}

    @asynccontextmanager
    async def profile(self, name: str):
        start = time.perf_counter()
        try:
            yield
        finally:
            elapsed = time.perf_counter() - start
            if name not in self.timings:
                self.timings[name] = []
            self.timings[name].append(elapsed)

    def report(self):
        for name, times in self.timings.items():
            avg = sum(times) / len(times)
            total = sum(times)
            print(f"{name}: avg={avg:.3f}s, total={total:.3f}s, count={len(times)}")


# Use yappi for comprehensive profiling
# pip install yappi
import yappi

yappi.set_clock_type("wall")  # For async code
yappi.start()

asyncio.run(main())

yappi.stop()
yappi.get_func_stats().print_all()

Quick Reference

Optimization Impact When to Use
uvloop 2-4x throughput Always (production)
Connection pooling Reduce latency Any external service
Request batching N requests → 1 Database, APIs
Semaphore limiting Memory control Large datasets
Streaming Memory efficiency Large responses
Priority queue Latency SLAs Mixed workloads

Performance Checklist

- [ ] uvloop installed and configured
- [ ] Connection pools properly sized
- [ ] Timeouts on all external calls
- [ ] Semaphores limiting concurrency
- [ ] Large responses streamed
- [ ] DNS caching enabled
- [ ] Connection keep-alive configured
- [ ] Profiling in place for hot paths