Skip to Content
DatabaseSQLAlchemy Async Pooling

SQLAlchemy Async Pooling

What

An environment-aware create_async_engine configuration that swaps a real connection pool for NullPool in the test environment, so Pytest event loops never outlive the DB connections they opened.

Project Context

In full_project_context_updated.txt -> app/database.py, the engine is built once at import time with engine_kwargs that are conditionally mutated by settings.environment. The test branch uses poolclass=NullPool and disables echo to keep AI-load test runs quiet. Production/dev use a normal pool with pool_recycle=3600 (reclaim stale connections) and pool_pre_ping=True (auto-reconnect dropped links).

How

Environment-aware engine construction

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.pool import NullPool engine_kwargs = { "echo": settings.database_echo, "future": True, # SQLAlchemy 2.0 style } if settings.environment == "test": engine_kwargs.update({ "poolclass": NullPool, # no pooling — close after each checkout "echo": False, # silence log flood during AI tests }) else: engine_kwargs.update({ "pool_size": settings.database_pool_size, "max_overflow": settings.database_max_overflow, "pool_recycle": 3600, # recycle connections hourly "pool_pre_ping": True, # auto-reconnect dropped DB links }) engine = create_async_engine(settings.database_url, **engine_kwargs) AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False, )
  • NullPool opens a fresh connection on every checkout and immediately closes it. This is essential in tests because pytest function-scoped event loops end and the next test creates a new loop — pooled-but-already-bound connections crash with loop is closed.
  • pool_recycle=3600 defends against load balancers that silently drop idle TCP links after an hour.
  • pool_pre_ping=True issues a cheap SELECT 1 on every checkout; the database round-trip is sub-millisecond and buys you safety against stale connections behind Kubernetes service meshes.
  • async_sessionmaker produces a session factory; configuring expire_on_commit=False lets callers read attributes after a commit without re-fetching from the DB.

Top-to-Bottom Code Walkthrough (app/database.py)

The module is short and that’s intentional — it owns three objects (engine, session factory, declarative base) and four lifecycle hooks (get_db, init_db, close_db, check_db_connection). Total scope: connect to Postgres, hand sessions to request handlers, run DDL on startup, shut down cleanly.

The imports are precise. create_async_engine, AsyncSession, async_sessionmaker come from sqlalchemy.ext.asyncio — the SQLAlchemy 2.0 async API; never mix with the 1.x engine = create_engine(...) pattern in an async codebase. declarative_base from sqlalchemy.orm is the legacy ORM API; every model file declares class Foo(Base): ... against it. text from sqlalchemy (not sqlalchemy.sql) is the wrapper function that converts raw SQL strings into executable objects — without it, session.execute("SELECT 1") raises ObjectNotExecutableError. AsyncGenerator from typing is the type hint that says “this async def uses yield” — required for FastAPI’s dependency injection to recognize the function as a context manager. NullPool from sqlalchemy.pool is conditionally imported; only used when environment == "test".

logger = logging.getLogger(__name__) follows the standard pattern; __name__ evaluates to "app.database", so log lines look like app.database INFO ....

The engine_kwargs = {"echo": ..., "future": True} block is the base configuration passed to create_async_engine. "echo": settings.database_echo flips SQL statement logging on when debugging — every session.execute(...) then prints the raw SQL to console. "future": True opts SQLAlchemy 2.0 into the modern API surface (it became the default but keeping the flag is defensive). The base kwargs are common to every environment.

The if settings.environment == "test": branch diverges from production for one reason: NullPool. In tests, the event loop that owns the engine is the pytest-asyncio loop, which is recreated between tests when configured for a fresh loop per test. Pooled connections would survive across loop boundaries and then fail to be re-used — producing the dreaded RuntimeError: Event loop is closed. NullPool says “open and close one connection per checkout”; slower at production scale but deterministic in tests. echo is also dropped to False because AI tests fire hundreds of queries per test and the SQL echo would flood logs.

The else: branch turns on the production configuration. pool_size caps open connections at settings.database_pool_size (default 5, bounded 1..20). max_overflow allows that many extra connections under spike load (default 10, bounded 0..50). pool_recycle=3600 recycles connections every hour so they don’t go stale behind a load balancer that times out idle TCP. pool_pre_ping=True makes SQLAlchemy send a trivial SELECT 1 on checkout to detect a stale or dead connection and reconnect transparently — without this, the first request after a DB restart would 500 the calling client.

engine = create_async_engine(settings.database_url, **engine_kwargs) is the actual engine. It’s a module-level singleton so every AsyncSession (and every FastAPI request) shares the same connection pool. The DBAPI is dictated by the URL scheme: +asyncpg here resolves to the asyncpg driver; if you wrote postgresql+psycopg://..., you’d be on psycopg3.

AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False) configures the session factory. Each option matters. class_=AsyncSession tells the factory to produce async sessions, not the sync 1.x Session. expire_on_commit=False means a freshly-committed ORM object doesn’t lose its attributes; without this, accessing any attribute on a committed object would silently trigger a fresh SELECT — breaking patterns like model_validate(orm_obj) after a commit. autocommit=False is the safe default — explicit commit() only. autoflush=False keeps dirty objects pre-flush until you call flush() or commit(); this is important when a function receives a session and runs multiple queries — visibility is predictable.

Base = declarative_base() creates the declarative base class every model inherits from. SQLAlchemy also offers the modern DeclarativeBase class-based API; this codebase uses the legacy function-based API for consistency.

async def get_db() -> AsyncGenerator[AsyncSession, None] is the FastAPI dependency function. Every router that writes db: AsyncSession = Depends(get_db) calls into this. The body uses async with AsyncSessionLocal() as session: to guarantee the session closes even on exception. Inside, it yield session so FastAPI can inject it into the handler. After the handler returns, the function resumes: await session.commit() commits if no exception escaped; in the except Exception: branch it await session.rollback() to discard partial writes; the finally: block await session.close() is defensive even though the async with already closes. This three-step teardown (commit/rollback/close) is the canonical async-session lifecycle used by every route in the app.

async def init_db() runs DDL. async with engine.begin() as conn: opens a transaction that lives for the entire block — important because DDL must be transactional in Postgres to avoid leaving half-created tables after a failure. Inside, await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector;")) enables pgvector that powers RAG. Order matters: extension FIRST so subsequent tables that reference vector(...) columns succeed. Then await conn.run_sync(Base.metadata.create_all) runs the synchronous ORM metadata create on a worker thread, generating CREATE TABLE IF NOT EXISTS statements for every model. run_sync is the bridge between SQLAlchemy’s sync ORM and the async connection.

async def close_db() is the shutdown counterpart. await engine.dispose() drains the pool, closes open connections, and frees TCP sockets cleanly. Called from the FastAPI lifespan shutdown.

async def check_db_connection() is the liveness probe used by the /health endpoint. It opens a one-shot session, runs SELECT 1, returns True on success or False on any exception (logged at ERROR level). Crucially it never raises — the caller treats False as unhealthy and responds 503, but the probing process itself doesn’t crash.

The if __name__ == "__main__": block at the bottom is a manual smoke-test entry point: python -m app.database connects, runs check_db_connection(), prints the result, and closes. Useful to confirm .env and pool config before the rest of the app comes up.

Common Pitfalls

Defaulting to NullPool in production removes pool reuse and your API server will open a TCP + auth handshake on every request. Restrict NullPool to environment == "test".

Forgetting autoflush=False plus expire_on_commit=True is the default and silently breaks Pydantic model_validate(orm_obj) patterns after a commit. Set both explicitly.

Real-World Interview Prep

Q1: When should you use NullPool vs the default pool in production?

A: NullPool opens + closes a connection on every checkout. Use it when (a) you’re behind a serverless runtime (Lambda, Cloud Run Jobs) where pooling offers no benefit (process exits between requests); (b) you’re in a test environment where you MUST guarantee connection lifetime is < pytest event loop lifetime (otherwise pytest hits “loop is closed” errors); (c) your DB is in another region/VPC and the connection setup is async — pooling is wasted because RTT is the bottleneck anyway. Use the default pool when (a) you have a long-running web/worker process making many sequential queries, (b) the DB is in the same VPC/AZ as the app, and (c) your connection budget is small (10-30 connections total).

Q2: How do you debug “connection timeout” errors in production?

A: Triangulate three sources. (1) Postgres side: SELECT pid, state, query_start, wait_event FROM pg_stat_activity WHERE datname='X' AND state='idle in transaction' ORDER BY query_start DESC LIMIT 20; — idle-in-transaction rows are the tell that the app’s pool is leaking. (2) App side: enable SQL echo temporarily (settings.database_echo = True), look for long-running statements. (3) Pool side: engine.pool.size() and engine.pool.checkedout() from a debug endpoint — if checkedout == size for > 30s you have a pool-exhaustion bug. The fix is usually one of: shorter transactions, less aggressive eager-loading, OR raising pool_size to match your max_concurrent_requests.

Q3: Why asyncpg over psycopg3 for FastAPI? When would you switch?

A: asyncpg is faster (binary protocol, no cursor wrapping), has first-class SQLAlchemy support, and uses async/await natively. psycopg3 is the official replacement from the psycopg team, has its own async mode, and supports more Postgres-specific features (e.g., COPY FROM). Switch to psycopg3 when (a) you need COPY for bulk loading, (b) psycopg2-based tooling (Alembic scripts, psycopg2-binary migrations) causes friction, (c) you need prepared-statement-on-server-side cursors for analytical queries. Stay on asyncpg for OLTP web traffic. They are largely a drop-in replacement at the SQLAlchemy 2.0 layer — change +asyncpg to +psycopg and you’re done.

Last updated on