Skip to Content
BackendBaseAgent: Circuit Breaker + Tenacity Retry Wrapper

BaseAgent: Circuit Breaker + tenacity Retry Wrapper

What? (Concept Overview)

BaseAgent is the abstract superclass that isolates two cross-agent failure modes — runaway LLM cost and transient infrastructure errors — into one reusable execute_with_retry helper that consults a SimpleCircuitBreaker before each call and retries-with-exponential-backoff on transient Exceptions via the tenacity library. Every concrete agent (IntentClassifierAgent, HumanAgent, ComplianceCheckerAgent, …) inherits this discipline for free.

Project Context

The FCA Support Agent’s every LLM-driven agent subclasses BaseAgent from app/agents/base.py and uses self.execute_with_retry(self._call_llm, ...) instead of calling the LLM client raw. The breaker thresholds (failure_threshold=settings.circuit_breaker_threshold, recovery_timeout=settings.circuit_breaker_recovery_timeout) are sourced from Settings, so SREs can dial them via env vars without code edits. The agent metadata (_get_description, _get_capabilities) doubles as a Langfuse root-spans source, and get_llm_callbacks() returns the Langfuse CallbackHandler for chain/agent invocations.

How? (Quick Reference Blocks)

3.1 SimpleCircuitBreaker — Three-State Machine

# app/agents/base.py class SimpleCircuitBreaker: def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.failures = 0 self.last_failure_time = 0 self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN def allow_request(self) -> bool: if self.state == "OPEN": if time.time() - self.last_failure_time > self.recovery_timeout: self.state = "HALF_OPEN" return True # probe: allow 1 trial request return False # fail fast while open return True # CLOSED or HALF_OPEN: allow def record_success(self): self.failures = 0 self.state = "CLOSED" def record_failure(self): self.failures += 1 self.last_failure_time = time.time() if self.failures >= self.failure_threshold: self.state = "OPEN"

3.2 execute_with_retry — Breaker + Tenacity Stack

# app/agents/base.py — BaseAgent.execute_with_retry from tenacity import ( AsyncRetrying, stop_after_attempt, wait_exponential, retry_if_exception_type, before_sleep_log, ) async def execute_with_retry(self, func: Callable, *args, **kwargs): # 1. FAST-FAIL if the circuit is OPEN if not self.circuit_breaker.allow_request(): self.logger.warning(f"Circuit Breaker OPEN for {self.name}. Failing fast.") raise Exception("Service temporarily unavailable (Circuit Breaker Open)") try: # 2. Retry loop with exponential backoff: 1s, 2s, 4s (capped at 10s, max 3 attempts) async for attempt in AsyncRetrying( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type(Exception), before_sleep=before_sleep_log(self.logger, logging.WARNING), reraise=True, ): with attempt: result = await func(*args, **kwargs) self.circuit_breaker.record_success() # reset breaker return result except Exception as e: self.circuit_breaker.record_failure() # keep breaker honest self.logger.error(f"Operation failed after retries: {e}") raise e

3.3 Wiring BaseAgent Into a Concrete Agent

# app/agents/human_agent.py — __init__ excerpt from app.agents.base import BaseAgent, AgentConfig class HumanAgent(BaseAgent): def __init__(self, config=None, conversation_service=None, **kwargs): super().__init__(name="human_agent", config=config) self.client = AsyncGroq(api_key=self.config.api_key) self.conversation_service = conversation_service or ConversationService() @observe(as_type="generation", name="Groq-Priority-Assessment") async def _assess_priority(self, message: str) -> EscalationPriority: async def _call_llm(): return await self.client.chat.completions.create( model=self.config.model_name, messages=[{"role": "system", "content": "You are a senior triage expert."}, {"role": "user", "content": prompt}], temperature=0.0, response_format={"type": "json_object"}, ) # ALWAYS go through execute_with_retry — never call Groq raw response = await self.execute_with_retry(_call_llm) return PriorityAnalysis.model_validate_json( response.choices[0].message.content ).priority

3.4 Langfuse Callback Plumbing

# app/agents/base.py — _setup_observability from langfuse.langchain import CallbackHandler as LangfuseCallbackHandler def _setup_observability(self): if not settings.is_observability_enabled: return None if LangfuseCallbackHandler is None: self.logger.warning("Langfuse not installed.") return None try: return LangfuseCallbackHandler() # default keys from env except Exception as e: self.logger.error(f"Failed to initialize Langfuse: {e}") return None def get_llm_callbacks(self) -> List[Any]: callbacks: List[Any] = [] if self.trace_handler: callbacks.append(self.trace_handler) return callbacks

Why? (Parameter Breakdown)

  • Circuit Breaker BEFORE retry loop — Without the breaker, retry storms can keep hammering a downed service and prolong the outage. With the breaker, the breaker opens after 5 failures (tunable) and the whole agent class fail-fasts for recovery_timeout seconds.
  • retry_if_exception_type(Exception) is the danger default — Retrying on EVERY exception masks logic bugs. Narrow retry_if_exception_type((httpx.TimeoutException, ConnectionError, ...)) for production. The current setting retries broadly because the LLM surface is too heterogeneous to enumerate.
  • wait_exponential(multiplier=1, min=1, max=10) — First retry 1s, then 2s, 4s, 8s, capped at 10s, total attempt budget = 3. This bounds the worst-case latency (≤ ~25s per agent call) while still absorbing transient API hiccups.
  • stop_after_attempt(3) — Three attempts total: initial + 2 retries. Beyond 3, the LLM is likely structurally broken; raising is better than spinning.
  • reraise=True — Tenacity’s AsyncRetrying swallows the original exception by default unless reraise=True is set. Without it, callers see RetryError instead of the actual ConnectionError.
  • record_success() inside the with attempt: block — Successful calls in retry loop still reset the breaker. Without this, a single successful retry would leave the breaker partially-open.
  • state = "HALF_OPEN" in allow_request() — Allows ONE trial request after the recovery timeout. If that request succeeds, the breaker moves back to CLOSED; if it fails, it goes back to OPEN for another full recovery_timeout. This is the “test the water” step.
  • Source failure_threshold/recovery_timeout from Settings — Production tuning must be env-var driven, not code-driven. Values like circuit_breaker_threshold: int = 5, ge=1, le=50 and circuit_breaker_recovery_timeout: int = 60 are validated by Pydantic.

Common Pitfalls

  1. Calling the LLM directly instead of through execute_with_retry. New agents often forget the wrapper. The fix is a code-review checklist item AND a unit test that asserts execute_with_retry was called (mock it via unittest.mock.patch).
  2. Wrapping Exception too broadly. Without narrowing to transient-network exceptions, the agent can spend 25s retrying on a Pydantic validation error that will never succeed. Fix: subclass the retry condition per-agent based on the LLM client’s known transient errors.

Real-World Interview Prep

Q1: Why three states (CLOSED / OPEN / HALF_OPEN) and not just two?

A: The HALF_OPEN state is the “test the water” probe. Without it, after the recovery timeout you either jump directly to CLOSED (allowing floods of requests) or stay OPEN (delaying recovery unnecessarily). HALF_OPEN lets exactly ONE request through; if it succeeds, transition CLOSED; if it fails, transition back to OPEN for another recovery timeout. This is the standard Michael Nygard pattern; production libraries like pybreaker and purgatory use the same three states.

Q2: How would you persist breaker state across pod restarts?

A: Use a Redis-backed shared store. The breaker state (failures, last_failure_time, state) is replicated via a Redis hash keyed by agent name + pod-name (or just agent name if you want cluster-wide breakers). On every state transition, write HSET breaker:agent_X failures 5 state OPEN last_failure_time 1735000000. On allow_request(), HGETALL breaker:agent_X first to refresh state. Cluster-wide breakers prevent one stuck pod from re-breaker’ing into OPEN across the fleet when only one instance is broken — but at the cost of cross-pod lock-step lock contention. Local breakers are cheaper but can’t share state.

Q3: How do you test a circuit breaker in CI?

A: Three layers. (1) Unit test: mock self._call_llm to raise httpx.TimeoutException 5 times → assert circuit_breaker.state == "OPEN" and the next call raises "Service temporarily unavailable" without ever calling _call_llm. (2) Edge case: mock one timeout then one success → assert breaker stays CLOSED. (3) Recovery test: mock 5 timeouts to open the breaker, time.sleep(recovery_timeout + 1), then provide a successful mock → assert breaker transitions HALF_OPEN → CLOSED. Use freezegun or unittest.mock.patch("time.time") for time control.

Top-to-Bottom Code Walkthrough (app/agents/base.py)

app/agents/base.py is the DNA every agent inherits. It bundles three reliability+observability tools so each subclass is just the LLM prompt logic.

Imports

  • from abc import ABC, abstractmethod — declares the class as abstract; you cannot instantiate BaseAgent directly, only its kids.
  • from langfuse import observe — the @observe() decorator that opens a Langfuse trace.
  • from tenacity import retry, stop_after_attempt, wait_exponential — the resilience trio: stop after N tries, wait with backoff between tries.
  • from app.config import settings — reads circuit_breaker_threshold and recovery_timeout from .env.

SimpleCircuitBreaker

A tiny three-state machine that lives in memory inside BaseAgent:

  • state = "closed" (healthy), "open" (blocked), "half_open" (testing recovery).
  • record_failure() bumps failure_count; once it hits threshold it flips to "open" and stamps opened_at = time.time().
  • can_execute() returns False while state == "open" UNLESS time.time() - opened_at > recovery_timeout, in which case it flips to "half_open" and lets ONE request through. That single request either resets the breaker (closes it) or trips it again.
  • Why this matters: when Groq returns 503, every user message would otherwise hammer the API. The breaker gives it breathing room.

BaseAgent class

  • self.circuit_breaker = SimpleCircuitBreaker(...) — owned per agent instance.
  • self.provider = provider — the LLM client (Groq, OpenAI, etc.), injected so subclasses can swap.
  • @observe(name="...") on execute() — opens a Langfuse span before the circuit-breaker check.
  • @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10)) wraps the call. Why exponential: thundering-herd avoidance. If ten workers retry every 1s they all hit Groq at the same time; exponential jitter spreads the load.
  • def _execute(self, state) is @abstractmethodintent_classifier, account_agent, etc. must override it with their prompt logic.

Subclass lifecycle (e.g. IntentClassifier)

class IntentClassifier(BaseAgent): def _execute(self, state): intent = self.provider.classify(state.message) return {"intent": intent, "confidence": 0.95}

execute() (inherited) -> checks circuit -> retries on transient failure -> calls _execute -> records success/failure -> returns dict.

For the full line-by-line walkthrough of each specialist agent subclass (IntentClassifier, AccountAgent, ProductRecommender, GeneralAgent), see Specialist Agent Deep Dives & LangGraph Flow.

Common Pitfalls

Sharing one circuit-breaker across instances defeats the goal of per-agent isolation. Each agent owns its own state.

Using time.sleep() to wait before retry blocks the event loop for I/O-bound agents. Always use tenacity.wait_exponential() — it cooperates with asyncio.

Marking is_compliant via short-circuit + skipping LLM sometimes hides subtle violations. Pair the fast regex pass with a periodic post-LLM audit sample.

Real-World Interview Prep

Q1: What’s the trade-off between a circuit-breaker and unbounded retries?

A: Retries hide failures (a papered-over outage looks healthy). A breaker surfaces them (the failure count makes the outage visible) and protects downstream (no thundering herd). Pair them: retries handle transient blips; the breaker handles sustained outages.

Q2: Why half-open and not closed directly after recovery_timeout?

A: Half-open is a probe state. It lets ONE call through to test whether the downstream recovered. Closing immediately on a timer would race multiple parallel probe calls on an unconfirmed recovery — if those probes still fail, you’d just trip the breaker again. Half-open ensures exactly one test attempt.

Q3: What should can_execute() do on first ever call?

A: Default to allowing (state == "closed" and failure_count == 0). It is correct and harmless because there is no failure history yet. Lazy initialisation is fine; eager probing requires you to know can_recover_at ahead of time.

Last updated on