async-project-template.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. #!/usr/bin/env python3
  2. """
  3. Async Python Project Template
  4. Production-ready async application structure with:
  5. - Proper session management
  6. - Graceful shutdown
  7. - Structured concurrency
  8. - Error handling
  9. - Logging
  10. """
  11. import asyncio
  12. import logging
  13. import signal
  14. from contextlib import asynccontextmanager
  15. from typing import Any
  16. import aiohttp
  17. # Configure logging
  18. logging.basicConfig(
  19. level=logging.INFO,
  20. format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  21. )
  22. logger = logging.getLogger(__name__)
  23. class AsyncApp:
  24. """Main application class with lifecycle management."""
  25. def __init__(self):
  26. self.session: aiohttp.ClientSession | None = None
  27. self.running = False
  28. self._tasks: set[asyncio.Task] = set()
  29. async def start(self):
  30. """Initialize resources."""
  31. logger.info("Starting application...")
  32. self.session = aiohttp.ClientSession(
  33. timeout=aiohttp.ClientTimeout(total=30),
  34. connector=aiohttp.TCPConnector(limit=100),
  35. )
  36. self.running = True
  37. logger.info("Application started")
  38. async def stop(self):
  39. """Cleanup resources."""
  40. logger.info("Stopping application...")
  41. self.running = False
  42. # Cancel background tasks
  43. for task in self._tasks:
  44. task.cancel()
  45. if self._tasks:
  46. await asyncio.gather(*self._tasks, return_exceptions=True)
  47. # Close session
  48. if self.session:
  49. await self.session.close()
  50. logger.info("Application stopped")
  51. def create_task(self, coro) -> asyncio.Task:
  52. """Create a tracked background task."""
  53. task = asyncio.create_task(coro)
  54. self._tasks.add(task)
  55. task.add_done_callback(self._tasks.discard)
  56. return task
  57. async def fetch(self, url: str) -> dict[str, Any] | None:
  58. """Fetch URL with error handling."""
  59. if not self.session:
  60. raise RuntimeError("App not started")
  61. try:
  62. async with self.session.get(url) as response:
  63. response.raise_for_status()
  64. return await response.json()
  65. except aiohttp.ClientError as e:
  66. logger.error(f"Request failed: {e}")
  67. return None
  68. async def fetch_many(
  69. self,
  70. urls: list[str],
  71. concurrency: int = 10
  72. ) -> list[dict[str, Any] | None]:
  73. """Fetch multiple URLs with bounded concurrency."""
  74. semaphore = asyncio.Semaphore(concurrency)
  75. async def bounded_fetch(url: str):
  76. async with semaphore:
  77. return await self.fetch(url)
  78. return await asyncio.gather(*[bounded_fetch(url) for url in urls])
  79. @asynccontextmanager
  80. async def create_app():
  81. """Context manager for app lifecycle."""
  82. app = AsyncApp()
  83. try:
  84. await app.start()
  85. yield app
  86. finally:
  87. await app.stop()
  88. async def main():
  89. """Main entry point."""
  90. # Setup signal handlers for graceful shutdown
  91. loop = asyncio.get_running_loop()
  92. stop_event = asyncio.Event()
  93. def signal_handler():
  94. logger.info("Received shutdown signal")
  95. stop_event.set()
  96. for sig in (signal.SIGTERM, signal.SIGINT):
  97. loop.add_signal_handler(sig, signal_handler)
  98. async with create_app() as app:
  99. # Example: Fetch some URLs
  100. urls = [
  101. "https://httpbin.org/json",
  102. "https://httpbin.org/uuid",
  103. ]
  104. results = await app.fetch_many(urls)
  105. for url, result in zip(urls, results):
  106. logger.info(f"{url}: {result}")
  107. # Keep running until shutdown signal
  108. # await stop_event.wait()
  109. if __name__ == "__main__":
  110. asyncio.run(main())