Production Hardening

Circuit Breakers and Fallback Chains

14m read

Circuit Breakers and Fallbacks: Building Resilient Agents

The Fragility Problem

Production agents are compositions of unreliable components. The web search API returns 503s during peak hours. The LLM provider experiences latency spikes. A tool hangs indefinitely waiting for a response that never comes. Your database connection pool is exhausted.

Naive agents fail catastrophically in these scenarios: they retry indefinitely (burning tokens and time), they deadlock (waiting for a hung tool), or they hallucinate tool outputs (making up results when tools fail). None of these is acceptable in production.

Circuit breakers and fallbacks are the two core reliability patterns that transform fragile agents into resilient systems that degrade gracefully.


Circuit Breakers: Preventing Cascading Failures

The circuit breaker pattern comes from electrical engineering. When a circuit is overloaded, the breaker trips and cuts power — preventing damage from propagating through the system. In software, a circuit breaker wraps a remote call and automatically stops calling a failing dependency.

Circuit Breaker States

┌─────────────────────────────────────────────────────────┐
│                Circuit Breaker State Machine            │
│                                                         │
│   All calls pass          Calls fail       Probe call  │
│   through normally    N times in window    succeeds     │
│                                                         │
│   ┌─────────┐  failures≥N  ┌──────────┐  success  ┌────┴──────┐
│   │  CLOSED │ ───────────► │   OPEN   │ ────────► │ HALF-OPEN │
│   │(healthy)│              │ (failed) │           │  (testing) │
│   └─────────┘              └──────────┘           └─────────┬──┘
│        ▲                        │                           │
│        │  reset_timeout         │ probe fails               │
│        └────────────────────────┘ ◄──────────────────────┘ │
│                                  failure                    │
└─────────────────────────────────────────────────────────────┘

Circuit Breaker Implementation

import time
import asyncio
import functools
from dataclasses import dataclass, field
from typing import Callable, Any
from enum import Enum


class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation; calls pass through
    OPEN = "open"           # Circuit tripped; calls fail fast
    HALF_OPEN = "half_open" # Testing if service has recovered


@dataclass
class CircuitBreaker:
    """
    Thread-safe circuit breaker for protecting agent tool calls.

    Opens after `failure_threshold` failures within `window_seconds`.
    Moves to HALF_OPEN after `reset_timeout_seconds` and resets fully
    on the first successful probe call.
    """
    name: str
    failure_threshold: int = 5
    window_seconds: float = 60.0
    reset_timeout_seconds: float = 30.0

    _state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    _failure_count: int = field(default=0, init=False)
    _failure_times: list[float] = field(default_factory=list, init=False)
    _last_failure_time: float = field(default=0.0, init=False)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)

    @property
    def state(self) -> CircuitState:
        return self._state

    def _prune_old_failures(self):
        """Remove failures older than the window."""
        cutoff = time.monotonic() - self.window_seconds
        self._failure_times = [t for t in self._failure_times if t > cutoff]
        self._failure_count = len(self._failure_times)

    def _should_allow_request(self) -> bool:
        """Determine if a request should be allowed through."""
        now = time.monotonic()
        if self._state == CircuitState.CLOSED:
            return True
        if self._state == CircuitState.OPEN:
            if now - self._last_failure_time >= self.reset_timeout_seconds:
                self._state = CircuitState.HALF_OPEN
                return True  # Allow probe request
            return False
        # HALF_OPEN: allow one probe request
        return True

    def record_success(self):
        """Record a successful call; reset if in HALF_OPEN."""
        if self._state == CircuitState.HALF_OPEN:
            self._state = CircuitState.CLOSED
            self._failure_count = 0
            self._failure_times = []

    def record_failure(self):
        """Record a failure; open circuit if threshold exceeded."""
        now = time.monotonic()
        self._failure_times.append(now)
        self._last_failure_time = now
        self._prune_old_failures()

        if self._state == CircuitState.HALF_OPEN:
            # Probe failed; re-open immediately
            self._state = CircuitState.OPEN
        elif self._failure_count >= self.failure_threshold:
            self._state = CircuitState.OPEN

    async def call(self, fn: Callable, *args, **kwargs) -> Any:
        """
        Execute `fn` through the circuit breaker.
        Raises CircuitOpenError if the circuit is open.
        """
        async with self._lock:
            if not self._should_allow_request():
                raise CircuitOpenError(
                    f"Circuit '{self.name}' is OPEN. "
                    f"Service unavailable. Retry in "
                    f"{self.reset_timeout_seconds:.0f}s."
                )

        try:
            result = await fn(*args, **kwargs) if asyncio.iscoroutinefunction(fn) \
                else fn(*args, **kwargs)
            async with self._lock:
                self.record_success()
            return result
        except Exception as exc:
            async with self._lock:
                self.record_failure()
            raise


class CircuitOpenError(Exception):
    """Raised when a circuit breaker is open and blocks the call."""
    pass


# Decorator for easy application
def with_circuit_breaker(breaker: CircuitBreaker):
    """Decorator that wraps a function with a circuit breaker."""
    def decorator(fn: Callable):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            return await breaker.call(fn, *args, **kwargs)
        return wrapper
    return decorator

Using Circuit Breakers with Agent Tools

# Define circuit breakers per external dependency
search_breaker = CircuitBreaker(
    name="web_search",
    failure_threshold=3,
    window_seconds=30,
    reset_timeout_seconds=60,
)

db_breaker = CircuitBreaker(
    name="database",
    failure_threshold=5,
    window_seconds=60,
    reset_timeout_seconds=30,
)

# Wrap tool functions
@with_circuit_breaker(search_breaker)
async def protected_web_search(query: str) -> str:
    """Web search with circuit breaker protection."""
    return await tavily_client.search(query)

@with_circuit_breaker(db_breaker)
async def protected_db_query(sql: str) -> list[dict]:
    """Database query with circuit breaker protection."""
    return await db.fetch(sql)

Retry Budgets

A retry budget caps total retries across all tool calls in a single agent run. Without a budget, an agent can exhaust both its token limit and external API quotas on repeated retries of a fundamentally broken step.

from dataclasses import dataclass, field


@dataclass
class RetryBudget:
    """
    Tracks and enforces a maximum number of retries per agent run.
    Budgets are per-run, not per-tool — they limit total retry spending.
    """
    total_budget: int = 10
    per_tool_budget: int = 3
    _used: dict[str, int] = field(default_factory=dict, init=False)
    _total_used: int = field(default=0, init=False)

    def can_retry(self, tool_name: str) -> bool:
        """Check if retrying this tool is within budget."""
        if self._total_used >= self.total_budget:
            return False
        return self._used.get(tool_name, 0) < self.per_tool_budget

    def consume(self, tool_name: str):
        """Record one retry for this tool."""
        self._used[tool_name] = self._used.get(tool_name, 0) + 1
        self._total_used += 1

    @property
    def remaining(self) -> int:
        return self.total_budget - self._total_used

    def summary(self) -> dict:
        return {
            "total_used": self._total_used,
            "total_budget": self.total_budget,
            "remaining": self.remaining,
            "per_tool": dict(self._used),
        }


async def execute_with_budget(
    tool_fn: Callable,
    tool_name: str,
    budget: RetryBudget,
    *args,
    base_delay: float = 1.0,
    **kwargs,
) -> Any:
    """Execute a tool call with retry budget enforcement and exponential backoff."""
    last_error = None
    attempt = 0

    while True:
        try:
            result = await tool_fn(*args, **kwargs)
            return result
        except CircuitOpenError:
            raise   # Circuit is open; don't retry
        except Exception as exc:
            last_error = exc
            if not budget.can_retry(tool_name):
                raise RuntimeError(
                    f"Retry budget exhausted for '{tool_name}': {exc}"
                ) from exc
            budget.consume(tool_name)
            delay = base_delay * (2 ** attempt)   # Exponential backoff
            await asyncio.sleep(delay)
            attempt += 1

Timeout Policies

Hung tool calls are silent killers. The agent waits indefinitely, consuming the timeout of the entire request.

import asyncio
from typing import TypeVar

T = TypeVar("T")


async def with_timeout(
    coro,
    timeout_seconds: float,
    fallback_value: Any = None,
    tool_name: str = "unknown",
) -> Any:
    """
    Execute a coroutine with a timeout.
    Returns fallback_value if the coroutine times out, rather than raising.
    """
    try:
        return await asyncio.wait_for(coro, timeout=timeout_seconds)
    except asyncio.TimeoutError:
        # Log for observability
        print(f"[TIMEOUT] Tool '{tool_name}' exceeded {timeout_seconds}s limit")
        if fallback_value is not None:
            return fallback_value
        raise TimeoutError(
            f"Tool '{tool_name}' timed out after {timeout_seconds:.1f}s"
        )


# Tool timeout configuration
TOOL_TIMEOUTS = {
    "web_search":       10.0,  # seconds
    "code_interpreter": 30.0,
    "database_query":    5.0,
    "file_read":         2.0,
    "llm_call":         60.0,
}

Fallback Chains

When a tool fails permanently (circuit open, budget exhausted, timeout), a fallback chain tries progressively simpler or more reliable alternatives.

from typing import Sequence


class FallbackChain:
    """
    Tries a sequence of functions in order, returning the first success.
    Enables graceful degradation when primary tools are unavailable.
    """

    def __init__(self, *fns: Callable, name: str = "fallback_chain"):
        self.fns = fns
        self.name = name

    async def __call__(self, *args, **kwargs) -> Any:
        errors = []
        for i, fn in enumerate(self.fns):
            try:
                result = await fn(*args, **kwargs) \
                    if asyncio.iscoroutinefunction(fn) else fn(*args, **kwargs)
                if i > 0:
                    print(f"[{self.name}] Primary failed; used fallback #{i}")
                return result
            except Exception as exc:
                errors.append(f"Option {i+1} ({fn.__name__}): {exc}")
                continue

        raise RuntimeError(
            f"[{self.name}] All fallbacks failed:\n" + "\n".join(errors)
        )


# Example: web search with fallbacks
async def search_duckduckgo(query: str) -> str:
    """Primary: DuckDuckGo search."""
    return await ddg_client.search(query)

async def search_wikipedia(query: str) -> str:
    """Fallback 1: Wikipedia summary."""
    return await wiki_client.summary(query)

async def search_static_cache(query: str) -> str:
    """Fallback 2: Cached results for common queries."""
    return static_cache.get(query, "No cached result available.")

robust_search = FallbackChain(
    search_duckduckgo,
    search_wikipedia,
    search_static_cache,
    name="web_search",
)

Graceful Degradation at the Agent Level

Individual tool protection is not enough. The agent itself must handle the case where its tools are degraded and communicate that clearly:

DEGRADED_RESPONSE_TEMPLATE = """I'm currently unable to complete this request
fully because {reason}. Here is what I can tell you based on what I do know:

{partial_answer}

For the most accurate and up-to-date information, please try again later or
consult {alternative_resource}.
"""


async def agent_with_graceful_degradation(task: str, tools: dict) -> str:
    """
    Agent wrapper that catches budget exhaustion and circuit failures,
    then produces a partial answer rather than an error message.
    """
    budget = RetryBudget(total_budget=10, per_tool_budget=3)
    partial_results = []
    failures = []

    for step in plan_steps(task):
        tool_fn = tools.get(step.tool_name)
        if tool_fn is None:
            failures.append(f"Tool '{step.tool_name}' not available")
            continue
        try:
            result = await execute_with_budget(
                tool_fn, step.tool_name, budget, **step.tool_args
            )
            partial_results.append({"step": step.description, "result": result})
        except (RuntimeError, CircuitOpenError, TimeoutError) as exc:
            failures.append(str(exc))
            # Continue with remaining steps; don't abort entirely

    if failures and not partial_results:
        return DEGRADED_RESPONSE_TEMPLATE.format(
            reason="; ".join(failures),
            partial_answer="I was unable to gather any information.",
            alternative_resource="direct sources or try again later",
        )

    if failures:
        # Mix of successes and failures — synthesise partial answer
        return synthesise_partial_answer(partial_results, failures)

    return synthesise_full_answer(partial_results)

Summary

Production-grade agents need three layers of protection: circuit breakers that stop hammering failing dependencies, retry budgets that cap total retries across a run, and timeout policies that prevent hung calls from blocking the agent indefinitely. Fallback chains provide graceful degradation when primary tools are unavailable. Together, these patterns transform an agent that fails catastrophically into one that degrades predictably and communicates its limitations clearly — which is far more useful in production.