Skip to Content
BackendHumanAgent: Escalation Priority with Hybrid Fast-Path + LLM

HumanAgent: Escalation Priority with Hybrid Fast-Path + LLM

What? (Concept Overview)

HumanAgent is the agent responsible for tier-zero escalations: when a customer issue cannot be auto-resolved, it’s classified by priority (URGENT, HIGH, MEDIUM, LOW), routed to a specialist team, and persisted as an EscalationTicket with full SLA metadata. The classification uses a hybrid pattern — a fast keyword heuristic catches emergencies without LLM latency, and an LLM-based semantic assessment handles everything else.

Project Context

The fca-support-agent’s HumanAgent (app/agents/human_agent.py) runs after compliance_checker or after the routing function flags low-confidence intents. The agent uses @observe(as_type="generation", name="Groq-Priority-Assessment") to wrap the LLM call into a Langfuse generation span with model parameters + usage details captured. The _assess_priority method first checks for hard-rule keywords (fraud, stolen, unauthorized, security breach) before falling back to the LLM.

How? (Quick Reference Blocks)

3.1 Priority Enum and Pydantic Schema

# app/agents/human_agent.py from enum import Enum from pydantic import BaseModel, Field class EscalationPriority(str, Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" URGENT = "urgent" class PriorityAnalysis(BaseModel): """Strict schema for LLM Priority Assessment. Pydantic v2.""" priority: EscalationPriority = Field( description="The semantic priority level of the customer's issue." ) reasoning: str = Field( description="A brief explanation of why this priority was chosen." ) class EscalationTicket(BaseModel): id: str customer_id: int conversation_id: int issue: str priority: str status: str = "open" assigned_to: str estimated_response: str saved: bool created_at: str

3.2 Hybrid Fast-Path + LLM Semantic Assessment

# app/agents/human_agent.py — _assess_priority @observe(as_type="generation", name="Groq-Priority-Assessment") async def _assess_priority(self, message: str) -> EscalationPriority: # 1. Hybrid Fast-Path: catch blatant emergencies instantly message_lower = message.lower() if any( kw in message_lower for kw in ["fraud", "stolen", "unauthorized", "security breach"] ): # Defensive: check for negation ("not fraud", "no stolen") if "not " not in message_lower and "no " not in message_lower: return EscalationPriority.URGENT # 2. LLM Semantic Assessment langfuse = get_client() langfuse.update_current_generation( model=self.config.model_name, model_parameters={"temperature": 0.0} ) prompt = f""" Analyze the following customer message to determine its escalation priority. Customer Message: "{message}" Priority Levels: - URGENT: Fraud, stolen cards, security breaches, locked out of accounts. - HIGH: Formal complaints, unacceptable service, denied transactions, system errors. - MEDIUM: Standard support requests, account changes, document requests. - LOW: General inquiries, non-urgent questions. You MUST respond with a single valid JSON object. Do NOT wrap it in a list or array. It must contain exactly these keys: "priority" and "reasoning". Example Output: {{ "priority": "medium", "reasoning": "The customer is asking for help with a standard account update." }} """ try: async def _call_llm(): return await self.client.chat.completions.create( model=self.config.model_name, messages=[{"role": "system", "content": "You are a senior support triage expert."}, {"role": "user", "content": prompt}], temperature=0.0, response_format={"type": "json_object"}, ) response = await self.execute_with_retry(_call_llm) if hasattr(response, "usage") and response.usage: langfuse.update_current_generation( usage_details={ "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens, } ) analysis = PriorityAnalysis.model_validate_json( response.choices[0].message.content ) return analysis.priority except Exception as e: self.logger.error(f"LLM Priority Parsing Error: {e}") # Safe fallback: HIGH, ensuring human attention quickly if AI fails return EscalationPriority.HIGH

3.3 Generating an EscalationTicket

# app/agents/human_agent.py — _create_escalation async def _create_escalation( self, customer_id, conversation_id, issue, priority, context=None, ) -> EscalationTicket: ticket_id = f"ESC-{customer_id}-{int(datetime.utcnow().timestamp())}" assigned_group = self._assign_specialist(priority) saved_status = False conversation_service = ( context.get("conversation_service") if context else None ) or self.conversation_service if conversation_service: # Throw hard error if DB is disconnected — never silently swallow if hasattr(conversation_service, "db") and conversation_service.db is not None: await conversation_service.escalate_conversation( conversation_id, reason=issue, priority=priority.value, assigned_group=assigned_group, ticket_id=ticket_id, ) saved_status = True else: raise ConnectionError("Database connection is null in ConversationService") return EscalationTicket( id=ticket_id, customer_id=customer_id, conversation_id=conversation_id, issue=issue, priority=priority.value, status="open", created_at=datetime.utcnow().isoformat(), assigned_to=assigned_group, estimated_response=self._estimate_response_time(priority), saved=saved_status, ) def _estimate_response_time(self, priority: EscalationPriority) -> str: return { EscalationPriority.URGENT: "Within 15 minutes", EscalationPriority.HIGH: "Within 1 hour", EscalationPriority.MEDIUM: "Within 4 hours", EscalationPriority.LOW: "Within 24 hours", }.get(priority, "Within 24 hours") def _assign_specialist(self, priority: EscalationPriority) -> str: return { EscalationPriority.URGENT: "Security & Fraud Team", EscalationPriority.HIGH: "Senior Support Team", EscalationPriority.MEDIUM: "Support Specialists", EscalationPriority.LOW: "Support Team", }.get(priority, "Support Team")

Why? (Parameter Breakdown

  • temperature=0.0 for triage — Same intent must classify the same way across customers. Higher temperature produces creative-but-inconsistent priorities.
  • One-shot JSON + Pydantic model_validate_json — Skip pre-validation json.loads and partial handling; let the strict Pydantic schema raise on malformed output. Faster and safer than regex-parsing raw strings.
  • Negation-aware fast-path ("not " and "no " substrings) — Naive keyword match hits "I do NOT think there's fraud". Without negation handling, the agent returns URGENT and freezes the customer’s account. Keep the negation check defensive but simple (not full NLP).
  • Fail-soft to HIGH on parse error — Half of [URGENT, MEDIUM, LOW] under-classifies; HIGH over-classifies. Over-classification is the correct trade-off (operators downgrade, never upgrade); missing an emergency is the worse failure mode.
  • Persist EscalationTicket with tier of metadatasaved=True/False is a real field, not a debug print. Operators see it in the queue UI to know whether the ticket actually reached the DB or is a transient in-memory stub.
  • @observe(as_type="generation", ...) in Langfuse — Marks the span as a “generation” type, which surfaces in Langfuse with model parameters and token usage as first-class fields. Without as_type="generation", the span is generic and token cost doesn’t show up in Cost Analysis.

Common Pitfalls

  1. Letting the LLM return free-text priorities ("high priority", "very urgent"). The enum-typed Pydantic schema forces a discrete set; otherwise downstream code branches on string comparison which breaks every time the LLM rewrites a synonym.
  2. Throwing away the user’s message when DB fails. The current code raises a ConnectionError if the DB is unreachable — but the response still includes an emergency-call fallback "0800-123-4567", ensuring no ticket is lost silently. Don’t trap-and-discard; trap-and-recover-with-fallback.

Real-World Interview Prep

Q1: How do you prevent the keyword fast-path from over-escalating negated mentions (“not stolen”, “no fraud”)?

A: Three layers. (1) Substring check for not, no, without preceding the keyword within a sliding window (5-7 tokens before). (2) Switch fast-path to a small fine-tuned model that understands negation. (3) Move the fast-path to a separate rule engine with proper tokenisation (Spacy, duckling) — substring matching is brittle. The current code uses (1); for production-grade safety combine with (3) because substring mis-matches are still possible ("I should have not stolen" is ambiguous). Most teams eventually replace fast-path entirely with a tiny classification model (~10MB) tuned on historical tickets.

Q2: Why persist the ticket as a Pydantic BaseModel and not a SQLAlchemy Ticket ORM row directly?

A: The Pydantic EscalationTicket is the service-layer contract, not the storage model. The DB has its own ORM model (or SQL construct) with foreign keys, indexing, partitioning, audit columns — those are storage concerns. By keeping the two separate: (1) the agent’s return type is stable across DB schema migrations, (2) the Pydantic model can be reused as a JSON API response shape, (3) tests can use the same Pydantic model without mocking SQLAlchemy. The translation happens in the service layer: ticket = EscalationTicket(**row_to_dict(db_row)).

Q3: How would you measure the priority classifier’s accuracy without a labelled ground-truth set?

A: Two pragmatic approaches. (1) Expert review sample — pull 200 random transcripts weekly, have a compliance officer label priorities, compare to the classifier. Cohen’s κ > 0.7 is acceptable; below means the prompt needs tuning. (2) Operational proxies — correlate classifier output with downstream SLA: if URGENT tickets take < 15 min (matching the SLA), the priority is right; if they take 4 hours, you over-classified. Negative correlations tell you where to retrain. Track via a Grafana panel: priority_confusion_matrix over weekly samples, with a heartbeat alert when the matrix drifts.

Top-to-Bottom Code Walkthrough (app/agents/human_agent.py)

The HumanAgent decides whether to hand the conversation off to a human. Its job is twofold: (1) decide priority (URGENT/HIGH/MEDIUM/LOW) so the right team picks it up first, and (2) generate a friendly, on-brand message for the customer.

Imports & init

  • from app.agents.base import BaseAgent — inherits the circuit breaker, observability, and retry logic.
  • self.specialist_queues = {"URGENT": "fraud_team", "HIGH": "compliance_team", "MEDIUM": "support_team", "LOW": "general_team"} — maps priority to a queue name. The persistence layer translates queue names to actual ticketing-system integration IDs.

_execute(self, state)

  1. Hybrid priority assessment:
    • Regex/keyword fast path: scan state.message.lower() for whitelisted terms ("fraud", "stolen", "police", "lawsuit"). If matched, skip the LLM and assign URGENT directly.
    • LLM semantic fallback: prompt = f"Classify urgency of: {state.message}". Use JSON-mode so the response is {"priority": "HIGH", "reasoning": "..."} not prose.
  2. Ticket generation: ticket_id = f"ESC-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6]}". The composite format is human-greppable AND globally unique.
  3. Specialist assignment: assigned_team = self.specialist_queues[priority]. Stored in state.metadata["escalation"].
  4. Friendly customer response: Generated via LLM with strict guidance: “Do not promise a time. Acknowledge the issue’s importance. Provide ticket reference. Encourage live-chat if urgent. NO financial advice.” This prompt is hardened — it’s exactly the kind of response EU/FCA regulators expect.
  5. Return WorkflowState updates:
    return { "agent_response": final_msg, "agent_metadata": {"escalated": True, "priority": priority, "ticket_id": ticket_id, "team": assigned_team}, "escalation_required": True, }
    The orchestrator routes this to a terminal “wait_for_human” state that holds the conversation thread open until a human approves or modifies the response.

Common Pitfalls

Promising “all fraud cases will be resolved in 24 hours” in the response — that’s an FCA-regulated performance claim. The LLM prompt must explicitly forbid specific SLAs.

Forgetting to write the escalation row to Message when the graph pauses. The MessageService must be called inside _execute so the conversation history survives the pause.

Using a shared uuid.uuid4() that collides — the timestamp prefix on the ticket id guarantees uniqueness even in tight loops.

Real-World Interview Prep

Q1: Why hybrid (regex + LLM) instead of pure LLM classification?

A: Speed + cost. “fraud” can be matched in microseconds. Only ~5% of messages contain explicit priority keywords — the rest need the LLM. Splitting the path saves 95% of LLM calls and keeps the worst-case latency under 200ms.

Q2: What happens if the LLM classifier returns malformed JSON?

A: The agent catches JSONDecodeError, falls back to priority = "MEDIUM" (conservative default), and logs the failure. Doing better is risky — LOW for a fraud case is worse than MEDIUM.

Q3: How would you audit this agent’s priority decisions?

A: Open a Langfuse trace priority_assessment. Every message gets a reasoning field stored on the WorkflowState. Build a nightly job: scan the last 24h of assessments + outcome (agent_resolved vs human_resolved) and flag mismatches where humans downgraded LLM-priority.

Last updated on