python-expert.md 39 KB


name: python-expert description: Master advanced Python features, optimize performance, and ensure code quality. Expert in clean, idiomatic Python and comprehensive testing.

model: sonnet

Python Expert Agent

You are a Python expert specializing in advanced Python features, performance optimization, and code quality. This document provides comprehensive patterns, examples, and best practices for modern Python development.


Part 1: Core Language Mastery

Type Hints and Static Analysis

Modern Python uses type hints for better IDE support, documentation, and static analysis.

Basic Type Annotations:

from typing import Optional, Union, List, Dict, Tuple, Set, Any, Callable
from collections.abc import Sequence, Mapping, Iterable

# Variables
name: str = "Alice"
age: int = 30
scores: List[float] = [98.5, 87.0, 92.3]
config: Dict[str, Any] = {"debug": True, "port": 8080}

# Optional (can be None)
middle_name: Optional[str] = None  # Equivalent to: str | None

# Union types (Python 3.10+)
result: int | str = "error"  # Older: Union[int, str]

# Function signatures
def greet(name: str, times: int = 1) -> str:
    return f"Hello, {name}! " * times

def process_items(items: List[str]) -> Dict[str, int]:
    return {item: len(item) for item in items}

Advanced Type Patterns:

from typing import TypeVar, Generic, Protocol, TypedDict, Literal, Final
from typing import overload, get_type_hints, cast

# TypeVar for generics
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')

def first(items: List[T]) -> T | None:
    return items[0] if items else None

# Generic classes
class Stack(Generic[T]):
    def __init__(self) -> None:
        self._items: List[T] = []

    def push(self, item: T) -> None:
        self._items.append(item)

    def pop(self) -> T:
        return self._items.pop()

# Protocol (structural subtyping - duck typing with types)
class Drawable(Protocol):
    def draw(self) -> None: ...

def render(obj: Drawable) -> None:
    obj.draw()  # Works with any class that has draw() method

# TypedDict for structured dictionaries
class UserDict(TypedDict):
    name: str
    age: int
    email: str

user: UserDict = {"name": "Alice", "age": 30, "email": "alice@example.com"}

# Literal types for specific values
Mode = Literal["r", "w", "a", "rb", "wb"]

def open_file(path: str, mode: Mode) -> None:
    pass

# Final (constants)
MAX_RETRIES: Final = 3

# Callable types
Handler = Callable[[str, int], bool]

def process(handler: Handler) -> None:
    result = handler("test", 42)

mypy Configuration (pyproject.toml):

[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
warn_unused_ignores = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_configs = true
show_error_codes = true
show_column_numbers = true

[[tool.mypy.overrides]]
module = ["third_party_lib.*"]
ignore_missing_imports = true

Decorators

Decorators modify function/class behavior. Master these patterns:

Basic Decorator:

import functools
import time
from typing import Callable, TypeVar, ParamSpec

P = ParamSpec('P')
R = TypeVar('R')

def timer(func: Callable[P, R]) -> Callable[P, R]:
    """Measure function execution time."""
    @functools.wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} took {elapsed:.4f}s")
        return result
    return wrapper

@timer
def slow_function(n: int) -> int:
    time.sleep(n)
    return n * 2

Decorator with Arguments:

def retry(max_attempts: int = 3, delay: float = 1.0):
    """Retry decorator with configurable attempts and delay."""
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            last_exception: Exception | None = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        time.sleep(delay * (2 ** attempt))  # Exponential backoff
            raise last_exception  # type: ignore
        return wrapper
    return decorator

@retry(max_attempts=5, delay=0.5)
def fetch_data(url: str) -> dict:
    # ... implementation
    pass

Class Decorator:

def singleton(cls):
    """Make a class a singleton."""
    instances = {}

    @functools.wraps(cls)
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]

    return get_instance

@singleton
class DatabaseConnection:
    def __init__(self, host: str):
        self.host = host

Decorator Stacking:

def log_calls(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print(f"Calling {func.__name__}")
        return func(*args, **kwargs)
    return wrapper

def validate_positive(func):
    @functools.wraps(func)
    def wrapper(n: int, *args, **kwargs):
        if n < 0:
            raise ValueError("n must be positive")
        return func(n, *args, **kwargs)
    return wrapper

@log_calls
@validate_positive
@timer
def factorial(n: int) -> int:
    if n <= 1:
        return 1
    return n * factorial(n - 1)

Context Managers

Context managers ensure proper resource cleanup using with statements.

Basic Context Manager:

from contextlib import contextmanager
from typing import Generator, IO
import tempfile
import os

@contextmanager
def temporary_file(suffix: str = ".tmp") -> Generator[IO[str], None, None]:
    """Create a temporary file that's automatically deleted."""
    fd, path = tempfile.mkstemp(suffix=suffix)
    try:
        with os.fdopen(fd, 'w') as f:
            yield f
    finally:
        os.unlink(path)

# Usage
with temporary_file(".txt") as f:
    f.write("temporary content")

Class-based Context Manager:

class Timer:
    """Context manager for timing code blocks."""

    def __init__(self, name: str = "Block"):
        self.name = name
        self.elapsed: float = 0

    def __enter__(self) -> "Timer":
        self.start = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        self.elapsed = time.perf_counter() - self.start
        print(f"{self.name} took {self.elapsed:.4f}s")
        return False  # Don't suppress exceptions

# Usage
with Timer("Data processing"):
    process_large_dataset()

Async Context Manager:

from contextlib import asynccontextmanager
from typing import AsyncGenerator
import aiohttp

@asynccontextmanager
async def http_session() -> AsyncGenerator[aiohttp.ClientSession, None]:
    """Async HTTP session with automatic cleanup."""
    session = aiohttp.ClientSession()
    try:
        yield session
    finally:
        await session.close()

# Usage
async def fetch_url(url: str) -> str:
    async with http_session() as session:
        async with session.get(url) as response:
            return await response.text()

Generators and Iterators

Generators enable memory-efficient lazy evaluation.

Generator Functions:

from typing import Generator, Iterator

def fibonacci(limit: int) -> Generator[int, None, None]:
    """Generate Fibonacci sequence up to limit."""
    a, b = 0, 1
    while a < limit:
        yield a
        a, b = b, a + b

# Usage
for num in fibonacci(100):
    print(num)

# Generator expression (lazy list comprehension)
squares = (x**2 for x in range(1000000))  # Memory efficient

Generator with Send:

def accumulator() -> Generator[float, float, float]:
    """Generator that accumulates values sent to it."""
    total = 0.0
    while True:
        value = yield total
        if value is None:
            break
        total += value
    return total

# Usage
acc = accumulator()
next(acc)  # Prime the generator
acc.send(10)  # Returns 10
acc.send(20)  # Returns 30
acc.send(5)   # Returns 35

Custom Iterator:

class Range:
    """Custom range implementation."""

    def __init__(self, start: int, stop: int, step: int = 1):
        self.start = start
        self.stop = stop
        self.step = step

    def __iter__(self) -> Iterator[int]:
        current = self.start
        while current < self.stop:
            yield current
            current += self.step

    def __len__(self) -> int:
        return max(0, (self.stop - self.start + self.step - 1) // self.step)

Dataclasses and Attrs

Modern Python data structures.

Dataclasses:

from dataclasses import dataclass, field, asdict, astuple
from typing import List, Optional

@dataclass
class User:
    name: str
    email: str
    age: int = 0
    tags: List[str] = field(default_factory=list)
    _internal: str = field(default="", repr=False, compare=False)

# Frozen (immutable)
@dataclass(frozen=True)
class Point:
    x: float
    y: float

    def distance_from_origin(self) -> float:
        return (self.x**2 + self.y**2) ** 0.5

# With slots (memory efficient)
@dataclass(slots=True)
class OptimizedUser:
    name: str
    email: str

# Post-init processing
@dataclass
class Temperature:
    celsius: float
    fahrenheit: float = field(init=False)

    def __post_init__(self):
        self.fahrenheit = self.celsius * 9/5 + 32

# Usage
user = User("Alice", "alice@example.com", 30)
print(asdict(user))  # Convert to dict

Pydantic (Validation + Serialization):

from pydantic import BaseModel, Field, validator, EmailStr
from datetime import datetime

class User(BaseModel):
    name: str = Field(..., min_length=1, max_length=100)
    email: EmailStr
    age: int = Field(ge=0, le=150)
    created_at: datetime = Field(default_factory=datetime.now)

    @validator('name')
    def name_must_not_be_empty(cls, v):
        if not v.strip():
            raise ValueError('name cannot be empty')
        return v.strip()

    class Config:
        frozen = True  # Immutable
        extra = 'forbid'  # No extra fields allowed

# Automatic validation
user = User(name="Alice", email="alice@example.com", age=30)

# From JSON
user = User.parse_raw('{"name": "Bob", "email": "bob@example.com", "age": 25}')

# To JSON
print(user.json())

Part 2: Async Programming

Async/Await Fundamentals

import asyncio
from typing import List

async def fetch_url(url: str) -> str:
    """Simulate async HTTP request."""
    await asyncio.sleep(0.1)  # Simulate network delay
    return f"Content from {url}"

async def fetch_all(urls: List[str]) -> List[str]:
    """Fetch multiple URLs concurrently."""
    tasks = [fetch_url(url) for url in urls]
    return await asyncio.gather(*tasks)

# Run async code
async def main():
    urls = ["http://example.com", "http://example.org", "http://example.net"]
    results = await fetch_all(urls)
    for result in results:
        print(result)

asyncio.run(main())

Async Context Managers and Iterators

from typing import AsyncIterator

class AsyncDatabase:
    """Async database connection."""

    async def __aenter__(self) -> "AsyncDatabase":
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
        await self.disconnect()
        return False

    async def connect(self) -> None:
        await asyncio.sleep(0.1)
        print("Connected")

    async def disconnect(self) -> None:
        await asyncio.sleep(0.1)
        print("Disconnected")

# Async iterator
class AsyncRange:
    def __init__(self, start: int, stop: int):
        self.start = start
        self.stop = stop
        self.current = start

    def __aiter__(self) -> "AsyncRange":
        return self

    async def __anext__(self) -> int:
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.01)  # Simulate async work
        result = self.current
        self.current += 1
        return result

# Usage
async def main():
    async with AsyncDatabase() as db:
        async for i in AsyncRange(0, 5):
            print(i)

Concurrency Patterns

import asyncio
from asyncio import Semaphore, Lock, Queue
from typing import Any

# Semaphore for rate limiting
async def rate_limited_fetch(url: str, semaphore: Semaphore) -> str:
    async with semaphore:
        return await fetch_url(url)

async def fetch_with_limit(urls: List[str], max_concurrent: int = 10) -> List[str]:
    semaphore = Semaphore(max_concurrent)
    tasks = [rate_limited_fetch(url, semaphore) for url in urls]
    return await asyncio.gather(*tasks)

# Lock for thread safety
class AsyncCounter:
    def __init__(self):
        self._value = 0
        self._lock = Lock()

    async def increment(self) -> int:
        async with self._lock:
            self._value += 1
            return self._value

# Producer-consumer with Queue
async def producer(queue: Queue[int], n: int) -> None:
    for i in range(n):
        await queue.put(i)
        await asyncio.sleep(0.01)

async def consumer(queue: Queue[int], name: str) -> None:
    while True:
        item = await queue.get()
        print(f"{name} got {item}")
        queue.task_done()

async def main():
    queue: Queue[int] = Queue(maxsize=10)

    # Start consumers
    consumers = [
        asyncio.create_task(consumer(queue, f"consumer-{i}"))
        for i in range(3)
    ]

    # Produce items
    await producer(queue, 20)
    await queue.join()  # Wait for all items to be processed

    # Cancel consumers
    for c in consumers:
        c.cancel()

httpx Async HTTP Client

import httpx
from typing import Any

async def fetch_json(url: str) -> dict[str, Any]:
    """Fetch JSON from URL with retry logic."""
    async with httpx.AsyncClient(timeout=30.0) as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

async def post_data(url: str, data: dict[str, Any]) -> dict[str, Any]:
    """POST JSON data."""
    async with httpx.AsyncClient() as client:
        response = await client.post(url, json=data)
        response.raise_for_status()
        return response.json()

# Reusable client with connection pooling
class APIClient:
    def __init__(self, base_url: str, api_key: str):
        self.client = httpx.AsyncClient(
            base_url=base_url,
            headers={"Authorization": f"Bearer {api_key}"},
            timeout=30.0,
        )

    async def get(self, path: str, **params) -> dict:
        response = await self.client.get(path, params=params)
        response.raise_for_status()
        return response.json()

    async def close(self) -> None:
        await self.client.aclose()

    async def __aenter__(self) -> "APIClient":
        return self

    async def __aexit__(self, *args) -> None:
        await self.close()

Part 3: Testing with pytest

Basic Testing

import pytest
from typing import List

# Function to test
def add(a: int, b: int) -> int:
    return a + b

def divide(a: float, b: float) -> float:
    if b == 0:
        raise ValueError("Cannot divide by zero")
    return a / b

# Tests
def test_add():
    assert add(2, 3) == 5
    assert add(-1, 1) == 0
    assert add(0, 0) == 0

def test_divide():
    assert divide(10, 2) == 5.0
    assert divide(7, 2) == 3.5

def test_divide_by_zero():
    with pytest.raises(ValueError, match="Cannot divide by zero"):
        divide(10, 0)

# Parameterized tests
@pytest.mark.parametrize("a,b,expected", [
    (1, 2, 3),
    (0, 0, 0),
    (-1, 1, 0),
    (100, 200, 300),
])
def test_add_parametrized(a: int, b: int, expected: int):
    assert add(a, b) == expected

Fixtures

import pytest
from dataclasses import dataclass
from typing import Generator

@dataclass
class User:
    name: str
    email: str

class Database:
    def __init__(self):
        self.users: List[User] = []

    def add_user(self, user: User) -> None:
        self.users.append(user)

    def get_user(self, email: str) -> User | None:
        return next((u for u in self.users if u.email == email), None)

# Fixtures
@pytest.fixture
def sample_user() -> User:
    return User("Alice", "alice@example.com")

@pytest.fixture
def database() -> Generator[Database, None, None]:
    """Database fixture with cleanup."""
    db = Database()
    yield db
    db.users.clear()  # Cleanup

@pytest.fixture
def populated_database(database: Database, sample_user: User) -> Database:
    """Database with sample data."""
    database.add_user(sample_user)
    return database

# Tests using fixtures
def test_add_user(database: Database, sample_user: User):
    database.add_user(sample_user)
    assert len(database.users) == 1
    assert database.users[0].email == "alice@example.com"

def test_get_user(populated_database: Database):
    user = populated_database.get_user("alice@example.com")
    assert user is not None
    assert user.name == "Alice"

def test_get_nonexistent_user(database: Database):
    user = database.get_user("nobody@example.com")
    assert user is None

Mocking

from unittest.mock import Mock, patch, AsyncMock, MagicMock
import pytest

# Function that calls external service
def fetch_user_data(user_id: int) -> dict:
    import requests
    response = requests.get(f"https://api.example.com/users/{user_id}")
    return response.json()

# Test with mock
def test_fetch_user_data():
    with patch('requests.get') as mock_get:
        mock_get.return_value.json.return_value = {"id": 1, "name": "Alice"}

        result = fetch_user_data(1)

        assert result == {"id": 1, "name": "Alice"}
        mock_get.assert_called_once_with("https://api.example.com/users/1")

# Async mock
async def fetch_data_async(url: str) -> dict:
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.json()

@pytest.mark.asyncio
async def test_fetch_data_async():
    with patch('httpx.AsyncClient') as MockClient:
        mock_client = AsyncMock()
        mock_response = AsyncMock()
        mock_response.json.return_value = {"data": "test"}
        mock_client.get.return_value = mock_response
        MockClient.return_value.__aenter__.return_value = mock_client

        result = await fetch_data_async("http://example.com")

        assert result == {"data": "test"}

# Mock with side_effect
def test_retry_on_failure():
    with patch('requests.get') as mock_get:
        # First call fails, second succeeds
        mock_get.side_effect = [
            Exception("Connection error"),
            Mock(json=lambda: {"success": True})
        ]

        # Test retry logic...

Async Testing

import pytest
import asyncio

# Mark for async tests
@pytest.mark.asyncio
async def test_async_function():
    result = await async_operation()
    assert result == "expected"

# Async fixture
@pytest.fixture
async def async_client():
    client = AsyncClient()
    await client.connect()
    yield client
    await client.disconnect()

@pytest.mark.asyncio
async def test_with_async_fixture(async_client):
    result = await async_client.fetch("data")
    assert result is not None

# Test concurrent operations
@pytest.mark.asyncio
async def test_concurrent_fetches():
    urls = ["http://a.com", "http://b.com", "http://c.com"]
    results = await asyncio.gather(*[fetch_url(url) for url in urls])
    assert len(results) == 3

pytest Configuration (pyproject.toml)

[tool.pytest.ini_options]
minversion = "7.0"
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_functions = ["test_*"]
asyncio_mode = "auto"
addopts = [
    "-v",
    "--strict-markers",
    "--cov=src",
    "--cov-report=term-missing",
    "--cov-report=html",
    "--cov-fail-under=80",
]
markers = [
    "slow: marks tests as slow",
    "integration: integration tests",
]

[tool.coverage.run]
source = ["src"]
branch = true
omit = ["*/tests/*", "*/__pycache__/*"]

[tool.coverage.report]
exclude_lines = [
    "pragma: no cover",
    "if TYPE_CHECKING:",
    "raise NotImplementedError",
]

Part 4: Error Handling

Custom Exceptions

from typing import Any

class ApplicationError(Exception):
    """Base exception for application errors."""

    def __init__(self, message: str, code: str | None = None, details: dict | None = None):
        self.message = message
        self.code = code
        self.details = details or {}
        super().__init__(message)

    def to_dict(self) -> dict[str, Any]:
        return {
            "error": self.__class__.__name__,
            "message": self.message,
            "code": self.code,
            "details": self.details,
        }

class ValidationError(ApplicationError):
    """Raised when input validation fails."""
    pass

class NotFoundError(ApplicationError):
    """Raised when a resource is not found."""
    pass

class AuthenticationError(ApplicationError):
    """Raised when authentication fails."""
    pass

# Usage
def get_user(user_id: int) -> dict:
    if user_id < 0:
        raise ValidationError(
            message="User ID must be positive",
            code="INVALID_USER_ID",
            details={"user_id": user_id}
        )
    # ... fetch user
    raise NotFoundError(
        message=f"User {user_id} not found",
        code="USER_NOT_FOUND"
    )

Exception Handling Patterns

import logging
from contextlib import suppress

logger = logging.getLogger(__name__)

# Specific exception handling
def safe_divide(a: float, b: float) -> float | None:
    try:
        return a / b
    except ZeroDivisionError:
        logger.warning("Division by zero attempted")
        return None
    except TypeError as e:
        logger.error(f"Type error in division: {e}")
        raise ValueError(f"Invalid types for division: {type(a)}, {type(b)}") from e

# Multiple exception types
def process_data(data: Any) -> dict:
    try:
        result = parse_and_transform(data)
    except (ValueError, KeyError) as e:
        logger.error(f"Data processing error: {e}")
        raise ApplicationError(f"Failed to process data: {e}") from e
    except Exception as e:
        logger.exception("Unexpected error in data processing")
        raise
    else:
        logger.info("Data processed successfully")
        return result
    finally:
        cleanup_resources()

# Suppress specific exceptions
def get_optional_config(key: str) -> str | None:
    with suppress(KeyError, FileNotFoundError):
        return load_config()[key]
    return None

# Exception chaining
def fetch_and_parse(url: str) -> dict:
    try:
        response = fetch(url)
    except ConnectionError as e:
        raise ApplicationError(f"Failed to fetch {url}") from e

    try:
        return parse_json(response)
    except json.JSONDecodeError as e:
        raise ApplicationError(f"Invalid JSON from {url}") from e

Part 5: Performance Optimization

Profiling

import cProfile
import pstats
from io import StringIO
import time
from functools import wraps
from memory_profiler import profile  # pip install memory-profiler

# Time profiling decorator
def profile_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        pr = cProfile.Profile()
        pr.enable()
        result = func(*args, **kwargs)
        pr.disable()

        s = StringIO()
        ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
        ps.print_stats(20)
        print(s.getvalue())
        return result
    return wrapper

# Memory profiling
@profile
def memory_intensive_function():
    data = [i ** 2 for i in range(1000000)]
    return sum(data)

# Manual timing
def benchmark(func, *args, iterations: int = 100, **kwargs):
    times = []
    for _ in range(iterations):
        start = time.perf_counter()
        func(*args, **kwargs)
        times.append(time.perf_counter() - start)

    avg = sum(times) / len(times)
    min_t = min(times)
    max_t = max(times)
    print(f"{func.__name__}: avg={avg:.6f}s, min={min_t:.6f}s, max={max_t:.6f}s")

Common Optimizations

from collections import defaultdict, Counter
from itertools import islice, chain
from functools import lru_cache
import operator

# Use generators for large data
def process_large_file(path: str):
    with open(path) as f:
        for line in f:  # Generator - one line at a time
            yield process_line(line)

# Use dict.get() instead of key checking
def get_value(d: dict, key: str, default: int = 0) -> int:
    # Bad: if key in d: return d[key] else: return default
    return d.get(key, default)  # Good

# Use defaultdict for grouping
def group_by_category(items: List[dict]) -> dict:
    grouped = defaultdict(list)
    for item in items:
        grouped[item['category']].append(item)
    return dict(grouped)

# Use Counter for counting
def count_words(text: str) -> dict:
    words = text.lower().split()
    return Counter(words)

# Use set for membership testing
def find_common(list1: List[int], list2: List[int]) -> List[int]:
    set2 = set(list2)  # O(n) creation, O(1) lookup
    return [x for x in list1 if x in set2]

# Use lru_cache for memoization
@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

# Use operator module for key functions
from operator import itemgetter, attrgetter

# Sort by dict key
items = [{'name': 'b', 'value': 2}, {'name': 'a', 'value': 1}]
sorted_items = sorted(items, key=itemgetter('name'))

# Sort by attribute
users = [User('Bob', 30), User('Alice', 25)]
sorted_users = sorted(users, key=attrgetter('age'))

# String joining (not concatenation)
def build_string(parts: List[str]) -> str:
    return ''.join(parts)  # Good
    # return result += part for part in parts  # Bad - O(n²)

# List comprehension vs map/filter
numbers = [1, 2, 3, 4, 5]
squares = [x**2 for x in numbers]  # Preferred
evens = [x for x in numbers if x % 2 == 0]  # Preferred

Slots for Memory Optimization

class OptimizedUser:
    __slots__ = ['name', 'email', 'age']

    def __init__(self, name: str, email: str, age: int):
        self.name = name
        self.email = email
        self.age = age

# With dataclass
from dataclasses import dataclass

@dataclass(slots=True)
class OptimizedDataUser:
    name: str
    email: str
    age: int

Part 6: File I/O and Pathlib

Modern Path Handling

from pathlib import Path
from typing import Iterator
import json
import shutil

# Path operations
project_root = Path(__file__).parent.parent
config_dir = project_root / "config"
data_file = config_dir / "settings.json"

# Check existence
if data_file.exists():
    print(f"File exists: {data_file}")

# Create directories
config_dir.mkdir(parents=True, exist_ok=True)

# Read/write files
def read_json(path: Path) -> dict:
    return json.loads(path.read_text(encoding='utf-8'))

def write_json(path: Path, data: dict) -> None:
    path.write_text(json.dumps(data, indent=2), encoding='utf-8')

# Find files
def find_python_files(directory: Path) -> Iterator[Path]:
    yield from directory.rglob("*.py")

# File info
def file_info(path: Path) -> dict:
    stat = path.stat()
    return {
        "name": path.name,
        "stem": path.stem,
        "suffix": path.suffix,
        "size": stat.st_size,
        "modified": stat.st_mtime,
        "is_file": path.is_file(),
        "is_dir": path.is_dir(),
    }

# Safe file operations
def safe_copy(src: Path, dst: Path) -> None:
    dst.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy2(src, dst)

def safe_delete(path: Path) -> bool:
    try:
        if path.is_file():
            path.unlink()
        elif path.is_dir():
            shutil.rmtree(path)
        return True
    except (OSError, PermissionError) as e:
        logger.error(f"Failed to delete {path}: {e}")
        return False

Temporary Files and Directories

import tempfile
from pathlib import Path
from contextlib import contextmanager

@contextmanager
def temp_directory():
    """Create and cleanup a temporary directory."""
    path = Path(tempfile.mkdtemp())
    try:
        yield path
    finally:
        shutil.rmtree(path)

# Usage
with temp_directory() as tmp:
    data_file = tmp / "data.json"
    data_file.write_text('{"key": "value"}')
    # Work with temporary files...
# Directory automatically cleaned up

Part 7: Logging

Structured Logging Setup

import logging
import logging.handlers
import json
from datetime import datetime
from pathlib import Path

# Custom JSON formatter
class JSONFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }
        if record.exc_info:
            log_data["exception"] = self.formatException(record.exc_info)
        if hasattr(record, "extra"):
            log_data.update(record.extra)
        return json.dumps(log_data)

def setup_logging(
    level: int = logging.INFO,
    log_dir: Path = Path("logs"),
    json_format: bool = True,
) -> logging.Logger:
    """Configure application logging."""
    log_dir.mkdir(exist_ok=True)

    logger = logging.getLogger()
    logger.setLevel(level)

    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    console_handler.setFormatter(logging.Formatter(console_format))
    logger.addHandler(console_handler)

    # File handler with rotation
    file_handler = logging.handlers.RotatingFileHandler(
        log_dir / "app.log",
        maxBytes=10_000_000,  # 10MB
        backupCount=5,
    )
    file_handler.setLevel(logging.DEBUG)
    if json_format:
        file_handler.setFormatter(JSONFormatter())
    else:
        file_handler.setFormatter(logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        ))
    logger.addHandler(file_handler)

    return logger

# Usage with extra context
logger = logging.getLogger(__name__)

def process_request(request_id: str, user_id: int):
    logger.info(
        "Processing request",
        extra={"request_id": request_id, "user_id": user_id}
    )

Part 8: Configuration Management

Environment Variables and Settings

import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class DatabaseConfig:
    host: str = "localhost"
    port: int = 5432
    name: str = "mydb"
    user: str = "postgres"
    password: str = ""

    @property
    def url(self) -> str:
        return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.name}"

@dataclass
class AppConfig:
    debug: bool = False
    log_level: str = "INFO"
    database: DatabaseConfig = field(default_factory=DatabaseConfig)

    @classmethod
    def from_env(cls) -> "AppConfig":
        return cls(
            debug=os.getenv("DEBUG", "false").lower() == "true",
            log_level=os.getenv("LOG_LEVEL", "INFO"),
            database=DatabaseConfig(
                host=os.getenv("DB_HOST", "localhost"),
                port=int(os.getenv("DB_PORT", "5432")),
                name=os.getenv("DB_NAME", "mydb"),
                user=os.getenv("DB_USER", "postgres"),
                password=os.getenv("DB_PASSWORD", ""),
            ),
        )

# Pydantic settings (recommended)
from pydantic import BaseSettings, PostgresDsn

class Settings(BaseSettings):
    debug: bool = False
    database_url: PostgresDsn
    api_key: str

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

settings = Settings()

Part 9: Standard Library Essentials

Collections

from collections import (
    defaultdict, Counter, deque, namedtuple,
    OrderedDict, ChainMap
)

# defaultdict - auto-initialize missing keys
word_count = defaultdict(int)
for word in words:
    word_count[word] += 1

# Counter - counting made easy
counter = Counter(["a", "b", "a", "c", "a", "b"])
counter.most_common(2)  # [('a', 3), ('b', 2)]

# deque - efficient queue operations
queue = deque(maxlen=100)
queue.append("item")      # Right side
queue.appendleft("item")  # Left side
queue.popleft()           # O(1) vs list.pop(0) O(n)

# namedtuple - lightweight immutable objects
Point = namedtuple('Point', ['x', 'y'])
p = Point(10, 20)
print(p.x, p.y)

# ChainMap - search multiple dicts
defaults = {"color": "red", "size": "medium"}
overrides = {"color": "blue"}
config = ChainMap(overrides, defaults)
print(config["color"])  # "blue"
print(config["size"])   # "medium"

itertools

from itertools import (
    chain, islice, groupby, combinations,
    permutations, product, cycle, repeat,
    takewhile, dropwhile, filterfalse, zip_longest
)

# chain - flatten iterables
all_items = list(chain([1, 2], [3, 4], [5, 6]))

# islice - slice any iterable
first_10 = list(islice(infinite_generator(), 10))

# groupby - group consecutive items
data = [("a", 1), ("a", 2), ("b", 3), ("b", 4)]
for key, group in groupby(data, key=lambda x: x[0]):
    print(key, list(group))

# combinations and permutations
list(combinations([1, 2, 3], 2))  # [(1,2), (1,3), (2,3)]
list(permutations([1, 2], 2))    # [(1,2), (2,1)]

# product - cartesian product
list(product([1, 2], ['a', 'b']))  # [(1,'a'), (1,'b'), (2,'a'), (2,'b')]

# takewhile/dropwhile
list(takewhile(lambda x: x < 5, [1, 3, 5, 7]))  # [1, 3]

functools

from functools import (
    lru_cache, cache, partial, reduce,
    wraps, singledispatch, cached_property
)

# lru_cache - memoization
@lru_cache(maxsize=128)
def expensive_computation(n: int) -> int:
    return sum(i**2 for i in range(n))

# cache (Python 3.9+) - unlimited cache
@cache
def factorial(n: int) -> int:
    return n * factorial(n-1) if n else 1

# partial - fix some arguments
def power(base: int, exponent: int) -> int:
    return base ** exponent

square = partial(power, exponent=2)
cube = partial(power, exponent=3)

# reduce
from functools import reduce
product = reduce(lambda x, y: x * y, [1, 2, 3, 4])  # 24

# singledispatch - function overloading
@singledispatch
def process(data):
    raise TypeError(f"Unsupported type: {type(data)}")

@process.register(list)
def _(data: list):
    return [x * 2 for x in data]

@process.register(dict)
def _(data: dict):
    return {k: v * 2 for k, v in data.items()}

# cached_property (Python 3.8+)
class DataLoader:
    @cached_property
    def data(self) -> list:
        return expensive_load()

Part 10: Quick Reference

Common Operations

# String formatting
name = "Alice"
age = 30
f"Name: {name}, Age: {age}"                    # f-string
f"Price: ${price:.2f}"                         # 2 decimal places
f"Padded: {num:05d}"                           # Zero-padded
f"Aligned: {text:<20}"                         # Left align 20 chars

# List operations
items = [1, 2, 3, 4, 5]
items[-1]                                       # Last item
items[::2]                                      # Every 2nd item
items[::-1]                                     # Reversed
[*items, 6, 7]                                  # Spread/unpack

# Dict operations
d = {"a": 1, "b": 2}
d.get("c", 0)                                   # Default value
d | {"c": 3}                                    # Merge (Python 3.9+)
{**d, "c": 3}                                   # Spread merge
{k: v for k, v in d.items() if v > 1}          # Filter

# Set operations
s1 = {1, 2, 3}
s2 = {2, 3, 4}
s1 | s2                                         # Union
s1 & s2                                         # Intersection
s1 - s2                                         # Difference
s1 ^ s2                                         # Symmetric difference

# Walrus operator (Python 3.8+)
if (n := len(data)) > 10:
    print(f"List has {n} items")

# Match statement (Python 3.10+)
match status_code:
    case 200:
        return "OK"
    case 404:
        return "Not Found"
    case 500 | 502 | 503:
        return "Server Error"
    case _:
        return "Unknown"

Common Imports

# Standard library
from pathlib import Path
from typing import Optional, List, Dict, Any, Callable, TypeVar
from dataclasses import dataclass, field
from collections import defaultdict, Counter
from functools import lru_cache, partial
from contextlib import contextmanager, suppress
from datetime import datetime, timedelta, timezone
import json
import logging
import os
import sys
import re
import asyncio

# Type checking only
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from expensive_module import HeavyClass

pyproject.toml Template

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "myproject"
version = "0.1.0"
description = "Project description"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
    "httpx>=0.24",
    "pydantic>=2.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0",
    "pytest-cov>=4.0",
    "pytest-asyncio>=0.21",
    "mypy>=1.0",
    "ruff>=0.1",
]

[tool.ruff]
target-version = "py311"
line-length = 100
select = ["E", "F", "I", "N", "W", "UP"]

[tool.ruff.isort]
known-first-party = ["myproject"]

[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "-v --cov=src --cov-report=term-missing"

Quality Checklist

All Python code must meet:

  • Type hints on all functions and methods
  • mypy strict passes without errors
  • pytest tests with >80% coverage
  • ruff linting passes
  • Docstrings for public API
  • Error handling with custom exceptions
  • Logging instead of print statements
  • No hardcoded secrets - use environment variables
  • Path handling with pathlib, not string manipulation
  • Context managers for resource cleanup
  • Async where I/O bound operations benefit
  • Generators for large data processing

Output Deliverables

When completing Python tasks:

  1. Clean, type-annotated code following PEP 8
  2. Comprehensive pytest tests with fixtures and mocks
  3. Error handling with custom exception hierarchy
  4. Configuration via environment variables or settings class
  5. Logging with appropriate levels and context
  6. Documentation via docstrings and type hints
  7. Performance considerations documented if relevant