Skip to Content
BackendFastAPI Message API Route with IDOR Check + Scope Enforcement

FastAPI Message API Route with IDOR Check + Scope Enforcement

What? (Concept Overview)

The /api/v1/messages/process endpoint is the conversational entry-point for authenticated customer traffic. It (a) protects against IDOR by matching the JWT subject against the requested customer_id, (b) enforces a read:accounts scope via Security(...), and (c) projects the coordinator’s heterogeneous response into a typed MessageResponse Pydantic model. The route is the canonical example of “auth + scope + IDOR + response shaping” as a single decorator stack.

Project Context

In full_project_context_updated.txt -> app/api/routes/messages.py, the message router delegates processing to AgentCoordinator.process_message() (which renders the LangGraph workflow) and converts the dictionary result into a typed Pydantic MessageResponse with extra: "ignore" so model additions don’t break older clients. Critical companion endpoints: GET /conversations/{id}/history, GET /customers/{id}/conversations, POST /escalations/{id}/resolve.

How? (Quick Reference Blocks)

3.1 The Main Process-Message Route

# app/api/routes/messages.py from fastapi import APIRouter, HTTPException, Security from pydantic import BaseModel from typing import Optional, Dict, Any import logging from app.coordinator.agent_coordinator import AgentCoordinator from app.api.deps import get_current_active_user from app.models.customer import Customer router = APIRouter(prefix="/api/v1", tags=["messages"]) coordinator = AgentCoordinator() logger = logging.getLogger(__name__) class MessageRequest(BaseModel): message: str customer_id: int conversation_id: int class MessageResponse(BaseModel): response: str status: str = "success" conversation_id: int agent: Optional[str] = "system" intent: Optional[str] = None confidence: float = 0.0 metadata: Optional[Dict[str, Any]] = {} turn_count: int = 0 model_config = {"extra": "ignore"} # forward-compat: ignore unknown fields @router.post("/messages/process") async def process_message( request: MessageRequest, current_user: Customer = Security( get_current_active_user, scopes=["read:accounts"] ), ) -> MessageResponse: # IDOR CHECK: refuse if auth subject != requested customer if request.customer_id != current_user.id: logger.critical( f"ID Mismatch: Token={current_user.id}, Request={request.customer_id}" ) raise HTTPException( status_code=403, detail=f"Access Denied: You cannot access customer " f"{request.customer_id}'s data.", ) logger.info(f"Processing message for customer {request.customer_id}") response = await coordinator.process_message( message=request.message, customer_id=request.customer_id, conversation_id=request.conversation_id, ) return MessageResponse( response=response["response"], conversation_id=response["conversation_id"], agent=response.get("agent", "system"), intent=response.get("intent"), confidence=response.get("confidence", 0.0), turn_count=response.get("turn_count", 0), status=response.get("status", "success"), metadata={ "escalated": response.get("escalated", False), "escalation_id": response.get("escalation_id"), **response.get("metadata", {}), }, )

3.2 History-by-Conversation Lookup

# app/api/routes/messages.py @router.get("/conversations/{conversation_id}/history", response_model=ConversationHistory) async def get_conversation_history(conversation_id: int, limit: int = 1000): try: history = await coordinator.get_db_conversation_history( conversation_id, limit ) if history is None: raise HTTPException(status_code=404, detail="Conversation not found") return ConversationHistory(conversation_id=conversation_id, history=history) except Exception as e: logger.error(f"History retrieval error: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Error retrieving history")

3.3 Admin Escalation Resolution

# app/api/routes/messages.py — operator endpoint @router.post("/escalations/{conversation_id}/resolve") async def resolve_escalation( conversation_id: int, resolution_notes: str, ) -> Dict[str, Any]: try: resolved = coordinator.resolve_escalation(conversation_id, resolution_notes) if not resolved: raise HTTPException(status_code=404, detail="Conversation not escalated") return {"status": "resolved", "conversation_id": conversation_id} except HTTPException: raise except Exception as e: logger.error(f"Error resolving escalation: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Error resolving escalation")

3.4 Stats + Coordinator Info

# app/api/routes/messages.py @router.get("/statistics", response_model=ConversationStats) async def get_statistics() -> ConversationStats: try: return ConversationStats(**coordinator.get_statistics()) except Exception as e: logger.error(f"Error retrieving statistics: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Error retrieving statistics")

Why? (Parameter Breakdown

  • Security(get_current_active_user, scopes=["read:accounts"]) over Depends(...)Security() is the documented FastAPI primitive for attaching scopes to OpenAPI’s security block; downstream tooling (Swagger Authorize button) automatically renders the scope requirement. Depends() is for non-security wiring.
  • IDOR check BEFORE coordinator call — If the customer ID doesn’t match the JWT subject, reject before running any expensive workflow. Without the check, one user could enumerate every other user’s data.
  • logger.critical() on IDOR attempt — IDs in the critical log level feed SIEM alerts; standard info would not. Every IDOR attempt is a potential intrusion.
  • Status code 403 for IDOR vs 404 for missing conversation — 403 says “you cannot access THIS resource you’re trying” (info-leak); 404 says “this resource doesn’t exist” (less info-leak). Some teams prefer 404 always. Choose based on security.privacy policy.
  • extra: "ignore" on Pydantic config — Lets the server add new fields without breaking older clients (they just ignore them). The opposite — "allow" — keeps unknown fields in metadata; "forbid" would 422 on unknown fields and break forward-compat.
  • Coordinator returns a dict but the route returns a typed MessageResponse — Decouples the internal workflow contract from the API contract. Internal can change without breaking clients; clients get explicit field types via OpenAPI.
  • Logs structured as f-string, then exc_info=True only on 5xx — 4xx errors are normal (bad input); adding traceback floods logs. Keep exc_info=True for 500 paths only.

Common Pitfalls

  1. Forgetting the IDOR check. New routes often just trust the JWT without re-checking IDs — a token issued for customer 1 can request customer 2’s data. Add a code-review checklist item AND a unit test that asserts mismatch raises 403.
  2. Returning the full internal dict as JSON. Exposes internal field names (agent_metadata, escalation_id) directly to clients. Wrap in Pydantic for explicit schema control.

Real-World Interview Prep

Q1: How would you rate-limit the message endpoint to prevent LLM-cost abuse?

A: Three layers. (1) App-layerslowapi or custom middleware: await rate_limit_bucket.consume(user_id, weight=1). (2) WSGI/ASGI — nginx limit_req with burst token. (3) Semantic — for high-frequency identical inputs, cache the response (see cache-service-redis-strategy page). The Settings block already declares RATE_LIMIT_CALLS=10 and RATE_LIMIT_PERIOD=60; wiring them through slowapi is straightforward. Pair with a Prometheus counter (rate_limit_hits_total{user=X}) to surface abusive consumers.

Q2: How do you test the IDOR check?

A: Two layers. (1) Unit: provide a current_user.id=1 and request customer_id=2 → assert 403 + log critical. (2) Integration: with httpx.AsyncClient(transport=ASGITransport(app)), send a request with a JWT for user 1 but customer_id=2 body → assert 403. Add a fuzz test that randomises IDs to confirm no path unintentionally trusts request body over token.

Q3: Why use Security(dependency, scopes=[...]) for messages but Depends(dependency) for system endpoints like /health?

A: Two reasons. (1) OpenAPI clarity — Swagger renders the security scheme next to the route, so QA knows which routes require a token. (2) RBAC enforcement — without scopes=, get_current_active_user still passes a valid token, but doesn’t reject tokens missing the required scope. With scopes=["read:accounts"], a token with no scope is rejected at the dependency level, before the route handler runs. Health checks must publicly reachable; they use Depends with no security.

Top-to-Bottom Code Walkthrough (app/api/routes/messages.py + app/api/routes/auth.py)

This page walks through the IDOR + RBAC pattern as it appears in the message-processing endpoint. The route layer is the last line of defense against a logged-in user accessing another user’s data.

Endpoint signature

@router.post("/messages/process", response_model=MessageResponse) async def process_message( message: ProcessMessageBody, current_user: Customer = Depends(get_current_active_user), db: AsyncSession = Depends(get_db), ):

Three dependencies, all enforced before the body runs:

  1. message: ProcessMessageBody — fails fast on malformed JSON.
  2. current_user: Customer = Depends(get_current_active_user) — runs the JWT verify, scope check, and DB lookup; raises 401 on bad token, 403 on missing scope.
  3. db: AsyncSession = Depends(get_db) — opens a session, commits on success, rolls back on error, closes on exit.

IDOR check

if message.customer_id != current_user.customer_id and "admin" not in current_user.scopes: raise HTTPException(403, detail="Cannot send messages on behalf of another customer.")

What this protects against: Insecure Direct Object Reference (IDOR). Without this, a logged-in attacker could send messages / read history attached to any customer_id by simply putting it in the JSON body.

Why customer_id not id: the internal primary key (Customer.id) is what current_user.id is. The external customer_id (“CUST-000001”) is the public identifier — but to defend against a body that references the internal id, we explicitly compare the right field. Both id and customer_id are checked in production.

Persistence

async with MessageService() as svc: response = await coordinator.process_message( message.content, current_user.customer_id, message.conversation_id ) await svc.add_message(conversation_id=message.conversation_id, role="CUSTOMER", content=message.content)

The MessageService block uses the unit-of-work pattern — both the LLM call’s message and the customer’s own message are written in one transaction.

Fast-path optimisations

  • The conversation_id = 0 case means “start a new conversation”. The route creates one and writes a single Message row for the user’s prompt.
  • Returning MessageResponse(**response) (a Pydantic model) is automatically serialised by FastAPI.

Common Pitfalls

Forgetting the IDOR check on read paths (GET /api/v1/customers/123/conversations) is the most common API bug in 2024. Every read endpoint must validate customer.id == current_user.id or scope-check.

Comparing message.customer_id (a string “CUST-001”) to current_user.customer_id (a string) — make sure both fields are the same type; the Auth library is strict here.

Using current_user.id for the body’s customer_id when the schema says customer_id is the external string — mismatch raises 403 for legitimate requests.

Real-World Interview Prep

Q1: Why is the IDOR check necessary when JWTs already authenticate the user?

A: JWTs say who you are, not what you can do in a given endpoint. A user can be JWT-valid but lack permissions to query another user’s data — IDOR is the bug where the route forgot to check ownership.

Q2: Where else besides this endpoint should you check ownership?

A: Every endpoint that accepts a customer_id, conversation_id, account_id, or order_id in the URL or body. Public-list endpoints that return all records to all users are IDOR by default — filter by current_user.id in the query.

Q3: How do you scale this to hundreds of endpoints without copy-paste?

A: Two-step dependency. Make a get_owned_resource(customer_id, current_user) dependency that always fetches and validates ownership server-side, returning the resource object or raising 404. Endpoints just resource = Depends(get_owned_resource).

Last updated on