Async pytest Testing with pytest-asyncio + Self-Healing DB Fixture
What? (Concept Overview)
The FCA test suite uses pytest-asyncio to test the real async stack (no mocks) against a real Postgres + Redis. A session-scoped enterprise_database_setup fixture in tests/conftest.py runs CREATE EXTENSION IF NOT EXISTS vector; + Base.metadata.create_all and then self-heals if the DB is empty by shelling out to the seeder (app.seed_database). Tests then run without mocks because (a) the DB is real and seeded, (b)pytest.mark.asyncio makes every test method an async def that gets awaited by the runner.
Project Context
Existing tests (tests/test_compliance_checker.py, tests/test_human_agent.py, tests/test_full_workflow1.py) mix pure-Python unit tests (compliance) with integration tests that hit live Postgres (human agent, full workflow). The conftest ensures the integration tests always have data. The fixture pattern is reusable for any modules that need an async engine.
How? (Quick Reference Blocks)
3.1 The conftest.py Self-Healing Fixture
# tests/conftest.py
import pytest
import subprocess
import sys
import os
from sqlalchemy import text
from app.database import engine, Base
from dotenv import load_dotenv
import pytest_asyncio
import redis.asyncio as redis
from sqlalchemy.ext.asyncio import AsyncSession
load_dotenv()
os.environ["REDIS_URL"] = "redis://localhost:6379/1"
@pytest.fixture(scope="session", autouse=True)
async def enterprise_database_setup():
"""Bootstraps the test DB and self-heals if empty."""
async with engine.begin() as conn:
# 1. Enable pgvector
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
# 2. Build tables safely (idempotent)
await conn.run_sync(Base.metadata.create_all)
# 3. Detect empty state
result = await conn.execute(text("SELECT COUNT(*) FROM customers"))
customer_count = result.scalar()
if customer_count == 0:
print("WARNING: Database is empty! Self-healing...")
subprocess.run([sys.executable, "-m", "app.seed_database"],
env=os.environ.copy(), check=True)
if os.path.exists("ingest.py"):
subprocess.run([sys.executable, "ingest.py"],
env=os.environ.copy(), check=True)
print("Database successfully self-healed. Starting tests...")
yield # tests run here3.2 Async Engine Fixture (Per-Test, NullPool)
# tests/test_human_agent.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import NullPool
from app.config import settings
@pytest.fixture
async def human_agent():
test_engine = create_async_engine(
settings.database_url, poolclass=NullPool, echo=False
)
session = AsyncSession(bind=test_engine, expire_on_commit=False)
try:
from app.services import ConversationService
conv_svc = ConversationService(db=session)
from app.agents.human_agent import HumanAgent
from app.agents.base import AgentConfig
agent = HumanAgent(
config=AgentConfig(),
conversation_service=conv_svc,
)
yield agent
finally:
await session.close()
await test_engine.dispose() # critical for NullPool lifecycle3.3 Async Test Method (Compliance)
# tests/test_compliance_checker.py
import pytest
from app.agents.compliance_checker import ComplianceCheckerAgent
from app.agents.base import AgentConfig
@pytest.fixture
def compliance_agent():
return ComplianceCheckerAgent(config=AgentConfig())
@pytest.mark.asyncio
async def test_short_circuit_prohibited_word(compliance_agent):
response = await compliance_agent.process({
"content": "Put your money in our new bond! It is completely risk-free."
})
assert response.metadata["is_compliant"] is False
assert any("risk-free" in issue.lower()
for issue in response.metadata["issues"])
@pytest.mark.asyncio
async def test_deterministic_disclaimer_injection(compliance_agent):
response = await compliance_agent.process(
{"content": "If you are struggling with debt on your credit card payments..."},
context={"product_type": "credit"},
)
disclaimers = response.metadata["required_disclaimers"]
assert any("Representative APR" in d for d in disclaimers)
assert any("MoneyHelper or StepChange" in d for d in disclaimers)3.4 End-to-End Workflow Test (Real DB + Real LLM)
# tests/test_full_workflow1.py
import pytest
from app.coordinator.agent_coordinator import AgentCoordinator
@pytest.fixture
async def coordinator():
return AgentCoordinator()
@pytest.mark.asyncio
async def test_workflow_complaint_escalation(coordinator):
"""Path: classify (complaint) -> human_agent -> PAUSE (Checkpointer)"""
result = await coordinator.process_message(
message="I am very unhappy with the recent changes to my account fees "
"and wish to file a complaint.",
customer_id=1,
conversation_id=104,
)
# Native LangGraph interrupt should freeze and return paused
assert result["status"] == "paused"
assert result["agent"] == "system"
assert "escalation_id" in result.get("metadata", {})Why? (Parameter Breakdown
scope="session", autouse=True— Runs ONCE perpytestsession. Withoutautouse=True, you must put the fixture name in every test signature. With it, every test gets the seeded DB without ceremony.async with engine.begin()for setup — Single transaction ensuresCREATE EXTENSIONandcreate_allsucceed atomically. Withoutengine.begin(), you’d get a half-initialised DB on failure.poolclass=NullPoolin per-test fixtures — Each test gets a fresh engine opening fresh connections. The default pool would re-use connections across test event loops → eventualloop is closederrors. Match the productionapp/database.pytest-mode setting.executor env=os.environ.copy()—subprocess.runinherits the parent env, so DATABASE_URL/REDIS_URL flow into the seeded subprocess. Withoutenv=..., the subprocess uses its env (which may differ).check=Trueon subprocess — RaisesCalledProcessErrorif the seed fails; the test suite fails FAST. Without it, the self-heal silently fails and tests run against an empty DB.pytest.mark.asynciodecorator — Marks a function for async collection. Without it, the test runs as a coroutine that the runner never awaits →'coroutine' object has no attribute 'assert'cryptic error.expire_on_commit=False—AsyncSession(bind=engine, expire_on_commit=False)lets access to ORM attrs work after commit. Default (True) reloads from DB on every attribute read, which adds latency.engine.dispose()in finally — Drains the engine’s connection pool before each test ends. Without it, pytest’s pytest-asyncio plugin leaks connections; subsequent tests start to fail with pool-exhausted errors.
Common Pitfalls
- Using
scope="function"(default) for the DB-setup fixture — re-creates tables 5x per second on a slow machine. Session-scoped is faster but tests run against shared state — order-dependent failures are more likely. Compromise: session-scoped for seeding + function-scoped for transactions. - Mocking
AsyncSession— except for pure-unit logic, mocks miss subtle async bugs (event-loop misuse, connection-pool close order). Hit the real DB; speed matters far less than correctness in CI.
Real-World Interview Prep
Q1: Why use subprocess.run for self-healing instead of calling seed_all() directly?
A: Two reasons. (1) Environment isolation — the seed subprocess inherits os.environ.copy() so whatever test-specific env vars were set (e.g., REDIS_URL=redis://localhost:6379/1) flow into the seed run. Calling seed_all() directly would use the test’s own env, but subprocess forces a fresh Python interpreter with the explicit env. (2) Crash isolation — if the seed raises, only the subprocess dies, not the test process. The test code keeps going and produces clean failure output.
Q2: How would you speed up a CI test suite that takes 30 minutes?
A: Three-pronged. (1) Parallel pytest — pytest -n auto (xdist) spins worker processes; ideal for read-heavy integration tests on the SAME DB instance. (2) DB-per-test — for higher isolation, allocate a per-test schema or per-test Postgres DB; each test runs against an empty-DB-with-seed, clean teardown {schema}. (3) Snapshot fast-restart — take a base image of the DB once, copy/restore per test instead of reseeding. For the FCA stack (1) is a 4-line change; (2)+(3) are ops investments worth it once CI > 10 min.
Q3: Why use NullPool and not NullPool+pool_recycle?
A: NullPool opens and closes a connection per checkout; pool_recycle is irrelevant because there’s no pool to recycle. NullPool is paired with pool_pre_ping=False (also irrelevant — there’s no pool). The combination is the standard pytest dance to get pure “connect → use → close” semantics. If you wanted a per-test pool (rare), use pool_size=1, max_overflow=0, pool_pre_ping=False instead — but the lifecycle hygiene costs outweigh the speed benefit.
Top-to-Bottom Code Walkthrough (tests/conftest.py + tests/test_*.py)
The conftest.py defines fixtures that flow into every test automatically. The “self-healing” pattern means tests clean up after themselves even when something fails mid-run.
tests/conftest.py
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from app.main import app
from app.database import AsyncSessionLocalWhy pytest_asyncio not just pytest? pytest doesn’t natively understand async def test_*. The pytest-asyncio plugin registers the asyncio event-loop fixture and runs async tests correctly.
Event-loop fixture
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.new_event_loop()
yield loop
loop.close()Session scope means ONE event loop for all tests in the session. Per-test loops cause issues with pooled DB connections — when test A’s loop is closed, test B can’t use the same connection.
async_client fixture — async HTTPX fixture
@pytest_asyncio.fixture
async def async_client():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
yield clientASGITransport lets you call FastAPI endpoints in-process without binding port 8000. await client.post("/api/v1/...") is a real HTTP roundtrip that runs the FastAPI app in the test.
DB fixtures
@pytest_asyncio.fixture
async def db_session():
async with AsyncSessionLocal() as session:
yield session
await session.rollback() # always rollback, never commitThe rollback-instead-of-commit pattern isolates each test from others. Tests can mutate freely; nothing persists.
Auth fixtures
@pytest_asyncio.fixture
async def auth_headers(db_session):
user = await create_test_user(db_session)
token = create_test_token(user)
return {"Authorization": f"Bearer {token}"}Auto-bundles token generation with the DB fixture — every test that depends on auth_headers gets a fresh user + token.
Self-healing seed
The “self-healing” idea: if the test fixtures fail to set up, the fixture’s cleanup logic must still run to tear down whatever partial state was created. This avoids test pollution where one failed test destabilises the next 50.
Markers
@pytest.mark.integration
async def test_full_workflow(async_client, db_session): ...
@pytest.mark.unit
def test_pure_function(): ...Run fast feedback with pytest -m unit (no DB) or pytest -m integration (full stack).
Pytest config (from pyproject.toml)
addopts = "-v --strict-markers --strict-config --cov=app --cov-report=term-missing"--strict-markers ensures every @pytest.mark.X is registered somewhere — typos cause immediate errors.
--cov=app automatically measures coverage of the app/ directory.
Common Pitfalls
Using pytest.fixture for an async fixture — the fixture returns before the async work begins. Always use @pytest_asyncio.fixture for async setup.
Sharing a DB row across tests without rollback — the second test sees dirty state. Always rollback in fixture cleanup.
Forgetting event_loop scope — per-test event loops cause RuntimeError: Event loop is closed errors that look like the test framework is broken.
Real-World Interview Prep
Q1: Why pytest-asyncio plugin at all?
A: Before the plugin, async tests had to be wrapped in asyncio.run boilerplate. The plugin auto-detects async def test_* and runs them on the configured loop. Saves ~3 lines per test and a lot of cognitive overhead.
Q2: How do you test endpoints that pause a LangGraph (human-in-the-loop)?
A: Mock AgentCoordinator to return a fake state without actually running the graph. Use AsyncMock(spec=coordinator.process_message) so route signatures still type-check.
Q3: What if tests are flaky on CI but pass locally?
A: Three suspects. (1) Connection pool has stale connections — use NullPool for tests. (2) Event loop scope is wrong — set scope=session. (3) Test ordering dependency — fixtures with shared state leak between tests. Add --random-order from pytest-randomly to surface.