You’re running a FastAPI application in production, traffic is picking up, and suddenly your server starts throwing errors. Requests hang for 30 seconds, then fail with a cryptic message: TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00. Your database is fine, your server has plenty of resources, but somehow you’ve run out of database connections.
Welcome to connection pool exhaustion—one of the most frustrating issues in FastAPI database integration. It typically shows up under load, making it hard to catch during development. Let’s diagnose what’s happening and fix it properly.
Symptom Description
When your connection pool is exhausted, you’ll see these telltale signs:
Error Messages:
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached,
connection timed out, timeout 30.00
# Or this variant:
TimeoutError: QueuePool limit of size X overflow Y reached
Performance Symptoms:
- Requests that used to be fast suddenly take 30+ seconds
- Server handles a few requests fine, then starts hanging
- Database shows few active queries, but FastAPI can’t connect
- Error rate spikes during traffic bursts
- Restarting the server temporarily fixes the issue
Monitoring Clues:
- Active database connections stay at the pool limit (default: 5)
- Application doesn’t release connections after requests complete
- Memory usage might increase over time
- Some endpoints work fine while others hang
If you’re seeing any of these symptoms, especially that “QueuePool limit exceeded” message, you’ve got a connection leak.
What’s Actually Happening?
SQLAlchemy uses a connection pool to efficiently manage database connections. Instead of opening a new connection for every query (expensive), it maintains a pool of reusable connections. Here’s the default configuration:
# Default SQLAlchemy connection pool
pool_size = 5 # Maximum persistent connections
max_overflow = 10 # Additional temporary connections allowed
pool_timeout = 30 # Seconds to wait before giving up
So by default, SQLAlchemy allows 5 persistent + 10 overflow = 15 total concurrent connections. When your app tries to use the 16th connection, it waits 30 seconds for one to become available. If none free up, you get the timeout error.
The issue isn’t that you’re hitting the limit—it’s that connections aren’t being returned to the pool after use. They’re leaking.
Diagnostic Steps
Before we fix anything, let’s confirm this is actually a connection pool problem and find where connections are leaking.
Step 1: Enable SQLAlchemy Connection Pool Logging
Add detailed logging to see exactly what’s happening with your connections:
import logging
# Enable SQLAlchemy pool logging
logging.basicConfig()
logging.getLogger('sqlalchemy.pool').setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
When you run your app, you’ll see output like:
Pool checked out connection <PoolProxiedConnection at 0x7f8b1c3d4e50>
Pool checked in connection <PoolProxiedConnection at 0x7f8b1c3d4e50>
Pool disposed. Pool size: 5 Connections in pool: 5 Current Overflow: 0
If you see lots of “checked out” but few “checked in”, you’ve got a leak.
Step 2: Monitor Pool Status in Real-Time
Add an endpoint to inspect your connection pool:
from fastapi import FastAPI
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@localhost/db")
app = FastAPI()
@app.get("/debug/pool")
def pool_status():
pool = engine.pool
return {
"pool_size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"max_overflow": pool._max_overflow,
}
Hit this endpoint during normal operation. If checked_out keeps growing and checked_in stays low, connections aren’t being returned.
Step 3: Check for Unclosed Sessions
The most common cause—sessions that never get closed. Look through your code for patterns like this:
# RED FLAG: Session created but never explicitly closed
@app.get("/users")
def get_users():
db = SessionLocal()
users = db.query(User).all()
return users # Session leaked! Never closed!
Step 4: Load Test to Reproduce
Connection leaks often only appear under concurrent load:
# Install hey (HTTP load testing tool)
# On Mac: brew install hey
# On Linux: go install github.com/rakyll/hey@latest
# Send 100 requests with 10 concurrent workers
hey -n 100 -c 10 http://localhost:8000/api/users
# Watch for increasing response times and eventual failures
If the first few requests succeed but later ones timeout, you’re leaking connections.
Cause #1: Not Closing Database Sessions
The number one culprit—creating database sessions without closing them.
The Problem:
from fastapi import FastAPI
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine("postgresql://user:pass@localhost/db")
SessionLocal = sessionmaker(bind=engine)
app = FastAPI()
@app.get("/users")
def get_users():
db = SessionLocal() # Connection checked out from pool
users = db.query(User).all()
return users
# Session never closed! Connection stays checked out forever!
Every request checks out a connection but never returns it. After 15 requests (with default pool settings), the pool is exhausted.
The Fix: Use Try-Finally
from fastapi import FastAPI
app = FastAPI()
@app.get("/users")
def get_users():
db = SessionLocal()
try:
users = db.query(User).all()
return users
finally:
db.close() # ALWAYS close, even if query fails
This ensures the connection is returned to the pool even if an exception occurs.
Better Fix: Use Context Manager
from fastapi import FastAPI
app = FastAPI()
@app.get("/users")
def get_users():
with SessionLocal() as db:
users = db.query(User).all()
return users
# Session automatically closed when context exits
Context managers are cleaner and harder to mess up.
Best Fix: Use FastAPI Dependencies
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
app = FastAPI()
# Dependency that manages session lifecycle
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users")
def get_users(db: Session = Depends(get_db)):
users = db.query(User).all()
return users
# FastAPI automatically closes session after response
This is the recommended pattern. FastAPI handles the session lifecycle for you—no way to forget to close it.
Cause #2: Exceptions Before Session Close
Even if you’re closing sessions, exceptions can cause leaks if not handled properly.
The Problem:
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/users/{user_id}")
def get_user(user_id: int):
db = SessionLocal()
user = db.query(User).filter(User.id == user_id).first()
if not user:
# Exception raised BEFORE closing session!
raise HTTPException(404, "User not found")
db.close() # Never reached when user not found
return user
When you raise an exception, the code after it doesn’t execute. The session never closes, connection leaks.
The Fix: Always Use Try-Finally
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/users/{user_id}")
def get_user(user_id: int):
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(404, "User not found")
return user
finally:
db.close() # Closes even when exception is raised
Or better yet, use the dependency pattern which handles this automatically.
Cause #3: Background Tasks With Sessions
Background tasks can keep sessions open longer than expected.
The Problem:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def send_email(user_email: str, db: Session):
# This runs AFTER the response is sent
# But the session is still open!
template = db.query(EmailTemplate).first()
# Simulate slow email sending
time.sleep(5)
# Session not closed for 5+ seconds after request completes
db.close()
@app.post("/register")
def register_user(user_data: dict, background_tasks: BackgroundTasks):
db = SessionLocal()
# Create user
user = User(**user_data)
db.add(user)
db.commit()
# Background task keeps session alive
background_tasks.add_task(send_email, user.email, db)
return {"message": "User created"}
# Response sent, but db session still open in background task!
The session stays checked out for the entire duration of the background task, potentially minutes.
The Fix: Create New Sessions in Background Tasks
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def send_email(user_email: str):
# Create a NEW session inside the background task
db = SessionLocal()
try:
template = db.query(EmailTemplate).first()
# Send email...
finally:
db.close() # Close when done
@app.post("/register")
def register_user(user_data: dict, background_tasks: BackgroundTasks):
db = SessionLocal()
try:
user = User(**user_data)
db.add(user)
db.commit()
db.refresh(user)
# Pass only the data, not the session
background_tasks.add_task(send_email, user.email)
return {"message": "User created"}
finally:
db.close() # Close immediately after request
Don’t pass database sessions to background tasks. Create new sessions inside them instead.
Cause #4: Connection Pool Too Small
Sometimes you’re not leaking connections—you just don’t have enough for your traffic.
The Problem:
from sqlalchemy import create_engine
# Default pool: 5 persistent + 10 overflow = 15 total
engine = create_engine("postgresql://user:pass@localhost/db")
# Running with 4 Uvicorn workers = 4 * 15 = 60 max connections
# But your database allows 100 connections
# You have room to increase the pool!
If you’re properly managing sessions but still hitting pool limits under load, you might just need a bigger pool.
The Fix: Increase Pool Size
from sqlalchemy import create_engine
# Increase pool for production traffic
engine = create_engine(
"postgresql://user:pass@localhost/db",
pool_size=20, # Increased from 5
max_overflow=30, # Increased from 10
pool_timeout=60, # Increased from 30
pool_pre_ping=True, # Test connections before using
)
Warning: Don’t just blindly increase pool size. Each worker process creates its own pool, so the total connections = workers * (pool_size + max_overflow). Make sure your database can handle it:
# Example calculation:
# 4 Uvicorn workers
# pool_size = 20
# max_overflow = 30
# Total possible connections = 4 * (20 + 30) = 200
# Make sure your PostgreSQL max_connections > 200!
Check your database’s connection limit:
-- PostgreSQL
SHOW max_connections;
-- MySQL
SHOW VARIABLES LIKE 'max_connections';
Better Fix: Use NullPool for Serverless
If you’re running on serverless platforms (AWS Lambda, Google Cloud Functions), use NullPool which doesn’t maintain persistent connections:
from sqlalchemy.pool import NullPool
from sqlalchemy import create_engine
# No connection pooling - creates new connection each time
engine = create_engine(
"postgresql://user:pass@localhost/db",
poolclass=NullPool
)
This prevents connections from staying open between function invocations.
Cause #5: Async Database with Sync Code
Mixing async and sync database code can cause connection leaks.
The Problem:
from fastapi import FastAPI
import asyncio
# Sync engine and session
engine = create_engine("postgresql://user:pass@localhost/db")
SessionLocal = sessionmaker(bind=engine)
app = FastAPI()
@app.get("/users")
async def get_users():
# Async route with sync database code - BAD!
db = SessionLocal()
# This blocks the event loop AND might not close properly
users = db.query(User).all()
db.close()
return users
When you use sync database code in async routes, you can run into connection management issues.
The Fix: Use All-Async or All-Sync
Option 1: Make Everything Async
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from fastapi import FastAPI, Depends
# Async engine
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=30,
)
AsyncSessionLocal = sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
app = FastAPI()
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
from sqlalchemy import select
result = await db.execute(select(User))
users = result.scalars().all()
return users
Option 2: Make Route Handlers Sync
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from fastapi import FastAPI, Depends
# Sync engine
engine = create_engine("postgresql://user:pass@localhost/db")
SessionLocal = sessionmaker(bind=engine)
app = FastAPI()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Changed to regular 'def' - FastAPI runs in thread pool
@app.get("/users")
def get_users(db: Session = Depends(get_db)):
users = db.query(User).all()
return users
Pick one approach and stick with it throughout your app.
Cause #6: Long-Running Transactions
Transactions that run for a long time keep connections checked out.
The Problem:
from fastapi import FastAPI
app = FastAPI()
@app.post("/process-bulk")
def process_bulk_data(items: list[dict]):
db = SessionLocal()
try:
# Transaction starts
for item in items: # Could be thousands of items!
# Each iteration queries database
existing = db.query(Item).filter_by(name=item['name']).first()
if existing:
existing.quantity += item['quantity']
else:
db.add(Item(**item))
# Simulate slow processing
time.sleep(0.1)
db.commit() # Transaction finally completes after minutes
return {"processed": len(items)}
finally:
db.close()
# Connection was checked out for the entire processing time!
If processing 1000 items takes 100 seconds, that connection is unavailable for 100 seconds. With concurrent requests, you quickly exhaust the pool.
The Fix: Batch Processing with Periodic Commits
from fastapi import FastAPI
app = FastAPI()
@app.post("/process-bulk")
def process_bulk_data(items: list[dict]):
db = SessionLocal()
try:
batch_size = 100
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
for item in batch:
existing = db.query(Item).filter_by(name=item['name']).first()
if existing:
existing.quantity += item['quantity']
else:
db.add(Item(**item))
# Commit after each batch
db.commit()
return {"processed": len(items)}
finally:
db.close()
Or even better, use background tasks for bulk operations:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def process_items_background(items: list[dict]):
db = SessionLocal()
try:
# Process in background with own session
for item in items:
# ... processing logic ...
pass
db.commit()
finally:
db.close()
@app.post("/process-bulk")
def process_bulk_data(items: list[dict], background_tasks: BackgroundTasks):
# Queue the work
background_tasks.add_task(process_items_background, items)
# Return immediately
return {"message": "Processing started", "items": len(items)}
Still Not Working? Advanced Debugging
Check for Implicit Transactions
Some queries automatically start transactions that you need to explicitly commit or rollback:
from fastapi import FastAPI
app = FastAPI()
@app.get("/users")
def get_users():
db = SessionLocal()
try:
# SELECT queries usually don't need commit
users = db.query(User).all()
# But if you did any updates, you MUST commit or rollback
# db.commit() # Or db.rollback()
return users
finally:
db.close()
If you mix reads and writes, always commit or rollback before closing:
@app.post("/increment-counter")
def increment_counter(counter_id: int):
db = SessionLocal()
try:
counter = db.query(Counter).get(counter_id)
counter.value += 1
# MUST commit the transaction
db.commit()
return {"value": counter.value}
except Exception as e:
# MUST rollback on error
db.rollback()
raise
finally:
db.close()
Monitor with Prometheus Metrics
Add metrics to track pool usage:
from prometheus_client import Gauge
from fastapi import FastAPI
# Define metrics
pool_size_gauge = Gauge('db_pool_size', 'Database pool size')
pool_checked_out = Gauge('db_pool_checked_out', 'Connections checked out')
pool_overflow = Gauge('db_pool_overflow', 'Pool overflow connections')
app = FastAPI()
@app.middleware("http")
async def track_pool_metrics(request, call_next):
# Update metrics before request
pool = engine.pool
pool_size_gauge.set(pool.size())
pool_checked_out.set(pool.checkedout())
pool_overflow.set(pool.overflow())
response = await call_next(request)
# Update metrics after request
pool_checked_out.set(pool.checkedout())
return response
Then monitor these metrics in Grafana or your monitoring tool.
Use Connection Pool Events
SQLAlchemy lets you hook into pool events to detect leaks:
from sqlalchemy import event, create_engine
import logging
logger = logging.getLogger(__name__)
engine = create_engine("postgresql://user:pass@localhost/db")
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
logger.info(f"Connection opened: {id(dbapi_conn)}")
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
logger.info(f"Connection checked out: {id(dbapi_conn)}")
@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
logger.info(f"Connection checked in: {id(dbapi_conn)}")
# Now you can trace exactly when connections are checked out and in
If you see connections checked out but never checked in, you’ve found your leak.
Production Configuration Best Practices
Here’s a solid production configuration:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from fastapi import FastAPI, Depends
import os
# Environment-specific settings
DATABASE_URL = os.getenv("DATABASE_URL")
IS_PRODUCTION = os.getenv("ENVIRONMENT") == "production"
# Configure engine based on environment
if IS_PRODUCTION:
engine = create_engine(
DATABASE_URL,
pool_size=20, # Persistent connections
max_overflow=30, # Additional connections allowed
pool_timeout=60, # Wait time before timeout
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Verify connection health before use
echo=False, # Disable query logging in production
)
else:
# Development settings
engine = create_engine(
DATABASE_URL,
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
echo=True, # Enable query logging for debugging
)
SessionLocal = sessionmaker(bind=engine)
app = FastAPI()
# Standard dependency for session management
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Use in all route handlers
@app.get("/users")
def get_users(db = Depends(get_db)):
return db.query(User).all()
Key Configuration Options Explained:
- pool_size: Max persistent connections in the pool (per worker)
- max_overflow: Additional temporary connections allowed
- pool_timeout: Seconds to wait for a connection before raising error
- pool_recycle: Recycle connections after X seconds (prevents stale connections)
- pool_pre_ping: Test connection health before using (catches broken connections)
Calculate Total Connections:
Total Connections = workers * (pool_size + max_overflow)
Example:
- 4 Uvicorn workers
- pool_size = 20
- max_overflow = 30
Total = 4 * (20 + 30) = 200 connections
Make sure your database max_connections > 200!
Testing Your Fix
Here’s how to verify you’ve actually fixed the leak:
Test 1: Sequential Requests
# test_sequential.py
import requests
import time
BASE_URL = "http://localhost:8000"
# Send 100 sequential requests
for i in range(100):
response = requests.get(f"{BASE_URL}/users")
print(f"Request {i}: {response.status_code}")
time.sleep(0.1)
print("All requests completed!")
# If this completes without errors, you're not leaking on single requests
Test 2: Concurrent Requests
# test_concurrent.py
import asyncio
import httpx
async def make_request(client, i):
response = await client.get("http://localhost:8000/users")
print(f"Request {i}: {response.status_code}")
return response
async def main():
async with httpx.AsyncClient() as client:
# Send 50 concurrent requests
tasks = [make_request(client, i) for i in range(50)]
responses = await asyncio.gather(*tasks)
print(f"Completed {len(responses)} requests")
asyncio.run(main())
# If this completes without timeouts, your pool is sized correctly
Test 3: Monitor Pool Status
# While running load tests, monitor pool status
import requests
import time
for i in range(10):
response = requests.get("http://localhost:8000/debug/pool")
print(f"Time {i}: {response.json()}")
time.sleep(2)
# Watch for:
# - checked_out should go up and down (not stay high)
# - checked_in should recover after requests complete
# - overflow should be temporary, not permanent
Common Mistakes Checklist
- [ ] Not using
try/finallyor context managers when creating sessions - [ ] Raising exceptions before closing sessions
- [ ] Passing sessions to background tasks
- [ ] Using sync database code in async route handlers
- [ ] Pool size too small for traffic volume
- [ ] Not setting
pool_pre_ping=True(causes stale connection issues) - [ ] Long-running transactions holding connections
- [ ] Total connections (workers × pool_size) exceeds database limit
Summary
Connection pool exhaustion in FastAPI happens when database connections are checked out but never returned to the pool. The “QueuePool limit exceeded” error means you’ve run out of available connections.
Most Common Causes:
- Not closing database sessions (use dependencies or context managers)
- Exceptions raised before session close (use try/finally)
- Background tasks keeping sessions open (create new sessions in tasks)
- Pool size too small for traffic (increase pool_size and max_overflow)
- Mixing async/sync database code (pick one approach)
- Long-running transactions (use batching or background tasks)
Best Practices:
- Always use FastAPI dependencies for session management
- Use context managers or try/finally for manual session handling
- Configure pool size based on workers and traffic
- Enable
pool_pre_ping=Truein production - Monitor pool status with metrics or debug endpoints
- Test under concurrent load before deploying
The dependency pattern (Depends(get_db)) is the most reliable way to prevent connection leaks in FastAPI. It automatically handles session lifecycle and works correctly even when exceptions are raised.
For related FastAPI troubleshooting, check out our FastAPI Async/Sync Blocking Guide to learn about event loop issues that can compound connection pool problems.
Debugging Database Issues? Use Debugly’s trace formatter to quickly parse and analyze SQLAlchemy tracebacks with clear, formatted output.