Communication and Coordination

Inter-Agent Communication Protocols

14m read

Inter-Agent Messaging

Overview

As multi-agent systems grow beyond two or three agents, the question of how agents communicate becomes as important as what they say. Ad-hoc string passing between agents doesn't scale — it breaks under concurrent execution, makes debugging nearly impossible, and creates subtle incompatibilities when agents evolve independently.

This lesson covers the principles and practical implementation of structured inter-agent messaging: message formats, communication topologies, synchronous vs. asynchronous patterns, and message broker architectures.


Why Structured Messaging Matters

Consider two agents communicating via raw strings:

# Fragile — no schema, no version, no correlation
result = agent_a.run("analyze this data")
agent_b.run(f"now summarize: {result}")

Problems:

  • No error envelope: If agent A fails, agent B receives a partial string with no way to know it's invalid
  • No correlation: Can't trace which request produced which response in concurrent scenarios
  • No schema evolution: If agent A adds a new field, agent B silently ignores it
  • No routing metadata: The message has no sender, priority, or target information

Structured messaging solves all of these.


Message Schema Design

A well-designed inter-agent message is a self-describing envelope:

from pydantic import BaseModel, Field
from typing import Any, Optional, Literal
from datetime import datetime
import uuid

class AgentMessage(BaseModel):
    # Identity
    message_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    correlation_id: str  # links request → response, spans entire workflow
    causation_id: Optional[str] = None  # the message_id that caused this message

    # Routing
    sender_id: str       # agent that sent this message
    recipient_id: str    # target agent (or "*" for broadcast)
    topic: str           # logical channel, e.g. "research.request"

    # Content
    message_type: Literal["request", "response", "event", "error"]
    payload: dict[str, Any]

    # Metadata
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    priority: int = Field(default=5, ge=1, le=10)  # 1=low, 10=critical
    ttl_seconds: Optional[int] = 300  # message expiry

    # Error handling
    retry_count: int = 0
    max_retries: int = 3

class ErrorPayload(BaseModel):
    error_code: str
    error_message: str
    recoverable: bool
    context: dict[str, Any] = {}

# Example messages
research_request = AgentMessage(
    correlation_id="workflow-abc-123",
    sender_id="supervisor-agent",
    recipient_id="researcher-agent",
    topic="research.request",
    message_type="request",
    payload={
        "query": "What are the latest developments in transformer architecture?",
        "max_sources": 10,
        "recency_days": 30,
    },
)

research_response = AgentMessage(
    correlation_id="workflow-abc-123",
    causation_id=research_request.message_id,
    sender_id="researcher-agent",
    recipient_id="supervisor-agent",
    topic="research.response",
    message_type="response",
    payload={
        "findings": [...],
        "source_count": 8,
        "confidence": 0.87,
    },
)

Tip: The correlation_id is your single most important debugging tool. Always propagate it through the entire workflow — every agent in a chain should forward the original correlation ID unchanged. This lets you reconstruct the full execution trace from logs.


Communication Topologies

Request-Response (Synchronous)

Agent A sends a message and blocks until agent B replies. Simplest to implement, but creates coupling and blocks throughput.

Agent A ──── request ────► Agent B
Agent A ◄─── response ──── Agent B

Best for: Short operations, when agent A genuinely can't proceed without the result.

Fire-and-Forget (Async Event)

Agent A sends a message and immediately continues. Agent B processes in background.

Agent A ──── event ────► Message Queue ──── event ────► Agent B

Best for: Logging, notifications, non-critical side effects.

Publish-Subscribe

Agents publish messages to topics. Multiple agents can subscribe to the same topic — none need to know about each other directly.

Researcher ──── publishes ────► topic: "research.completed"
                                    ├──► Analyst Agent (subscribed)
                                    ├──► Logger Agent (subscribed)
                                    └──► Notifier Agent (subscribed)

Best for: Fan-out scenarios, event-driven pipelines, decoupled systems.

Request-Reply via Queue (Async Request-Response)

Agent A sends a request with a reply_to address. Agent B responds to that address asynchronously.

Agent A ──► queue:requests ──► Agent B
Agent A ◄── queue:agent-a-replies ◄── Agent B

Best for: Production systems requiring scalability and fault tolerance.


Implementing a Message Broker

The following implements an in-process async message broker that supports pub-sub, request-response, and dead-letter handling:

import asyncio
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Callable, Awaitable, Optional
import uuid
from datetime import datetime, timedelta

MessageHandler = Callable[[AgentMessage], Awaitable[Optional[AgentMessage]]]

@dataclass
class Subscription:
    subscriber_id: str
    topic_pattern: str  # supports "*" wildcard
    handler: MessageHandler

class MessageBroker:
    """
    In-process async message broker for inter-agent communication.
    Supports pub-sub, request-response, and dead-letter queuing.
    """

    def __init__(self, dead_letter_ttl_seconds: int = 3600):
        self._subscriptions: dict[str, list[Subscription]] = defaultdict(list)
        self._pending_replies: dict[str, asyncio.Future] = {}
        self._dead_letter_queue: list[AgentMessage] = []
        self._dead_letter_ttl = dead_letter_ttl_seconds
        self._lock = asyncio.Lock()

    def subscribe(self, topic_pattern: str, subscriber_id: str, handler: MessageHandler) -> str:
        """Register an agent handler for messages matching topic_pattern."""
        sub = Subscription(
            subscriber_id=subscriber_id,
            topic_pattern=topic_pattern,
            handler=handler,
        )
        self._subscriptions[topic_pattern].append(sub)
        print(f"[Broker] {subscriber_id} subscribed to '{topic_pattern}'")
        return sub.subscriber_id

    def _matches(self, pattern: str, topic: str) -> bool:
        if pattern == "*":
            return True
        if pattern.endswith(".*"):
            prefix = pattern[:-2]
            return topic.startswith(prefix)
        return pattern == topic

    async def publish(self, message: AgentMessage) -> None:
        """Publish a message to all matching subscribers."""
        handlers = []
        for pattern, subs in self._subscriptions.items():
            if self._matches(pattern, message.topic):
                handlers.extend(subs)

        if not handlers:
            print(f"[Broker] No subscribers for topic '{message.topic}' — routing to DLQ")
            self._dead_letter_queue.append(message)
            return

        # Fan out to all matching subscribers concurrently
        results = await asyncio.gather(
            *[sub.handler(message) for sub in handlers],
            return_exceptions=True,
        )

        # Handle failures: if handler returned an exception, add to DLQ
        for sub, result in zip(handlers, results):
            if isinstance(result, Exception):
                print(f"[Broker] Handler {sub.subscriber_id} failed: {result}")
                if message.retry_count < message.max_retries:
                    retried = message.model_copy(update={"retry_count": message.retry_count + 1})
                    asyncio.create_task(self._retry_with_backoff(retried, sub))
                else:
                    self._dead_letter_queue.append(message)

    async def _retry_with_backoff(self, message: AgentMessage, sub: Subscription) -> None:
        delay = 2 ** message.retry_count  # exponential backoff
        await asyncio.sleep(delay)
        try:
            await sub.handler(message)
        except Exception as e:
            print(f"[Broker] Retry {message.retry_count} failed for {sub.subscriber_id}: {e}")
            self._dead_letter_queue.append(message)

    async def request(
        self,
        message: AgentMessage,
        timeout_seconds: float = 30.0,
    ) -> AgentMessage:
        """
        Send a request and await a response with a timeout.
        The response must be published to topic '{original_topic}.reply.{message_id}'.
        """
        reply_topic = f"{message.topic}.reply.{message.message_id}"
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self._pending_replies[reply_topic] = future

        # Subscribe to the reply topic
        async def capture_reply(reply: AgentMessage) -> None:
            if not future.done():
                future.set_result(reply)

        self.subscribe(reply_topic, f"reply-listener-{message.message_id}", capture_reply)
        await self.publish(message)

        try:
            response = await asyncio.wait_for(future, timeout=timeout_seconds)
            return response
        except asyncio.TimeoutError:
            raise TimeoutError(
                f"No response for message {message.message_id} on topic '{message.topic}' "
                f"after {timeout_seconds}s"
            )
        finally:
            self._pending_replies.pop(reply_topic, None)

    def drain_dead_letters(self) -> list[AgentMessage]:
        """Retrieve and clear the dead letter queue for inspection or reprocessing."""
        dlq = list(self._dead_letter_queue)
        self._dead_letter_queue.clear()
        return dlq

# --- Example: Wiring agents through the broker ---

broker = MessageBroker()

async def researcher_handler(msg: AgentMessage) -> Optional[AgentMessage]:
    """Simulated researcher agent."""
    print(f"[Researcher] Received: {msg.payload['query']}")
    await asyncio.sleep(0.1)  # simulate work

    # Respond to the reply topic
    reply = AgentMessage(
        correlation_id=msg.correlation_id,
        causation_id=msg.message_id,
        sender_id="researcher-agent",
        recipient_id=msg.sender_id,
        topic=f"{msg.topic}.reply.{msg.message_id}",
        message_type="response",
        payload={"findings": ["Finding A", "Finding B"], "confidence": 0.9},
    )
    await broker.publish(reply)
    return reply

async def analyst_handler(msg: AgentMessage) -> None:
    """Simulated analyst agent — subscribes to all research completions."""
    print(f"[Analyst] Processing research findings: {msg.payload}")

# Subscribe agents to topics
broker.subscribe("research.request", "researcher-agent", researcher_handler)
broker.subscribe("research.response", "analyst-agent", analyst_handler)

# Supervisor sends a research request
async def run_example():
    request = AgentMessage(
        correlation_id=str(uuid.uuid4()),
        sender_id="supervisor-agent",
        recipient_id="researcher-agent",
        topic="research.request",
        message_type="request",
        payload={"query": "Latest advances in mixture-of-experts models"},
    )

    try:
        response = await broker.request(request, timeout_seconds=5.0)
        print(f"\n[Supervisor] Response received: {response.payload}")
    except TimeoutError as e:
        print(f"[Supervisor] Request timed out: {e}")

asyncio.run(run_example())

Redis Streams for Production Messaging

For multi-process agent deployments, use Redis Streams as a persistent, distributed message bus:

import redis.asyncio as redis
import json

async def publish_to_redis(stream: str, message: AgentMessage, r: redis.Redis) -> str:
    """Publish a message to a Redis Stream."""
    payload = {
        "data": message.model_dump_json(),
        "correlation_id": message.correlation_id,
        "message_type": message.message_type,
    }
    msg_id = await r.xadd(stream, payload)
    return msg_id

async def consume_from_redis(
    stream: str,
    consumer_group: str,
    consumer_name: str,
    r: redis.Redis,
    handler: MessageHandler,
) -> None:
    """Consume messages from a Redis Stream as part of a consumer group."""
    # Create consumer group if it doesn't exist
    try:
        await r.xgroup_create(stream, consumer_group, id="0", mkstream=True)
    except redis.ResponseError:
        pass  # group already exists

    while True:
        messages = await r.xreadgroup(
            consumer_group, consumer_name, {stream: ">"}, count=10, block=1000
        )
        for _, stream_messages in messages:
            for msg_id, fields in stream_messages:
                msg = AgentMessage.model_validate_json(fields[b"data"])
                try:
                    await handler(msg)
                    await r.xack(stream, consumer_group, msg_id)
                except Exception as e:
                    print(f"[Redis Consumer] Failed processing {msg_id}: {e}")
                    # Message stays unacknowledged — can be reclaimed after PEL timeout

Message Design Checklist

RequirementImplementation
Unique message identitymessage_id (UUID)
Workflow correlationcorrelation_id propagated end-to-end
Causal tracingcausation_id = parent's message_id
Failure handlingretry_count, max_retries, dead-letter queue
Priority routingpriority field (1–10)
Message expiryttl_seconds checked before processing
Schema versioningAdd schema_version: str field

Key Takeaways

  • Always use structured message schemas — raw string passing is a maintenance trap that breaks at scale
  • The correlation_id is the backbone of distributed tracing in multi-agent systems; never let it get dropped
  • Choose your topology based on coupling needs: pub-sub for fan-out, request-response for dependencies, fire-and-forget for side effects
  • Implement dead-letter queues from the start — they are your safety net for silent failures
  • For multi-process deployments, Redis Streams or a message queue (RabbitMQ, Kafka) provides persistence, consumer groups, and horizontal scaling