Skip to Content
BackendAsync Repository Pattern with SQLAlchemy 2.0 (BaseRepository)

Async Repository Pattern with SQLAlchemy 2.0 (BaseRepository[T])

What? (Concept Overview)

The async repository pattern isolates DB-level CRUD into a generic class parameterised on the model type, so every domain model inherits the same create / get_by_id / get_by_filters / update / delete surface without copy-pasting SQLAlchemy boilerplate. Each repository instance owns ONE AsyncSession, which is why almost every method in BaseRepository is async.

Project Context

The FCA Support Agent has 7+ per-resource repositories (customer, account, transaction, conversation, message, product, faq), all subclassing BaseRepository[ResourceModel]. The base class delegates session management to the proxy pattern set up in BaseService, so callers never pass an AsyncSession explicitly. This keeps each repository testable with a single in-memory SQLite engine and predictable across environments.

How? (Quick Reference Blocks)

3.1 The Generic BaseRepository[T]

# app/repositories/base.py from typing import Generic, Type, TypeVar from sqlalchemy import select, delete, update from sqlalchemy.ext.asyncio import AsyncSession T = TypeVar("T") class BaseRepository(Generic[T]): model: Type[T] def __init__(self, session: AsyncSession) -> None: self.session = session async def create(self, data: dict) -> T: instance = self.model(**data) self.session.add(instance) await self.session.flush() await self.session.refresh(instance) return instance async def get_by_id(self, id_: int | str) -> T | None: return await self.session.get(self.model, id_) async def get_all(self, limit: int = 100, offset: int = 0) -> list[T]: stmt = select(self.model).limit(limit).offset(offset) result = await self.session.execute(stmt) return list(result.scalars().all()) async def get_by_filters( self, filters: dict, limit: int = 100 ) -> list[T]: stmt = select(self.model) for field, value in filters.items(): stmt = stmt.where(getattr(self.model, field) == value) stmt = stmt.limit(limit) result = await self.session.execute(stmt) return list(result.scalars().all()) async def update(self, id_: int | str, data: dict) -> T | None: stmt = update(self.model).where( self.model.id == id_ ).values(**data).returning(self.model) result = await self.session.execute(stmt) return result.scalar_one_or_none() async def delete(self, id_: int | str) -> bool: stmt = delete(self.model).where(self.model.id == id_) result = await self.session.execute(stmt) return result.rowcount > 0

3.2 Subclassing for a Domain Model

# app/repositories/customer.py (illustrative) from app.models.customer import Customer from app.repositories.base import BaseRepository class CustomerRepository(BaseRepository[Customer]): model = Customer # domain-specific addition — does NOT belong in the base class async def find_by_email(self, email: str) -> Customer | None: results = await self.get_by_filters({"email": email}, limit=1) return results[0] if results else None

Why? (Parameter Breakdown)

  • Generic[T] with model: Type[T] — Single source of truth: the subclass sets model = Customer and every method now operates on Customer shaped objects. Mypy/static-analysis tools can verify that update(customer) is a type-safe call.
  • session.flush() + refresh() after add()flush() sends the INSERT to the DB (so PK auto-increments) without committing. refresh() re-fetches the row so server-default columns (timestamps, generated UUIDs) populate the Python object before persistence returns.
  • update(...).returning(self.model) — Single round-trip UPDATE+SELECT. Saves a get_by_id round-trip after every update and preserves the post-update row in one object.
  • delete(...).rowcount > 0 — Reports whether the delete actually matched anything. Without it, “delete non-existent row” returns None (ambiguous) and the caller cannot distinguish “missing” from “deleted”.
  • get_by_filters with limit — Hard-coded limit caps accidental full-table scans in case a caller forgets to specify constraints. The default 100 is a guardrail, not a feature.
  • Domain methods like find_by_email live on the subclass, not BaseRepository — Keeps the base generic and re-usable across model types. Reserve BaseRepository for truly cross-model CRUD only.

Common Pitfalls

  1. Forgetting await self.session.flush() in create. Without it, the autoincrement PK isn’t populated until commit time, so any caller downstream that needs the new ID will see None. The pattern (add → flush → refresh → return) is the SQLAlchemy 2.0 idiom for returning a fully-populated row.
  2. Using commit() inside the repository. Repositories should NEVER commit. The owning service (BaseService) owns the transaction boundary. Calling commit() from inside a repository method makes nested repository calls impossible (commit-then-rollback fails) and makes tests brittle.

Real-World Interview Prep

Q1: Why is the async repository pattern strictly preferable to a sync one when paired with FastAPI?

A: FastAPI runs request handlers in an event loop. Mixing sync DB calls blocks the loop and pins worker threads — at 100 concurrent requests you exhaust the threadpool. Async reps play nicely with await everywhere in the handler chain, and let you stream via SSE without context switches. SQLAlchemy 2.0’s async API uses asyncpg/psycopg3-async under the hood, which has dedicated IO paths that don’t occupy a Python thread while waiting on the network.

Q2: How would you add soft-delete semantics (e.g., is_deleted=True) without bloating BaseRepository?

A: Override only the delete method in subclasses that need it; the base stays generic. For cross-cutting soft-delete, introduce a SoftDeleteMixin that overrides delete, get_all, and get_by_filters to filter on is_deleted.is_(False). Avoid embedding the flag in BaseRepository because most resources don’t need it and adding it forces every model to carry the column.

Q3: What’s the safe way to update many rows atomically (e.g., bulk-deactivate 10k users) using this pattern?

A: Don’t loop await self.update(id, {...}) — that’s 10k individual UPDATEs and 10k awaits. Use a single UPDATE ... WHERE id IN (...) via sqlalchemy.update(self.model).where(self.model.id.in_(ids)).values(**data), then await session.execute(stmt). Wrap it in a service-layer method (bulk_deactivate) that owns the transaction. The repository’s update(id, data) is per-row by design; bulk operations belong on the service layer or in a dedicated bulk_* method on the repository that explicitly opts into batched semantics.

Top-to-Bottom Code Walkthrough (app/repositories/base.py)

The async repository pattern keeps the SQL where it belongs (one file per table) while letting services own the transaction lifecycle.

BaseRepository(Generic[T])

  • def __init__(self, session: AsyncSession, model: Type[T]) — the session is injected so the unit-of-work boundary is owned by the service, not the repo. The model is declared in subclasses via class-attribute assignment.

  • async def create(data: dict) -> T:

    obj = self.model(**data) self.session.add(obj) await self.session.flush() return obj

    flush (vs commit) sends the INSERT to the DB but doesn’t end the transaction. The service decides when to commit.

  • async def get_by_id(id): await self.session.get(self.model, id). SQLAlchemy uses the identity map — if the object is already loaded, no query is issued.

  • async def get_all(skip: int = 0, limit: int = 100):

    result = await self.session.execute( select(self.model).offset(skip).limit(limit) ) return result.scalars().unique().all()

    .scalars() peels off individual columns; .unique() removes duplicates from JOIN duplicates.

  • async def update(id, data): Fetches, mutates attributes via setattr, returns. No eager UPDATE — the change is flushed when the transaction commits.

  • async def delete(id): Calls session.delete(obj). Triggers the actual DELETE statement at flush time.

  • async def count(): select(func.count()).select_from(self.model) — delegates to a single SELECT count(*).

Subclass example (app/repositories/customer.py)

class CustomerRepository(BaseRepository[Customer]): model = Customer # declared once async def get_by_email(self, email): result = await self.session.execute( select(Customer).where(Customer.email == email) ) return result.scalar_one_or_none() async def get_by_customer_id(self, customer_id): # external ID result = await self.session.execute( select(Customer).where(Customer.customer_id == customer_id) ) return result.scalar_one_or_none()

Domain queries live in the subclass; CRUD lives in the base. New repositories never need to re-implement CRUD.

How services use repositories

class AccountService(BaseService): async def create_account(self, data): async with AccountRepository(self.session, Account) as repo: account = await repo.create(data) await self.commit() return account

A with block on a repo (or session) ties its lifecycle to the service’s transaction.

Common Pitfalls

Calling await self.session.commit() inside a repo method breaks the unit-of-work. Services need the option to roll back. Never commit from a repo.

Returning result.scalars() instead of .unique().all() after a JOIN raises when there are duplicate rows.

Returning a generator across the async boundary blocks the consumer — always materialise .all() before returning.

Real-World Interview Prep

Q1: Why Generic[T] instead of inheritance with abstract model?

A: It tells mypy/IDE that await self.get_by_id(...) returns T. Without generics, every helper is model | None | Customer. With generics it’s exact — runtime speeds are the same.

Q2: How do you mock a repo in a service test?

A: Replace self.repo with a MagicMock(spec=CustomerRepository) whose .create.return_value = AsyncMock(return_value=...). The service runs with no DB at all.

Q3: When would you bypass the repo and write raw SQL?

A: Window functions, CTEs, lateral joins, and bulk INSERT-from-SELECT are awkward in ORM. Use session.execute(text("...")) for these — keep them in a raw_queries.py next to the repo so they’re documented and inspectable.

Last updated on