Async Repository Pattern with SQLAlchemy 2.0 (BaseRepository[T])
What? (Concept Overview)
The async repository pattern isolates DB-level CRUD into a generic class parameterised on the model type, so every domain model inherits the same create / get_by_id / get_by_filters / update / delete surface without copy-pasting SQLAlchemy boilerplate. Each repository instance owns ONE AsyncSession, which is why almost every method in BaseRepository is async.
Project Context
The FCA Support Agent has 7+ per-resource repositories (customer, account, transaction, conversation, message, product, faq), all subclassing BaseRepository[ResourceModel]. The base class delegates session management to the proxy pattern set up in BaseService, so callers never pass an AsyncSession explicitly. This keeps each repository testable with a single in-memory SQLite engine and predictable across environments.
How? (Quick Reference Blocks)
3.1 The Generic BaseRepository[T]
# app/repositories/base.py
from typing import Generic, Type, TypeVar
from sqlalchemy import select, delete, update
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T")
class BaseRepository(Generic[T]):
model: Type[T]
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def create(self, data: dict) -> T:
instance = self.model(**data)
self.session.add(instance)
await self.session.flush()
await self.session.refresh(instance)
return instance
async def get_by_id(self, id_: int | str) -> T | None:
return await self.session.get(self.model, id_)
async def get_all(self, limit: int = 100, offset: int = 0) -> list[T]:
stmt = select(self.model).limit(limit).offset(offset)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def get_by_filters(
self, filters: dict, limit: int = 100
) -> list[T]:
stmt = select(self.model)
for field, value in filters.items():
stmt = stmt.where(getattr(self.model, field) == value)
stmt = stmt.limit(limit)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def update(self, id_: int | str, data: dict) -> T | None:
stmt = update(self.model).where(
self.model.id == id_
).values(**data).returning(self.model)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def delete(self, id_: int | str) -> bool:
stmt = delete(self.model).where(self.model.id == id_)
result = await self.session.execute(stmt)
return result.rowcount > 03.2 Subclassing for a Domain Model
# app/repositories/customer.py (illustrative)
from app.models.customer import Customer
from app.repositories.base import BaseRepository
class CustomerRepository(BaseRepository[Customer]):
model = Customer
# domain-specific addition — does NOT belong in the base class
async def find_by_email(self, email: str) -> Customer | None:
results = await self.get_by_filters({"email": email}, limit=1)
return results[0] if results else NoneWhy? (Parameter Breakdown)
Generic[T]withmodel: Type[T]— Single source of truth: the subclass setsmodel = Customerand every method now operates onCustomershaped objects. Mypy/static-analysis tools can verify thatupdate(customer)is a type-safe call.session.flush()+refresh()afteradd()—flush()sends the INSERT to the DB (so PK auto-increments) without committing.refresh()re-fetches the row so server-default columns (timestamps, generated UUIDs) populate the Python object before persistence returns.update(...).returning(self.model)— Single round-trip UPDATE+SELECT. Saves aget_by_idround-trip after everyupdateand preserves the post-update row in one object.delete(...).rowcount > 0— Reports whether the delete actually matched anything. Without it, “delete non-existent row” returnsNone(ambiguous) and the caller cannot distinguish “missing” from “deleted”.get_by_filterswithlimit— Hard-coded limit caps accidental full-table scans in case a caller forgets to specify constraints. The default 100 is a guardrail, not a feature.- Domain methods like
find_by_emaillive on the subclass, notBaseRepository— Keeps the base generic and re-usable across model types. ReserveBaseRepositoryfor truly cross-model CRUD only.
Common Pitfalls
- Forgetting
await self.session.flush()increate. Without it, the autoincrement PK isn’t populated until commit time, so any caller downstream that needs the new ID will seeNone. The pattern (add → flush → refresh → return) is the SQLAlchemy 2.0 idiom for returning a fully-populated row. - Using
commit()inside the repository. Repositories should NEVER commit. The owning service (BaseService) owns the transaction boundary. Callingcommit()from inside a repository method makes nested repository calls impossible (commit-then-rollback fails) and makes tests brittle.
Real-World Interview Prep
Q1: Why is the async repository pattern strictly preferable to a sync one when paired with FastAPI?
A: FastAPI runs request handlers in an event loop. Mixing sync DB calls blocks the loop and pins worker threads — at 100 concurrent requests you exhaust the threadpool. Async reps play nicely with await everywhere in the handler chain, and let you stream via SSE without context switches. SQLAlchemy 2.0’s async API uses asyncpg/psycopg3-async under the hood, which has dedicated IO paths that don’t occupy a Python thread while waiting on the network.
Q2: How would you add soft-delete semantics (e.g., is_deleted=True) without bloating BaseRepository?
A: Override only the delete method in subclasses that need it; the base stays generic. For cross-cutting soft-delete, introduce a SoftDeleteMixin that overrides delete, get_all, and get_by_filters to filter on is_deleted.is_(False). Avoid embedding the flag in BaseRepository because most resources don’t need it and adding it forces every model to carry the column.
Q3: What’s the safe way to update many rows atomically (e.g., bulk-deactivate 10k users) using this pattern?
A: Don’t loop await self.update(id, {...}) — that’s 10k individual UPDATEs and 10k awaits. Use a single UPDATE ... WHERE id IN (...) via sqlalchemy.update(self.model).where(self.model.id.in_(ids)).values(**data), then await session.execute(stmt). Wrap it in a service-layer method (bulk_deactivate) that owns the transaction. The repository’s update(id, data) is per-row by design; bulk operations belong on the service layer or in a dedicated bulk_* method on the repository that explicitly opts into batched semantics.
Top-to-Bottom Code Walkthrough (app/repositories/base.py)
The async repository pattern keeps the SQL where it belongs (one file per table) while letting services own the transaction lifecycle.
BaseRepository(Generic[T])
-
def __init__(self, session: AsyncSession, model: Type[T])— the session is injected so the unit-of-work boundary is owned by the service, not the repo. The model is declared in subclasses via class-attribute assignment. -
async def create(data: dict) -> T:obj = self.model(**data) self.session.add(obj) await self.session.flush() return objflush(vscommit) sends the INSERT to the DB but doesn’t end the transaction. The service decides when to commit. -
async def get_by_id(id):await self.session.get(self.model, id). SQLAlchemy uses the identity map — if the object is already loaded, no query is issued. -
async def get_all(skip: int = 0, limit: int = 100):result = await self.session.execute( select(self.model).offset(skip).limit(limit) ) return result.scalars().unique().all().scalars()peels off individual columns;.unique()removes duplicates from JOIN duplicates. -
async def update(id, data): Fetches, mutates attributes viasetattr, returns. No eager UPDATE — the change is flushed when the transaction commits. -
async def delete(id): Callssession.delete(obj). Triggers the actual DELETE statement at flush time. -
async def count():select(func.count()).select_from(self.model)— delegates to a singleSELECT count(*).
Subclass example (app/repositories/customer.py)
class CustomerRepository(BaseRepository[Customer]):
model = Customer # declared once
async def get_by_email(self, email):
result = await self.session.execute(
select(Customer).where(Customer.email == email)
)
return result.scalar_one_or_none()
async def get_by_customer_id(self, customer_id): # external ID
result = await self.session.execute(
select(Customer).where(Customer.customer_id == customer_id)
)
return result.scalar_one_or_none()Domain queries live in the subclass; CRUD lives in the base. New repositories never need to re-implement CRUD.
How services use repositories
class AccountService(BaseService):
async def create_account(self, data):
async with AccountRepository(self.session, Account) as repo:
account = await repo.create(data)
await self.commit()
return accountA with block on a repo (or session) ties its lifecycle to the service’s transaction.
Common Pitfalls
Calling await self.session.commit() inside a repo method breaks the unit-of-work. Services need the option to roll back. Never commit from a repo.
Returning result.scalars() instead of .unique().all() after a JOIN raises when there are duplicate rows.
Returning a generator across the async boundary blocks the consumer — always materialise .all() before returning.
Real-World Interview Prep
Q1: Why Generic[T] instead of inheritance with abstract model?
A: It tells mypy/IDE that await self.get_by_id(...) returns T. Without generics, every helper is model | None | Customer. With generics it’s exact — runtime speeds are the same.
Q2: How do you mock a repo in a service test?
A: Replace self.repo with a MagicMock(spec=CustomerRepository) whose .create.return_value = AsyncMock(return_value=...). The service runs with no DB at all.
Q3: When would you bypass the repo and write raw SQL?
A: Window functions, CTEs, lateral joins, and bulk INSERT-from-SELECT are awkward in ORM. Use session.execute(text("...")) for these — keep them in a raw_queries.py next to the repo so they’re documented and inspectable.