Circuit Breakers and Fallbacks: Building Resilient Agents
The Fragility Problem
Production agents are compositions of unreliable components. The web search API returns 503s during peak hours. The LLM provider experiences latency spikes. A tool hangs indefinitely waiting for a response that never comes. Your database connection pool is exhausted.
Naive agents fail catastrophically in these scenarios: they retry indefinitely (burning tokens and time), they deadlock (waiting for a hung tool), or they hallucinate tool outputs (making up results when tools fail). None of these is acceptable in production.
Circuit breakers and fallbacks are the two core reliability patterns that transform fragile agents into resilient systems that degrade gracefully.
Circuit Breakers: Preventing Cascading Failures
The circuit breaker pattern comes from electrical engineering. When a circuit is overloaded, the breaker trips and cuts power — preventing damage from propagating through the system. In software, a circuit breaker wraps a remote call and automatically stops calling a failing dependency.
Circuit Breaker States
┌─────────────────────────────────────────────────────────┐
│ Circuit Breaker State Machine │
│ │
│ All calls pass Calls fail Probe call │
│ through normally N times in window succeeds │
│ │
│ ┌─────────┐ failures≥N ┌──────────┐ success ┌────┴──────┐
│ │ CLOSED │ ───────────► │ OPEN │ ────────► │ HALF-OPEN │
│ │(healthy)│ │ (failed) │ │ (testing) │
│ └─────────┘ └──────────┘ └─────────┬──┘
│ ▲ │ │
│ │ reset_timeout │ probe fails │
│ └────────────────────────┘ ◄──────────────────────┘ │
│ failure │
└─────────────────────────────────────────────────────────────┘
Circuit Breaker Implementation
import time
import asyncio
import functools
from dataclasses import dataclass, field
from typing import Callable, Any
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation; calls pass through
OPEN = "open" # Circuit tripped; calls fail fast
HALF_OPEN = "half_open" # Testing if service has recovered
@dataclass
class CircuitBreaker:
"""
Thread-safe circuit breaker for protecting agent tool calls.
Opens after `failure_threshold` failures within `window_seconds`.
Moves to HALF_OPEN after `reset_timeout_seconds` and resets fully
on the first successful probe call.
"""
name: str
failure_threshold: int = 5
window_seconds: float = 60.0
reset_timeout_seconds: float = 30.0
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_failure_times: list[float] = field(default_factory=list, init=False)
_last_failure_time: float = field(default=0.0, init=False)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)
@property
def state(self) -> CircuitState:
return self._state
def _prune_old_failures(self):
"""Remove failures older than the window."""
cutoff = time.monotonic() - self.window_seconds
self._failure_times = [t for t in self._failure_times if t > cutoff]
self._failure_count = len(self._failure_times)
def _should_allow_request(self) -> bool:
"""Determine if a request should be allowed through."""
now = time.monotonic()
if self._state == CircuitState.CLOSED:
return True
if self._state == CircuitState.OPEN:
if now - self._last_failure_time >= self.reset_timeout_seconds:
self._state = CircuitState.HALF_OPEN
return True # Allow probe request
return False
# HALF_OPEN: allow one probe request
return True
def record_success(self):
"""Record a successful call; reset if in HALF_OPEN."""
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.CLOSED
self._failure_count = 0
self._failure_times = []
def record_failure(self):
"""Record a failure; open circuit if threshold exceeded."""
now = time.monotonic()
self._failure_times.append(now)
self._last_failure_time = now
self._prune_old_failures()
if self._state == CircuitState.HALF_OPEN:
# Probe failed; re-open immediately
self._state = CircuitState.OPEN
elif self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
async def call(self, fn: Callable, *args, **kwargs) -> Any:
"""
Execute `fn` through the circuit breaker.
Raises CircuitOpenError if the circuit is open.
"""
async with self._lock:
if not self._should_allow_request():
raise CircuitOpenError(
f"Circuit '{self.name}' is OPEN. "
f"Service unavailable. Retry in "
f"{self.reset_timeout_seconds:.0f}s."
)
try:
result = await fn(*args, **kwargs) if asyncio.iscoroutinefunction(fn) \
else fn(*args, **kwargs)
async with self._lock:
self.record_success()
return result
except Exception as exc:
async with self._lock:
self.record_failure()
raise
class CircuitOpenError(Exception):
"""Raised when a circuit breaker is open and blocks the call."""
pass
# Decorator for easy application
def with_circuit_breaker(breaker: CircuitBreaker):
"""Decorator that wraps a function with a circuit breaker."""
def decorator(fn: Callable):
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
return await breaker.call(fn, *args, **kwargs)
return wrapper
return decorator
Using Circuit Breakers with Agent Tools
# Define circuit breakers per external dependency
search_breaker = CircuitBreaker(
name="web_search",
failure_threshold=3,
window_seconds=30,
reset_timeout_seconds=60,
)
db_breaker = CircuitBreaker(
name="database",
failure_threshold=5,
window_seconds=60,
reset_timeout_seconds=30,
)
# Wrap tool functions
@with_circuit_breaker(search_breaker)
async def protected_web_search(query: str) -> str:
"""Web search with circuit breaker protection."""
return await tavily_client.search(query)
@with_circuit_breaker(db_breaker)
async def protected_db_query(sql: str) -> list[dict]:
"""Database query with circuit breaker protection."""
return await db.fetch(sql)
Retry Budgets
A retry budget caps total retries across all tool calls in a single agent run. Without a budget, an agent can exhaust both its token limit and external API quotas on repeated retries of a fundamentally broken step.
from dataclasses import dataclass, field
@dataclass
class RetryBudget:
"""
Tracks and enforces a maximum number of retries per agent run.
Budgets are per-run, not per-tool — they limit total retry spending.
"""
total_budget: int = 10
per_tool_budget: int = 3
_used: dict[str, int] = field(default_factory=dict, init=False)
_total_used: int = field(default=0, init=False)
def can_retry(self, tool_name: str) -> bool:
"""Check if retrying this tool is within budget."""
if self._total_used >= self.total_budget:
return False
return self._used.get(tool_name, 0) < self.per_tool_budget
def consume(self, tool_name: str):
"""Record one retry for this tool."""
self._used[tool_name] = self._used.get(tool_name, 0) + 1
self._total_used += 1
@property
def remaining(self) -> int:
return self.total_budget - self._total_used
def summary(self) -> dict:
return {
"total_used": self._total_used,
"total_budget": self.total_budget,
"remaining": self.remaining,
"per_tool": dict(self._used),
}
async def execute_with_budget(
tool_fn: Callable,
tool_name: str,
budget: RetryBudget,
*args,
base_delay: float = 1.0,
**kwargs,
) -> Any:
"""Execute a tool call with retry budget enforcement and exponential backoff."""
last_error = None
attempt = 0
while True:
try:
result = await tool_fn(*args, **kwargs)
return result
except CircuitOpenError:
raise # Circuit is open; don't retry
except Exception as exc:
last_error = exc
if not budget.can_retry(tool_name):
raise RuntimeError(
f"Retry budget exhausted for '{tool_name}': {exc}"
) from exc
budget.consume(tool_name)
delay = base_delay * (2 ** attempt) # Exponential backoff
await asyncio.sleep(delay)
attempt += 1
Timeout Policies
Hung tool calls are silent killers. The agent waits indefinitely, consuming the timeout of the entire request.
import asyncio
from typing import TypeVar
T = TypeVar("T")
async def with_timeout(
coro,
timeout_seconds: float,
fallback_value: Any = None,
tool_name: str = "unknown",
) -> Any:
"""
Execute a coroutine with a timeout.
Returns fallback_value if the coroutine times out, rather than raising.
"""
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
# Log for observability
print(f"[TIMEOUT] Tool '{tool_name}' exceeded {timeout_seconds}s limit")
if fallback_value is not None:
return fallback_value
raise TimeoutError(
f"Tool '{tool_name}' timed out after {timeout_seconds:.1f}s"
)
# Tool timeout configuration
TOOL_TIMEOUTS = {
"web_search": 10.0, # seconds
"code_interpreter": 30.0,
"database_query": 5.0,
"file_read": 2.0,
"llm_call": 60.0,
}
Fallback Chains
When a tool fails permanently (circuit open, budget exhausted, timeout), a fallback chain tries progressively simpler or more reliable alternatives.
from typing import Sequence
class FallbackChain:
"""
Tries a sequence of functions in order, returning the first success.
Enables graceful degradation when primary tools are unavailable.
"""
def __init__(self, *fns: Callable, name: str = "fallback_chain"):
self.fns = fns
self.name = name
async def __call__(self, *args, **kwargs) -> Any:
errors = []
for i, fn in enumerate(self.fns):
try:
result = await fn(*args, **kwargs) \
if asyncio.iscoroutinefunction(fn) else fn(*args, **kwargs)
if i > 0:
print(f"[{self.name}] Primary failed; used fallback #{i}")
return result
except Exception as exc:
errors.append(f"Option {i+1} ({fn.__name__}): {exc}")
continue
raise RuntimeError(
f"[{self.name}] All fallbacks failed:\n" + "\n".join(errors)
)
# Example: web search with fallbacks
async def search_duckduckgo(query: str) -> str:
"""Primary: DuckDuckGo search."""
return await ddg_client.search(query)
async def search_wikipedia(query: str) -> str:
"""Fallback 1: Wikipedia summary."""
return await wiki_client.summary(query)
async def search_static_cache(query: str) -> str:
"""Fallback 2: Cached results for common queries."""
return static_cache.get(query, "No cached result available.")
robust_search = FallbackChain(
search_duckduckgo,
search_wikipedia,
search_static_cache,
name="web_search",
)
Graceful Degradation at the Agent Level
Individual tool protection is not enough. The agent itself must handle the case where its tools are degraded and communicate that clearly:
DEGRADED_RESPONSE_TEMPLATE = """I'm currently unable to complete this request
fully because {reason}. Here is what I can tell you based on what I do know:
{partial_answer}
For the most accurate and up-to-date information, please try again later or
consult {alternative_resource}.
"""
async def agent_with_graceful_degradation(task: str, tools: dict) -> str:
"""
Agent wrapper that catches budget exhaustion and circuit failures,
then produces a partial answer rather than an error message.
"""
budget = RetryBudget(total_budget=10, per_tool_budget=3)
partial_results = []
failures = []
for step in plan_steps(task):
tool_fn = tools.get(step.tool_name)
if tool_fn is None:
failures.append(f"Tool '{step.tool_name}' not available")
continue
try:
result = await execute_with_budget(
tool_fn, step.tool_name, budget, **step.tool_args
)
partial_results.append({"step": step.description, "result": result})
except (RuntimeError, CircuitOpenError, TimeoutError) as exc:
failures.append(str(exc))
# Continue with remaining steps; don't abort entirely
if failures and not partial_results:
return DEGRADED_RESPONSE_TEMPLATE.format(
reason="; ".join(failures),
partial_answer="I was unable to gather any information.",
alternative_resource="direct sources or try again later",
)
if failures:
# Mix of successes and failures — synthesise partial answer
return synthesise_partial_answer(partial_results, failures)
return synthesise_full_answer(partial_results)
Summary
Production-grade agents need three layers of protection: circuit breakers that stop hammering failing dependencies, retry budgets that cap total retries across a run, and timeout policies that prevent hung calls from blocking the agent indefinitely. Fallback chains provide graceful degradation when primary tools are unavailable. Together, these patterns transform an agent that fails catastrophically into one that degrades predictably and communicates its limitations clearly — which is far more useful in production.