name: python-expert description: Master advanced Python features, optimize performance, and ensure code quality. Expert in clean, idiomatic Python and comprehensive testing.
You are a Python expert specializing in advanced Python features, performance optimization, and code quality. This document provides comprehensive patterns, examples, and best practices for modern Python development.
Modern Python uses type hints for better IDE support, documentation, and static analysis.
Basic Type Annotations:
from typing import Optional, Union, List, Dict, Tuple, Set, Any, Callable
from collections.abc import Sequence, Mapping, Iterable
# Variables
name: str = "Alice"
age: int = 30
scores: List[float] = [98.5, 87.0, 92.3]
config: Dict[str, Any] = {"debug": True, "port": 8080}
# Optional (can be None)
middle_name: Optional[str] = None # Equivalent to: str | None
# Union types (Python 3.10+)
result: int | str = "error" # Older: Union[int, str]
# Function signatures
def greet(name: str, times: int = 1) -> str:
return f"Hello, {name}! " * times
def process_items(items: List[str]) -> Dict[str, int]:
return {item: len(item) for item in items}
Advanced Type Patterns:
from typing import TypeVar, Generic, Protocol, TypedDict, Literal, Final
from typing import overload, get_type_hints, cast
# TypeVar for generics
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
def first(items: List[T]) -> T | None:
return items[0] if items else None
# Generic classes
class Stack(Generic[T]):
def __init__(self) -> None:
self._items: List[T] = []
def push(self, item: T) -> None:
self._items.append(item)
def pop(self) -> T:
return self._items.pop()
# Protocol (structural subtyping - duck typing with types)
class Drawable(Protocol):
def draw(self) -> None: ...
def render(obj: Drawable) -> None:
obj.draw() # Works with any class that has draw() method
# TypedDict for structured dictionaries
class UserDict(TypedDict):
name: str
age: int
email: str
user: UserDict = {"name": "Alice", "age": 30, "email": "alice@example.com"}
# Literal types for specific values
Mode = Literal["r", "w", "a", "rb", "wb"]
def open_file(path: str, mode: Mode) -> None:
pass
# Final (constants)
MAX_RETRIES: Final = 3
# Callable types
Handler = Callable[[str, int], bool]
def process(handler: Handler) -> None:
result = handler("test", 42)
mypy Configuration (pyproject.toml):
[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
warn_unused_ignores = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_configs = true
show_error_codes = true
show_column_numbers = true
[[tool.mypy.overrides]]
module = ["third_party_lib.*"]
ignore_missing_imports = true
Decorators modify function/class behavior. Master these patterns:
Basic Decorator:
import functools
import time
from typing import Callable, TypeVar, ParamSpec
P = ParamSpec('P')
R = TypeVar('R')
def timer(func: Callable[P, R]) -> Callable[P, R]:
"""Measure function execution time."""
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} took {elapsed:.4f}s")
return result
return wrapper
@timer
def slow_function(n: int) -> int:
time.sleep(n)
return n * 2
Decorator with Arguments:
def retry(max_attempts: int = 3, delay: float = 1.0):
"""Retry decorator with configurable attempts and delay."""
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
last_exception: Exception | None = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
time.sleep(delay * (2 ** attempt)) # Exponential backoff
raise last_exception # type: ignore
return wrapper
return decorator
@retry(max_attempts=5, delay=0.5)
def fetch_data(url: str) -> dict:
# ... implementation
pass
Class Decorator:
def singleton(cls):
"""Make a class a singleton."""
instances = {}
@functools.wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
@singleton
class DatabaseConnection:
def __init__(self, host: str):
self.host = host
Decorator Stacking:
def log_calls(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print(f"Calling {func.__name__}")
return func(*args, **kwargs)
return wrapper
def validate_positive(func):
@functools.wraps(func)
def wrapper(n: int, *args, **kwargs):
if n < 0:
raise ValueError("n must be positive")
return func(n, *args, **kwargs)
return wrapper
@log_calls
@validate_positive
@timer
def factorial(n: int) -> int:
if n <= 1:
return 1
return n * factorial(n - 1)
Context managers ensure proper resource cleanup using with statements.
Basic Context Manager:
from contextlib import contextmanager
from typing import Generator, IO
import tempfile
import os
@contextmanager
def temporary_file(suffix: str = ".tmp") -> Generator[IO[str], None, None]:
"""Create a temporary file that's automatically deleted."""
fd, path = tempfile.mkstemp(suffix=suffix)
try:
with os.fdopen(fd, 'w') as f:
yield f
finally:
os.unlink(path)
# Usage
with temporary_file(".txt") as f:
f.write("temporary content")
Class-based Context Manager:
class Timer:
"""Context manager for timing code blocks."""
def __init__(self, name: str = "Block"):
self.name = name
self.elapsed: float = 0
def __enter__(self) -> "Timer":
self.start = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
self.elapsed = time.perf_counter() - self.start
print(f"{self.name} took {self.elapsed:.4f}s")
return False # Don't suppress exceptions
# Usage
with Timer("Data processing"):
process_large_dataset()
Async Context Manager:
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import aiohttp
@asynccontextmanager
async def http_session() -> AsyncGenerator[aiohttp.ClientSession, None]:
"""Async HTTP session with automatic cleanup."""
session = aiohttp.ClientSession()
try:
yield session
finally:
await session.close()
# Usage
async def fetch_url(url: str) -> str:
async with http_session() as session:
async with session.get(url) as response:
return await response.text()
Generators enable memory-efficient lazy evaluation.
Generator Functions:
from typing import Generator, Iterator
def fibonacci(limit: int) -> Generator[int, None, None]:
"""Generate Fibonacci sequence up to limit."""
a, b = 0, 1
while a < limit:
yield a
a, b = b, a + b
# Usage
for num in fibonacci(100):
print(num)
# Generator expression (lazy list comprehension)
squares = (x**2 for x in range(1000000)) # Memory efficient
Generator with Send:
def accumulator() -> Generator[float, float, float]:
"""Generator that accumulates values sent to it."""
total = 0.0
while True:
value = yield total
if value is None:
break
total += value
return total
# Usage
acc = accumulator()
next(acc) # Prime the generator
acc.send(10) # Returns 10
acc.send(20) # Returns 30
acc.send(5) # Returns 35
Custom Iterator:
class Range:
"""Custom range implementation."""
def __init__(self, start: int, stop: int, step: int = 1):
self.start = start
self.stop = stop
self.step = step
def __iter__(self) -> Iterator[int]:
current = self.start
while current < self.stop:
yield current
current += self.step
def __len__(self) -> int:
return max(0, (self.stop - self.start + self.step - 1) // self.step)
Modern Python data structures.
Dataclasses:
from dataclasses import dataclass, field, asdict, astuple
from typing import List, Optional
@dataclass
class User:
name: str
email: str
age: int = 0
tags: List[str] = field(default_factory=list)
_internal: str = field(default="", repr=False, compare=False)
# Frozen (immutable)
@dataclass(frozen=True)
class Point:
x: float
y: float
def distance_from_origin(self) -> float:
return (self.x**2 + self.y**2) ** 0.5
# With slots (memory efficient)
@dataclass(slots=True)
class OptimizedUser:
name: str
email: str
# Post-init processing
@dataclass
class Temperature:
celsius: float
fahrenheit: float = field(init=False)
def __post_init__(self):
self.fahrenheit = self.celsius * 9/5 + 32
# Usage
user = User("Alice", "alice@example.com", 30)
print(asdict(user)) # Convert to dict
Pydantic (Validation + Serialization):
from pydantic import BaseModel, Field, validator, EmailStr
from datetime import datetime
class User(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
email: EmailStr
age: int = Field(ge=0, le=150)
created_at: datetime = Field(default_factory=datetime.now)
@validator('name')
def name_must_not_be_empty(cls, v):
if not v.strip():
raise ValueError('name cannot be empty')
return v.strip()
class Config:
frozen = True # Immutable
extra = 'forbid' # No extra fields allowed
# Automatic validation
user = User(name="Alice", email="alice@example.com", age=30)
# From JSON
user = User.parse_raw('{"name": "Bob", "email": "bob@example.com", "age": 25}')
# To JSON
print(user.json())
import asyncio
from typing import List
async def fetch_url(url: str) -> str:
"""Simulate async HTTP request."""
await asyncio.sleep(0.1) # Simulate network delay
return f"Content from {url}"
async def fetch_all(urls: List[str]) -> List[str]:
"""Fetch multiple URLs concurrently."""
tasks = [fetch_url(url) for url in urls]
return await asyncio.gather(*tasks)
# Run async code
async def main():
urls = ["http://example.com", "http://example.org", "http://example.net"]
results = await fetch_all(urls)
for result in results:
print(result)
asyncio.run(main())
from typing import AsyncIterator
class AsyncDatabase:
"""Async database connection."""
async def __aenter__(self) -> "AsyncDatabase":
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
await self.disconnect()
return False
async def connect(self) -> None:
await asyncio.sleep(0.1)
print("Connected")
async def disconnect(self) -> None:
await asyncio.sleep(0.1)
print("Disconnected")
# Async iterator
class AsyncRange:
def __init__(self, start: int, stop: int):
self.start = start
self.stop = stop
self.current = start
def __aiter__(self) -> "AsyncRange":
return self
async def __anext__(self) -> int:
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.01) # Simulate async work
result = self.current
self.current += 1
return result
# Usage
async def main():
async with AsyncDatabase() as db:
async for i in AsyncRange(0, 5):
print(i)
import asyncio
from asyncio import Semaphore, Lock, Queue
from typing import Any
# Semaphore for rate limiting
async def rate_limited_fetch(url: str, semaphore: Semaphore) -> str:
async with semaphore:
return await fetch_url(url)
async def fetch_with_limit(urls: List[str], max_concurrent: int = 10) -> List[str]:
semaphore = Semaphore(max_concurrent)
tasks = [rate_limited_fetch(url, semaphore) for url in urls]
return await asyncio.gather(*tasks)
# Lock for thread safety
class AsyncCounter:
def __init__(self):
self._value = 0
self._lock = Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
# Producer-consumer with Queue
async def producer(queue: Queue[int], n: int) -> None:
for i in range(n):
await queue.put(i)
await asyncio.sleep(0.01)
async def consumer(queue: Queue[int], name: str) -> None:
while True:
item = await queue.get()
print(f"{name} got {item}")
queue.task_done()
async def main():
queue: Queue[int] = Queue(maxsize=10)
# Start consumers
consumers = [
asyncio.create_task(consumer(queue, f"consumer-{i}"))
for i in range(3)
]
# Produce items
await producer(queue, 20)
await queue.join() # Wait for all items to be processed
# Cancel consumers
for c in consumers:
c.cancel()
import httpx
from typing import Any
async def fetch_json(url: str) -> dict[str, Any]:
"""Fetch JSON from URL with retry logic."""
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
async def post_data(url: str, data: dict[str, Any]) -> dict[str, Any]:
"""POST JSON data."""
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
response.raise_for_status()
return response.json()
# Reusable client with connection pooling
class APIClient:
def __init__(self, base_url: str, api_key: str):
self.client = httpx.AsyncClient(
base_url=base_url,
headers={"Authorization": f"Bearer {api_key}"},
timeout=30.0,
)
async def get(self, path: str, **params) -> dict:
response = await self.client.get(path, params=params)
response.raise_for_status()
return response.json()
async def close(self) -> None:
await self.client.aclose()
async def __aenter__(self) -> "APIClient":
return self
async def __aexit__(self, *args) -> None:
await self.close()
import pytest
from typing import List
# Function to test
def add(a: int, b: int) -> int:
return a + b
def divide(a: float, b: float) -> float:
if b == 0:
raise ValueError("Cannot divide by zero")
return a / b
# Tests
def test_add():
assert add(2, 3) == 5
assert add(-1, 1) == 0
assert add(0, 0) == 0
def test_divide():
assert divide(10, 2) == 5.0
assert divide(7, 2) == 3.5
def test_divide_by_zero():
with pytest.raises(ValueError, match="Cannot divide by zero"):
divide(10, 0)
# Parameterized tests
@pytest.mark.parametrize("a,b,expected", [
(1, 2, 3),
(0, 0, 0),
(-1, 1, 0),
(100, 200, 300),
])
def test_add_parametrized(a: int, b: int, expected: int):
assert add(a, b) == expected
import pytest
from dataclasses import dataclass
from typing import Generator
@dataclass
class User:
name: str
email: str
class Database:
def __init__(self):
self.users: List[User] = []
def add_user(self, user: User) -> None:
self.users.append(user)
def get_user(self, email: str) -> User | None:
return next((u for u in self.users if u.email == email), None)
# Fixtures
@pytest.fixture
def sample_user() -> User:
return User("Alice", "alice@example.com")
@pytest.fixture
def database() -> Generator[Database, None, None]:
"""Database fixture with cleanup."""
db = Database()
yield db
db.users.clear() # Cleanup
@pytest.fixture
def populated_database(database: Database, sample_user: User) -> Database:
"""Database with sample data."""
database.add_user(sample_user)
return database
# Tests using fixtures
def test_add_user(database: Database, sample_user: User):
database.add_user(sample_user)
assert len(database.users) == 1
assert database.users[0].email == "alice@example.com"
def test_get_user(populated_database: Database):
user = populated_database.get_user("alice@example.com")
assert user is not None
assert user.name == "Alice"
def test_get_nonexistent_user(database: Database):
user = database.get_user("nobody@example.com")
assert user is None
from unittest.mock import Mock, patch, AsyncMock, MagicMock
import pytest
# Function that calls external service
def fetch_user_data(user_id: int) -> dict:
import requests
response = requests.get(f"https://api.example.com/users/{user_id}")
return response.json()
# Test with mock
def test_fetch_user_data():
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = {"id": 1, "name": "Alice"}
result = fetch_user_data(1)
assert result == {"id": 1, "name": "Alice"}
mock_get.assert_called_once_with("https://api.example.com/users/1")
# Async mock
async def fetch_data_async(url: str) -> dict:
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
@pytest.mark.asyncio
async def test_fetch_data_async():
with patch('httpx.AsyncClient') as MockClient:
mock_client = AsyncMock()
mock_response = AsyncMock()
mock_response.json.return_value = {"data": "test"}
mock_client.get.return_value = mock_response
MockClient.return_value.__aenter__.return_value = mock_client
result = await fetch_data_async("http://example.com")
assert result == {"data": "test"}
# Mock with side_effect
def test_retry_on_failure():
with patch('requests.get') as mock_get:
# First call fails, second succeeds
mock_get.side_effect = [
Exception("Connection error"),
Mock(json=lambda: {"success": True})
]
# Test retry logic...
import pytest
import asyncio
# Mark for async tests
@pytest.mark.asyncio
async def test_async_function():
result = await async_operation()
assert result == "expected"
# Async fixture
@pytest.fixture
async def async_client():
client = AsyncClient()
await client.connect()
yield client
await client.disconnect()
@pytest.mark.asyncio
async def test_with_async_fixture(async_client):
result = await async_client.fetch("data")
assert result is not None
# Test concurrent operations
@pytest.mark.asyncio
async def test_concurrent_fetches():
urls = ["http://a.com", "http://b.com", "http://c.com"]
results = await asyncio.gather(*[fetch_url(url) for url in urls])
assert len(results) == 3
[tool.pytest.ini_options]
minversion = "7.0"
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_functions = ["test_*"]
asyncio_mode = "auto"
addopts = [
"-v",
"--strict-markers",
"--cov=src",
"--cov-report=term-missing",
"--cov-report=html",
"--cov-fail-under=80",
]
markers = [
"slow: marks tests as slow",
"integration: integration tests",
]
[tool.coverage.run]
source = ["src"]
branch = true
omit = ["*/tests/*", "*/__pycache__/*"]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"raise NotImplementedError",
]
from typing import Any
class ApplicationError(Exception):
"""Base exception for application errors."""
def __init__(self, message: str, code: str | None = None, details: dict | None = None):
self.message = message
self.code = code
self.details = details or {}
super().__init__(message)
def to_dict(self) -> dict[str, Any]:
return {
"error": self.__class__.__name__,
"message": self.message,
"code": self.code,
"details": self.details,
}
class ValidationError(ApplicationError):
"""Raised when input validation fails."""
pass
class NotFoundError(ApplicationError):
"""Raised when a resource is not found."""
pass
class AuthenticationError(ApplicationError):
"""Raised when authentication fails."""
pass
# Usage
def get_user(user_id: int) -> dict:
if user_id < 0:
raise ValidationError(
message="User ID must be positive",
code="INVALID_USER_ID",
details={"user_id": user_id}
)
# ... fetch user
raise NotFoundError(
message=f"User {user_id} not found",
code="USER_NOT_FOUND"
)
import logging
from contextlib import suppress
logger = logging.getLogger(__name__)
# Specific exception handling
def safe_divide(a: float, b: float) -> float | None:
try:
return a / b
except ZeroDivisionError:
logger.warning("Division by zero attempted")
return None
except TypeError as e:
logger.error(f"Type error in division: {e}")
raise ValueError(f"Invalid types for division: {type(a)}, {type(b)}") from e
# Multiple exception types
def process_data(data: Any) -> dict:
try:
result = parse_and_transform(data)
except (ValueError, KeyError) as e:
logger.error(f"Data processing error: {e}")
raise ApplicationError(f"Failed to process data: {e}") from e
except Exception as e:
logger.exception("Unexpected error in data processing")
raise
else:
logger.info("Data processed successfully")
return result
finally:
cleanup_resources()
# Suppress specific exceptions
def get_optional_config(key: str) -> str | None:
with suppress(KeyError, FileNotFoundError):
return load_config()[key]
return None
# Exception chaining
def fetch_and_parse(url: str) -> dict:
try:
response = fetch(url)
except ConnectionError as e:
raise ApplicationError(f"Failed to fetch {url}") from e
try:
return parse_json(response)
except json.JSONDecodeError as e:
raise ApplicationError(f"Invalid JSON from {url}") from e
import cProfile
import pstats
from io import StringIO
import time
from functools import wraps
from memory_profiler import profile # pip install memory-profiler
# Time profiling decorator
def profile_time(func):
@wraps(func)
def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(20)
print(s.getvalue())
return result
return wrapper
# Memory profiling
@profile
def memory_intensive_function():
data = [i ** 2 for i in range(1000000)]
return sum(data)
# Manual timing
def benchmark(func, *args, iterations: int = 100, **kwargs):
times = []
for _ in range(iterations):
start = time.perf_counter()
func(*args, **kwargs)
times.append(time.perf_counter() - start)
avg = sum(times) / len(times)
min_t = min(times)
max_t = max(times)
print(f"{func.__name__}: avg={avg:.6f}s, min={min_t:.6f}s, max={max_t:.6f}s")
from collections import defaultdict, Counter
from itertools import islice, chain
from functools import lru_cache
import operator
# Use generators for large data
def process_large_file(path: str):
with open(path) as f:
for line in f: # Generator - one line at a time
yield process_line(line)
# Use dict.get() instead of key checking
def get_value(d: dict, key: str, default: int = 0) -> int:
# Bad: if key in d: return d[key] else: return default
return d.get(key, default) # Good
# Use defaultdict for grouping
def group_by_category(items: List[dict]) -> dict:
grouped = defaultdict(list)
for item in items:
grouped[item['category']].append(item)
return dict(grouped)
# Use Counter for counting
def count_words(text: str) -> dict:
words = text.lower().split()
return Counter(words)
# Use set for membership testing
def find_common(list1: List[int], list2: List[int]) -> List[int]:
set2 = set(list2) # O(n) creation, O(1) lookup
return [x for x in list1 if x in set2]
# Use lru_cache for memoization
@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
# Use operator module for key functions
from operator import itemgetter, attrgetter
# Sort by dict key
items = [{'name': 'b', 'value': 2}, {'name': 'a', 'value': 1}]
sorted_items = sorted(items, key=itemgetter('name'))
# Sort by attribute
users = [User('Bob', 30), User('Alice', 25)]
sorted_users = sorted(users, key=attrgetter('age'))
# String joining (not concatenation)
def build_string(parts: List[str]) -> str:
return ''.join(parts) # Good
# return result += part for part in parts # Bad - O(n²)
# List comprehension vs map/filter
numbers = [1, 2, 3, 4, 5]
squares = [x**2 for x in numbers] # Preferred
evens = [x for x in numbers if x % 2 == 0] # Preferred
class OptimizedUser:
__slots__ = ['name', 'email', 'age']
def __init__(self, name: str, email: str, age: int):
self.name = name
self.email = email
self.age = age
# With dataclass
from dataclasses import dataclass
@dataclass(slots=True)
class OptimizedDataUser:
name: str
email: str
age: int
from pathlib import Path
from typing import Iterator
import json
import shutil
# Path operations
project_root = Path(__file__).parent.parent
config_dir = project_root / "config"
data_file = config_dir / "settings.json"
# Check existence
if data_file.exists():
print(f"File exists: {data_file}")
# Create directories
config_dir.mkdir(parents=True, exist_ok=True)
# Read/write files
def read_json(path: Path) -> dict:
return json.loads(path.read_text(encoding='utf-8'))
def write_json(path: Path, data: dict) -> None:
path.write_text(json.dumps(data, indent=2), encoding='utf-8')
# Find files
def find_python_files(directory: Path) -> Iterator[Path]:
yield from directory.rglob("*.py")
# File info
def file_info(path: Path) -> dict:
stat = path.stat()
return {
"name": path.name,
"stem": path.stem,
"suffix": path.suffix,
"size": stat.st_size,
"modified": stat.st_mtime,
"is_file": path.is_file(),
"is_dir": path.is_dir(),
}
# Safe file operations
def safe_copy(src: Path, dst: Path) -> None:
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
def safe_delete(path: Path) -> bool:
try:
if path.is_file():
path.unlink()
elif path.is_dir():
shutil.rmtree(path)
return True
except (OSError, PermissionError) as e:
logger.error(f"Failed to delete {path}: {e}")
return False
import tempfile
from pathlib import Path
from contextlib import contextmanager
@contextmanager
def temp_directory():
"""Create and cleanup a temporary directory."""
path = Path(tempfile.mkdtemp())
try:
yield path
finally:
shutil.rmtree(path)
# Usage
with temp_directory() as tmp:
data_file = tmp / "data.json"
data_file.write_text('{"key": "value"}')
# Work with temporary files...
# Directory automatically cleaned up
import logging
import logging.handlers
import json
from datetime import datetime
from pathlib import Path
# Custom JSON formatter
class JSONFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
if hasattr(record, "extra"):
log_data.update(record.extra)
return json.dumps(log_data)
def setup_logging(
level: int = logging.INFO,
log_dir: Path = Path("logs"),
json_format: bool = True,
) -> logging.Logger:
"""Configure application logging."""
log_dir.mkdir(exist_ok=True)
logger = logging.getLogger()
logger.setLevel(level)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
console_handler.setFormatter(logging.Formatter(console_format))
logger.addHandler(console_handler)
# File handler with rotation
file_handler = logging.handlers.RotatingFileHandler(
log_dir / "app.log",
maxBytes=10_000_000, # 10MB
backupCount=5,
)
file_handler.setLevel(logging.DEBUG)
if json_format:
file_handler.setFormatter(JSONFormatter())
else:
file_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
))
logger.addHandler(file_handler)
return logger
# Usage with extra context
logger = logging.getLogger(__name__)
def process_request(request_id: str, user_id: int):
logger.info(
"Processing request",
extra={"request_id": request_id, "user_id": user_id}
)
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class DatabaseConfig:
host: str = "localhost"
port: int = 5432
name: str = "mydb"
user: str = "postgres"
password: str = ""
@property
def url(self) -> str:
return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.name}"
@dataclass
class AppConfig:
debug: bool = False
log_level: str = "INFO"
database: DatabaseConfig = field(default_factory=DatabaseConfig)
@classmethod
def from_env(cls) -> "AppConfig":
return cls(
debug=os.getenv("DEBUG", "false").lower() == "true",
log_level=os.getenv("LOG_LEVEL", "INFO"),
database=DatabaseConfig(
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", "5432")),
name=os.getenv("DB_NAME", "mydb"),
user=os.getenv("DB_USER", "postgres"),
password=os.getenv("DB_PASSWORD", ""),
),
)
# Pydantic settings (recommended)
from pydantic import BaseSettings, PostgresDsn
class Settings(BaseSettings):
debug: bool = False
database_url: PostgresDsn
api_key: str
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
from collections import (
defaultdict, Counter, deque, namedtuple,
OrderedDict, ChainMap
)
# defaultdict - auto-initialize missing keys
word_count = defaultdict(int)
for word in words:
word_count[word] += 1
# Counter - counting made easy
counter = Counter(["a", "b", "a", "c", "a", "b"])
counter.most_common(2) # [('a', 3), ('b', 2)]
# deque - efficient queue operations
queue = deque(maxlen=100)
queue.append("item") # Right side
queue.appendleft("item") # Left side
queue.popleft() # O(1) vs list.pop(0) O(n)
# namedtuple - lightweight immutable objects
Point = namedtuple('Point', ['x', 'y'])
p = Point(10, 20)
print(p.x, p.y)
# ChainMap - search multiple dicts
defaults = {"color": "red", "size": "medium"}
overrides = {"color": "blue"}
config = ChainMap(overrides, defaults)
print(config["color"]) # "blue"
print(config["size"]) # "medium"
from itertools import (
chain, islice, groupby, combinations,
permutations, product, cycle, repeat,
takewhile, dropwhile, filterfalse, zip_longest
)
# chain - flatten iterables
all_items = list(chain([1, 2], [3, 4], [5, 6]))
# islice - slice any iterable
first_10 = list(islice(infinite_generator(), 10))
# groupby - group consecutive items
data = [("a", 1), ("a", 2), ("b", 3), ("b", 4)]
for key, group in groupby(data, key=lambda x: x[0]):
print(key, list(group))
# combinations and permutations
list(combinations([1, 2, 3], 2)) # [(1,2), (1,3), (2,3)]
list(permutations([1, 2], 2)) # [(1,2), (2,1)]
# product - cartesian product
list(product([1, 2], ['a', 'b'])) # [(1,'a'), (1,'b'), (2,'a'), (2,'b')]
# takewhile/dropwhile
list(takewhile(lambda x: x < 5, [1, 3, 5, 7])) # [1, 3]
from functools import (
lru_cache, cache, partial, reduce,
wraps, singledispatch, cached_property
)
# lru_cache - memoization
@lru_cache(maxsize=128)
def expensive_computation(n: int) -> int:
return sum(i**2 for i in range(n))
# cache (Python 3.9+) - unlimited cache
@cache
def factorial(n: int) -> int:
return n * factorial(n-1) if n else 1
# partial - fix some arguments
def power(base: int, exponent: int) -> int:
return base ** exponent
square = partial(power, exponent=2)
cube = partial(power, exponent=3)
# reduce
from functools import reduce
product = reduce(lambda x, y: x * y, [1, 2, 3, 4]) # 24
# singledispatch - function overloading
@singledispatch
def process(data):
raise TypeError(f"Unsupported type: {type(data)}")
@process.register(list)
def _(data: list):
return [x * 2 for x in data]
@process.register(dict)
def _(data: dict):
return {k: v * 2 for k, v in data.items()}
# cached_property (Python 3.8+)
class DataLoader:
@cached_property
def data(self) -> list:
return expensive_load()
# String formatting
name = "Alice"
age = 30
f"Name: {name}, Age: {age}" # f-string
f"Price: ${price:.2f}" # 2 decimal places
f"Padded: {num:05d}" # Zero-padded
f"Aligned: {text:<20}" # Left align 20 chars
# List operations
items = [1, 2, 3, 4, 5]
items[-1] # Last item
items[::2] # Every 2nd item
items[::-1] # Reversed
[*items, 6, 7] # Spread/unpack
# Dict operations
d = {"a": 1, "b": 2}
d.get("c", 0) # Default value
d | {"c": 3} # Merge (Python 3.9+)
{**d, "c": 3} # Spread merge
{k: v for k, v in d.items() if v > 1} # Filter
# Set operations
s1 = {1, 2, 3}
s2 = {2, 3, 4}
s1 | s2 # Union
s1 & s2 # Intersection
s1 - s2 # Difference
s1 ^ s2 # Symmetric difference
# Walrus operator (Python 3.8+)
if (n := len(data)) > 10:
print(f"List has {n} items")
# Match statement (Python 3.10+)
match status_code:
case 200:
return "OK"
case 404:
return "Not Found"
case 500 | 502 | 503:
return "Server Error"
case _:
return "Unknown"
# Standard library
from pathlib import Path
from typing import Optional, List, Dict, Any, Callable, TypeVar
from dataclasses import dataclass, field
from collections import defaultdict, Counter
from functools import lru_cache, partial
from contextlib import contextmanager, suppress
from datetime import datetime, timedelta, timezone
import json
import logging
import os
import sys
import re
import asyncio
# Type checking only
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from expensive_module import HeavyClass
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "myproject"
version = "0.1.0"
description = "Project description"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"httpx>=0.24",
"pydantic>=2.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"pytest-cov>=4.0",
"pytest-asyncio>=0.21",
"mypy>=1.0",
"ruff>=0.1",
]
[tool.ruff]
target-version = "py311"
line-length = 100
select = ["E", "F", "I", "N", "W", "UP"]
[tool.ruff.isort]
known-first-party = ["myproject"]
[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "-v --cov=src --cov-report=term-missing"
All Python code must meet:
When completing Python tasks: