| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- #!/usr/bin/env python3
- """
- Async Python Project Template
- Production-ready async application structure with:
- - Proper session management
- - Graceful shutdown
- - Structured concurrency
- - Error handling
- - Logging
- """
- import asyncio
- import logging
- import signal
- from contextlib import asynccontextmanager
- from typing import Any
- import aiohttp
- # Configure logging
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
- )
- logger = logging.getLogger(__name__)
- class AsyncApp:
- """Main application class with lifecycle management."""
- def __init__(self):
- self.session: aiohttp.ClientSession | None = None
- self.running = False
- self._tasks: set[asyncio.Task] = set()
- async def start(self):
- """Initialize resources."""
- logger.info("Starting application...")
- self.session = aiohttp.ClientSession(
- timeout=aiohttp.ClientTimeout(total=30),
- connector=aiohttp.TCPConnector(limit=100),
- )
- self.running = True
- logger.info("Application started")
- async def stop(self):
- """Cleanup resources."""
- logger.info("Stopping application...")
- self.running = False
- # Cancel background tasks
- for task in self._tasks:
- task.cancel()
- if self._tasks:
- await asyncio.gather(*self._tasks, return_exceptions=True)
- # Close session
- if self.session:
- await self.session.close()
- logger.info("Application stopped")
- def create_task(self, coro) -> asyncio.Task:
- """Create a tracked background task."""
- task = asyncio.create_task(coro)
- self._tasks.add(task)
- task.add_done_callback(self._tasks.discard)
- return task
- async def fetch(self, url: str) -> dict[str, Any] | None:
- """Fetch URL with error handling."""
- if not self.session:
- raise RuntimeError("App not started")
- try:
- async with self.session.get(url) as response:
- response.raise_for_status()
- return await response.json()
- except aiohttp.ClientError as e:
- logger.error(f"Request failed: {e}")
- return None
- async def fetch_many(
- self,
- urls: list[str],
- concurrency: int = 10
- ) -> list[dict[str, Any] | None]:
- """Fetch multiple URLs with bounded concurrency."""
- semaphore = asyncio.Semaphore(concurrency)
- async def bounded_fetch(url: str):
- async with semaphore:
- return await self.fetch(url)
- return await asyncio.gather(*[bounded_fetch(url) for url in urls])
- @asynccontextmanager
- async def create_app():
- """Context manager for app lifecycle."""
- app = AsyncApp()
- try:
- await app.start()
- yield app
- finally:
- await app.stop()
- async def main():
- """Main entry point."""
- # Setup signal handlers for graceful shutdown
- loop = asyncio.get_running_loop()
- stop_event = asyncio.Event()
- def signal_handler():
- logger.info("Received shutdown signal")
- stop_event.set()
- for sig in (signal.SIGTERM, signal.SIGINT):
- loop.add_signal_handler(sig, signal_handler)
- async with create_app() as app:
- # Example: Fetch some URLs
- urls = [
- "https://httpbin.org/json",
- "https://httpbin.org/uuid",
- ]
- results = await app.fetch_many(urls)
- for url, result in zip(urls, results):
- logger.info(f"{url}: {result}")
- # Keep running until shutdown signal
- # await stop_event.wait()
- if __name__ == "__main__":
- asyncio.run(main())
|