Inter-Agent Messaging
Overview
As multi-agent systems grow beyond two or three agents, the question of how agents communicate becomes as important as what they say. Ad-hoc string passing between agents doesn't scale — it breaks under concurrent execution, makes debugging nearly impossible, and creates subtle incompatibilities when agents evolve independently.
This lesson covers the principles and practical implementation of structured inter-agent messaging: message formats, communication topologies, synchronous vs. asynchronous patterns, and message broker architectures.
Why Structured Messaging Matters
Consider two agents communicating via raw strings:
# Fragile — no schema, no version, no correlation
result = agent_a.run("analyze this data")
agent_b.run(f"now summarize: {result}")
Problems:
- No error envelope: If agent A fails, agent B receives a partial string with no way to know it's invalid
- No correlation: Can't trace which request produced which response in concurrent scenarios
- No schema evolution: If agent A adds a new field, agent B silently ignores it
- No routing metadata: The message has no sender, priority, or target information
Structured messaging solves all of these.
Message Schema Design
A well-designed inter-agent message is a self-describing envelope:
from pydantic import BaseModel, Field
from typing import Any, Optional, Literal
from datetime import datetime
import uuid
class AgentMessage(BaseModel):
# Identity
message_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
correlation_id: str # links request → response, spans entire workflow
causation_id: Optional[str] = None # the message_id that caused this message
# Routing
sender_id: str # agent that sent this message
recipient_id: str # target agent (or "*" for broadcast)
topic: str # logical channel, e.g. "research.request"
# Content
message_type: Literal["request", "response", "event", "error"]
payload: dict[str, Any]
# Metadata
timestamp: datetime = Field(default_factory=datetime.utcnow)
priority: int = Field(default=5, ge=1, le=10) # 1=low, 10=critical
ttl_seconds: Optional[int] = 300 # message expiry
# Error handling
retry_count: int = 0
max_retries: int = 3
class ErrorPayload(BaseModel):
error_code: str
error_message: str
recoverable: bool
context: dict[str, Any] = {}
# Example messages
research_request = AgentMessage(
correlation_id="workflow-abc-123",
sender_id="supervisor-agent",
recipient_id="researcher-agent",
topic="research.request",
message_type="request",
payload={
"query": "What are the latest developments in transformer architecture?",
"max_sources": 10,
"recency_days": 30,
},
)
research_response = AgentMessage(
correlation_id="workflow-abc-123",
causation_id=research_request.message_id,
sender_id="researcher-agent",
recipient_id="supervisor-agent",
topic="research.response",
message_type="response",
payload={
"findings": [...],
"source_count": 8,
"confidence": 0.87,
},
)
Tip: The
correlation_idis your single most important debugging tool. Always propagate it through the entire workflow — every agent in a chain should forward the original correlation ID unchanged. This lets you reconstruct the full execution trace from logs.
Communication Topologies
Request-Response (Synchronous)
Agent A sends a message and blocks until agent B replies. Simplest to implement, but creates coupling and blocks throughput.
Agent A ──── request ────► Agent B
Agent A ◄─── response ──── Agent B
Best for: Short operations, when agent A genuinely can't proceed without the result.
Fire-and-Forget (Async Event)
Agent A sends a message and immediately continues. Agent B processes in background.
Agent A ──── event ────► Message Queue ──── event ────► Agent B
Best for: Logging, notifications, non-critical side effects.
Publish-Subscribe
Agents publish messages to topics. Multiple agents can subscribe to the same topic — none need to know about each other directly.
Researcher ──── publishes ────► topic: "research.completed"
├──► Analyst Agent (subscribed)
├──► Logger Agent (subscribed)
└──► Notifier Agent (subscribed)
Best for: Fan-out scenarios, event-driven pipelines, decoupled systems.
Request-Reply via Queue (Async Request-Response)
Agent A sends a request with a reply_to address. Agent B responds to that address asynchronously.
Agent A ──► queue:requests ──► Agent B
Agent A ◄── queue:agent-a-replies ◄── Agent B
Best for: Production systems requiring scalability and fault tolerance.
Implementing a Message Broker
The following implements an in-process async message broker that supports pub-sub, request-response, and dead-letter handling:
import asyncio
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Callable, Awaitable, Optional
import uuid
from datetime import datetime, timedelta
MessageHandler = Callable[[AgentMessage], Awaitable[Optional[AgentMessage]]]
@dataclass
class Subscription:
subscriber_id: str
topic_pattern: str # supports "*" wildcard
handler: MessageHandler
class MessageBroker:
"""
In-process async message broker for inter-agent communication.
Supports pub-sub, request-response, and dead-letter queuing.
"""
def __init__(self, dead_letter_ttl_seconds: int = 3600):
self._subscriptions: dict[str, list[Subscription]] = defaultdict(list)
self._pending_replies: dict[str, asyncio.Future] = {}
self._dead_letter_queue: list[AgentMessage] = []
self._dead_letter_ttl = dead_letter_ttl_seconds
self._lock = asyncio.Lock()
def subscribe(self, topic_pattern: str, subscriber_id: str, handler: MessageHandler) -> str:
"""Register an agent handler for messages matching topic_pattern."""
sub = Subscription(
subscriber_id=subscriber_id,
topic_pattern=topic_pattern,
handler=handler,
)
self._subscriptions[topic_pattern].append(sub)
print(f"[Broker] {subscriber_id} subscribed to '{topic_pattern}'")
return sub.subscriber_id
def _matches(self, pattern: str, topic: str) -> bool:
if pattern == "*":
return True
if pattern.endswith(".*"):
prefix = pattern[:-2]
return topic.startswith(prefix)
return pattern == topic
async def publish(self, message: AgentMessage) -> None:
"""Publish a message to all matching subscribers."""
handlers = []
for pattern, subs in self._subscriptions.items():
if self._matches(pattern, message.topic):
handlers.extend(subs)
if not handlers:
print(f"[Broker] No subscribers for topic '{message.topic}' — routing to DLQ")
self._dead_letter_queue.append(message)
return
# Fan out to all matching subscribers concurrently
results = await asyncio.gather(
*[sub.handler(message) for sub in handlers],
return_exceptions=True,
)
# Handle failures: if handler returned an exception, add to DLQ
for sub, result in zip(handlers, results):
if isinstance(result, Exception):
print(f"[Broker] Handler {sub.subscriber_id} failed: {result}")
if message.retry_count < message.max_retries:
retried = message.model_copy(update={"retry_count": message.retry_count + 1})
asyncio.create_task(self._retry_with_backoff(retried, sub))
else:
self._dead_letter_queue.append(message)
async def _retry_with_backoff(self, message: AgentMessage, sub: Subscription) -> None:
delay = 2 ** message.retry_count # exponential backoff
await asyncio.sleep(delay)
try:
await sub.handler(message)
except Exception as e:
print(f"[Broker] Retry {message.retry_count} failed for {sub.subscriber_id}: {e}")
self._dead_letter_queue.append(message)
async def request(
self,
message: AgentMessage,
timeout_seconds: float = 30.0,
) -> AgentMessage:
"""
Send a request and await a response with a timeout.
The response must be published to topic '{original_topic}.reply.{message_id}'.
"""
reply_topic = f"{message.topic}.reply.{message.message_id}"
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._pending_replies[reply_topic] = future
# Subscribe to the reply topic
async def capture_reply(reply: AgentMessage) -> None:
if not future.done():
future.set_result(reply)
self.subscribe(reply_topic, f"reply-listener-{message.message_id}", capture_reply)
await self.publish(message)
try:
response = await asyncio.wait_for(future, timeout=timeout_seconds)
return response
except asyncio.TimeoutError:
raise TimeoutError(
f"No response for message {message.message_id} on topic '{message.topic}' "
f"after {timeout_seconds}s"
)
finally:
self._pending_replies.pop(reply_topic, None)
def drain_dead_letters(self) -> list[AgentMessage]:
"""Retrieve and clear the dead letter queue for inspection or reprocessing."""
dlq = list(self._dead_letter_queue)
self._dead_letter_queue.clear()
return dlq
# --- Example: Wiring agents through the broker ---
broker = MessageBroker()
async def researcher_handler(msg: AgentMessage) -> Optional[AgentMessage]:
"""Simulated researcher agent."""
print(f"[Researcher] Received: {msg.payload['query']}")
await asyncio.sleep(0.1) # simulate work
# Respond to the reply topic
reply = AgentMessage(
correlation_id=msg.correlation_id,
causation_id=msg.message_id,
sender_id="researcher-agent",
recipient_id=msg.sender_id,
topic=f"{msg.topic}.reply.{msg.message_id}",
message_type="response",
payload={"findings": ["Finding A", "Finding B"], "confidence": 0.9},
)
await broker.publish(reply)
return reply
async def analyst_handler(msg: AgentMessage) -> None:
"""Simulated analyst agent — subscribes to all research completions."""
print(f"[Analyst] Processing research findings: {msg.payload}")
# Subscribe agents to topics
broker.subscribe("research.request", "researcher-agent", researcher_handler)
broker.subscribe("research.response", "analyst-agent", analyst_handler)
# Supervisor sends a research request
async def run_example():
request = AgentMessage(
correlation_id=str(uuid.uuid4()),
sender_id="supervisor-agent",
recipient_id="researcher-agent",
topic="research.request",
message_type="request",
payload={"query": "Latest advances in mixture-of-experts models"},
)
try:
response = await broker.request(request, timeout_seconds=5.0)
print(f"\n[Supervisor] Response received: {response.payload}")
except TimeoutError as e:
print(f"[Supervisor] Request timed out: {e}")
asyncio.run(run_example())
Redis Streams for Production Messaging
For multi-process agent deployments, use Redis Streams as a persistent, distributed message bus:
import redis.asyncio as redis
import json
async def publish_to_redis(stream: str, message: AgentMessage, r: redis.Redis) -> str:
"""Publish a message to a Redis Stream."""
payload = {
"data": message.model_dump_json(),
"correlation_id": message.correlation_id,
"message_type": message.message_type,
}
msg_id = await r.xadd(stream, payload)
return msg_id
async def consume_from_redis(
stream: str,
consumer_group: str,
consumer_name: str,
r: redis.Redis,
handler: MessageHandler,
) -> None:
"""Consume messages from a Redis Stream as part of a consumer group."""
# Create consumer group if it doesn't exist
try:
await r.xgroup_create(stream, consumer_group, id="0", mkstream=True)
except redis.ResponseError:
pass # group already exists
while True:
messages = await r.xreadgroup(
consumer_group, consumer_name, {stream: ">"}, count=10, block=1000
)
for _, stream_messages in messages:
for msg_id, fields in stream_messages:
msg = AgentMessage.model_validate_json(fields[b"data"])
try:
await handler(msg)
await r.xack(stream, consumer_group, msg_id)
except Exception as e:
print(f"[Redis Consumer] Failed processing {msg_id}: {e}")
# Message stays unacknowledged — can be reclaimed after PEL timeout
Message Design Checklist
| Requirement | Implementation |
|---|---|
| Unique message identity | message_id (UUID) |
| Workflow correlation | correlation_id propagated end-to-end |
| Causal tracing | causation_id = parent's message_id |
| Failure handling | retry_count, max_retries, dead-letter queue |
| Priority routing | priority field (1–10) |
| Message expiry | ttl_seconds checked before processing |
| Schema versioning | Add schema_version: str field |
Key Takeaways
- Always use structured message schemas — raw string passing is a maintenance trap that breaks at scale
- The
correlation_idis the backbone of distributed tracing in multi-agent systems; never let it get dropped - Choose your topology based on coupling needs: pub-sub for fan-out, request-response for dependencies, fire-and-forget for side effects
- Implement dead-letter queues from the start — they are your safety net for silent failures
- For multi-process deployments, Redis Streams or a message queue (RabbitMQ, Kafka) provides persistence, consumer groups, and horizontal scaling