Failure Modes in Multi-Agent Systems
Overview
Multi-agent systems introduce a class of failures that don't exist in single-agent applications. When agents interact, the system becomes more than the sum of its parts — but so do the failure modes. A single misbehaving agent can cascade failures across the entire system, cause resource exhaustion, or produce subtly wrong outputs that are harder to detect than an outright error.
This lesson catalogs the most critical failure modes, explains how to recognize them, and provides concrete mitigation strategies and code implementations.
Taxonomy of Multi-Agent Failures
Multi-Agent Failures
├── Structural Failures
│ ├── Cascading failures (agent failure propagates downstream)
│ ├── Deadlocks (agents block each other indefinitely)
│ └── Infinite delegation loops (circular task routing)
│
├── Behavioral Failures
│ ├── Conflicting agent actions (agents work against each other)
│ ├── Prompt injection across agents (one agent corrupts another's input)
│ └── Hallucination amplification (errors compound through agent chain)
│
└── Resource Failures
├── Token budget exhaustion (conversation grows unbounded)
├── Cost explosion (runaway LLM calls)
└── Tool call storms (concurrent agents overloading external APIs)
Failure Mode 1: Cascading Failures
In a sequential pipeline (A → B → C), if agent B fails, agent C has no input. Without proper error handling, C either fails silently, uses stale data, or produces completely wrong output.
How to recognize it: Downstream agents report "no data to process" or produce outputs based on empty/default inputs.
Mitigation: Explicit Error Propagation
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
class AgentStatus(Enum):
SUCCESS = "success"
PARTIAL = "partial" # some results but degraded
FAILED = "failed"
SKIPPED = "skipped" # intentionally not run
@dataclass
class AgentResult:
agent_id: str
status: AgentStatus
payload: Optional[Any] = None
error: Optional[str] = None
error_code: Optional[str] = None
fallback_used: bool = False
@property
def is_usable(self) -> bool:
return self.status in (AgentStatus.SUCCESS, AgentStatus.PARTIAL)
def safe_agent_call(agent_fn, input_data: Any, agent_id: str) -> AgentResult:
"""Wrapper that converts any agent call into a structured result."""
try:
result = agent_fn(input_data)
return AgentResult(agent_id=agent_id, status=AgentStatus.SUCCESS, payload=result)
except ValueError as e:
# Input validation failure — don't retry
return AgentResult(
agent_id=agent_id,
status=AgentStatus.FAILED,
error=str(e),
error_code="INVALID_INPUT",
)
except TimeoutError as e:
return AgentResult(
agent_id=agent_id,
status=AgentStatus.FAILED,
error=str(e),
error_code="TIMEOUT",
)
except Exception as e:
return AgentResult(
agent_id=agent_id,
status=AgentStatus.FAILED,
error=str(e),
error_code="UNEXPECTED_ERROR",
)
def run_pipeline_with_fallbacks(task: str) -> dict[str, AgentResult]:
results: dict[str, AgentResult] = {}
# Stage 1: Research
results["research"] = safe_agent_call(researcher_agent, task, "researcher")
# Stage 2: Analysis — gracefully handles upstream failure
if results["research"].is_usable:
results["analysis"] = safe_agent_call(
analyst_agent, results["research"].payload, "analyst"
)
else:
# Use cached/fallback data instead of failing the whole pipeline
fallback_data = load_fallback_research(task)
results["analysis"] = AgentResult(
agent_id="analyst",
status=AgentStatus.PARTIAL,
payload=safe_agent_call(analyst_agent, fallback_data, "analyst").payload,
fallback_used=True,
)
# Stage 3: Writing — always runs, reports upstream degradation
upstream_ok = all(r.is_usable for r in results.values())
write_input = {
"research": results["research"].payload or "Research unavailable.",
"analysis": results["analysis"].payload or "Analysis unavailable.",
"degraded_mode": not upstream_ok,
}
results["writing"] = safe_agent_call(writer_agent, write_input, "writer")
return results
Failure Mode 2: Deadlocks
Deadlocks occur when agents are waiting for each other in a cycle. In async systems, this often manifests as tasks that simply never complete — no error, no output, just silence.
How to recognize it: Workflow hangs indefinitely. No error in logs. Agent status remains "waiting" or "in_progress" forever.
Mitigation: Timeouts and Dead Letter Queues
import asyncio
import time
from typing import Callable, TypeVar, Awaitable
T = TypeVar("T")
async def with_timeout(
coro: Awaitable[T],
timeout_seconds: float,
agent_id: str,
fallback: T = None,
) -> T:
"""
Run a coroutine with a hard timeout. Returns fallback value if timeout occurs.
Logs the timeout event for alerting.
"""
try:
result = await asyncio.wait_for(coro, timeout=timeout_seconds)
return result
except asyncio.TimeoutError:
print(f"[TIMEOUT] Agent '{agent_id}' exceeded {timeout_seconds}s deadline.")
# In production: send alert, increment timeout_count metric
if fallback is not None:
return fallback
raise TimeoutError(
f"Agent '{agent_id}' timed out after {timeout_seconds}s. "
f"Task has been moved to dead letter queue."
)
class DeadLetterQueue:
"""Stores failed tasks for manual review, retry, or alternative handling."""
def __init__(self):
self._queue: list[dict] = []
def enqueue(
self,
task: Any,
agent_id: str,
failure_reason: str,
correlation_id: str,
retry_count: int = 0,
) -> None:
entry = {
"enqueued_at": time.time(),
"task": task,
"agent_id": agent_id,
"failure_reason": failure_reason,
"correlation_id": correlation_id,
"retry_count": retry_count,
}
self._queue.append(entry)
print(f"[DLQ] Task for '{agent_id}' added to dead letter queue. Reason: {failure_reason}")
def drain_retriable(self, max_retries: int = 3) -> list[dict]:
"""Return tasks that can still be retried and remove them from the queue."""
retriable = [t for t in self._queue if t["retry_count"] < max_retries]
self._queue = [t for t in self._queue if t["retry_count"] >= max_retries]
return retriable
def get_failed(self) -> list[dict]:
"""Tasks that exceeded max retries — require human review."""
return [t for t in self._queue]
dlq = DeadLetterQueue()
async def agent_with_timeout_and_dlq(
agent_fn: Callable,
task: Any,
agent_id: str,
correlation_id: str,
timeout: float = 30.0,
max_retries: int = 3,
retry_count: int = 0,
) -> Optional[Any]:
try:
return await with_timeout(
asyncio.to_thread(agent_fn, task),
timeout_seconds=timeout,
agent_id=agent_id,
)
except (TimeoutError, Exception) as e:
if retry_count < max_retries:
wait = 2 ** retry_count # exponential backoff
print(f"[RETRY] Agent '{agent_id}' failed. Retrying in {wait}s (attempt {retry_count + 1}/{max_retries})")
await asyncio.sleep(wait)
return await agent_with_timeout_and_dlq(
agent_fn, task, agent_id, correlation_id,
timeout, max_retries, retry_count + 1
)
else:
dlq.enqueue(task, agent_id, str(e), correlation_id, retry_count)
return None
Failure Mode 3: Infinite Delegation Loops
In hierarchical systems, a supervisor may route a task to a worker, which fails and returns the task, which the supervisor re-routes to the same worker, indefinitely.
How to recognize it: Log shows the same agent pair repeatedly. Task never completes. Token costs spike.
Mitigation: Delegation Depth Tracking
@dataclass
class DelegationContext:
task_id: str
correlation_id: str
depth: int = 0
visited_agents: list[str] = field(default_factory=list)
max_depth: int = 5
max_visits_per_agent: int = 2
def can_delegate_to(self, agent_id: str) -> tuple[bool, str]:
if self.depth >= self.max_depth:
return False, f"Max delegation depth ({self.max_depth}) reached"
visit_count = self.visited_agents.count(agent_id)
if visit_count >= self.max_visits_per_agent:
return False, f"Agent '{agent_id}' already visited {visit_count} times"
return True, ""
def record_delegation(self, agent_id: str) -> "DelegationContext":
return DelegationContext(
task_id=self.task_id,
correlation_id=self.correlation_id,
depth=self.depth + 1,
visited_agents=self.visited_agents + [agent_id],
max_depth=self.max_depth,
max_visits_per_agent=self.max_visits_per_agent,
)
class SafeSupervisor:
def delegate(self, task: str, agent_id: str, ctx: DelegationContext) -> Optional[str]:
can_delegate, reason = ctx.can_delegate_to(agent_id)
if not can_delegate:
print(f"[SafeSupervisor] Delegation blocked: {reason}")
return None # or raise to trigger fallback
new_ctx = ctx.record_delegation(agent_id)
return self._workers[agent_id].run(task, new_ctx)
Failure Mode 4: Conflicting Agent Actions
In concurrent systems, two agents may take actions that contradict each other — one agent writes a file while another deletes it, or two agents both claim the same resource.
Mitigation: Optimistic Locking and Claim Tokens
import redis
import uuid
class ResourceLock:
"""Distributed lock for agent resource claims using Redis."""
def __init__(self, redis_client: redis.Redis, lock_ttl_seconds: int = 30):
self.r = redis_client
self.ttl = lock_ttl_seconds
def acquire(self, resource_id: str, agent_id: str) -> Optional[str]:
"""
Attempt to claim a resource. Returns lock_token if successful, None if already claimed.
agent_id is stored so you know WHO holds the lock.
"""
lock_key = f"lock:{resource_id}"
lock_token = str(uuid.uuid4())
value = f"{agent_id}:{lock_token}"
acquired = self.r.set(lock_key, value, nx=True, ex=self.ttl)
if acquired:
print(f"[Lock] Agent '{agent_id}' acquired lock on '{resource_id}'")
return lock_token
else:
holder = self.r.get(lock_key)
print(f"[Lock] Agent '{agent_id}' could not acquire '{resource_id}' — held by: {holder}")
return None
def release(self, resource_id: str, agent_id: str, lock_token: str) -> bool:
"""Release a lock. Only the holder can release it."""
lock_key = f"lock:{resource_id}"
current = self.r.get(lock_key)
if current == f"{agent_id}:{lock_token}":
self.r.delete(lock_key)
return True
return False # lock expired or stolen
Failure Mode 5: Hallucination Amplification
Each agent in a chain adds its own potential for hallucination. If Agent A hallucinates a fact, Agent B reasons from that false premise, and Agent C presents the compounded error with confidence.
How to recognize it: Final output contains fabricated claims that trace back to an early agent's incorrect statement.
Mitigation: Confidence Scoring and Verification Gates
from pydantic import BaseModel
from typing import Any
class VerifiedOutput(BaseModel):
content: Any
confidence: float # 0.0 - 1.0, self-reported
claims: list[str] # explicit list of factual claims made
sources: list[str] # sources cited for each claim
verified: bool = False
CONFIDENCE_THRESHOLD = 0.75
def verification_gate(output: VerifiedOutput, verifier_agent_fn) -> VerifiedOutput:
"""
Block low-confidence outputs and route to a verification agent before passing downstream.
"""
if output.confidence >= CONFIDENCE_THRESHOLD:
return output
print(f"[VerificationGate] Low confidence ({output.confidence:.2f}). Routing to verifier.")
verification_result = verifier_agent_fn(output)
return VerifiedOutput(
content=verification_result.get("verified_content", output.content),
confidence=verification_result.get("revised_confidence", output.confidence),
claims=output.claims,
sources=output.sources,
verified=True,
)
Circuit Breakers for Agent Dependencies
A circuit breaker prevents repeated calls to a failing agent, giving it time to recover:
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # normal operation
OPEN = "open" # blocking calls, agent is failing
HALF_OPEN = "half_open" # testing if agent has recovered
class CircuitBreaker:
"""Per-agent circuit breaker to prevent cascading failures from a failing dependency."""
def __init__(
self,
agent_id: str,
failure_threshold: int = 3,
recovery_timeout: float = 60.0,
success_threshold: int = 2,
):
self.agent_id = agent_id
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: Optional[float] = None
def call(self, fn: Callable, *args, **kwargs) -> Any:
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time >= self.recovery_timeout:
print(f"[CircuitBreaker:{self.agent_id}] Entering HALF_OPEN state.")
self._state = CircuitState.HALF_OPEN
else:
raise RuntimeError(
f"Circuit OPEN for agent '{self.agent_id}'. "
f"Retry after {self.recovery_timeout}s."
)
try:
result = fn(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self) -> None:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
print(f"[CircuitBreaker:{self.agent_id}] CLOSED — agent recovered.")
self._state = CircuitState.CLOSED
self._success_count = 0
def _on_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.time()
self._success_count = 0
if self._failure_count >= self.failure_threshold:
print(f"[CircuitBreaker:{self.agent_id}] OPEN — too many failures ({self._failure_count}).")
self._state = CircuitState.OPEN
# Usage
breakers: dict[str, CircuitBreaker] = {
"researcher": CircuitBreaker("researcher"),
"analyst": CircuitBreaker("analyst"),
}
def resilient_call(agent_id: str, fn: Callable, *args) -> Optional[Any]:
try:
return breakers[agent_id].call(fn, *args)
except RuntimeError as e:
print(f"[Resilience] {e}")
return None # fail gracefully, trigger fallback
Failure Mode Reference Card
| Failure Mode | Detection Signal | Primary Mitigation | Secondary Mitigation |
|---|---|---|---|
| Cascading failure | Downstream agents receive null/empty input | Explicit AgentResult with status | Fallback data sources |
| Deadlock | Workflow never completes, no error | Per-agent timeouts | Dead letter queue |
| Infinite loop | Same agent visited repeatedly in logs | Delegation depth tracking | Max iteration limit |
| Conflicting actions | State corruption, resource conflicts | Distributed locks (Redis) | Optimistic concurrency |
| Hallucination amplification | False claims compound across chain | Confidence gates | Independent verification |
| Cost explosion | Token/API cost spikes unexpectedly | Per-workflow budget limits | Circuit breakers |
| Tool call storms | External API rate limit errors | Request queuing and throttling | Concurrent call limits |
Key Takeaways
- Multi-agent failures are qualitatively different from single-agent failures — they compound, amplify, and can be silent
- Every agent boundary is a potential cascade point — wrap every inter-agent call with structured error handling
- Timeouts are mandatory, not optional — every agent must have a hard deadline regardless of what it's doing
- Circuit breakers prevent an unhealthy agent from taking down the entire system
- Hallucination amplification is the most insidious failure — implement confidence scoring and verification gates on outputs that cross agent boundaries
- Dead letter queues are your audit trail for failures — always capture failed tasks with full context for later analysis