FastAPI Message API Route with IDOR Check + Scope Enforcement
What? (Concept Overview)
The /api/v1/messages/process endpoint is the conversational entry-point for authenticated customer traffic. It (a) protects against IDOR by matching the JWT subject against the requested customer_id, (b) enforces a read:accounts scope via Security(...), and (c) projects the coordinator’s heterogeneous response into a typed MessageResponse Pydantic model. The route is the canonical example of “auth + scope + IDOR + response shaping” as a single decorator stack.
Project Context
In full_project_context_updated.txt -> app/api/routes/messages.py, the message router delegates processing to AgentCoordinator.process_message() (which renders the LangGraph workflow) and converts the dictionary result into a typed Pydantic MessageResponse with extra: "ignore" so model additions don’t break older clients. Critical companion endpoints: GET /conversations/{id}/history, GET /customers/{id}/conversations, POST /escalations/{id}/resolve.
How? (Quick Reference Blocks)
3.1 The Main Process-Message Route
# app/api/routes/messages.py
from fastapi import APIRouter, HTTPException, Security
from pydantic import BaseModel
from typing import Optional, Dict, Any
import logging
from app.coordinator.agent_coordinator import AgentCoordinator
from app.api.deps import get_current_active_user
from app.models.customer import Customer
router = APIRouter(prefix="/api/v1", tags=["messages"])
coordinator = AgentCoordinator()
logger = logging.getLogger(__name__)
class MessageRequest(BaseModel):
message: str
customer_id: int
conversation_id: int
class MessageResponse(BaseModel):
response: str
status: str = "success"
conversation_id: int
agent: Optional[str] = "system"
intent: Optional[str] = None
confidence: float = 0.0
metadata: Optional[Dict[str, Any]] = {}
turn_count: int = 0
model_config = {"extra": "ignore"} # forward-compat: ignore unknown fields
@router.post("/messages/process")
async def process_message(
request: MessageRequest,
current_user: Customer = Security(
get_current_active_user, scopes=["read:accounts"]
),
) -> MessageResponse:
# IDOR CHECK: refuse if auth subject != requested customer
if request.customer_id != current_user.id:
logger.critical(
f"ID Mismatch: Token={current_user.id}, Request={request.customer_id}"
)
raise HTTPException(
status_code=403,
detail=f"Access Denied: You cannot access customer "
f"{request.customer_id}'s data.",
)
logger.info(f"Processing message for customer {request.customer_id}")
response = await coordinator.process_message(
message=request.message,
customer_id=request.customer_id,
conversation_id=request.conversation_id,
)
return MessageResponse(
response=response["response"],
conversation_id=response["conversation_id"],
agent=response.get("agent", "system"),
intent=response.get("intent"),
confidence=response.get("confidence", 0.0),
turn_count=response.get("turn_count", 0),
status=response.get("status", "success"),
metadata={
"escalated": response.get("escalated", False),
"escalation_id": response.get("escalation_id"),
**response.get("metadata", {}),
},
)3.2 History-by-Conversation Lookup
# app/api/routes/messages.py
@router.get("/conversations/{conversation_id}/history",
response_model=ConversationHistory)
async def get_conversation_history(conversation_id: int, limit: int = 1000):
try:
history = await coordinator.get_db_conversation_history(
conversation_id, limit
)
if history is None:
raise HTTPException(status_code=404, detail="Conversation not found")
return ConversationHistory(conversation_id=conversation_id, history=history)
except Exception as e:
logger.error(f"History retrieval error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Error retrieving history")3.3 Admin Escalation Resolution
# app/api/routes/messages.py — operator endpoint
@router.post("/escalations/{conversation_id}/resolve")
async def resolve_escalation(
conversation_id: int, resolution_notes: str,
) -> Dict[str, Any]:
try:
resolved = coordinator.resolve_escalation(conversation_id, resolution_notes)
if not resolved:
raise HTTPException(status_code=404, detail="Conversation not escalated")
return {"status": "resolved", "conversation_id": conversation_id}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error resolving escalation: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Error resolving escalation")3.4 Stats + Coordinator Info
# app/api/routes/messages.py
@router.get("/statistics", response_model=ConversationStats)
async def get_statistics() -> ConversationStats:
try:
return ConversationStats(**coordinator.get_statistics())
except Exception as e:
logger.error(f"Error retrieving statistics: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Error retrieving statistics")Why? (Parameter Breakdown
Security(get_current_active_user, scopes=["read:accounts"])overDepends(...)—Security()is the documented FastAPI primitive for attaching scopes to OpenAPI’ssecurityblock; downstream tooling (Swagger Authorize button) automatically renders the scope requirement.Depends()is for non-security wiring.- IDOR check BEFORE coordinator call — If the customer ID doesn’t match the JWT subject, reject before running any expensive workflow. Without the check, one user could enumerate every other user’s data.
logger.critical()on IDOR attempt — IDs in the critical log level feed SIEM alerts; standardinfowould not. Every IDOR attempt is a potential intrusion.Status code 403for IDOR vs404for missing conversation — 403 says “you cannot access THIS resource you’re trying” (info-leak); 404 says “this resource doesn’t exist” (less info-leak). Some teams prefer 404 always. Choose based onsecurity.privacypolicy.extra: "ignore"on Pydantic config — Lets the server add new fields without breaking older clients (they just ignore them). The opposite —"allow"— keeps unknown fields inmetadata;"forbid"would 422 on unknown fields and break forward-compat.- Coordinator returns a dict but the route returns a typed MessageResponse — Decouples the internal workflow contract from the API contract. Internal can change without breaking clients; clients get explicit field types via OpenAPI.
- Logs structured as f-string, then
exc_info=Trueonly on 5xx — 4xx errors are normal (bad input); adding traceback floods logs. Keepexc_info=Truefor 500 paths only.
Common Pitfalls
- Forgetting the IDOR check. New routes often just trust the JWT without re-checking IDs — a token issued for customer 1 can request customer 2’s data. Add a code-review checklist item AND a unit test that asserts mismatch raises 403.
- Returning the full internal dict as JSON. Exposes internal field names (
agent_metadata,escalation_id) directly to clients. Wrap in Pydantic for explicit schema control.
Real-World Interview Prep
Q1: How would you rate-limit the message endpoint to prevent LLM-cost abuse?
A: Three layers. (1) App-layer — slowapi or custom middleware: await rate_limit_bucket.consume(user_id, weight=1). (2) WSGI/ASGI — nginx limit_req with burst token. (3) Semantic — for high-frequency identical inputs, cache the response (see cache-service-redis-strategy page). The Settings block already declares RATE_LIMIT_CALLS=10 and RATE_LIMIT_PERIOD=60; wiring them through slowapi is straightforward. Pair with a Prometheus counter (rate_limit_hits_total{user=X}) to surface abusive consumers.
Q2: How do you test the IDOR check?
A: Two layers. (1) Unit: provide a current_user.id=1 and request customer_id=2 → assert 403 + log critical. (2) Integration: with httpx.AsyncClient(transport=ASGITransport(app)), send a request with a JWT for user 1 but customer_id=2 body → assert 403. Add a fuzz test that randomises IDs to confirm no path unintentionally trusts request body over token.
Q3: Why use Security(dependency, scopes=[...]) for messages but Depends(dependency) for system endpoints like /health?
A: Two reasons. (1) OpenAPI clarity — Swagger renders the security scheme next to the route, so QA knows which routes require a token. (2) RBAC enforcement — without scopes=, get_current_active_user still passes a valid token, but doesn’t reject tokens missing the required scope. With scopes=["read:accounts"], a token with no scope is rejected at the dependency level, before the route handler runs. Health checks must publicly reachable; they use Depends with no security.
Top-to-Bottom Code Walkthrough (app/api/routes/messages.py + app/api/routes/auth.py)
This page walks through the IDOR + RBAC pattern as it appears in the message-processing endpoint. The route layer is the last line of defense against a logged-in user accessing another user’s data.
Endpoint signature
@router.post("/messages/process", response_model=MessageResponse)
async def process_message(
message: ProcessMessageBody,
current_user: Customer = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
):Three dependencies, all enforced before the body runs:
message: ProcessMessageBody— fails fast on malformed JSON.current_user: Customer = Depends(get_current_active_user)— runs the JWT verify, scope check, and DB lookup; raises 401 on bad token, 403 on missing scope.db: AsyncSession = Depends(get_db)— opens a session, commits on success, rolls back on error, closes on exit.
IDOR check
if message.customer_id != current_user.customer_id and "admin" not in current_user.scopes:
raise HTTPException(403, detail="Cannot send messages on behalf of another customer.")What this protects against: Insecure Direct Object Reference (IDOR). Without this, a logged-in attacker could send messages / read history attached to any customer_id by simply putting it in the JSON body.
Why customer_id not id: the internal primary key (Customer.id) is what current_user.id is. The external customer_id (“CUST-000001”) is the public identifier — but to defend against a body that references the internal id, we explicitly compare the right field. Both id and customer_id are checked in production.
Persistence
async with MessageService() as svc:
response = await coordinator.process_message(
message.content, current_user.customer_id, message.conversation_id
)
await svc.add_message(conversation_id=message.conversation_id, role="CUSTOMER", content=message.content)The MessageService block uses the unit-of-work pattern — both the LLM call’s message and the customer’s own message are written in one transaction.
Fast-path optimisations
- The
conversation_id = 0case means “start a new conversation”. The route creates one and writes a single Message row for the user’s prompt. - Returning
MessageResponse(**response)(a Pydantic model) is automatically serialised by FastAPI.
Common Pitfalls
Forgetting the IDOR check on read paths (GET /api/v1/customers/123/conversations) is the most common API bug in 2024. Every read endpoint must validate customer.id == current_user.id or scope-check.
Comparing message.customer_id (a string “CUST-001”) to current_user.customer_id (a string) — make sure both fields are the same type; the Auth library is strict here.
Using current_user.id for the body’s customer_id when the schema says customer_id is the external string — mismatch raises 403 for legitimate requests.
Real-World Interview Prep
Q1: Why is the IDOR check necessary when JWTs already authenticate the user?
A: JWTs say who you are, not what you can do in a given endpoint. A user can be JWT-valid but lack permissions to query another user’s data — IDOR is the bug where the route forgot to check ownership.
Q2: Where else besides this endpoint should you check ownership?
A: Every endpoint that accepts a customer_id, conversation_id, account_id, or order_id in the URL or body. Public-list endpoints that return all records to all users are IDOR by default — filter by current_user.id in the query.
Q3: How do you scale this to hundreds of endpoints without copy-paste?
A: Two-step dependency. Make a get_owned_resource(customer_id, current_user) dependency that always fetches and validates ownership server-side, returning the resource object or raising 404. Endpoints just resource = Depends(get_owned_resource).