Per-Resource Repository Specialization (CustomerRepository, FAQRepository, etc.)
What? (Concept Overview)
Each per-domain repository subclasses BaseRepository[T] and adds its own domain-specific finder methods (find_by_email, find_by_status, search_by_keyword, find_recent_transactions). The specialization has two roles: it moves domain-specific query logic out of route handlers and services, AND it keeps BaseRepository clean and reusable across all 7 domain models.
Project Context
The FCA app has 7 specialised repositories under app/repositories/: customer.py, account.py, transaction.py, conversation.py, message.py, product.py, faq.py. Each adds 2-6 domain finders on top of the cross-model CRUD from BaseRepository. Below: the most representative specialisations.
How? (Quick Reference Blocks)
3.1 Customer Repository — Email and Business-ID Lookups
# app/repositories/customer.py
from typing import List, Optional
from sqlalchemy import select
from app.models.customer import Customer
from app.repositories.base import BaseRepository
class CustomerRepository(BaseRepository[Customer]):
model = Customer
async def find_by_email(self, email: str) -> Optional[Customer]:
return await self.session.scalar(
select(Customer).where(Customer.email == email).limit(1)
)
async def find_by_business_id(self, business_id: str) -> Optional[Customer]:
return await self.session.scalar(
select(Customer).where(Customer.customer_id == business_id).limit(1)
)
async def find_vip_customers(self, limit: int = 50) -> List[Customer]:
return list((await self.session.scalars(
select(Customer).where(Customer.is_vip == True).limit(limit)
)).all())3.2 FAQ Repository — Keyword Search
# app/repositories/faq.py
from typing import List
from sqlalchemy import select, or_
from app.models.faq import FAQ
from app.repositories.base import BaseRepository
class FAQRepository(BaseRepository[FAQ]):
model = FAQ
async def search_by_keyword(self, keyword: str, limit: int = 10) -> List[FAQ]:
like = f"%{keyword}%"
return list((await self.session.scalars(
select(FAQ).where(
or_(FAQ.question.ilike(like), FAQ.keywords.ilike(like))
).limit(limit)
)).all())3.3 Conversation Repository — Index Lookups
# app/repositories/conversation.py
from typing import List, Optional
from sqlalchemy import select
from app.models.conversation import Conversation, ConversationStatus
from app.repositories.base import BaseRepository
class ConversationRepository(BaseRepository[Conversation]):
model = Conversation
async def find_by_customer(self, customer_id: int, limit: int = 50) -> List[Conversation]:
return list((await self.session.scalars(
select(Conversation)
.where(Conversation.customer_id == customer_id)
.order_by(Conversation.updated_at.desc())
.limit(limit)
)).all())
async def find_escalated(self, limit: int = 100) -> List[Conversation]:
return list((await self.session.scalars(
select(Conversation)
.where(Conversation.status == ConversationStatus.ESCALATED)
.order_by(Conversation.updated_at.asc())
.limit(limit)
)).all())
async def find_by_ticket_id(self, ticket_id: str) -> Optional[Conversation]:
return await self.session.scalar(
select(Conversation).where(Conversation.escalation_id == ticket_id).limit(1)
)3.4 Account Repository — Customer-Scoped Lookups
# app/repositories/account.py
from typing import List, Optional
from sqlalchemy import select
from app.models.account import Account
from app.repositories.base import BaseRepository
class AccountRepository(BaseRepository[Account]):
model = Account
async def find_by_account_number(self, account_number: str) -> Optional[Account]:
return await self.session.scalar(
select(Account).where(Account.account_number == account_number).limit(1)
)
async def list_by_customer(self, customer_id: int) -> List[Account]:
return list((await self.session.scalars(
select(Account)
.where(Account.customer_id == customer_id)
.order_by(Account.created_at.desc())
)).all())Why? (Parameter Breakdown
.limit(1)on single-row queries — Even on a primary-key column, an unbounded query could return many rows for a malformed data edge case..limit(1)plus.scalar()is robust: one row or none. The cost is negligible (Postgres stops at first row).order_by(... desc()).limit(limit)for “latest N” queries — Postgres indexes wide-table ORDER BY poorly without an index; add a composite(customer_id, updated_at DESC)index if this query is hot. The.desc()plus.limit()together trigger index-driven bounds scan.ilikefor case-insensitive FAQ search —LIKEis ANSI-case-sensitive;ILIKE(Postgres extension) is case-insensitive. UseLIKEonly for column-stored categorical tokens (category='account'),ILIKEfor freeform text (question.ilike('%loan%')).or_(FAQ.question.ilike(...), FAQ.keywords.ilike(...))— Search both the natural-language question AND the comma-separated keywords column. Better recall vs single-field search for sparse corpora.select(...).where(...).limit(N)returning.scalars().all()—.scalars()strips row tuples to model instances;.all()materialises synchronously. For very large result sets,.stream()(chunked) is the right call but rarely needed in tests.is_vip == Trueliteral — Slightly verbose; the cleaner pattern isCustomer.is_vip.is_(True)which SQLAlchemy translates correctly even if the column had a different boolean encoding (BIT(1) in MySQL, integer in SQLite). Both compile to the same SQL;is_(True)is the safer default.
Common Pitfalls
- Returning
select(...).all()directly instead ofscalars().all()— Returns row tuples(Customer(...),)instead of model instances; downstream code breaks when accessingrow.name. Always.scalars()for one-model queries. - Forgetting
.order_by()on “latest N” queries — Without an ordering, Postgres returns rows in insertion order (no guarantee post-VACUUM); users see inconsistent pagination. Always order.
Real-World Interview Prep
Q1: How would you paginate the find_by_customer query with cursor instead of LIMIT/OFFSET?
A: Cursor pagination uses WHERE (updated_at, id) < (last_updated, last_id) ORDER BY updated_at DESC, id DESC LIMIT N. The composite (updated_at, id) cursor avoids OFFSET’s O(N) cost when N grows — large tables can slow to seconds per query. For the FCA scale (≤ 1000 conversations per customer) LIMIT/OFFSET is fine. Switch to cursor at > 100k rows per customer. Asymmetry: cursors are stable under writes (no rows “skipped”); OFFSET can skip or duplicate rows if data shifts between requests.
Q2: How do you test these repository methods?
A: Three layers. (1) Unit: seed 10 customers via the seed fixture, call find_by_email("known@example.com"), assert 1 row. (2) Edge: call find_by_email("nonexistent@example.com"), assert None (not an exception). (3) Pagination: seed 50 customers, call list_by_customer(1) with default limit, assert ≤ 50 rows. With pytest-asyncio, write async def test_...(): against the test engine with NullPool to avoid connection-pool bleed.
Q3: Why does find_by_email use select(...).where(...).limit(1) instead of session.get(Customer, email)?
A: session.get(Model, primary_key) works ONLY for primary-key lookups. email is a unique-indexed column but NOT the primary key (id is). The pattern select(...).where(...) is the universal way to lookup by non-PK columns. Bonus: session.get is cache-aware (looks in the identity map first) and returns None vs raising; the where() path doesn’t surface the identity map, but .scalar() does. For single-row indexed lookups both are roughly equivalent; .where() is more flexible because you can stack conditions OR chain .order_by().
Top-to-Bottom Code Walkthrough (app/repositories/customer.py + app/repositories/base.py)
The “per-resource” pattern gives every domain table its own thin subclass with only the queries it needs. The base never knows business jargon; the subclass never knows about reflection or commit logic.
BaseRepository(Generic[T]) (app/repositories/base.py)
def __init__(self, session: AsyncSession, model: Type[T])— constructor takes the session (lifecycle owned by the service) and the model class (so the subclass can declaremodel = Customer).create(data: dict) -> T: builds the row withself.model(**data); await self.session.add(obj); await self.session.flush(); return obj. Whyflushnotcommit: callers (services) own the transaction. Flush sends the INSERT to the DB but doesn’t end the transaction.get_by_id(id)usesawait self.session.get(self.model, id)— the identity-map fast path doesn’t issue a query if the object is already in the session.get_all(skip, limit)doesselect(self.model).offset(skip).limit(limit).update(id, data)doesobj = await self.get_by_id(id); for k, v in data.items(): setattr(obj, k, v); return obj.delete(id)doesobj = await self.get_by_id(id); await self.session.delete(obj).count()returnsselect(func.count()).select_from(self.model).
CustomerRepository(BaseRepository[Customer]) (app/repositories/customer.py)
model = Customer— this is all you need to inherit CRUD.async def get_by_email(self, email):select(Customer).where(Customer.email == email). Used on login.async def get_by_customer_id(self, external_id):Customer.customer_id(the external “CUST-000001” string) — distinct from the internalidPK.async def get_vip_customers(self):select(Customer).where(Customer.is_vip == True)— used by the priority-classifier to skip the queue for VIPs.
How a service uses a repo (e.g. app/services/customer.py)
async with CustomerService() as svc:
customer = await svc.repo.create(data)
await svc.commit()The service owns the commit boundary — the repo never commits. If the service decides halfway through that it wants to roll back, the repo’s pending changes go with it.
Common Pitfalls
Calling commit() inside a repo method breaks the unit-of-work. The service can’t roll back what the repo already committed.
Using session.execute(select(Customer)) without .unique().scalars().all() returns a list of Row objects, not Customer instances. Use .scalars().all() to peel off the rows.
Putting business logic in a repo (if customer.balance < 0: raise ...) makes it impossible to unit-test without a DB. Repos are CRUD only; business rules go in services.
Real-World Interview Prep
Q1: Why a separate repositories/ layer if services already have DB access?
A: Services own transactions; repos own queries. With the split you can mock a repo in a service test (no DB needed) but still test the transaction logic. Without the split every test needs a real DB.
Q2: What is the Generic[T] PEP-696 syntax doing in BaseRepository[Customer]?
A: Telling mypy/IDE that this repo returns Customer instances. await self.get_by_id(5) on AccountRepository returns Account, not Customer. Catches type errors at edit time.
Q3: When would you NOT use the generic repository pattern?
A: When the queries are deeply pathological (window functions, lateral joins, custom CTEs). At that point a raw text(...) query in the service is more readable than a generic abstraction trying to express column lists.