Communication and Coordination

Shared State and Consistency

14m read

Shared State Management in Multi-Agent Systems

Overview

When multiple agents work on a shared goal, they inevitably need to share information: intermediate results, discovered facts, task status, accumulated context. Shared state is the memory that binds a multi-agent system together. Getting it right is one of the hardest engineering challenges in multi-agent design — get it wrong and you'll face race conditions, conflicting writes, and stale data leading to incorrect agent decisions.


The Shared State Problem

Consider three agents working in parallel on a research task:

  • Agent A discovers "Company X was acquired in 2023"
  • Agent B is simultaneously looking up "Company X's current leadership"
  • Agent C is writing a summary that references "Company X"

If Agent C reads state before Agent A's discovery is written, the summary will be incorrect. If Agents A and B both try to update the same company_profile object simultaneously, one update may overwrite the other.

These are the core challenges:

  1. Race conditions — concurrent reads and writes producing inconsistent state
  2. Stale reads — an agent acting on outdated information
  3. Conflicting writes — two agents updating the same field with different values
  4. State explosion — unbounded state growth as agents accumulate data

The Blackboard Pattern

The blackboard is the oldest and most proven pattern for shared state in multi-agent systems. It comes from AI research of the 1970s (the Hearsay-II speech recognition system) and remains highly relevant today.

Concept

A central shared workspace — the "blackboard" — stores all intermediate results. Agents:

  • Read from the blackboard to understand current state
  • Write their results back to the blackboard
  • Subscribe to changes on the blackboard to know when to act

No agent communicates directly with another — all coordination happens through the blackboard.

┌─────────────────────────────────────┐
│           BLACKBOARD                │
│  ┌─────────────────────────────┐   │
│  │ research_findings: [...]    │   │
│  │ analysis_results: {...}     │   │
│  │ task_status: {              │   │
│  │   research: "complete"      │   │
│  │   analysis: "in_progress"   │   │
│  │ }                           │   │
│  │ errors: []                  │   │
│  └─────────────────────────────┘   │
└──────────┬──────────────┬──────────┘
           │              │
    ┌──────▼──────┐ ┌──────▼──────┐
    │  Researcher │ │   Analyst   │
    │    Agent    │ │    Agent    │
    └─────────────┘ └─────────────┘

Implementing the Blackboard Pattern

import asyncio
import json
import hashlib
from datetime import datetime
from typing import Any, Callable, Optional
from dataclasses import dataclass, field
from enum import Enum
import threading

class WriteStrategy(Enum):
    OVERWRITE = "overwrite"       # last writer wins
    MERGE = "merge"               # deep merge dicts
    APPEND = "append"             # append to lists
    OPTIMISTIC_LOCK = "optimistic_lock"  # fail if version mismatch

@dataclass
class BlackboardEntry:
    key: str
    value: Any
    written_by: str           # agent that wrote this value
    timestamp: datetime = field(default_factory=datetime.utcnow)
    version: int = 1
    checksum: str = ""

    def __post_init__(self):
        self.checksum = self._compute_checksum()

    def _compute_checksum(self) -> str:
        content = json.dumps(self.value, sort_keys=True, default=str)
        return hashlib.md5(content.encode()).hexdigest()[:8]

ChangeListener = Callable[[str, BlackboardEntry], None]

class Blackboard:
    """
    Thread-safe shared state store for multi-agent coordination.
    Implements optimistic locking, merge strategies, and change notifications.
    """

    def __init__(self):
        self._store: dict[str, BlackboardEntry] = {}
        self._listeners: dict[str, list[ChangeListener]] = {}  # key → listeners
        self._lock = threading.RLock()  # reentrant for nested reads

    # --- WRITE ---

    def write(
        self,
        key: str,
        value: Any,
        agent_id: str,
        strategy: WriteStrategy = WriteStrategy.OVERWRITE,
        expected_version: Optional[int] = None,
    ) -> BlackboardEntry:
        """
        Write a value to the blackboard with the specified conflict resolution strategy.
        Raises ValueError on optimistic lock version mismatch.
        """
        with self._lock:
            existing = self._store.get(key)

            # Optimistic locking: reject writes where version doesn't match expectation
            if strategy == WriteStrategy.OPTIMISTIC_LOCK:
                current_version = existing.version if existing else 0
                if expected_version is not None and current_version != expected_version:
                    raise ValueError(
                        f"Optimistic lock failed for '{key}': "
                        f"expected version {expected_version}, got {current_version}"
                    )

            new_version = (existing.version + 1) if existing else 1

            # Apply merge strategy
            if strategy == WriteStrategy.MERGE and existing and isinstance(existing.value, dict):
                merged = {**existing.value, **value}
                final_value = merged
            elif strategy == WriteStrategy.APPEND and existing and isinstance(existing.value, list):
                final_value = existing.value + (value if isinstance(value, list) else [value])
            else:
                final_value = value

            entry = BlackboardEntry(
                key=key,
                value=final_value,
                written_by=agent_id,
                version=new_version,
            )
            self._store[key] = entry

            # Notify listeners
            self._notify(key, entry)
            return entry

    # --- READ ---

    def read(self, key: str, default: Any = None) -> Any:
        """Read a value. Returns default if key not found."""
        with self._lock:
            entry = self._store.get(key)
            return entry.value if entry else default

    def read_entry(self, key: str) -> Optional[BlackboardEntry]:
        """Read the full entry with metadata (version, author, timestamp)."""
        with self._lock:
            return self._store.get(key)

    def snapshot(self) -> dict[str, Any]:
        """Return a point-in-time snapshot of all values."""
        with self._lock:
            return {k: v.value for k, v in self._store.items()}

    # --- CHANGE NOTIFICATIONS ---

    def subscribe(self, key_pattern: str, listener: ChangeListener) -> None:
        """Subscribe to changes on a key or key prefix (e.g., 'research.*')."""
        with self._lock:
            if key_pattern not in self._listeners:
                self._listeners[key_pattern] = []
            self._listeners[key_pattern].append(listener)

    def _notify(self, key: str, entry: BlackboardEntry) -> None:
        for pattern, listeners in self._listeners.items():
            if pattern == key or (pattern.endswith("*") and key.startswith(pattern[:-1])):
                for listener in listeners:
                    try:
                        listener(key, entry)
                    except Exception as e:
                        print(f"[Blackboard] Listener error for '{key}': {e}")

    # --- UTILITIES ---

    def keys(self) -> list[str]:
        with self._lock:
            return list(self._store.keys())

    def delete(self, key: str, agent_id: str) -> bool:
        with self._lock:
            if key in self._store:
                del self._store[key]
                return True
            return False

# --- Agent using the blackboard ---

class ResearchAgent:
    def __init__(self, agent_id: str, blackboard: Blackboard):
        self.agent_id = agent_id
        self.blackboard = blackboard

    def run(self, query: str) -> None:
        print(f"[{self.agent_id}] Starting research: {query}")

        # Read existing findings to avoid duplication
        existing = self.blackboard.read("research.findings", default=[])

        # Simulate research (replace with actual LLM call)
        new_findings = [
            {"claim": f"Finding about '{query}'", "confidence": 0.85, "source": "web"},
        ]

        # Append to existing findings (safe for concurrent agents)
        self.blackboard.write(
            key="research.findings",
            value=new_findings,
            agent_id=self.agent_id,
            strategy=WriteStrategy.APPEND,
        )

        # Update task status with optimistic lock to prevent conflicts
        status_entry = self.blackboard.read_entry("task.status")
        current_version = status_entry.version if status_entry else 0
        current_status = status_entry.value if status_entry else {}

        try:
            self.blackboard.write(
                key="task.status",
                value={**current_status, self.agent_id: "complete"},
                agent_id=self.agent_id,
                strategy=WriteStrategy.OPTIMISTIC_LOCK,
                expected_version=current_version,
            )
        except ValueError:
            # Another agent wrote status concurrently — retry
            self.run_status_update()

        print(f"[{self.agent_id}] Research complete.")

    def run_status_update(self):
        """Retry status update after optimistic lock failure."""
        status_entry = self.blackboard.read_entry("task.status")
        current_version = status_entry.version if status_entry else 0
        current_status = status_entry.value if status_entry else {}
        self.blackboard.write(
            key="task.status",
            value={**current_status, self.agent_id: "complete"},
            agent_id=self.agent_id,
            strategy=WriteStrategy.OPTIMISTIC_LOCK,
            expected_version=current_version,
        )

# --- Usage ---
board = Blackboard()

# Subscribe a monitor to all research updates
def on_research_update(key: str, entry: BlackboardEntry):
    print(f"[Monitor] '{key}' updated by {entry.written_by} (v{entry.version})")

board.subscribe("research.*", on_research_update)
board.subscribe("task.*", on_research_update)

# Run agents (in production, these would be separate threads/processes)
agent_a = ResearchAgent("researcher-a", board)
agent_b = ResearchAgent("researcher-b", board)
agent_a.run("transformer architecture")
agent_b.run("mixture of experts")

print("\nFinal blackboard state:")
for key, value in board.snapshot().items():
    print(f"  {key}: {value}")

Tip: The APPEND strategy is your safest default for lists that multiple agents contribute to concurrently. Reserve OPTIMISTIC_LOCK for status fields and counters where only one agent should win.


Event Sourcing for Agent Actions

Instead of storing current state directly, event sourcing records every action that changed state. Current state is derived by replaying the event log.

from dataclasses import dataclass
from datetime import datetime
from typing import Any
import json

@dataclass
class AgentEvent:
    event_id: str
    correlation_id: str
    agent_id: str
    event_type: str        # "task_started", "finding_added", "task_completed", etc.
    payload: dict[str, Any]
    timestamp: datetime = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.utcnow()

class EventLog:
    """Append-only event log. The source of truth for all agent state."""

    def __init__(self):
        self._events: list[AgentEvent] = []

    def append(self, event: AgentEvent) -> None:
        self._events.append(event)

    def replay(self, correlation_id: str = None) -> dict[str, Any]:
        """Replay events to reconstruct current state."""
        state = {"findings": [], "task_status": {}, "errors": []}

        for event in self._events:
            if correlation_id and event.correlation_id != correlation_id:
                continue

            if event.event_type == "finding_added":
                state["findings"].append(event.payload)
            elif event.event_type == "task_status_updated":
                state["task_status"][event.agent_id] = event.payload["status"]
            elif event.event_type == "error_recorded":
                state["errors"].append(event.payload)

        return state

    def get_agent_timeline(self, agent_id: str) -> list[AgentEvent]:
        """Reconstruct everything a specific agent did."""
        return [e for e in self._events if e.agent_id == agent_id]

Benefits of event sourcing for multi-agent systems:

  • Complete audit trail of every agent action
  • Ability to replay and debug any workflow execution
  • Time-travel debugging: reconstruct state at any point in the past
  • Natural conflict resolution: the event log is the truth, not derived state

Distributed State with Redis

For multi-process deployments, use Redis for shared state:

import redis
import json
from typing import Any, Optional

class RedisBlackboard:
    """Production blackboard backed by Redis with atomic operations."""

    def __init__(self, redis_url: str = "redis://localhost:6379", namespace: str = "agent"):
        self.r = redis.from_url(redis_url, decode_responses=True)
        self.ns = namespace

    def _key(self, key: str) -> str:
        return f"{self.ns}:{key}"

    def write(self, key: str, value: Any, agent_id: str, ex: int = 3600) -> None:
        """Write with TTL (seconds). Overwrites existing value."""
        entry = {"value": value, "written_by": agent_id, "ts": datetime.utcnow().isoformat()}
        self.r.set(self._key(key), json.dumps(entry), ex=ex)

    def read(self, key: str, default: Any = None) -> Any:
        raw = self.r.get(self._key(key))
        if raw is None:
            return default
        return json.loads(raw)["value"]

    def append_to_list(self, key: str, value: Any, agent_id: str) -> int:
        """Atomically append to a Redis list. Returns new list length."""
        serialized = json.dumps({"value": value, "written_by": agent_id})
        return self.r.rpush(self._key(key), serialized)

    def read_list(self, key: str) -> list[Any]:
        items = self.r.lrange(self._key(key), 0, -1)
        return [json.loads(item)["value"] for item in items]

    def increment(self, key: str, amount: int = 1) -> int:
        """Atomic counter increment — safe for concurrent agents."""
        return self.r.incrby(self._key(key), amount)

    def set_if_not_exists(self, key: str, value: Any, ex: int = 300) -> bool:
        """Claim a lock or resource. Returns True if claimed, False if already taken."""
        entry = json.dumps({"value": value})
        return bool(self.r.set(self._key(key), entry, ex=ex, nx=True))

Conflict Resolution Strategies Summary

StrategyBehaviorUse When
Last Write WinsNewer value overwrites olderSimple counters, non-critical metadata
AppendValues accumulate in a listFindings, log entries, collected data
MergeDict keys are mergedProfile updates, partial state updates
Optimistic LockWrite fails on version mismatchStatus fields, workflow state transitions
Pessimistic LockExclusive write accessCritical operations, financial data
Event SourcingAll writes are events, state derivedFull audit required, complex state

Key Takeaways

  • The blackboard pattern is the foundation of multi-agent shared state — proven in AI research for over 40 years
  • Choose your write strategy explicitly: APPEND for lists, MERGE for dicts, OPTIMISTIC_LOCK for status
  • Event sourcing trades storage for debuggability — highly recommended for production multi-agent systems
  • Redis provides atomic operations (INCR, RPUSH, SET NX) that make multi-process shared state safe without custom locking
  • Always set TTLs on blackboard entries to prevent unbounded state growth