FastAPI’s BackgroundTasks is one of the framework’s most convenient features — fire off a side job (send an email, log an event, update a cache) without making the client wait. But it’s also one of the sneakiest sources of silent bugs in production.
TLDR: Why Your Background Task Isn't Working
Most Common Causes:
- Exceptions in background tasks are silently swallowed — no log output by default
- The SQLAlchemy
dbsession from your route is already closed when the task runs - Sync blocking code inside an async background task freezes the event loop
- Using
Depends()directly inside a background task function — it doesn’t work there
Quick diagnostic: Add an explicit try/except with logging inside your task. If you see the error, pick the fix for that specific cause below.
Why This Happens
FastAPI runs background tasks after the HTTP response has been sent. That’s by design — it’s what makes them “background.” But it means a few things shift under your feet:
- The request context is gone. Any dependency (like a database session) injected into your route handler has been cleaned up.
- Exceptions that bubble up from a background task don’t get forwarded to the client (the client already received a 200). By default, they just disappear.
- The event loop is still shared with your live request handlers, so a blocking call inside a background task can slow down all active requests.
Understanding these three constraints unlocks the fix for almost every background task problem you’ll encounter.
Solutions by Scenario
Scenario 1: The Task Runs But Nothing Happens (Silent Exceptions)
This is the most common scenario. Your route returns 200 OK, the background task is queued, but the intended side effect (email sent, row inserted, file written) never happens. No error in the logs. No traceback. Just silence.
Why it happens: FastAPI catches exceptions from background tasks internally to avoid crashing the server. Unless you’ve configured a custom exception handler or structured logging, those exceptions go nowhere.
❌ Before — exception is swallowed silently:
from fastapi import FastAPI, BackgroundTasks
import logging
app = FastAPI()
def send_welcome_email(user_email: str):
# This raises an exception but you'll never know
result = some_smtp_client.send(to=user_email) # AttributeError if client is None
@app.post("/register")
async def register_user(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_welcome_email, email)
return {"message": "registered"}
✅ After — exceptions are caught and logged:
from fastapi import FastAPI, BackgroundTasks
import logging
logger = logging.getLogger(__name__)
app = FastAPI()
def send_welcome_email(user_email: str):
try:
result = some_smtp_client.send(to=user_email)
logger.info(f"Email sent to {user_email}")
except Exception as exc:
# Log the full traceback so you can debug it
logger.exception(f"Failed to send email to {user_email}: {exc}")
# Optionally re-raise if you want to trigger a retry mechanism
raise
@app.post("/register")
async def register_user(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_welcome_email, email)
return {"message": "registered"}
The rule is simple: always wrap your background task body in a try/except that logs the full exception. Use logger.exception() instead of logger.error() — the former includes the full traceback, which you’ll need for debugging.
You can also use Debugly’s trace formatter to parse those tracebacks if they’re messy or deeply nested.
Scenario 2: Database Session Closed (The SQLAlchemy Trap)
If your app uses SQLAlchemy with FastAPI’s dependency injection pattern, you’ve probably written something like this:
from fastapi import Depends
from sqlalchemy.orm import Session
from .database import get_db
@app.post("/order")
async def create_order(
order_data: OrderCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
):
order = Order(**order_data.dict())
db.add(order)
db.commit()
# Pass `db` into the background task — this is the trap
background_tasks.add_task(send_order_confirmation, order.id, db)
return {"order_id": order.id}
This looks reasonable. The problem? By the time send_order_confirmation runs, the get_db dependency has already cleaned up and closed the session. You’re passing a closed (or garbage-collected) session to your task.
The error you’ll typically see — if you have logging set up — looks like:
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back
due to a previous exception during flush.
# or
sqlalchemy.exc.StatefulProcessing: Session already closed.
✅ Fix — create a fresh session inside the background task:
from fastapi import Depends, BackgroundTasks
from sqlalchemy.orm import Session
from .database import SessionLocal # your sessionmaker
def send_order_confirmation(order_id: int):
# Open a brand-new session, independent of the request
db: Session = SessionLocal()
try:
order = db.query(Order).filter(Order.id == order_id).first()
if order:
email_client.send_order_email(order.user_email, order.id)
order.email_sent = True
db.commit()
except Exception as exc:
db.rollback()
logger.exception(f"Background task failed for order {order_id}: {exc}")
raise
finally:
db.close() # Always close the session you opened
@app.post("/order")
async def create_order(
order_data: OrderCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
):
order = Order(**order_data.dict())
db.add(order)
db.commit()
db.refresh(order)
# Pass only the ID, not the session
background_tasks.add_task(send_order_confirmation, order.id)
return {"order_id": order.id}
Key rule: pass IDs (or serializable data), not ORM objects or sessions, into your background tasks. The task should open its own session, do its work, and close it in a finally block.
Scenario 3: Blocking Code in an Async Context
FastAPI routes and background tasks can be either async def or plain def. But when you mix them wrong, you block the event loop for every concurrent request.
Here’s the trap: if your background task is async def and does blocking I/O (like a requests HTTP call or a synchronous SQLAlchemy query with a sync driver), it freezes the event loop until it finishes.
❌ Blocking the event loop:
import requests # sync HTTP library
async def fetch_and_store_data(url: str):
# requests.get() is blocking — it freezes the event loop
response = requests.get(url, timeout=10)
data = response.json()
store_in_db(data) # also blocking if using sync SQLAlchemy
@app.post("/sync-data")
async def trigger_sync(background_tasks: BackgroundTasks):
background_tasks.add_task(fetch_and_store_data, "https://api.example.com/data")
return {"status": "queued"}
Option A — Make the task a plain def function (FastAPI runs it in a thread pool):
import requests
def fetch_and_store_data(url: str):
# Plain def = FastAPI uses run_in_executor under the hood
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
store_in_db(data)
logger.info(f"Synced data from {url}")
except Exception as exc:
logger.exception(f"Sync failed for {url}: {exc}")
raise
Option B — Use async-native libraries in an async def task:
import httpx # async HTTP client
async def fetch_and_store_data(url: str):
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10)
response.raise_for_status()
data = response.json()
await store_in_db_async(data) # needs async SQLAlchemy
except Exception as exc:
logger.exception(f"Sync failed for {url}: {exc}")
raise
Which to choose? If your task does mostly I/O (HTTP calls, database queries), Option B with async libraries is more scalable. If you’re dealing with CPU-heavy work or a legacy codebase using sync libraries, Option A (plain def) is safer and simpler.
For more detail on the async/sync mixing problem, see the post on FastAPI async/sync blocking issues.
Scenario 4: Trying to Use Depends() in a Background Task
FastAPI’s dependency injection is a route-level feature. You can’t use Depends() inside a background task function directly — it only works in route handler signatures.
❌ This doesn’t work:
from fastapi import Depends, BackgroundTasks
from .database import get_db
# Depends() is NOT resolved here — db will be the Depends object itself
def process_data(data: dict, db: Session = Depends(get_db)):
db.add(DataModel(**data))
db.commit()
@app.post("/process")
async def trigger_processing(
data: dict,
background_tasks: BackgroundTasks,
):
background_tasks.add_task(process_data, data)
return {"status": "queued"}
The db parameter receives the Depends(get_db) wrapper object, not an actual session. You’ll get an AttributeError (or worse, a silent failure if it reaches the “silent exceptions” scenario from Scenario 1).
✅ Fix — resolve dependencies in the route, then pass the values explicitly:
from fastapi import Depends, BackgroundTasks
from sqlalchemy.orm import Session
from .database import get_db, SessionLocal
def process_data(data: dict):
db: Session = SessionLocal()
try:
db.add(DataModel(**data))
db.commit()
except Exception as exc:
db.rollback()
logger.exception(f"process_data failed: {exc}")
raise
finally:
db.close()
@app.post("/process")
async def trigger_processing(
data: dict,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db), # Resolved here in the route
):
# Use db in your route logic if needed...
background_tasks.add_task(process_data, data)
return {"status": "queued"}
The background task creates its own session internally (Scenario 2 pattern). The route handler’s db is available for route-level logic, but it’s never passed to the task.
Prevention Tips
A few habits that save hours of debugging:
1. Always use structured logging in tasks. A bare print() won’t appear in most production log collectors. Use Python’s logging module with logger.exception() for full tracebacks.
2. Keep background tasks short. BackgroundTasks is designed for lightweight work (< a few seconds): sending an email, updating a cache key, firing a webhook. For anything longer (video processing, bulk data imports, large API polls), use a proper job queue like Celery with Redis or RQ. Don’t fight the tool.
3. Pass primitive data, not objects. IDs, strings, and small dicts are safe to pass. SQLAlchemy model instances, open file handles, and database sessions are not.
4. Test background tasks explicitly. FastAPI’s TestClient runs background tasks synchronously, so they do execute in tests. But write a dedicated test that verifies the task’s side effect (the database row, the email call), not just the HTTP response.
from fastapi.testclient import TestClient
from unittest.mock import patch
def test_background_email_is_called():
with patch("myapp.tasks.email_client.send") as mock_send:
with TestClient(app) as client:
response = client.post("/register", json={"email": "test@example.com"})
assert response.status_code == 200
mock_send.assert_called_once_with(to="test@example.com")
5. Consider starlette-background or middleware for complex flows. If you find yourself passing lots of context into tasks or wiring up complex retry logic, it’s a signal to graduate to a proper task queue rather than patching BackgroundTasks further.
Still Not Working?
A few less common but real edge cases:
The task is registered but never runs: This can happen if you’re running tests with httpx.AsyncClient directly (not wrapped in TestClient). Background tasks only run automatically with TestClient. With raw httpx, you need to trigger them manually or use AsyncClient inside an async with block with a lifespan context.
The task runs twice: If you’re running multiple Uvicorn workers (e.g., uvicorn app:app --workers 4), each worker is an independent process. A background task queued in worker 1 runs only in worker 1. If you need cross-worker coordination, you need an external queue.
Memory grows over time: Background tasks that hold references to large objects (images, dataframes, full API responses) can cause memory pressure because Python’s garbage collector doesn’t always release them promptly. Pass only the minimum data needed (IDs, small configs).
Summary Checklist
Before shipping code with BackgroundTasks:
- [ ] Wrap the task body in
try/exceptwithlogger.exception() - [ ] Never pass SQLAlchemy sessions or ORM objects into tasks — pass IDs instead
- [ ] Tasks that do blocking I/O should be plain
def, notasync def(or use async-native libraries) - [ ] Don’t use
Depends()in background task signatures — create sessions internally - [ ] Write a test that checks the task’s side effect, not just the HTTP status
- [ ] If the task takes more than a few seconds, use Celery or RQ instead
Background task failures are notoriously hard to debug because there’s no client-visible error. Start with logging, then work through each scenario above until you find the culprit. Use Debugly’s trace formatter to parse the Python traceback once you do surface it — it makes navigating deep exception chains much faster.
For related issues with database sessions in FastAPI, the post on SQLAlchemy connection pool exhaustion covers the pool-level complement to the session management patterns here.