FastAPI Background Tasks
Async background processing patterns.
Built-in BackgroundTasks
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
async def send_email(email: str, message: str):
"""Background task - runs after response sent."""
# Simulate email sending
await asyncio.sleep(2)
print(f"Email sent to {email}: {message}")
@app.post("/signup")
async def signup(
email: str,
background_tasks: BackgroundTasks,
):
# Create user synchronously
user = create_user(email)
# Queue background task
background_tasks.add_task(send_email, email, "Welcome!")
# Response sent immediately
return {"message": "User created"}
# Multiple tasks
@app.post("/order")
async def create_order(
order: OrderCreate,
background_tasks: BackgroundTasks,
):
db_order = save_order(order)
# Queue multiple tasks
background_tasks.add_task(send_confirmation, order.email)
background_tasks.add_task(update_inventory, order.items)
background_tasks.add_task(notify_warehouse, db_order.id)
return {"order_id": db_order.id}
Dependency Injection with Background Tasks
from fastapi import Depends, BackgroundTasks
from typing import Annotated
async def audit_log(action: str, user_id: int):
"""Log user actions."""
await db.execute(
"INSERT INTO audit_log (action, user_id) VALUES ($1, $2)",
action, user_id
)
def get_auditor(background_tasks: BackgroundTasks):
"""Factory for audit logging."""
def log(action: str, user_id: int):
background_tasks.add_task(audit_log, action, user_id)
return log
Auditor = Annotated[Callable, Depends(get_auditor)]
@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_user: CurrentUser,
auditor: Auditor,
):
await db.delete_user(user_id)
auditor("user_deleted", current_user.id)
return {"deleted": user_id}
Longer Tasks with Celery
# tasks.py
from celery import Celery
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
)
@celery_app.task
def process_video(video_id: int):
"""Long-running task - handled by Celery worker."""
video = get_video(video_id)
processed = transcode(video)
save_processed(processed)
return {"status": "completed", "video_id": video_id}
# api.py
from fastapi import FastAPI
from tasks import process_video
app = FastAPI()
@app.post("/videos/{video_id}/process")
async def start_processing(video_id: int):
# Queue task in Celery
task = process_video.delay(video_id)
return {
"task_id": task.id,
"status": "queued",
}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
task = process_video.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None,
}
Periodic Tasks with APScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from fastapi import FastAPI
from contextlib import asynccontextmanager
scheduler = AsyncIOScheduler()
async def cleanup_expired_sessions():
"""Run daily at midnight."""
await db.execute("DELETE FROM sessions WHERE expires < NOW()")
async def send_daily_report():
"""Run daily at 9 AM."""
report = await generate_report()
await send_email("admin@example.com", report)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup - configure scheduler
scheduler.add_job(
cleanup_expired_sessions,
CronTrigger(hour=0, minute=0),
id="cleanup_sessions",
)
scheduler.add_job(
send_daily_report,
CronTrigger(hour=9, minute=0),
id="daily_report",
)
scheduler.start()
yield
# Shutdown
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
# Manual trigger endpoint (for testing)
@app.post("/admin/trigger/{job_id}")
async def trigger_job(job_id: str, admin: AdminUser):
job = scheduler.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
job.modify(next_run_time=datetime.now())
return {"message": f"Job {job_id} triggered"}
Task Queues with Redis
import redis.asyncio as redis
import json
from uuid import uuid4
class TaskQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_name = "task_queue"
async def enqueue(self, task_type: str, payload: dict) -> str:
"""Add task to queue."""
task_id = str(uuid4())
task = {
"id": task_id,
"type": task_type,
"payload": payload,
"status": "pending",
}
await self.redis.rpush(self.queue_name, json.dumps(task))
await self.redis.set(f"task:{task_id}", json.dumps(task))
return task_id
async def get_status(self, task_id: str) -> dict | None:
"""Get task status."""
data = await self.redis.get(f"task:{task_id}")
return json.loads(data) if data else None
# Worker (separate process)
async def worker(queue: TaskQueue):
"""Process tasks from queue."""
while True:
task_data = await queue.redis.blpop(queue.queue_name, timeout=1)
if not task_data:
continue
task = json.loads(task_data[1])
task["status"] = "processing"
await queue.redis.set(f"task:{task['id']}", json.dumps(task))
try:
result = await process_task(task)
task["status"] = "completed"
task["result"] = result
except Exception as e:
task["status"] = "failed"
task["error"] = str(e)
await queue.redis.set(f"task:{task['id']}", json.dumps(task))
# API endpoints
queue = TaskQueue("redis://localhost:6379")
@app.post("/tasks")
async def create_task(task_type: str, payload: dict):
task_id = await queue.enqueue(task_type, payload)
return {"task_id": task_id}
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
task = await queue.get_status(task_id)
if not task:
raise HTTPException(status_code=404)
return task
Async Task Manager
import asyncio
from contextlib import asynccontextmanager
from typing import Callable, Awaitable
class TaskManager:
"""Manage long-running async tasks."""
def __init__(self):
self._tasks: dict[str, asyncio.Task] = {}
self._results: dict[str, Any] = {}
async def start(self, task_id: str, coro: Awaitable):
"""Start a named task."""
if task_id in self._tasks:
raise ValueError(f"Task {task_id} already running")
async def wrapper():
try:
result = await coro
self._results[task_id] = {"status": "completed", "result": result}
except Exception as e:
self._results[task_id] = {"status": "failed", "error": str(e)}
finally:
self._tasks.pop(task_id, None)
self._tasks[task_id] = asyncio.create_task(wrapper())
return task_id
def get_status(self, task_id: str) -> dict:
if task_id in self._tasks:
return {"status": "running"}
return self._results.get(task_id, {"status": "not_found"})
async def cancel(self, task_id: str) -> bool:
task = self._tasks.get(task_id)
if task:
task.cancel()
return True
return False
# Global task manager
task_manager = TaskManager()
@app.post("/process")
async def start_processing(data: ProcessRequest):
task_id = str(uuid4())
await task_manager.start(task_id, heavy_processing(data))
return {"task_id": task_id}
@app.get("/process/{task_id}")
async def get_processing_status(task_id: str):
return task_manager.get_status(task_id)
Quick Reference
| Method |
Use Case |
Runs Where |
BackgroundTasks |
Quick async tasks |
Same process |
| Celery |
Heavy processing |
Worker process |
| APScheduler |
Periodic jobs |
Same process |
| Redis queue |
Distributed tasks |
Worker process |
asyncio.Task |
In-memory async |
Same process |
| Pattern |
Best For |
| BackgroundTasks |
Email, webhooks, logging |
| Celery |
Video processing, ML, reports |
| APScheduler |
Cleanup, reports, sync |
| Redis queue |
Scalable task distribution |