You’re running a FastAPI application in production, traffic is picking up, and suddenly your server starts throwing errors. Requests hang for 30 seconds, then fail with a cryptic message: TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00. Your database is fine, your server has plenty of resources, but somehow you’ve run out of database connections.

Welcome to connection pool exhaustion—one of the most frustrating issues in FastAPI database integration. It typically shows up under load, making it hard to catch during development. Let’s diagnose what’s happening and fix it properly.

Symptom Description

When your connection pool is exhausted, you’ll see these telltale signs:

Error Messages:

sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached,
connection timed out, timeout 30.00

# Or this variant:
TimeoutError: QueuePool limit of size X overflow Y reached

Performance Symptoms:

  • Requests that used to be fast suddenly take 30+ seconds
  • Server handles a few requests fine, then starts hanging
  • Database shows few active queries, but FastAPI can’t connect
  • Error rate spikes during traffic bursts
  • Restarting the server temporarily fixes the issue

Monitoring Clues:

  • Active database connections stay at the pool limit (default: 5)
  • Application doesn’t release connections after requests complete
  • Memory usage might increase over time
  • Some endpoints work fine while others hang

If you’re seeing any of these symptoms, especially that “QueuePool limit exceeded” message, you’ve got a connection leak.

What’s Actually Happening?

SQLAlchemy uses a connection pool to efficiently manage database connections. Instead of opening a new connection for every query (expensive), it maintains a pool of reusable connections. Here’s the default configuration:

# Default SQLAlchemy connection pool
pool_size = 5          # Maximum persistent connections
max_overflow = 10      # Additional temporary connections allowed
pool_timeout = 30      # Seconds to wait before giving up

So by default, SQLAlchemy allows 5 persistent + 10 overflow = 15 total concurrent connections. When your app tries to use the 16th connection, it waits 30 seconds for one to become available. If none free up, you get the timeout error.

The issue isn’t that you’re hitting the limit—it’s that connections aren’t being returned to the pool after use. They’re leaking.

Diagnostic Steps

Before we fix anything, let’s confirm this is actually a connection pool problem and find where connections are leaking.

Step 1: Enable SQLAlchemy Connection Pool Logging

Add detailed logging to see exactly what’s happening with your connections:

import logging

# Enable SQLAlchemy pool logging
logging.basicConfig()
logging.getLogger('sqlalchemy.pool').setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

When you run your app, you’ll see output like:

Pool checked out connection <PoolProxiedConnection at 0x7f8b1c3d4e50>
Pool checked in connection <PoolProxiedConnection at 0x7f8b1c3d4e50>
Pool disposed. Pool size: 5 Connections in pool: 5 Current Overflow: 0

If you see lots of “checked out” but few “checked in”, you’ve got a leak.

Step 2: Monitor Pool Status in Real-Time

Add an endpoint to inspect your connection pool:

from fastapi import FastAPI
from sqlalchemy import create_engine

engine = create_engine("postgresql://user:pass@localhost/db")

app = FastAPI()

@app.get("/debug/pool")
def pool_status():
    pool = engine.pool
    return {
        "pool_size": pool.size(),
        "checked_in": pool.checkedin(),
        "checked_out": pool.checkedout(),
        "overflow": pool.overflow(),
        "max_overflow": pool._max_overflow,
    }

Hit this endpoint during normal operation. If checked_out keeps growing and checked_in stays low, connections aren’t being returned.

Step 3: Check for Unclosed Sessions

The most common cause—sessions that never get closed. Look through your code for patterns like this:

# RED FLAG: Session created but never explicitly closed
@app.get("/users")
def get_users():
    db = SessionLocal()
    users = db.query(User).all()
    return users  # Session leaked! Never closed!

Step 4: Load Test to Reproduce

Connection leaks often only appear under concurrent load:

# Install hey (HTTP load testing tool)
# On Mac: brew install hey
# On Linux: go install github.com/rakyll/hey@latest

# Send 100 requests with 10 concurrent workers
hey -n 100 -c 10 http://localhost:8000/api/users

# Watch for increasing response times and eventual failures

If the first few requests succeed but later ones timeout, you’re leaking connections.

Cause #1: Not Closing Database Sessions

The number one culprit—creating database sessions without closing them.

The Problem:

from fastapi import FastAPI
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine("postgresql://user:pass@localhost/db")
SessionLocal = sessionmaker(bind=engine)

app = FastAPI()

@app.get("/users")
def get_users():
    db = SessionLocal()  # Connection checked out from pool

    users = db.query(User).all()

    return users
    # Session never closed! Connection stays checked out forever!

Every request checks out a connection but never returns it. After 15 requests (with default pool settings), the pool is exhausted.

The Fix: Use Try-Finally

from fastapi import FastAPI

app = FastAPI()

@app.get("/users")
def get_users():
    db = SessionLocal()
    try:
        users = db.query(User).all()
        return users
    finally:
        db.close()  # ALWAYS close, even if query fails

This ensures the connection is returned to the pool even if an exception occurs.

Better Fix: Use Context Manager

from fastapi import FastAPI

app = FastAPI()

@app.get("/users")
def get_users():
    with SessionLocal() as db:
        users = db.query(User).all()
        return users
    # Session automatically closed when context exits

Context managers are cleaner and harder to mess up.

Best Fix: Use FastAPI Dependencies

from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session

app = FastAPI()

# Dependency that manages session lifecycle
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.get("/users")
def get_users(db: Session = Depends(get_db)):
    users = db.query(User).all()
    return users
    # FastAPI automatically closes session after response

This is the recommended pattern. FastAPI handles the session lifecycle for you—no way to forget to close it.

Cause #2: Exceptions Before Session Close

Even if you’re closing sessions, exceptions can cause leaks if not handled properly.

The Problem:

from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/users/{user_id}")
def get_user(user_id: int):
    db = SessionLocal()

    user = db.query(User).filter(User.id == user_id).first()

    if not user:
        # Exception raised BEFORE closing session!
        raise HTTPException(404, "User not found")

    db.close()  # Never reached when user not found
    return user

When you raise an exception, the code after it doesn’t execute. The session never closes, connection leaks.

The Fix: Always Use Try-Finally

from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/users/{user_id}")
def get_user(user_id: int):
    db = SessionLocal()
    try:
        user = db.query(User).filter(User.id == user_id).first()

        if not user:
            raise HTTPException(404, "User not found")

        return user
    finally:
        db.close()  # Closes even when exception is raised

Or better yet, use the dependency pattern which handles this automatically.

Cause #3: Background Tasks With Sessions

Background tasks can keep sessions open longer than expected.

The Problem:

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def send_email(user_email: str, db: Session):
    # This runs AFTER the response is sent
    # But the session is still open!
    template = db.query(EmailTemplate).first()

    # Simulate slow email sending
    time.sleep(5)

    # Session not closed for 5+ seconds after request completes
    db.close()

@app.post("/register")
def register_user(user_data: dict, background_tasks: BackgroundTasks):
    db = SessionLocal()

    # Create user
    user = User(**user_data)
    db.add(user)
    db.commit()

    # Background task keeps session alive
    background_tasks.add_task(send_email, user.email, db)

    return {"message": "User created"}
    # Response sent, but db session still open in background task!

The session stays checked out for the entire duration of the background task, potentially minutes.

The Fix: Create New Sessions in Background Tasks

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def send_email(user_email: str):
    # Create a NEW session inside the background task
    db = SessionLocal()
    try:
        template = db.query(EmailTemplate).first()
        # Send email...
    finally:
        db.close()  # Close when done

@app.post("/register")
def register_user(user_data: dict, background_tasks: BackgroundTasks):
    db = SessionLocal()
    try:
        user = User(**user_data)
        db.add(user)
        db.commit()
        db.refresh(user)

        # Pass only the data, not the session
        background_tasks.add_task(send_email, user.email)

        return {"message": "User created"}
    finally:
        db.close()  # Close immediately after request

Don’t pass database sessions to background tasks. Create new sessions inside them instead.

Cause #4: Connection Pool Too Small

Sometimes you’re not leaking connections—you just don’t have enough for your traffic.

The Problem:

from sqlalchemy import create_engine

# Default pool: 5 persistent + 10 overflow = 15 total
engine = create_engine("postgresql://user:pass@localhost/db")

# Running with 4 Uvicorn workers = 4 * 15 = 60 max connections
# But your database allows 100 connections
# You have room to increase the pool!

If you’re properly managing sessions but still hitting pool limits under load, you might just need a bigger pool.

The Fix: Increase Pool Size

from sqlalchemy import create_engine

# Increase pool for production traffic
engine = create_engine(
    "postgresql://user:pass@localhost/db",
    pool_size=20,        # Increased from 5
    max_overflow=30,     # Increased from 10
    pool_timeout=60,     # Increased from 30
    pool_pre_ping=True,  # Test connections before using
)

Warning: Don’t just blindly increase pool size. Each worker process creates its own pool, so the total connections = workers * (pool_size + max_overflow). Make sure your database can handle it:

# Example calculation:
# 4 Uvicorn workers
# pool_size = 20
# max_overflow = 30
# Total possible connections = 4 * (20 + 30) = 200

# Make sure your PostgreSQL max_connections > 200!

Check your database’s connection limit:

-- PostgreSQL
SHOW max_connections;

-- MySQL
SHOW VARIABLES LIKE 'max_connections';

Better Fix: Use NullPool for Serverless

If you’re running on serverless platforms (AWS Lambda, Google Cloud Functions), use NullPool which doesn’t maintain persistent connections:

from sqlalchemy.pool import NullPool
from sqlalchemy import create_engine

# No connection pooling - creates new connection each time
engine = create_engine(
    "postgresql://user:pass@localhost/db",
    poolclass=NullPool
)

This prevents connections from staying open between function invocations.

Cause #5: Async Database with Sync Code

Mixing async and sync database code can cause connection leaks.

The Problem:

from fastapi import FastAPI
import asyncio

# Sync engine and session
engine = create_engine("postgresql://user:pass@localhost/db")
SessionLocal = sessionmaker(bind=engine)

app = FastAPI()

@app.get("/users")
async def get_users():
    # Async route with sync database code - BAD!
    db = SessionLocal()

    # This blocks the event loop AND might not close properly
    users = db.query(User).all()

    db.close()
    return users

When you use sync database code in async routes, you can run into connection management issues.

The Fix: Use All-Async or All-Sync

Option 1: Make Everything Async

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from fastapi import FastAPI, Depends

# Async engine
async_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,
    max_overflow=30,
)

AsyncSessionLocal = sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

app = FastAPI()

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
    from sqlalchemy import select

    result = await db.execute(select(User))
    users = result.scalars().all()
    return users

Option 2: Make Route Handlers Sync

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from fastapi import FastAPI, Depends

# Sync engine
engine = create_engine("postgresql://user:pass@localhost/db")
SessionLocal = sessionmaker(bind=engine)

app = FastAPI()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Changed to regular 'def' - FastAPI runs in thread pool
@app.get("/users")
def get_users(db: Session = Depends(get_db)):
    users = db.query(User).all()
    return users

Pick one approach and stick with it throughout your app.

Cause #6: Long-Running Transactions

Transactions that run for a long time keep connections checked out.

The Problem:

from fastapi import FastAPI

app = FastAPI()

@app.post("/process-bulk")
def process_bulk_data(items: list[dict]):
    db = SessionLocal()
    try:
        # Transaction starts
        for item in items:  # Could be thousands of items!
            # Each iteration queries database
            existing = db.query(Item).filter_by(name=item['name']).first()

            if existing:
                existing.quantity += item['quantity']
            else:
                db.add(Item(**item))

            # Simulate slow processing
            time.sleep(0.1)

        db.commit()  # Transaction finally completes after minutes
        return {"processed": len(items)}
    finally:
        db.close()

    # Connection was checked out for the entire processing time!

If processing 1000 items takes 100 seconds, that connection is unavailable for 100 seconds. With concurrent requests, you quickly exhaust the pool.

The Fix: Batch Processing with Periodic Commits

from fastapi import FastAPI

app = FastAPI()

@app.post("/process-bulk")
def process_bulk_data(items: list[dict]):
    db = SessionLocal()
    try:
        batch_size = 100

        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]

            for item in batch:
                existing = db.query(Item).filter_by(name=item['name']).first()

                if existing:
                    existing.quantity += item['quantity']
                else:
                    db.add(Item(**item))

            # Commit after each batch
            db.commit()

        return {"processed": len(items)}
    finally:
        db.close()

Or even better, use background tasks for bulk operations:

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def process_items_background(items: list[dict]):
    db = SessionLocal()
    try:
        # Process in background with own session
        for item in items:
            # ... processing logic ...
            pass
        db.commit()
    finally:
        db.close()

@app.post("/process-bulk")
def process_bulk_data(items: list[dict], background_tasks: BackgroundTasks):
    # Queue the work
    background_tasks.add_task(process_items_background, items)

    # Return immediately
    return {"message": "Processing started", "items": len(items)}

Still Not Working? Advanced Debugging

Check for Implicit Transactions

Some queries automatically start transactions that you need to explicitly commit or rollback:

from fastapi import FastAPI

app = FastAPI()

@app.get("/users")
def get_users():
    db = SessionLocal()
    try:
        # SELECT queries usually don't need commit
        users = db.query(User).all()

        # But if you did any updates, you MUST commit or rollback
        # db.commit()  # Or db.rollback()

        return users
    finally:
        db.close()

If you mix reads and writes, always commit or rollback before closing:

@app.post("/increment-counter")
def increment_counter(counter_id: int):
    db = SessionLocal()
    try:
        counter = db.query(Counter).get(counter_id)
        counter.value += 1

        # MUST commit the transaction
        db.commit()

        return {"value": counter.value}
    except Exception as e:
        # MUST rollback on error
        db.rollback()
        raise
    finally:
        db.close()

Monitor with Prometheus Metrics

Add metrics to track pool usage:

from prometheus_client import Gauge
from fastapi import FastAPI

# Define metrics
pool_size_gauge = Gauge('db_pool_size', 'Database pool size')
pool_checked_out = Gauge('db_pool_checked_out', 'Connections checked out')
pool_overflow = Gauge('db_pool_overflow', 'Pool overflow connections')

app = FastAPI()

@app.middleware("http")
async def track_pool_metrics(request, call_next):
    # Update metrics before request
    pool = engine.pool
    pool_size_gauge.set(pool.size())
    pool_checked_out.set(pool.checkedout())
    pool_overflow.set(pool.overflow())

    response = await call_next(request)

    # Update metrics after request
    pool_checked_out.set(pool.checkedout())

    return response

Then monitor these metrics in Grafana or your monitoring tool.

Use Connection Pool Events

SQLAlchemy lets you hook into pool events to detect leaks:

from sqlalchemy import event, create_engine
import logging

logger = logging.getLogger(__name__)

engine = create_engine("postgresql://user:pass@localhost/db")

@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
    logger.info(f"Connection opened: {id(dbapi_conn)}")

@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
    logger.info(f"Connection checked out: {id(dbapi_conn)}")

@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
    logger.info(f"Connection checked in: {id(dbapi_conn)}")

# Now you can trace exactly when connections are checked out and in

If you see connections checked out but never checked in, you’ve found your leak.

Production Configuration Best Practices

Here’s a solid production configuration:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from fastapi import FastAPI, Depends
import os

# Environment-specific settings
DATABASE_URL = os.getenv("DATABASE_URL")
IS_PRODUCTION = os.getenv("ENVIRONMENT") == "production"

# Configure engine based on environment
if IS_PRODUCTION:
    engine = create_engine(
        DATABASE_URL,
        pool_size=20,           # Persistent connections
        max_overflow=30,        # Additional connections allowed
        pool_timeout=60,        # Wait time before timeout
        pool_recycle=3600,      # Recycle connections after 1 hour
        pool_pre_ping=True,     # Verify connection health before use
        echo=False,             # Disable query logging in production
    )
else:
    # Development settings
    engine = create_engine(
        DATABASE_URL,
        pool_size=5,
        max_overflow=10,
        pool_pre_ping=True,
        echo=True,              # Enable query logging for debugging
    )

SessionLocal = sessionmaker(bind=engine)

app = FastAPI()

# Standard dependency for session management
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Use in all route handlers
@app.get("/users")
def get_users(db = Depends(get_db)):
    return db.query(User).all()

Key Configuration Options Explained:

  • pool_size: Max persistent connections in the pool (per worker)
  • max_overflow: Additional temporary connections allowed
  • pool_timeout: Seconds to wait for a connection before raising error
  • pool_recycle: Recycle connections after X seconds (prevents stale connections)
  • pool_pre_ping: Test connection health before using (catches broken connections)

Calculate Total Connections:

Total Connections = workers * (pool_size + max_overflow)

Example:
- 4 Uvicorn workers
- pool_size = 20
- max_overflow = 30

Total = 4 * (20 + 30) = 200 connections

Make sure your database max_connections > 200!

Testing Your Fix

Here’s how to verify you’ve actually fixed the leak:

Test 1: Sequential Requests

# test_sequential.py
import requests
import time

BASE_URL = "http://localhost:8000"

# Send 100 sequential requests
for i in range(100):
    response = requests.get(f"{BASE_URL}/users")
    print(f"Request {i}: {response.status_code}")
    time.sleep(0.1)

print("All requests completed!")
# If this completes without errors, you're not leaking on single requests

Test 2: Concurrent Requests

# test_concurrent.py
import asyncio
import httpx

async def make_request(client, i):
    response = await client.get("http://localhost:8000/users")
    print(f"Request {i}: {response.status_code}")
    return response

async def main():
    async with httpx.AsyncClient() as client:
        # Send 50 concurrent requests
        tasks = [make_request(client, i) for i in range(50)]
        responses = await asyncio.gather(*tasks)

        print(f"Completed {len(responses)} requests")

asyncio.run(main())
# If this completes without timeouts, your pool is sized correctly

Test 3: Monitor Pool Status

# While running load tests, monitor pool status
import requests
import time

for i in range(10):
    response = requests.get("http://localhost:8000/debug/pool")
    print(f"Time {i}: {response.json()}")
    time.sleep(2)

# Watch for:
# - checked_out should go up and down (not stay high)
# - checked_in should recover after requests complete
# - overflow should be temporary, not permanent

Common Mistakes Checklist

  • [ ] Not using try/finally or context managers when creating sessions
  • [ ] Raising exceptions before closing sessions
  • [ ] Passing sessions to background tasks
  • [ ] Using sync database code in async route handlers
  • [ ] Pool size too small for traffic volume
  • [ ] Not setting pool_pre_ping=True (causes stale connection issues)
  • [ ] Long-running transactions holding connections
  • [ ] Total connections (workers × pool_size) exceeds database limit

Summary

Connection pool exhaustion in FastAPI happens when database connections are checked out but never returned to the pool. The “QueuePool limit exceeded” error means you’ve run out of available connections.

Most Common Causes:

  1. Not closing database sessions (use dependencies or context managers)
  2. Exceptions raised before session close (use try/finally)
  3. Background tasks keeping sessions open (create new sessions in tasks)
  4. Pool size too small for traffic (increase pool_size and max_overflow)
  5. Mixing async/sync database code (pick one approach)
  6. Long-running transactions (use batching or background tasks)

Best Practices:

  • Always use FastAPI dependencies for session management
  • Use context managers or try/finally for manual session handling
  • Configure pool size based on workers and traffic
  • Enable pool_pre_ping=True in production
  • Monitor pool status with metrics or debug endpoints
  • Test under concurrent load before deploying

The dependency pattern (Depends(get_db)) is the most reliable way to prevent connection leaks in FastAPI. It automatically handles session lifecycle and works correctly even when exceptions are raised.

For related FastAPI troubleshooting, check out our FastAPI Async/Sync Blocking Guide to learn about event loop issues that can compound connection pool problems.

Debugging Database Issues? Use Debugly’s trace formatter to quickly parse and analyze SQLAlchemy tracebacks with clear, formatted output.