Skip to Content
BackendPer-Resource Repository Specialization (CustomerRepository, FAQRepository)

Per-Resource Repository Specialization (CustomerRepository, FAQRepository, etc.)

What? (Concept Overview)

Each per-domain repository subclasses BaseRepository[T] and adds its own domain-specific finder methods (find_by_email, find_by_status, search_by_keyword, find_recent_transactions). The specialization has two roles: it moves domain-specific query logic out of route handlers and services, AND it keeps BaseRepository clean and reusable across all 7 domain models.

Project Context

The FCA app has 7 specialised repositories under app/repositories/: customer.py, account.py, transaction.py, conversation.py, message.py, product.py, faq.py. Each adds 2-6 domain finders on top of the cross-model CRUD from BaseRepository. Below: the most representative specialisations.

How? (Quick Reference Blocks)

3.1 Customer Repository — Email and Business-ID Lookups

# app/repositories/customer.py from typing import List, Optional from sqlalchemy import select from app.models.customer import Customer from app.repositories.base import BaseRepository class CustomerRepository(BaseRepository[Customer]): model = Customer async def find_by_email(self, email: str) -> Optional[Customer]: return await self.session.scalar( select(Customer).where(Customer.email == email).limit(1) ) async def find_by_business_id(self, business_id: str) -> Optional[Customer]: return await self.session.scalar( select(Customer).where(Customer.customer_id == business_id).limit(1) ) async def find_vip_customers(self, limit: int = 50) -> List[Customer]: return list((await self.session.scalars( select(Customer).where(Customer.is_vip == True).limit(limit) )).all())
# app/repositories/faq.py from typing import List from sqlalchemy import select, or_ from app.models.faq import FAQ from app.repositories.base import BaseRepository class FAQRepository(BaseRepository[FAQ]): model = FAQ async def search_by_keyword(self, keyword: str, limit: int = 10) -> List[FAQ]: like = f"%{keyword}%" return list((await self.session.scalars( select(FAQ).where( or_(FAQ.question.ilike(like), FAQ.keywords.ilike(like)) ).limit(limit) )).all())

3.3 Conversation Repository — Index Lookups

# app/repositories/conversation.py from typing import List, Optional from sqlalchemy import select from app.models.conversation import Conversation, ConversationStatus from app.repositories.base import BaseRepository class ConversationRepository(BaseRepository[Conversation]): model = Conversation async def find_by_customer(self, customer_id: int, limit: int = 50) -> List[Conversation]: return list((await self.session.scalars( select(Conversation) .where(Conversation.customer_id == customer_id) .order_by(Conversation.updated_at.desc()) .limit(limit) )).all()) async def find_escalated(self, limit: int = 100) -> List[Conversation]: return list((await self.session.scalars( select(Conversation) .where(Conversation.status == ConversationStatus.ESCALATED) .order_by(Conversation.updated_at.asc()) .limit(limit) )).all()) async def find_by_ticket_id(self, ticket_id: str) -> Optional[Conversation]: return await self.session.scalar( select(Conversation).where(Conversation.escalation_id == ticket_id).limit(1) )

3.4 Account Repository — Customer-Scoped Lookups

# app/repositories/account.py from typing import List, Optional from sqlalchemy import select from app.models.account import Account from app.repositories.base import BaseRepository class AccountRepository(BaseRepository[Account]): model = Account async def find_by_account_number(self, account_number: str) -> Optional[Account]: return await self.session.scalar( select(Account).where(Account.account_number == account_number).limit(1) ) async def list_by_customer(self, customer_id: int) -> List[Account]: return list((await self.session.scalars( select(Account) .where(Account.customer_id == customer_id) .order_by(Account.created_at.desc()) )).all())

Why? (Parameter Breakdown

  • .limit(1) on single-row queries — Even on a primary-key column, an unbounded query could return many rows for a malformed data edge case. .limit(1) plus .scalar() is robust: one row or none. The cost is negligible (Postgres stops at first row).
  • order_by(... desc()).limit(limit) for “latest N” queries — Postgres indexes wide-table ORDER BY poorly without an index; add a composite (customer_id, updated_at DESC) index if this query is hot. The .desc() plus .limit() together trigger index-driven bounds scan.
  • ilike for case-insensitive FAQ searchLIKE is ANSI-case-sensitive; ILIKE (Postgres extension) is case-insensitive. Use LIKE only for column-stored categorical tokens (category='account'), ILIKE for freeform text (question.ilike('%loan%')).
  • or_(FAQ.question.ilike(...), FAQ.keywords.ilike(...)) — Search both the natural-language question AND the comma-separated keywords column. Better recall vs single-field search for sparse corpora.
  • select(...).where(...).limit(N) returning .scalars().all().scalars() strips row tuples to model instances; .all() materialises synchronously. For very large result sets, .stream() (chunked) is the right call but rarely needed in tests.
  • is_vip == True literal — Slightly verbose; the cleaner pattern is Customer.is_vip.is_(True) which SQLAlchemy translates correctly even if the column had a different boolean encoding (BIT(1) in MySQL, integer in SQLite). Both compile to the same SQL; is_(True) is the safer default.

Common Pitfalls

  1. Returning select(...).all() directly instead of scalars().all() — Returns row tuples (Customer(...),) instead of model instances; downstream code breaks when accessing row.name. Always .scalars() for one-model queries.
  2. Forgetting .order_by() on “latest N” queries — Without an ordering, Postgres returns rows in insertion order (no guarantee post-VACUUM); users see inconsistent pagination. Always order.

Real-World Interview Prep

Q1: How would you paginate the find_by_customer query with cursor instead of LIMIT/OFFSET?

A: Cursor pagination uses WHERE (updated_at, id) < (last_updated, last_id) ORDER BY updated_at DESC, id DESC LIMIT N. The composite (updated_at, id) cursor avoids OFFSET’s O(N) cost when N grows — large tables can slow to seconds per query. For the FCA scale (≤ 1000 conversations per customer) LIMIT/OFFSET is fine. Switch to cursor at > 100k rows per customer. Asymmetry: cursors are stable under writes (no rows “skipped”); OFFSET can skip or duplicate rows if data shifts between requests.

Q2: How do you test these repository methods?

A: Three layers. (1) Unit: seed 10 customers via the seed fixture, call find_by_email("known@example.com"), assert 1 row. (2) Edge: call find_by_email("nonexistent@example.com"), assert None (not an exception). (3) Pagination: seed 50 customers, call list_by_customer(1) with default limit, assert ≤ 50 rows. With pytest-asyncio, write async def test_...(): against the test engine with NullPool to avoid connection-pool bleed.

Q3: Why does find_by_email use select(...).where(...).limit(1) instead of session.get(Customer, email)?

A: session.get(Model, primary_key) works ONLY for primary-key lookups. email is a unique-indexed column but NOT the primary key (id is). The pattern select(...).where(...) is the universal way to lookup by non-PK columns. Bonus: session.get is cache-aware (looks in the identity map first) and returns None vs raising; the where() path doesn’t surface the identity map, but .scalar() does. For single-row indexed lookups both are roughly equivalent; .where() is more flexible because you can stack conditions OR chain .order_by().

Top-to-Bottom Code Walkthrough (app/repositories/customer.py + app/repositories/base.py)

The “per-resource” pattern gives every domain table its own thin subclass with only the queries it needs. The base never knows business jargon; the subclass never knows about reflection or commit logic.

BaseRepository(Generic[T]) (app/repositories/base.py)

  • def __init__(self, session: AsyncSession, model: Type[T]) — constructor takes the session (lifecycle owned by the service) and the model class (so the subclass can declare model = Customer).
  • create(data: dict) -> T: builds the row with self.model(**data); await self.session.add(obj); await self.session.flush(); return obj. Why flush not commit: callers (services) own the transaction. Flush sends the INSERT to the DB but doesn’t end the transaction.
  • get_by_id(id) uses await self.session.get(self.model, id) — the identity-map fast path doesn’t issue a query if the object is already in the session.
  • get_all(skip, limit) does select(self.model).offset(skip).limit(limit).
  • update(id, data) does obj = await self.get_by_id(id); for k, v in data.items(): setattr(obj, k, v); return obj.
  • delete(id) does obj = await self.get_by_id(id); await self.session.delete(obj).
  • count() returns select(func.count()).select_from(self.model).

CustomerRepository(BaseRepository[Customer]) (app/repositories/customer.py)

  • model = Customer — this is all you need to inherit CRUD.
  • async def get_by_email(self, email): select(Customer).where(Customer.email == email). Used on login.
  • async def get_by_customer_id(self, external_id): Customer.customer_id (the external “CUST-000001” string) — distinct from the internal id PK.
  • async def get_vip_customers(self): select(Customer).where(Customer.is_vip == True) — used by the priority-classifier to skip the queue for VIPs.

How a service uses a repo (e.g. app/services/customer.py)

async with CustomerService() as svc: customer = await svc.repo.create(data) await svc.commit()

The service owns the commit boundary — the repo never commits. If the service decides halfway through that it wants to roll back, the repo’s pending changes go with it.

Common Pitfalls

Calling commit() inside a repo method breaks the unit-of-work. The service can’t roll back what the repo already committed.

Using session.execute(select(Customer)) without .unique().scalars().all() returns a list of Row objects, not Customer instances. Use .scalars().all() to peel off the rows.

Putting business logic in a repo (if customer.balance < 0: raise ...) makes it impossible to unit-test without a DB. Repos are CRUD only; business rules go in services.

Real-World Interview Prep

Q1: Why a separate repositories/ layer if services already have DB access?

A: Services own transactions; repos own queries. With the split you can mock a repo in a service test (no DB needed) but still test the transaction logic. Without the split every test needs a real DB.

Q2: What is the Generic[T] PEP-696 syntax doing in BaseRepository[Customer]?

A: Telling mypy/IDE that this repo returns Customer instances. await self.get_by_id(5) on AccountRepository returns Account, not Customer. Catches type errors at edit time.

Q3: When would you NOT use the generic repository pattern?

A: When the queries are deeply pathological (window functions, lateral joins, custom CTEs). At that point a raw text(...) query in the service is more readable than a generic abstraction trying to express column lists.

Last updated on