BaseAgent: Circuit Breaker + tenacity Retry Wrapper
What? (Concept Overview)
BaseAgent is the abstract superclass that isolates two cross-agent failure modes — runaway LLM cost and transient infrastructure errors — into one reusable execute_with_retry helper that consults a SimpleCircuitBreaker before each call and retries-with-exponential-backoff on transient Exceptions via the tenacity library. Every concrete agent (IntentClassifierAgent, HumanAgent, ComplianceCheckerAgent, …) inherits this discipline for free.
Project Context
The FCA Support Agent’s every LLM-driven agent subclasses BaseAgent from app/agents/base.py and uses self.execute_with_retry(self._call_llm, ...) instead of calling the LLM client raw. The breaker thresholds (failure_threshold=settings.circuit_breaker_threshold, recovery_timeout=settings.circuit_breaker_recovery_timeout) are sourced from Settings, so SREs can dial them via env vars without code edits. The agent metadata (_get_description, _get_capabilities) doubles as a Langfuse root-spans source, and get_llm_callbacks() returns the Langfuse CallbackHandler for chain/agent invocations.
How? (Quick Reference Blocks)
3.1 SimpleCircuitBreaker — Three-State Machine
# app/agents/base.py
class SimpleCircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure_time = 0
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def allow_request(self) -> bool:
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
return True # probe: allow 1 trial request
return False # fail fast while open
return True # CLOSED or HALF_OPEN: allow
def record_success(self):
self.failures = 0
self.state = "CLOSED"
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"3.2 execute_with_retry — Breaker + Tenacity Stack
# app/agents/base.py — BaseAgent.execute_with_retry
from tenacity import (
AsyncRetrying,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
async def execute_with_retry(self, func: Callable, *args, **kwargs):
# 1. FAST-FAIL if the circuit is OPEN
if not self.circuit_breaker.allow_request():
self.logger.warning(f"Circuit Breaker OPEN for {self.name}. Failing fast.")
raise Exception("Service temporarily unavailable (Circuit Breaker Open)")
try:
# 2. Retry loop with exponential backoff: 1s, 2s, 4s (capped at 10s, max 3 attempts)
async for attempt in AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(Exception),
before_sleep=before_sleep_log(self.logger, logging.WARNING),
reraise=True,
):
with attempt:
result = await func(*args, **kwargs)
self.circuit_breaker.record_success() # reset breaker
return result
except Exception as e:
self.circuit_breaker.record_failure() # keep breaker honest
self.logger.error(f"Operation failed after retries: {e}")
raise e3.3 Wiring BaseAgent Into a Concrete Agent
# app/agents/human_agent.py — __init__ excerpt
from app.agents.base import BaseAgent, AgentConfig
class HumanAgent(BaseAgent):
def __init__(self, config=None, conversation_service=None, **kwargs):
super().__init__(name="human_agent", config=config)
self.client = AsyncGroq(api_key=self.config.api_key)
self.conversation_service = conversation_service or ConversationService()
@observe(as_type="generation", name="Groq-Priority-Assessment")
async def _assess_priority(self, message: str) -> EscalationPriority:
async def _call_llm():
return await self.client.chat.completions.create(
model=self.config.model_name,
messages=[{"role": "system", "content": "You are a senior triage expert."},
{"role": "user", "content": prompt}],
temperature=0.0,
response_format={"type": "json_object"},
)
# ALWAYS go through execute_with_retry — never call Groq raw
response = await self.execute_with_retry(_call_llm)
return PriorityAnalysis.model_validate_json(
response.choices[0].message.content
).priority3.4 Langfuse Callback Plumbing
# app/agents/base.py — _setup_observability
from langfuse.langchain import CallbackHandler as LangfuseCallbackHandler
def _setup_observability(self):
if not settings.is_observability_enabled:
return None
if LangfuseCallbackHandler is None:
self.logger.warning("Langfuse not installed.")
return None
try:
return LangfuseCallbackHandler() # default keys from env
except Exception as e:
self.logger.error(f"Failed to initialize Langfuse: {e}")
return None
def get_llm_callbacks(self) -> List[Any]:
callbacks: List[Any] = []
if self.trace_handler:
callbacks.append(self.trace_handler)
return callbacksWhy? (Parameter Breakdown)
- Circuit Breaker BEFORE retry loop — Without the breaker, retry storms can keep hammering a downed service and prolong the outage. With the breaker, the breaker opens after 5 failures (tunable) and the whole agent class fail-fasts for
recovery_timeoutseconds. retry_if_exception_type(Exception)is the danger default — Retrying on EVERY exception masks logic bugs. Narrowretry_if_exception_type((httpx.TimeoutException, ConnectionError, ...))for production. The current setting retries broadly because the LLM surface is too heterogeneous to enumerate.wait_exponential(multiplier=1, min=1, max=10)— First retry 1s, then 2s, 4s, 8s, capped at 10s, total attempt budget = 3. This bounds the worst-case latency (≤ ~25s per agent call) while still absorbing transient API hiccups.stop_after_attempt(3)— Three attempts total: initial + 2 retries. Beyond 3, the LLM is likely structurally broken; raising is better than spinning.reraise=True— Tenacity’sAsyncRetryingswallows the original exception by default unlessreraise=Trueis set. Without it, callers seeRetryErrorinstead of the actualConnectionError.record_success()inside thewith attempt:block — Successful calls in retry loop still reset the breaker. Without this, a single successful retry would leave the breaker partially-open.state = "HALF_OPEN"inallow_request()— Allows ONE trial request after the recovery timeout. If that request succeeds, the breaker moves back to CLOSED; if it fails, it goes back to OPEN for another fullrecovery_timeout. This is the “test the water” step.- Source
failure_threshold/recovery_timeoutfromSettings— Production tuning must be env-var driven, not code-driven. Values likecircuit_breaker_threshold: int = 5, ge=1, le=50andcircuit_breaker_recovery_timeout: int = 60are validated by Pydantic.
Common Pitfalls
- Calling the LLM directly instead of through
execute_with_retry. New agents often forget the wrapper. The fix is a code-review checklist item AND a unit test that assertsexecute_with_retrywas called (mock it viaunittest.mock.patch). - Wrapping
Exceptiontoo broadly. Without narrowing to transient-network exceptions, the agent can spend 25s retrying on a Pydantic validation error that will never succeed. Fix: subclass the retry condition per-agent based on the LLM client’s known transient errors.
Real-World Interview Prep
Q1: Why three states (CLOSED / OPEN / HALF_OPEN) and not just two?
A: The HALF_OPEN state is the “test the water” probe. Without it, after the recovery timeout you either jump directly to CLOSED (allowing floods of requests) or stay OPEN (delaying recovery unnecessarily). HALF_OPEN lets exactly ONE request through; if it succeeds, transition CLOSED; if it fails, transition back to OPEN for another recovery timeout. This is the standard Michael Nygard pattern; production libraries like pybreaker and purgatory use the same three states.
Q2: How would you persist breaker state across pod restarts?
A: Use a Redis-backed shared store. The breaker state (failures, last_failure_time, state) is replicated via a Redis hash keyed by agent name + pod-name (or just agent name if you want cluster-wide breakers). On every state transition, write HSET breaker:agent_X failures 5 state OPEN last_failure_time 1735000000. On allow_request(), HGETALL breaker:agent_X first to refresh state. Cluster-wide breakers prevent one stuck pod from re-breaker’ing into OPEN across the fleet when only one instance is broken — but at the cost of cross-pod lock-step lock contention. Local breakers are cheaper but can’t share state.
Q3: How do you test a circuit breaker in CI?
A: Three layers. (1) Unit test: mock self._call_llm to raise httpx.TimeoutException 5 times → assert circuit_breaker.state == "OPEN" and the next call raises "Service temporarily unavailable" without ever calling _call_llm. (2) Edge case: mock one timeout then one success → assert breaker stays CLOSED. (3) Recovery test: mock 5 timeouts to open the breaker, time.sleep(recovery_timeout + 1), then provide a successful mock → assert breaker transitions HALF_OPEN → CLOSED. Use freezegun or unittest.mock.patch("time.time") for time control.
Top-to-Bottom Code Walkthrough (app/agents/base.py)
app/agents/base.py is the DNA every agent inherits. It bundles three reliability+observability tools so each subclass is just the LLM prompt logic.
Imports
from abc import ABC, abstractmethod— declares the class as abstract; you cannot instantiateBaseAgentdirectly, only its kids.from langfuse import observe— the@observe()decorator that opens a Langfuse trace.from tenacity import retry, stop_after_attempt, wait_exponential— the resilience trio: stop after N tries, wait with backoff between tries.from app.config import settings— readscircuit_breaker_thresholdandrecovery_timeoutfrom.env.
SimpleCircuitBreaker
A tiny three-state machine that lives in memory inside BaseAgent:
state="closed"(healthy),"open"(blocked),"half_open"(testing recovery).record_failure()bumpsfailure_count; once it hitsthresholdit flips to"open"and stampsopened_at = time.time().can_execute()returnsFalsewhilestate == "open"UNLESStime.time() - opened_at > recovery_timeout, in which case it flips to"half_open"and lets ONE request through. That single request either resets the breaker (closes it) or trips it again.- Why this matters: when Groq returns 503, every user message would otherwise hammer the API. The breaker gives it breathing room.
BaseAgent class
self.circuit_breaker = SimpleCircuitBreaker(...)— owned per agent instance.self.provider = provider— the LLM client (Groq, OpenAI, etc.), injected so subclasses can swap.@observe(name="...")onexecute()— opens a Langfuse span before the circuit-breaker check.@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))wraps the call. Why exponential: thundering-herd avoidance. If ten workers retry every 1s they all hit Groq at the same time; exponential jitter spreads the load.def _execute(self, state)is@abstractmethod—intent_classifier,account_agent, etc. must override it with their prompt logic.
Subclass lifecycle (e.g. IntentClassifier)
class IntentClassifier(BaseAgent):
def _execute(self, state):
intent = self.provider.classify(state.message)
return {"intent": intent, "confidence": 0.95}execute() (inherited) -> checks circuit -> retries on transient failure -> calls _execute -> records success/failure -> returns dict.
For the full line-by-line walkthrough of each specialist agent subclass (IntentClassifier, AccountAgent, ProductRecommender, GeneralAgent), see Specialist Agent Deep Dives & LangGraph Flow.
Common Pitfalls
Sharing one circuit-breaker across instances defeats the goal of per-agent isolation. Each agent owns its own state.
Using time.sleep() to wait before retry blocks the event loop for I/O-bound agents. Always use tenacity.wait_exponential() — it cooperates with asyncio.
Marking is_compliant via short-circuit + skipping LLM sometimes hides subtle violations. Pair the fast regex pass with a periodic post-LLM audit sample.
Real-World Interview Prep
Q1: What’s the trade-off between a circuit-breaker and unbounded retries?
A: Retries hide failures (a papered-over outage looks healthy). A breaker surfaces them (the failure count makes the outage visible) and protects downstream (no thundering herd). Pair them: retries handle transient blips; the breaker handles sustained outages.
Q2: Why half-open and not closed directly after recovery_timeout?
A: Half-open is a probe state. It lets ONE call through to test whether the downstream recovered. Closing immediately on a timer would race multiple parallel probe calls on an unconfirmed recovery — if those probes still fail, you’d just trip the breaker again. Half-open ensures exactly one test attempt.
Q3: What should can_execute() do on first ever call?
A: Default to allowing (state == "closed" and failure_count == 0). It is correct and harmless because there is no failure history yet. Lazy initialisation is fine; eager probing requires you to know can_recover_at ahead of time.