Skip to Content
BackendSpecialist Agent Deep Dives & LangGraph Flow

Specialist Agent Deep Dives & LangGraph Flow

This page goes beyond the pattern pages and walks through the actual four specialist agents in app/agents/ plus the full LangGraph graph in app/workflows/message_workflow.py and the stateless coordinator in app/coordinator/agent_coordinator.py. Read this once and you’ll understand ~80% of the project’s behavior.

Related pages you may want open in another tab:


1. The Big Picture — How a Message Flows

Before diving into each agent, see how they fit together. Every customer message goes through nine LangGraph nodes in this order:

┌────────────────┐ │ END / out │ └───────▲────────┘ ┌─────────────┼─────────────┐ │ │ │ ┌──────────┐ │ ┌────────┴──┐ ┌─────┴──────┐ │ human │ │ │ compliance │ │ end (OK) │ │ agent │ │ │ checker │ └─────▲─────┘ └─────▲────┘ │ └─────▲──────┘ │ │ │ │ │ │ ┌─────┴───┐ ┌────┴──────────────┐ ┌─┴────────────┐ └───┤ product ├─┤ account_agent ├─┤ general_agent │ └─────────┘ └───────────────────┘ └──────────────┘ ▲ ▲ ▲ │ │ │ ┌──────────┴──────────────┴─────────────────┘ │ classify (intent) └────────────────▲──────────────────────────── ┌──────────┴──────────┐ │ guardrail │ ← security service └─────────────────────▲ (entry) Customer message ────▶ guardrail ────▶ classify ────▶ specialist ────▶ compliance ────▶ END ↘ ↗ human (if needed)

The graph is declarative: every node is a BaseAgent.run() call. Failures at any node route back to the appropriate sibling (or to the human-agent on hard failures).


2. Specialist Agent Deep Dives

2.1 IntentClassifier (app/agents/intent_classifier.py)

The first specialist runs. It decides which subsequent agent will handle the message.

Imports & schema

  • from pydantic import BaseModel, Field — the IntentClassification model is the contract.
  • class IntentClassification(BaseModel):
    class IntentClassification(BaseModel): intent: Literal["product_acquisition", "account_data", "knowledge_inquiry", "complaint", "general_inquiry"] confidence: float = Field(ge=0.0, le=1.0) sentiment: Literal["positive", "neutral", "negative", "frustrated"] explanation: str = Field(min_length=10)
  • Pydantic validates the LLM response before the graph moves on. A malformed response raised ValidationError would dead-letter the conversation; the agent catches it and falls back to "general_inquiry".

The INTENTS configuration dictionary

INTENTS = { "product_acquisition": { "description": "User wants to buy a new product...", "examples": ["I'd like a mortgage", "Can I get a credit card?", ...], "routing": "product", }, "account_data": { "description": "User asking about their existing accounts...", "examples": ["What's my balance?", "Show last 5 transactions"], "routing": "account", }, "knowledge_inquiry": { "description": "General questions about FCA / banking...", "examples": ["What is FSCS?", "How does APR work?"], "routing": "general", }, "complaint": { "description": "User expressing dissatisfaction...", "examples": ["I'm angry", "This is unfair"], "routing": "human", # immediately escalates! }, "general_inquiry": { "description": "Catch-all", "examples": [], "routing": "general", }, }

Why examples matter: the LLM is few-shot prompted with these classifications. Without examples, an 8B model invents intents. With them, accuracy jumps 30-40%.

The UK-financial domain split rule

The system prompt enforces a strict distinction:

  • account_data — “what’s my balance”, “show transactions” → read-only.
  • product_acquisition — “I want a mortgage”, “apply for loan” → write/create.

Without this rule, customers asking “what’s my mortgage balance?” get routed to product_recommender which tries to sell them a new mortgage. Awkward.

process() method (line-by-line)

  1. Validate / shape input: if not state.message: raise ValueError.
  2. Truncate history: state.history[-10:] to avoid prompt bloat. Older context goes.
  3. Build the prompt: dynamically inserts INTENTS definitions, examples, recent history. The Pydantic schema is literally appended:
    Return JSON matching this schema: { "intent": "...", "confidence": 0.0-1.0, ... }
  4. Call the LLM:
    response = await self.client.chat.completions.create( model=self.config.model, # "llama-3.1-8b-instant" messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], temperature=0.0, # CRITICAL: deterministic routing response_format={"type": "json_object"}, )
    • temperature=0.0 — routes MUST be the same for the same input. A non-zero temp would mean compliance test cases pass sometimes and fail other times.
    • response_format={"type": "json_object"} — Groq/OpenAI JSON mode forces structured output. Far more reliable than asking the LLM to “respond in JSON” via prompt.
  5. Validate: result = IntentClassification.model_validate_json(response.choices[0].message.content).
  6. Set state: state.intent = result.intent; state.confidence = result.confidence; state.routing = INTENTS[result.intent]["routing"].
  7. Langfuse tracking: @observe(name="intent_classifier.classify") opens a span — token usage, latency, and the actual prompt/response are recorded.

Safe degradation

except Exception as e: logger.warning(f"Intent classification degraded: {e}") return WorkflowState( intent="general_inquiry", confidence=0.0, routing="general", error=str(e), )

The “general” fallback is critical: when the LLM is down, you want the most general agent to handle the message — not a blank error. Customers still get a courteous answer (probably FAQ-based) instead of an outage.

Common Pitfalls

  • Forgetting temperature=0.0 makes routes fork across runs. Tests become flaky, behavior drifts.
  • Empty examples arrays for vital intents — the LLM hallucinates routes for unclear messages.
  • Storing INTENTS as a constant rather than a config-file-driven — when Compliance wants to add a new intent type, you have to redeploy.

Real-World Interview Prep

Q1: Why not use a fine-tuned classifier instead of an LLM? A: A fine-tuned classifier (e.g., a small BERT) is 10-100x faster, costs nothing per call, and is more deterministic. For high-volume production classifiers, do it. Trade-off: requires labelled training data, can’t handle new intents without retraining, and isn’t few-shot extensible. For 5 well-defined intents, an LLM is the right starting point; for 50 intents at 1000 QPS, switch to fine-tuned.

Q2: Why temperature=0.0 and not 0.1? A: Routes determine which downstream agent and which downstream compliance check runs. Even a 0.1 variation can route “I want a mortgage” to product_acquisition vs general_inquiry. Routing decisions must be reproducible. Generation decisions (creative text) are fine at 0.7.

Q3: What happens if state.confidence < 0.5? A: The graph can be configured to not trust low-confidence classifications. Route all low-confidence messages to general_agent (the catch-all) instead of specialists, even if the LLM picked a specific intent. This drift-to-generalist pattern prevents specialist hallucinations.


2.2 AccountAgent (app/agents/account_agent.py)

The read-only data accessor. Furnishes balance, transactions, statements, and account details.

Init dependencies

def __init__( self, config: Optional[AgentConfig] = None, account_service: AccountService = None, customer_service: CustomerService = None, transaction_service: TransactionService = None, ): super().__init__(name="account_agent", config=config) if not all([account_service, customer_service, transaction_service]): raise ValueError("AccountAgent requires DB-backed services.")

Three services required: Account (metadata), Customer (linked customer details), Transaction (history). Each wraps a Postgres query — composing three gives a complete answer in one roundtrip-equivalent.

_determine_query_type (internal routing)

A second classifier, but coarser:

  • balance — “how much do I have?”
  • transactions — “show recent activity”
  • statement — “send me a statement”
  • details — “what’s my sort code / IBAN?”
  • general — anything else, fallback.

Uses the same AsyncGroq LLM as the IntentClassifier but with a smaller, scoped prompt.

_fetch_real_data(query_type, customer_id) — the data layer

async def _fetch_real_data(self, query_type, customer_id): if query_type == "balance": accounts = await self.account_service.get_accounts_by_customer(customer_id) return {"total": sum(a.balance for a in accounts), "accounts": [...]} if query_type == "transactions": accounts = await self.account_service.get_accounts_by_customer(customer_id) all_txns = [] for acc in accounts: txns = await self.transaction_service.get_recent(acc.id, limit=10) all_txns.extend(txns) return {"transactions": all_txns[:20]} if query_type == "statement": # generate or return stored statement return {"statement_url": ...} ...

Privacy-aware: account numbers are masked ("****5678") before being passed to the LLM. The LLM never sees the full account number.

_generate_conversational_response (the LLM call)

Constructs a system prompt: “You are a polite UK bank assistant. Be concise. Use £. Do not give financial advice.” Appends the data dict, sends to LLM, returns the response. The LLM’s task is purely textual presentation — no logic, no arithmetic, just formatting.

Common Pitfalls

  • Passing full account numbers to the LLM — PCI/regulatory issue. Always mask before the LLM call.
  • Forgetting the transaction_service.get_recent limit — unbounded queries fetch millions of rows. Always limit=N with a sane N (10-20).
  • Generating arithmetic in the LLM — “what’s the total of my transactions” is a math question; LLMs hallucinate sums. Compute the sum in _fetch_real_data and pass it as a known value.

Real-World Interview Prep

Q1: Why split into _fetch_real_data and _generate_conversational_response? A: Separation of concerns. Fetching data is deterministic (Postgres query); generating text is non-deterministic (LLM). They have different observability needs, different failure modes, different testing strategies. Mixing them — having the LLM make the DB query — makes debugging impossible.

Q2: How do you test this agent without posting to a real LLM? A: Mock AsyncGroq and the services. AsyncMock(spec=AsyncGroq) returns canned responses. Service mocks return canned DB rows. The agent code path is exercised; LLM is not.

Q3: What happens if the customer has no accounts? A: _fetch_real_data returns {"total": 0, "accounts": [], "_warning": "no_accounts"}. The LLM’s system prompt includes the rule “if the user has no accounts, politely report it and offer to help open one”. Without the explicit warning key, the LLM might fabricate account balances.


2.3 ProductRecommender (app/agents/product_recommender.py)

The sales agent. Lists financial products matching the customer’s profile. Compliance-strict.

Two Pydantic schemas

class ProductCategoryExtraction(BaseModel): categories: List[Literal["loan", "savings", "credit", "current", "mortgage"]] reasoning: str class RecommendationResult(BaseModel): recommendations: List[dict] # {product_name, why_recommended, eligibility_met} disclaimer: str

Why two schemas, not one: category extraction is a separate LLM call — the system first classifies, then queries the DB by category, then requests the LLM to rank. Splitting helps with retry granularity and per-step observability.

The flow

  1. _determine_category(state.message)ProductCategoryExtraction. The LLM picks categories from a controlled vocabulary.
  2. DB lookup: products = await self.product_service.find_by_category(categories, customer_profile=customer).
  3. _generate_recommendations(state, products)RecommendationResult. The system prompt includes:
    • The candidate products (as a JSON list).
    • Customer’s profile (income, credit_score, is_vip — NEVER PII like SSN).
    • Compliance rules: “No ‘guaranteed’. No ‘risk-free’. No specific rate promises.”
  4. Post-validation: Pydantic validates the response. Falls back to a generic “we couldn’t recommend” if it fails.

_clean_json — robust JSON parsing

LLMs sometimes wrap JSON in markdown fences or include preamble text. _clean_json strips down to pure JSON:

def _clean_json(self, raw: str) -> str: raw = raw.strip() if raw.startswith("```"): raw = raw.split("\n", 1)[1].rsplit("\n", 1)[0] return raw

Compliance baked into the prompt

The system prompt template:

“You are a UK-regulated bank product recommender. Never use the words ‘guaranteed’, ‘risk-free’, ‘100% sure’, or make specific rate promises that depend on future market conditions. Always include a ‘your home may be repossessed’ if mortgage-related.”

This is prompt-level compliance, NOT a post-hoc filter. The agent is designed to never produce bad output in the first place. (See ComplianceChecker Hybrid Short-Circuit for the post-hoc gate.)

Common Pitfalls

  • Returning products without checking eligibility_met — recommending a mortgage to a customer without sufficient income violates FCA rules. The schema enforces this field.
  • Caching recommendations across sessions — a recommendation from last week might not suit today. Always fresh per conversation.
  • Missing the disclaimer field in the response — FCA requires the disclaimer text visible to the user.

Real-World Interview Prep

Q1: Why model outputs as Pydantic schemas rather than freeform text? A: Reliability. A Pydantic schema is a contract. If the LLM produces malformed JSON, the Pydantic validator raises — you catch it, log it, fall back. Freeform text has no such safety net. The schema also serves as documentation of what the agent should output.

Q2: How is this different from pulling a “Top 5” list from the DB and dumping it? A: Personalisation. The DB has 50 products; only ~3 match the customer’s profile. The LLM ranks/elaborates them with explainability (“why this mortgage suits you”) that a pure DB query can’t produce. The DB is the source of truth; the LLM is the explainer.

Q3: What about FCA-regulated “no advice” rules? A: The agent is information-only, not “advice”. A regulated financial advisor must be FCA-qualified and certified. The agent never says “you should buy X”; it says “X has features A, B, C; based on your profile C matches your needs; consult a qualified advisor for personalised advice”.


2.4 GeneralAgent (app/agents/general_agent.py)

The catch-all with a tiered cache pattern. Handles FAQs and RAG-backed Q&A.

The three tiers

Customer question ┌───────┐ hit ┌─────────────────────┐ │ Tier 0│──────▶│ Redis cache │ │ (Rds) │ │ (TTL 1-24h) │ └───┬───┘ └─────────────────────┘ │ miss ┌───────┐ hit ┌─────────────────────┐ │ Tier 1│──────▶│ FAQ Postgres DB │ │ (FAQ) │ │ (exact keyword match│ └───┬───┘ └─────────────────────┘ │ miss ┌───────┐ ┌─────────────────────┐ │ Tier 2│──────▶│ RAG pgvector + LLM │ │(RAG) │ │ (slow, expensive) │ └───────┘ └─────────────────────┘

Why three tiers: each tier is faster and cheaper than the next. Tier 0 answers in ~1ms; Tier 1 in ~50ms; Tier 2 in ~1-3 seconds. ~80% of questions hit Tier 0 or 1.

_lookup_faq_db(question) — Tier 1

async def _lookup_faq_db(self, question): faq_match = await self.faq_service.search_by_keywords(question) if faq_match and faq_match.confidence > 0.8: return faq_match.answer

search_by_keywords does a fuzzy keyword match against FAQ.keywords column. The threshold (0.8) prevents low-confidence matches from leaking through as authoritative answers.

After a Tier 1 hit: await self.cache_service.set(key, answer, ttl=86400). 24h TTL for FAQs because they’re stable.

_lookup_rag_db(question) — Tier 2

async def _lookup_rag_db(self, question, top_k=6): chunks = await self.rag_service.query(question, top_k=top_k) return chunks

Returns up to 6 chunks — chosen carefully: more chunks = more context, but also more noise; fewer = faster but possibly missing detail. 6 is the sweet spot.

After a Tier 2 success: cache with 1h TTL (vs 24h for FAQs). RAG outputs are less stable — embeddings can be re-indexed.

_generate_llm_response(question, chunks) — LLM with citations

prompt = f"""Answer the question using ONLY the context below. Question: {question} Context: {chr(10).join(f'- [{c.source}] {c.content}' for c in chunks)} At the end, include 'Sources: <citations>' listing each source filename. """

The [source] prefix per chunk is mandatory — the LLM is told to cite. The post-processing extracts citations into the response metadata so the UI can render “Source: FCA_faqs.pdf” badges.

Token echo prevention

The system prompt’s rules include: “Do not output placeholders like [NAME], [EMAIL], [PHONE]. If you see them, paraphrase over them.” This prevents the LLM from echoing the placeholders that Presidio’s redaction output.

Safe default list init

Real bug: in earlier versions, the agent used history = [] as a class attribute. All instances shared the same list — one user’s history bled into another’s. The fix uses local-init patterns:

def __init__(...): self.history = [] # instance attribute, not shared

Common Pitfalls

  • Caching with no TTL — Redis slowly fills until OOM. Always set TTL.
  • Returning multiple FAQ answers for one question — concatenates uselessly. Pick the highest-confidence match.
  • Forgetting top_k=6 default — without it, you might RAG 50 chunks and waste 8000 input tokens.

Real-World Interview Prep

Q1: Why cache Tier 2 (RAG) responses at all? A: RAG results are expensive (~1-3 seconds + LLM cost). If 5 users ask the same question in an hour, only 1 hits Tier 2. 80% traffic reduction on the most expensive path. The trade-off: stale answers if the underlying documents change — keep TTL ≤1h to bound staleness.

Q2: When would you use a vector DB other than pgvector for RAG? A: When (a) your corpus exceeds 10M chunks (pgvector’s index gets slow), (b) you need advanced filter expressions (Qdrant’s faceted search), or (c) you need horizontal scaling without Postgres expertise. For FCA’s typical scope (<1M chunks), pgvector is the right choice — keeps data in the same transactional DB.

Q3: How would you handle “ambiguous” Tier 1 / Tier 2 matches? A: Track the distance score from pgvector. If Tier 2’s best match has distance > 0.4 (meaning low similarity), refuse to answer: “I couldn’t find a relevant document. Could you rephrase?” Don’t fabricate answers. This is the right calibration between hallucination and silence.


3. The LangGraph Flow (app/workflows/message_workflow.py)

3.1 Graph assembly at a glance

from langgraph.graph import StateGraph, END from app.schemas.common import WorkflowState graph = StateGraph(WorkflowState) graph.add_node("guardrail", guardrail_node) # security check graph.add_node("classify", classify_node) # IntentClassifier graph.add_node("account", account_node) # AccountAgent graph.add_node("general", general_node) # GeneralAgent graph.add_node("product", product_node) # ProductRecommender graph.add_node("compliance", compliance_node) # ComplianceChecker graph.add_node("human", human_node) # HumanAgent graph.add_node("human_approval", human_approval_node) graph.add_node("end", end_node) # END-formatting graph.set_entry_point("guardrail") graph.add_conditional_edges("guardrail", route_guardrail, { "safe": "classify", "unsafe": "human", # jailbreak detected → escalate }) graph.add_conditional_edges("classify", route_intent, { "account_data": "account", "product_acquisition": "product", "knowledge_inquiry": "general", "complaint": "human", "general_inquiry": "general", }) graph.add_edge("account", "compliance") graph.add_edge("general", "compliance") graph.add_edge("product", "compliance") graph.add_conditional_edges("compliance", route_compliance, { "OK": "end", "FAIL": "human_approval", # pause for human review "NEEDS_HUMAN": "human_approval", }) graph.add_edge("human_approval", "end") graph.add_edge("end", END)

Compiled with:

compiled = graph.compile( checkpointer=async_postgres_saver, interrupt_before=["human_approval"], )

3.2 The nine nodes explained

NodePurposeGoes to next
guardrailFirst line: detect jailbreak / PII injection. Uses SecurityService.check_prompt_injection.classify if safe, human if unsafe
classifyIntentClassifier classifies intent + confidence + sentiment + explanationConditional edge routes by intent
accountFetch balance / transactions / statements / detailscompliance
generalTiered cache → FAQ → RAGcompliance
productProductRecommender with compliance-strict promptcompliance
complianceComplianceChecker scans for forbidden phrases + LLM judgementConditional: OK → end, FAIL → human_approval
humanEmergency escalation (jailbreak detected, complaint detected)Direct to human_approval
human_approvalPause. Wait for human reviewer.After approval → end
endFormat final responseEND

3.3 Conditional edge functions

Each conditional edge is a pure Python function (sync or async) returning the name of the next node.

route_guardrail(state) — security gate

def route_guardrail(state: WorkflowState) -> str: if state.security_status == "blocked": return "human" # escalation path if state.security_status == "warn": return "classify" # proceed with caution tag in metadata return "classify"

blocked = clear jailbreak/injection → human. warn = suspicious but maybe legit → still classify with a metadata flag.

route_intent(state) — the supervisor

Already covered in Multi-Agent Supervisor Routing. Briefly:

def route_intent(state) -> str: intent = state.intent routing = INTENTS.get(intent, {}).get("routing", "general") if state.confidence < 0.6: return "general" # drift-to-generalist at low confidence return routing

route_compliance(state) — FCA gate

def route_compliance(state) -> str: result = state.compliance_check if result and result.is_compliant: return "end" if state.requires_human_review: return "human_approval" return "human_approval" # default: any non-compliance → human

3.4 The interrupt_before pattern

compiled = graph.compile( checkpointer=async_postgres_saver, interrupt_before=["human_approval"], )

What it does: when the graph reaches a route TO human_approval, it freezes. The customer’s conversation thread is held open. A human reviewer can call coordinator.approve_intervention(...) to resume — see LangGraph Human-in-the-Loop.

3.5 Common Pitfalls

  • Forgetting checkpointer=... at compile time — no resume possible. Mid-conversation pauses can’t be restored.
  • Conditional edges returning unregistered node namesKeyError at compile time or runtime, depending on LangGraph version.
  • Putting non-serialisable objects in WorkflowState (e.g., asyncio.Queue) — checkpoint round-trip breaks.

3.6 Real-World Interview Prep

Q1: Why is compliance between every agent and END? A: Compliance is a hard gate. Every LLM response risks producing a forbidden phrase. By making compliance the ONLY node that routes to END, you centralise the check. If a new agent type is added later, it MUST go through compliance.

Q2: Why not route compliance failures directly to a hard error instead of human_approval? A: Compliance failures aren’t always unambiguous. “guaranteed” might be in a quoted FCA regulation. A human-approval node lets a reviewer decide: “yes, block this” OR “actually, this is fine, let it through”. Saves false positives.

Q3: How do you pause the graph mid-flight vs interrupt_before=["X"]? A: interrupt_before only fires before the listed node. For mid-node pauses, use Command(resume=...) with conditional pausing in your node function (await pause_event.wait()). The async event lets you pause anywhere.


4. The Coordinator (app/coordinator/agent_coordinator.py)

A stateless wrapper around the compiled graph. Every request opens its own DB session, runs the graph once, and commits the message log.

4.1 Why stateless matters

The coordinator holds no in-process state between requests. Multi-instance horizontal scaling is automatic — load balancer can route any request to any pod. Conversation continuity comes from the checkpointer (Postgres), not the process.

4.2 The process_message method

async def process_message( self, message: str, customer_id: int, conversation_id: int | None = None, ) -> WorkflowState: # Open DB session (Unit-of-Work) async with AsyncSessionLocal() as session: # Sanitize input cleaned = await self.security_service.sanitize_input(message) # Build state state = WorkflowState( conversation_id=conversation_id or 0, customer_id=customer_id, message=cleaned["safe_text"], history=await self.message_service.get_recent(conversation_id, limit=20), ) # Resolve graph config = {"configurable": {"thread_id": str(conversation_id)}} # Run to completion output = await self.compiled_graph.ainvoke(state, config=config) # Persist messages atomically await self.message_service.add_message( conversation_id, role="CUSTOMER", content=state.message ) await self.message_service.add_message( conversation_id, role="AGENT", content=output.final_response.content, metadata=output.final_response.metadata, ) await session.commit() return output

4.3 stream_message (SSE)

Wraps process_message but yields events to an asyncio.Queue for the SSE endpoint:

async def stream_message(self, message, customer_id, conversation_id) -> AsyncIterator[dict]: config = {"configurable": {"thread_id": str(conversation_id)}} async for event in self.compiled_graph.astream(state, config=config): # Each node emit = one SSE event if isinstance(event, tuple): node_name, partial_state = event yield {"type": "status", "step": node_name}

4.4 30-second deadlock recovery

The checkpointer setup has a known issue under concurrent startup:

try: await asyncio.wait_for(self.checkpointer.setup(), timeout=30.0) except asyncio.TimeoutError: # Retry once await self.checkpointer.setup()

Without this guard, a startup deadlock could hang the entire pod.

4.5 Common Pitfalls

  • Calling process_message without conversation_id for an ongoing conversation — no thread continuity; graph runs from scratch.
  • Not committing the DB session on exception — orphaned customer-side messages mid-graph. Always try: commit() except: rollback().
  • Sharing an AsyncPostgresSaver instance across workers — the saver has file handles; reuse across processes fails.

4.6 Real-World Interview Prep

Q1: Why use Postgres for state instead of Redis? A: Conversation history (the Message table) is already in Postgres. Keeping checkpoints in the same DB simplifies backup/restore and transactional consistency. Redis is great for ephemeral state but adds an extra dependency.

Q2: How do you test a process_message call? A: Use a real Postgres test container. Mock the LLM (AsyncMock), but don’t mock the checkpointer — its serialization is hard to fake. Tests run against actual checkpoint-SQL paths.

Q3: What’s the cost of a “pause at human_approval” round trip? A: Postgres checkpoint write on pause (5-10ms). Resume reads checkpoint + replays up to the pause. Resuming is more expensive (50-100ms depending on graph size). When a customer opens a UI, they wait for the resume. UX-wise,show a “Reviewer is checking” badge.


This page connects the dots. Map of related pages:

ConceptRelated page
Specialist agents (this page)Multi-Agent Supervisor Routing
Agent backbone (run/retry/circuit)BaseAgent (Circuit Breaker + Tenacity)
Compliance gate (post-LLM scan)ComplianceChecker Hybrid Short-Circuit
Postgres checkpointsLangGraph Checkpointing
Human pause/resumeLangGraph Human-in-the-Loop
Shared memory schemaWorkflowState Pydantic Schema
LLM observabilityLangfuse LLM Tracing (@observe)
RBAC on inbound callsFastAPI Depends() for Auth/RBAC
Cache TTLsRedis Cache Service & Normalized Keys
Security pre-flight (Presidio/Lakera)SecurityService Sanitization Pipeline
Authenticated message endpointMessage API Route (IDOR + Scopes)
SSE transport layerFastAPI SSE Streaming
Frontend SSE consumptionStreamlit Chat UI (Session State + SSE)

Real-World Interview Prep

Q1: How does this multi-agent architecture handle the tension between specialisation and generality?

A: The architecture uses a two-level design. The IntentClassifier (a light LLM call with temperature=0.0) chooses which specialist agent to route to, and the specialists themselves are narrow — AccountAgent only reads data, ProductRecommender only recommends. The GeneralAgent acts as the safety net: low-confidence classifications, ambiguous queries, and unexpected inputs all land there. This keeps specialists pure, the generalist catches everything else, and the system never produces a blank error. The drift-to-generalist pattern (any classification below 0.6 confidence routes to GeneralAgent) is the key safety valve.

Q2: Why have six specialist nodes in the LangGraph instead of one big LLM call with all capabilities?

A: Four reasons. (1) Cost — routing a simple balance check through IntentClassifier → AccountAgent costs ~200 tokens; a single monolithic prompt costs 2000+ tokens with irrelevant context. (2) Observability — each node gets its own Langfuse span; if AccountAgent fails, you know exactly where. (3) Compliance — the ComplianceChecker sits between every specialist and the END node, so every response is checked equally. A monolithic call makes it harder to interpose compliance checks. (4) Testing — each specialist is independently testable with mocked services and mocked LLMs, instead of needing the full graph.

Q3: How would you add a new specialist agent (e.g. a “FraudAlertAgent”)?

A: Five steps. (1) Subclass BaseAgent in app/agents/fraud_alert_agent.py — inherits circuit-breaking and retries for free. (2) Add "fraud_alert" to the INTENTS config in IntentClassifier with routing "fraud". (3) Add graph.add_node("fraud", fraud_node) in message_workflow.py and a conditional edge from route_intent. (4) Add graph.add_edge("fraud", "compliance") so all fraud outputs pass through ComplianceChecker. (5) Write unit tests for the agent, integration tests for routing, and a LangGraph trace test. Total effort is ~100 lines of code plus tests.

Last updated on