Observability for Multi-Agent Systems
Overview
A multi-agent system that you can't observe is a multi-agent system you can't debug, optimize, or trust in production. In a distributed system where five agents may be executing simultaneously, a single wrong LLM response can cascade into a completely wrong final output — and without observability, you'll have no idea which agent caused it, when it happened, or why.
Observability for multi-agent systems means being able to answer:
- What did each agent do, and in what order?
- What inputs did each agent receive, and what did it output?
- Where did the workflow diverge from expected behavior?
- Which agent call is responsible for the slowness I'm seeing in production?
The Three Pillars Applied to Multi-Agent Systems
| Pillar | Single-Agent Meaning | Multi-Agent Meaning |
|---|---|---|
| Logs | Structured log of LLM calls | Per-agent structured logs with shared correlation IDs |
| Metrics | Token count, latency, cost | Per-agent breakdowns, aggregated workflow metrics |
| Traces | Single request trace | Distributed trace spanning all agents in a workflow |
Correlation IDs: The Foundation of Multi-Agent Tracing
The single most important observability primitive is the correlation ID — a UUID that is generated once per user request and propagated through every agent call in the workflow.
With a correlation ID:
- You can filter your entire log history to see only the events from one specific workflow execution
- You can reconstruct the exact sequence of agent actions from a single search
- You can cross-reference agent logs, LLM provider logs, and tool execution logs for the same request
Without a correlation ID, you're searching for needles in haystacks.
Correlation ID Propagation
import uuid
import contextvars
import logging
import json
from datetime import datetime
from typing import Any, Optional
from functools import wraps
# Thread/async-safe correlation context
_correlation_id: contextvars.ContextVar[str] = contextvars.ContextVar(
"correlation_id", default=""
)
_agent_id: contextvars.ContextVar[str] = contextvars.ContextVar(
"agent_id", default="unknown"
)
_span_id: contextvars.ContextVar[str] = contextvars.ContextVar(
"span_id", default=""
)
def set_correlation_context(correlation_id: str, agent_id: str, span_id: str = None) -> None:
"""Set the active correlation context for the current async context."""
_correlation_id.set(correlation_id)
_agent_id.set(agent_id)
_span_id.set(span_id or str(uuid.uuid4())[:8])
def get_correlation_id() -> str:
return _correlation_id.get()
def get_agent_id() -> str:
return _agent_id.get()
def new_span_id() -> str:
"""Generate a new span ID for a child operation within the current agent."""
return str(uuid.uuid4())[:8]
# --- Structured JSON logger ---
class AgentLogger:
"""
Structured logger that automatically injects correlation context into every log entry.
Outputs newline-delimited JSON for ingestion by Datadog, CloudWatch, Loki, etc.
"""
def __init__(self, agent_id: str, workflow_id: str = None):
self.agent_id = agent_id
self.workflow_id = workflow_id
self._base_logger = logging.getLogger(f"agent.{agent_id}")
def _build_entry(self, level: str, message: str, **extra: Any) -> dict:
return {
"ts": datetime.utcnow().isoformat() + "Z",
"level": level,
"agent_id": self.agent_id,
"correlation_id": get_correlation_id() or "no-correlation",
"span_id": _span_id.get() or "no-span",
"workflow_id": self.workflow_id,
"message": message,
**extra,
}
def info(self, message: str, **extra: Any) -> None:
entry = self._build_entry("INFO", message, **extra)
self._base_logger.info(json.dumps(entry))
print(json.dumps(entry)) # for dev visibility
def warning(self, message: str, **extra: Any) -> None:
entry = self._build_entry("WARNING", message, **extra)
self._base_logger.warning(json.dumps(entry))
def error(self, message: str, **extra: Any) -> None:
entry = self._build_entry("ERROR", message, **extra)
self._base_logger.error(json.dumps(entry))
def llm_call(
self,
model: str,
prompt_tokens: int,
completion_tokens: int,
latency_ms: float,
success: bool,
**extra: Any,
) -> None:
"""Dedicated log entry for every LLM API call — essential for cost and latency tracking."""
entry = self._build_entry(
"INFO" if success else "ERROR",
"llm_call",
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
estimated_cost_usd=self._estimate_cost(model, prompt_tokens, completion_tokens),
latency_ms=round(latency_ms, 2),
success=success,
**extra,
)
print(json.dumps(entry))
def tool_call(
self,
tool_name: str,
latency_ms: float,
success: bool,
result_summary: str = "",
**extra: Any,
) -> None:
entry = self._build_entry(
"INFO" if success else "ERROR",
"tool_call",
tool_name=tool_name,
latency_ms=round(latency_ms, 2),
success=success,
result_summary=result_summary[:200], # truncate long results
**extra,
)
print(json.dumps(entry))
@staticmethod
def _estimate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float:
# Approximate rates ($/1M tokens) — update as pricing changes
rates = {
"gpt-4o": (2.50, 10.00),
"gpt-4o-mini": (0.15, 0.60),
"claude-3-5-sonnet-20241022": (3.00, 15.00),
"claude-3-haiku-20240307": (0.25, 1.25),
}
prompt_rate, completion_rate = rates.get(model, (1.0, 3.0))
return round(
(prompt_tokens * prompt_rate + completion_tokens * completion_rate) / 1_000_000, 6
)
Distributed Tracing: Spans and Traces
A trace represents one complete workflow execution from user request to final output. A trace is composed of spans — individual units of work (one agent invocation, one LLM call, one tool use).
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Span:
trace_id: str # = correlation_id of the workflow
span_id: str
parent_span_id: Optional[str]
operation: str # e.g., "researcher_agent", "llm_call", "web_search"
agent_id: str
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
status: str = "in_progress" # "success" | "error" | "timeout"
attributes: dict = field(default_factory=dict)
events: list[dict] = field(default_factory=list)
@property
def duration_ms(self) -> Optional[float]:
if self.end_time is None:
return None
return round((self.end_time - self.start_time) * 1000, 2)
def add_event(self, name: str, **attrs: Any) -> None:
self.events.append({"name": name, "ts": time.time(), **attrs})
def set_attribute(self, key: str, value: Any) -> None:
self.attributes[key] = value
def finish(self, status: str = "success") -> None:
self.end_time = time.time()
self.status = status
class Tracer:
"""Simple in-process distributed tracer. In production, export to Jaeger or OTLP."""
def __init__(self):
self._spans: list[Span] = []
self._active_span: contextvars.ContextVar[Optional[Span]] = contextvars.ContextVar(
"active_span", default=None
)
@contextmanager
def start_span(self, operation: str, agent_id: str, trace_id: str = None):
parent = self._active_span.get()
span = Span(
trace_id=trace_id or get_correlation_id() or str(uuid.uuid4()),
span_id=new_span_id(),
parent_span_id=parent.span_id if parent else None,
operation=operation,
agent_id=agent_id,
)
self._spans.append(span)
token = self._active_span.set(span)
try:
yield span
span.finish("success")
except Exception as e:
span.add_event("error", error=str(e))
span.finish("error")
raise
finally:
self._active_span.reset(token)
def get_trace(self, trace_id: str) -> list[Span]:
return [s for s in self._spans if s.trace_id == trace_id]
def render_trace_tree(self, trace_id: str) -> str:
"""Render the trace as an indented tree for debugging."""
spans = sorted(self.get_trace(trace_id), key=lambda s: s.start_time)
span_map = {s.span_id: s for s in spans}
def _render(span: Span, depth: int = 0) -> list[str]:
indent = " " * depth
status_icon = "✓" if span.status == "success" else "✗"
lines = [f"{indent}{status_icon} [{span.agent_id}] {span.operation} ({span.duration_ms}ms)"]
children = [s for s in spans if s.parent_span_id == span.span_id]
for child in children:
lines.extend(_render(child, depth + 1))
return lines
roots = [s for s in spans if s.parent_span_id is None]
output = [f"Trace: {trace_id}"]
for root in roots:
output.extend(_render(root))
return "\n".join(output)
# Global tracer instance (in production, use OpenTelemetry SDK)
tracer = Tracer()
Instrumenting an Agent
import time
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
class ObservableAgent:
"""An agent wrapper that automatically instruments all LLM and tool calls."""
def __init__(self, agent_id: str, role: str, model: str = "gpt-4o-mini"):
self.agent_id = agent_id
self.role = role
self.model = model
self.llm = ChatOpenAI(model=model)
self.logger = AgentLogger(agent_id)
def run(self, task: str, correlation_id: str) -> str:
set_correlation_context(correlation_id, self.agent_id)
with tracer.start_span(f"agent_{self.agent_id}", self.agent_id, trace_id=correlation_id) as span:
span.set_attribute("task_preview", task[:100])
span.set_attribute("model", self.model)
self.logger.info("agent_started", task_preview=task[:100])
system = SystemMessage(content=f"You are a {self.role}. Complete the assigned task.")
t0 = time.time()
try:
with tracer.start_span("llm_call", self.agent_id) as llm_span:
response = self.llm.invoke([system, HumanMessage(content=task)])
latency = (time.time() - t0) * 1000
# Log detailed LLM metrics
usage = response.usage_metadata or {}
self.logger.llm_call(
model=self.model,
prompt_tokens=usage.get("input_tokens", 0),
completion_tokens=usage.get("output_tokens", 0),
latency_ms=latency,
success=True,
)
llm_span.set_attribute("tokens_used", usage.get("total_tokens", 0))
result = response.content
self.logger.info("agent_completed", result_preview=result[:100])
span.set_attribute("output_preview", result[:100])
return result
except Exception as e:
self.logger.error("agent_failed", error=str(e), error_type=type(e).__name__)
raise
# --- Usage ---
correlation_id = str(uuid.uuid4())
agent = ObservableAgent("researcher-1", "research analyst")
result = agent.run("Summarize recent advances in RAG systems", correlation_id)
print("\n" + tracer.render_trace_tree(correlation_id))
Visualizing Agent Interaction Flows
For complex workflows, a Mermaid sequence diagram can be auto-generated from trace data:
def trace_to_mermaid(tracer: Tracer, trace_id: str) -> str:
"""Generate a Mermaid sequence diagram from a trace."""
spans = sorted(tracer.get_trace(trace_id), key=lambda s: s.start_time)
lines = ["sequenceDiagram"]
lines.append(" participant User")
agents = list(dict.fromkeys(s.agent_id for s in spans))
for agent in agents:
lines.append(f" participant {agent}")
for span in spans:
status = "✓" if span.status == "success" else "✗"
if span.parent_span_id is None:
lines.append(f" User->>{span.agent_id}: {span.operation}")
else:
parent = next((s for s in spans if s.span_id == span.parent_span_id), None)
if parent and parent.agent_id != span.agent_id:
lines.append(f" {parent.agent_id}->>{span.agent_id}: {span.operation}")
return "\n".join(lines)
Note: For production observability, integrate with OpenTelemetry and export traces to Jaeger, Honeycomb, or Datadog. LangSmith (the LangChain observability platform) provides purpose-built tracing for LangChain/LangGraph workflows with minimal integration effort.
Debugging Deadlocks and Infinite Loops
Multi-agent deadlocks occur when agents are waiting on each other in a cycle:
Agent A waiting for Agent B's result
Agent B waiting for Agent C's result
Agent C waiting for Agent A's result ← deadlock
Detection Strategy
class LoopDetector:
"""Detects infinite loops and deadlocks in agent execution graphs."""
def __init__(self, max_visits: int = 3):
self.visit_counts: dict[str, int] = {}
self.max_visits = max_visits
def record_visit(self, node_id: str, correlation_id: str) -> None:
key = f"{correlation_id}:{node_id}"
self.visit_counts[key] = self.visit_counts.get(key, 0) + 1
if self.visit_counts[key] >= self.max_visits:
raise RuntimeError(
f"Infinite loop detected: node '{node_id}' visited "
f"{self.visit_counts[key]} times in workflow '{correlation_id}'. "
f"Full visit history: {self.visit_counts}"
)
def get_visit_count(self, node_id: str, correlation_id: str) -> int:
return self.visit_counts.get(f"{correlation_id}:{node_id}", 0)
Key Takeaways
- Correlation IDs are the single most important observability primitive — generate one per user request and propagate it everywhere
- Structured JSON logging enables powerful filtering and aggregation in log management platforms
- Distributed traces give you a timeline view of every agent's actions within a single workflow execution
- Log every LLM call with model, token counts, latency, and estimated cost — this data is critical for optimization
- Build deadlock and infinite loop detection into your observability layer, not just your runtime logic
- For production, use OpenTelemetry + LangSmith or a purpose-built LLM observability platform rather than rolling your own