Production Multi-Agent Systems

Failure Modes and Resilience Patterns

14m read

Failure Modes in Multi-Agent Systems

Overview

Multi-agent systems introduce a class of failures that don't exist in single-agent applications. When agents interact, the system becomes more than the sum of its parts — but so do the failure modes. A single misbehaving agent can cascade failures across the entire system, cause resource exhaustion, or produce subtly wrong outputs that are harder to detect than an outright error.

This lesson catalogs the most critical failure modes, explains how to recognize them, and provides concrete mitigation strategies and code implementations.


Taxonomy of Multi-Agent Failures

Multi-Agent Failures
├── Structural Failures
│   ├── Cascading failures (agent failure propagates downstream)
│   ├── Deadlocks (agents block each other indefinitely)
│   └── Infinite delegation loops (circular task routing)
│
├── Behavioral Failures
│   ├── Conflicting agent actions (agents work against each other)
│   ├── Prompt injection across agents (one agent corrupts another's input)
│   └── Hallucination amplification (errors compound through agent chain)
│
└── Resource Failures
    ├── Token budget exhaustion (conversation grows unbounded)
    ├── Cost explosion (runaway LLM calls)
    └── Tool call storms (concurrent agents overloading external APIs)

Failure Mode 1: Cascading Failures

In a sequential pipeline (A → B → C), if agent B fails, agent C has no input. Without proper error handling, C either fails silently, uses stale data, or produces completely wrong output.

How to recognize it: Downstream agents report "no data to process" or produce outputs based on empty/default inputs.

Mitigation: Explicit Error Propagation

from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum

class AgentStatus(Enum):
    SUCCESS = "success"
    PARTIAL = "partial"     # some results but degraded
    FAILED = "failed"
    SKIPPED = "skipped"     # intentionally not run

@dataclass
class AgentResult:
    agent_id: str
    status: AgentStatus
    payload: Optional[Any] = None
    error: Optional[str] = None
    error_code: Optional[str] = None
    fallback_used: bool = False

    @property
    def is_usable(self) -> bool:
        return self.status in (AgentStatus.SUCCESS, AgentStatus.PARTIAL)

def safe_agent_call(agent_fn, input_data: Any, agent_id: str) -> AgentResult:
    """Wrapper that converts any agent call into a structured result."""
    try:
        result = agent_fn(input_data)
        return AgentResult(agent_id=agent_id, status=AgentStatus.SUCCESS, payload=result)
    except ValueError as e:
        # Input validation failure — don't retry
        return AgentResult(
            agent_id=agent_id,
            status=AgentStatus.FAILED,
            error=str(e),
            error_code="INVALID_INPUT",
        )
    except TimeoutError as e:
        return AgentResult(
            agent_id=agent_id,
            status=AgentStatus.FAILED,
            error=str(e),
            error_code="TIMEOUT",
        )
    except Exception as e:
        return AgentResult(
            agent_id=agent_id,
            status=AgentStatus.FAILED,
            error=str(e),
            error_code="UNEXPECTED_ERROR",
        )

def run_pipeline_with_fallbacks(task: str) -> dict[str, AgentResult]:
    results: dict[str, AgentResult] = {}

    # Stage 1: Research
    results["research"] = safe_agent_call(researcher_agent, task, "researcher")

    # Stage 2: Analysis — gracefully handles upstream failure
    if results["research"].is_usable:
        results["analysis"] = safe_agent_call(
            analyst_agent, results["research"].payload, "analyst"
        )
    else:
        # Use cached/fallback data instead of failing the whole pipeline
        fallback_data = load_fallback_research(task)
        results["analysis"] = AgentResult(
            agent_id="analyst",
            status=AgentStatus.PARTIAL,
            payload=safe_agent_call(analyst_agent, fallback_data, "analyst").payload,
            fallback_used=True,
        )

    # Stage 3: Writing — always runs, reports upstream degradation
    upstream_ok = all(r.is_usable for r in results.values())
    write_input = {
        "research": results["research"].payload or "Research unavailable.",
        "analysis": results["analysis"].payload or "Analysis unavailable.",
        "degraded_mode": not upstream_ok,
    }
    results["writing"] = safe_agent_call(writer_agent, write_input, "writer")

    return results

Failure Mode 2: Deadlocks

Deadlocks occur when agents are waiting for each other in a cycle. In async systems, this often manifests as tasks that simply never complete — no error, no output, just silence.

How to recognize it: Workflow hangs indefinitely. No error in logs. Agent status remains "waiting" or "in_progress" forever.

Mitigation: Timeouts and Dead Letter Queues

import asyncio
import time
from typing import Callable, TypeVar, Awaitable

T = TypeVar("T")

async def with_timeout(
    coro: Awaitable[T],
    timeout_seconds: float,
    agent_id: str,
    fallback: T = None,
) -> T:
    """
    Run a coroutine with a hard timeout. Returns fallback value if timeout occurs.
    Logs the timeout event for alerting.
    """
    try:
        result = await asyncio.wait_for(coro, timeout=timeout_seconds)
        return result
    except asyncio.TimeoutError:
        print(f"[TIMEOUT] Agent '{agent_id}' exceeded {timeout_seconds}s deadline.")
        # In production: send alert, increment timeout_count metric
        if fallback is not None:
            return fallback
        raise TimeoutError(
            f"Agent '{agent_id}' timed out after {timeout_seconds}s. "
            f"Task has been moved to dead letter queue."
        )

class DeadLetterQueue:
    """Stores failed tasks for manual review, retry, or alternative handling."""

    def __init__(self):
        self._queue: list[dict] = []

    def enqueue(
        self,
        task: Any,
        agent_id: str,
        failure_reason: str,
        correlation_id: str,
        retry_count: int = 0,
    ) -> None:
        entry = {
            "enqueued_at": time.time(),
            "task": task,
            "agent_id": agent_id,
            "failure_reason": failure_reason,
            "correlation_id": correlation_id,
            "retry_count": retry_count,
        }
        self._queue.append(entry)
        print(f"[DLQ] Task for '{agent_id}' added to dead letter queue. Reason: {failure_reason}")

    def drain_retriable(self, max_retries: int = 3) -> list[dict]:
        """Return tasks that can still be retried and remove them from the queue."""
        retriable = [t for t in self._queue if t["retry_count"] < max_retries]
        self._queue = [t for t in self._queue if t["retry_count"] >= max_retries]
        return retriable

    def get_failed(self) -> list[dict]:
        """Tasks that exceeded max retries — require human review."""
        return [t for t in self._queue]

dlq = DeadLetterQueue()

async def agent_with_timeout_and_dlq(
    agent_fn: Callable,
    task: Any,
    agent_id: str,
    correlation_id: str,
    timeout: float = 30.0,
    max_retries: int = 3,
    retry_count: int = 0,
) -> Optional[Any]:
    try:
        return await with_timeout(
            asyncio.to_thread(agent_fn, task),
            timeout_seconds=timeout,
            agent_id=agent_id,
        )
    except (TimeoutError, Exception) as e:
        if retry_count < max_retries:
            wait = 2 ** retry_count  # exponential backoff
            print(f"[RETRY] Agent '{agent_id}' failed. Retrying in {wait}s (attempt {retry_count + 1}/{max_retries})")
            await asyncio.sleep(wait)
            return await agent_with_timeout_and_dlq(
                agent_fn, task, agent_id, correlation_id,
                timeout, max_retries, retry_count + 1
            )
        else:
            dlq.enqueue(task, agent_id, str(e), correlation_id, retry_count)
            return None

Failure Mode 3: Infinite Delegation Loops

In hierarchical systems, a supervisor may route a task to a worker, which fails and returns the task, which the supervisor re-routes to the same worker, indefinitely.

How to recognize it: Log shows the same agent pair repeatedly. Task never completes. Token costs spike.

Mitigation: Delegation Depth Tracking

@dataclass
class DelegationContext:
    task_id: str
    correlation_id: str
    depth: int = 0
    visited_agents: list[str] = field(default_factory=list)
    max_depth: int = 5
    max_visits_per_agent: int = 2

    def can_delegate_to(self, agent_id: str) -> tuple[bool, str]:
        if self.depth >= self.max_depth:
            return False, f"Max delegation depth ({self.max_depth}) reached"

        visit_count = self.visited_agents.count(agent_id)
        if visit_count >= self.max_visits_per_agent:
            return False, f"Agent '{agent_id}' already visited {visit_count} times"

        return True, ""

    def record_delegation(self, agent_id: str) -> "DelegationContext":
        return DelegationContext(
            task_id=self.task_id,
            correlation_id=self.correlation_id,
            depth=self.depth + 1,
            visited_agents=self.visited_agents + [agent_id],
            max_depth=self.max_depth,
            max_visits_per_agent=self.max_visits_per_agent,
        )

class SafeSupervisor:
    def delegate(self, task: str, agent_id: str, ctx: DelegationContext) -> Optional[str]:
        can_delegate, reason = ctx.can_delegate_to(agent_id)
        if not can_delegate:
            print(f"[SafeSupervisor] Delegation blocked: {reason}")
            return None  # or raise to trigger fallback

        new_ctx = ctx.record_delegation(agent_id)
        return self._workers[agent_id].run(task, new_ctx)

Failure Mode 4: Conflicting Agent Actions

In concurrent systems, two agents may take actions that contradict each other — one agent writes a file while another deletes it, or two agents both claim the same resource.

Mitigation: Optimistic Locking and Claim Tokens

import redis
import uuid

class ResourceLock:
    """Distributed lock for agent resource claims using Redis."""

    def __init__(self, redis_client: redis.Redis, lock_ttl_seconds: int = 30):
        self.r = redis_client
        self.ttl = lock_ttl_seconds

    def acquire(self, resource_id: str, agent_id: str) -> Optional[str]:
        """
        Attempt to claim a resource. Returns lock_token if successful, None if already claimed.
        agent_id is stored so you know WHO holds the lock.
        """
        lock_key = f"lock:{resource_id}"
        lock_token = str(uuid.uuid4())
        value = f"{agent_id}:{lock_token}"

        acquired = self.r.set(lock_key, value, nx=True, ex=self.ttl)
        if acquired:
            print(f"[Lock] Agent '{agent_id}' acquired lock on '{resource_id}'")
            return lock_token
        else:
            holder = self.r.get(lock_key)
            print(f"[Lock] Agent '{agent_id}' could not acquire '{resource_id}' — held by: {holder}")
            return None

    def release(self, resource_id: str, agent_id: str, lock_token: str) -> bool:
        """Release a lock. Only the holder can release it."""
        lock_key = f"lock:{resource_id}"
        current = self.r.get(lock_key)
        if current == f"{agent_id}:{lock_token}":
            self.r.delete(lock_key)
            return True
        return False  # lock expired or stolen

Failure Mode 5: Hallucination Amplification

Each agent in a chain adds its own potential for hallucination. If Agent A hallucinates a fact, Agent B reasons from that false premise, and Agent C presents the compounded error with confidence.

How to recognize it: Final output contains fabricated claims that trace back to an early agent's incorrect statement.

Mitigation: Confidence Scoring and Verification Gates

from pydantic import BaseModel
from typing import Any

class VerifiedOutput(BaseModel):
    content: Any
    confidence: float        # 0.0 - 1.0, self-reported
    claims: list[str]        # explicit list of factual claims made
    sources: list[str]       # sources cited for each claim
    verified: bool = False

CONFIDENCE_THRESHOLD = 0.75

def verification_gate(output: VerifiedOutput, verifier_agent_fn) -> VerifiedOutput:
    """
    Block low-confidence outputs and route to a verification agent before passing downstream.
    """
    if output.confidence >= CONFIDENCE_THRESHOLD:
        return output

    print(f"[VerificationGate] Low confidence ({output.confidence:.2f}). Routing to verifier.")
    verification_result = verifier_agent_fn(output)

    return VerifiedOutput(
        content=verification_result.get("verified_content", output.content),
        confidence=verification_result.get("revised_confidence", output.confidence),
        claims=output.claims,
        sources=output.sources,
        verified=True,
    )

Circuit Breakers for Agent Dependencies

A circuit breaker prevents repeated calls to a failing agent, giving it time to recover:

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"       # normal operation
    OPEN = "open"           # blocking calls, agent is failing
    HALF_OPEN = "half_open" # testing if agent has recovered

class CircuitBreaker:
    """Per-agent circuit breaker to prevent cascading failures from a failing dependency."""

    def __init__(
        self,
        agent_id: str,
        failure_threshold: int = 3,
        recovery_timeout: float = 60.0,
        success_threshold: int = 2,
    ):
        self.agent_id = agent_id
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: Optional[float] = None

    def call(self, fn: Callable, *args, **kwargs) -> Any:
        if self._state == CircuitState.OPEN:
            if time.time() - self._last_failure_time >= self.recovery_timeout:
                print(f"[CircuitBreaker:{self.agent_id}] Entering HALF_OPEN state.")
                self._state = CircuitState.HALF_OPEN
            else:
                raise RuntimeError(
                    f"Circuit OPEN for agent '{self.agent_id}'. "
                    f"Retry after {self.recovery_timeout}s."
                )

        try:
            result = fn(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self) -> None:
        self._failure_count = 0
        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self.success_threshold:
                print(f"[CircuitBreaker:{self.agent_id}] CLOSED — agent recovered.")
                self._state = CircuitState.CLOSED
                self._success_count = 0

    def _on_failure(self) -> None:
        self._failure_count += 1
        self._last_failure_time = time.time()
        self._success_count = 0

        if self._failure_count >= self.failure_threshold:
            print(f"[CircuitBreaker:{self.agent_id}] OPEN — too many failures ({self._failure_count}).")
            self._state = CircuitState.OPEN

# Usage
breakers: dict[str, CircuitBreaker] = {
    "researcher": CircuitBreaker("researcher"),
    "analyst": CircuitBreaker("analyst"),
}

def resilient_call(agent_id: str, fn: Callable, *args) -> Optional[Any]:
    try:
        return breakers[agent_id].call(fn, *args)
    except RuntimeError as e:
        print(f"[Resilience] {e}")
        return None  # fail gracefully, trigger fallback

Failure Mode Reference Card

Failure ModeDetection SignalPrimary MitigationSecondary Mitigation
Cascading failureDownstream agents receive null/empty inputExplicit AgentResult with statusFallback data sources
DeadlockWorkflow never completes, no errorPer-agent timeoutsDead letter queue
Infinite loopSame agent visited repeatedly in logsDelegation depth trackingMax iteration limit
Conflicting actionsState corruption, resource conflictsDistributed locks (Redis)Optimistic concurrency
Hallucination amplificationFalse claims compound across chainConfidence gatesIndependent verification
Cost explosionToken/API cost spikes unexpectedlyPer-workflow budget limitsCircuit breakers
Tool call stormsExternal API rate limit errorsRequest queuing and throttlingConcurrent call limits

Key Takeaways

  • Multi-agent failures are qualitatively different from single-agent failures — they compound, amplify, and can be silent
  • Every agent boundary is a potential cascade point — wrap every inter-agent call with structured error handling
  • Timeouts are mandatory, not optional — every agent must have a hard deadline regardless of what it's doing
  • Circuit breakers prevent an unhealthy agent from taking down the entire system
  • Hallucination amplification is the most insidious failure — implement confidence scoring and verification gates on outputs that cross agent boundaries
  • Dead letter queues are your audit trail for failures — always capture failed tasks with full context for later analysis