Shared State Management in Multi-Agent Systems
Overview
When multiple agents work on a shared goal, they inevitably need to share information: intermediate results, discovered facts, task status, accumulated context. Shared state is the memory that binds a multi-agent system together. Getting it right is one of the hardest engineering challenges in multi-agent design — get it wrong and you'll face race conditions, conflicting writes, and stale data leading to incorrect agent decisions.
The Shared State Problem
Consider three agents working in parallel on a research task:
- Agent A discovers "Company X was acquired in 2023"
- Agent B is simultaneously looking up "Company X's current leadership"
- Agent C is writing a summary that references "Company X"
If Agent C reads state before Agent A's discovery is written, the summary will be incorrect. If Agents A and B both try to update the same company_profile object simultaneously, one update may overwrite the other.
These are the core challenges:
- Race conditions — concurrent reads and writes producing inconsistent state
- Stale reads — an agent acting on outdated information
- Conflicting writes — two agents updating the same field with different values
- State explosion — unbounded state growth as agents accumulate data
The Blackboard Pattern
The blackboard is the oldest and most proven pattern for shared state in multi-agent systems. It comes from AI research of the 1970s (the Hearsay-II speech recognition system) and remains highly relevant today.
Concept
A central shared workspace — the "blackboard" — stores all intermediate results. Agents:
- Read from the blackboard to understand current state
- Write their results back to the blackboard
- Subscribe to changes on the blackboard to know when to act
No agent communicates directly with another — all coordination happens through the blackboard.
┌─────────────────────────────────────┐
│ BLACKBOARD │
│ ┌─────────────────────────────┐ │
│ │ research_findings: [...] │ │
│ │ analysis_results: {...} │ │
│ │ task_status: { │ │
│ │ research: "complete" │ │
│ │ analysis: "in_progress" │ │
│ │ } │ │
│ │ errors: [] │ │
│ └─────────────────────────────┘ │
└──────────┬──────────────┬──────────┘
│ │
┌──────▼──────┐ ┌──────▼──────┐
│ Researcher │ │ Analyst │
│ Agent │ │ Agent │
└─────────────┘ └─────────────┘
Implementing the Blackboard Pattern
import asyncio
import json
import hashlib
from datetime import datetime
from typing import Any, Callable, Optional
from dataclasses import dataclass, field
from enum import Enum
import threading
class WriteStrategy(Enum):
OVERWRITE = "overwrite" # last writer wins
MERGE = "merge" # deep merge dicts
APPEND = "append" # append to lists
OPTIMISTIC_LOCK = "optimistic_lock" # fail if version mismatch
@dataclass
class BlackboardEntry:
key: str
value: Any
written_by: str # agent that wrote this value
timestamp: datetime = field(default_factory=datetime.utcnow)
version: int = 1
checksum: str = ""
def __post_init__(self):
self.checksum = self._compute_checksum()
def _compute_checksum(self) -> str:
content = json.dumps(self.value, sort_keys=True, default=str)
return hashlib.md5(content.encode()).hexdigest()[:8]
ChangeListener = Callable[[str, BlackboardEntry], None]
class Blackboard:
"""
Thread-safe shared state store for multi-agent coordination.
Implements optimistic locking, merge strategies, and change notifications.
"""
def __init__(self):
self._store: dict[str, BlackboardEntry] = {}
self._listeners: dict[str, list[ChangeListener]] = {} # key → listeners
self._lock = threading.RLock() # reentrant for nested reads
# --- WRITE ---
def write(
self,
key: str,
value: Any,
agent_id: str,
strategy: WriteStrategy = WriteStrategy.OVERWRITE,
expected_version: Optional[int] = None,
) -> BlackboardEntry:
"""
Write a value to the blackboard with the specified conflict resolution strategy.
Raises ValueError on optimistic lock version mismatch.
"""
with self._lock:
existing = self._store.get(key)
# Optimistic locking: reject writes where version doesn't match expectation
if strategy == WriteStrategy.OPTIMISTIC_LOCK:
current_version = existing.version if existing else 0
if expected_version is not None and current_version != expected_version:
raise ValueError(
f"Optimistic lock failed for '{key}': "
f"expected version {expected_version}, got {current_version}"
)
new_version = (existing.version + 1) if existing else 1
# Apply merge strategy
if strategy == WriteStrategy.MERGE and existing and isinstance(existing.value, dict):
merged = {**existing.value, **value}
final_value = merged
elif strategy == WriteStrategy.APPEND and existing and isinstance(existing.value, list):
final_value = existing.value + (value if isinstance(value, list) else [value])
else:
final_value = value
entry = BlackboardEntry(
key=key,
value=final_value,
written_by=agent_id,
version=new_version,
)
self._store[key] = entry
# Notify listeners
self._notify(key, entry)
return entry
# --- READ ---
def read(self, key: str, default: Any = None) -> Any:
"""Read a value. Returns default if key not found."""
with self._lock:
entry = self._store.get(key)
return entry.value if entry else default
def read_entry(self, key: str) -> Optional[BlackboardEntry]:
"""Read the full entry with metadata (version, author, timestamp)."""
with self._lock:
return self._store.get(key)
def snapshot(self) -> dict[str, Any]:
"""Return a point-in-time snapshot of all values."""
with self._lock:
return {k: v.value for k, v in self._store.items()}
# --- CHANGE NOTIFICATIONS ---
def subscribe(self, key_pattern: str, listener: ChangeListener) -> None:
"""Subscribe to changes on a key or key prefix (e.g., 'research.*')."""
with self._lock:
if key_pattern not in self._listeners:
self._listeners[key_pattern] = []
self._listeners[key_pattern].append(listener)
def _notify(self, key: str, entry: BlackboardEntry) -> None:
for pattern, listeners in self._listeners.items():
if pattern == key or (pattern.endswith("*") and key.startswith(pattern[:-1])):
for listener in listeners:
try:
listener(key, entry)
except Exception as e:
print(f"[Blackboard] Listener error for '{key}': {e}")
# --- UTILITIES ---
def keys(self) -> list[str]:
with self._lock:
return list(self._store.keys())
def delete(self, key: str, agent_id: str) -> bool:
with self._lock:
if key in self._store:
del self._store[key]
return True
return False
# --- Agent using the blackboard ---
class ResearchAgent:
def __init__(self, agent_id: str, blackboard: Blackboard):
self.agent_id = agent_id
self.blackboard = blackboard
def run(self, query: str) -> None:
print(f"[{self.agent_id}] Starting research: {query}")
# Read existing findings to avoid duplication
existing = self.blackboard.read("research.findings", default=[])
# Simulate research (replace with actual LLM call)
new_findings = [
{"claim": f"Finding about '{query}'", "confidence": 0.85, "source": "web"},
]
# Append to existing findings (safe for concurrent agents)
self.blackboard.write(
key="research.findings",
value=new_findings,
agent_id=self.agent_id,
strategy=WriteStrategy.APPEND,
)
# Update task status with optimistic lock to prevent conflicts
status_entry = self.blackboard.read_entry("task.status")
current_version = status_entry.version if status_entry else 0
current_status = status_entry.value if status_entry else {}
try:
self.blackboard.write(
key="task.status",
value={**current_status, self.agent_id: "complete"},
agent_id=self.agent_id,
strategy=WriteStrategy.OPTIMISTIC_LOCK,
expected_version=current_version,
)
except ValueError:
# Another agent wrote status concurrently — retry
self.run_status_update()
print(f"[{self.agent_id}] Research complete.")
def run_status_update(self):
"""Retry status update after optimistic lock failure."""
status_entry = self.blackboard.read_entry("task.status")
current_version = status_entry.version if status_entry else 0
current_status = status_entry.value if status_entry else {}
self.blackboard.write(
key="task.status",
value={**current_status, self.agent_id: "complete"},
agent_id=self.agent_id,
strategy=WriteStrategy.OPTIMISTIC_LOCK,
expected_version=current_version,
)
# --- Usage ---
board = Blackboard()
# Subscribe a monitor to all research updates
def on_research_update(key: str, entry: BlackboardEntry):
print(f"[Monitor] '{key}' updated by {entry.written_by} (v{entry.version})")
board.subscribe("research.*", on_research_update)
board.subscribe("task.*", on_research_update)
# Run agents (in production, these would be separate threads/processes)
agent_a = ResearchAgent("researcher-a", board)
agent_b = ResearchAgent("researcher-b", board)
agent_a.run("transformer architecture")
agent_b.run("mixture of experts")
print("\nFinal blackboard state:")
for key, value in board.snapshot().items():
print(f" {key}: {value}")
Tip: The
APPENDstrategy is your safest default for lists that multiple agents contribute to concurrently. ReserveOPTIMISTIC_LOCKfor status fields and counters where only one agent should win.
Event Sourcing for Agent Actions
Instead of storing current state directly, event sourcing records every action that changed state. Current state is derived by replaying the event log.
from dataclasses import dataclass
from datetime import datetime
from typing import Any
import json
@dataclass
class AgentEvent:
event_id: str
correlation_id: str
agent_id: str
event_type: str # "task_started", "finding_added", "task_completed", etc.
payload: dict[str, Any]
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.utcnow()
class EventLog:
"""Append-only event log. The source of truth for all agent state."""
def __init__(self):
self._events: list[AgentEvent] = []
def append(self, event: AgentEvent) -> None:
self._events.append(event)
def replay(self, correlation_id: str = None) -> dict[str, Any]:
"""Replay events to reconstruct current state."""
state = {"findings": [], "task_status": {}, "errors": []}
for event in self._events:
if correlation_id and event.correlation_id != correlation_id:
continue
if event.event_type == "finding_added":
state["findings"].append(event.payload)
elif event.event_type == "task_status_updated":
state["task_status"][event.agent_id] = event.payload["status"]
elif event.event_type == "error_recorded":
state["errors"].append(event.payload)
return state
def get_agent_timeline(self, agent_id: str) -> list[AgentEvent]:
"""Reconstruct everything a specific agent did."""
return [e for e in self._events if e.agent_id == agent_id]
Benefits of event sourcing for multi-agent systems:
- Complete audit trail of every agent action
- Ability to replay and debug any workflow execution
- Time-travel debugging: reconstruct state at any point in the past
- Natural conflict resolution: the event log is the truth, not derived state
Distributed State with Redis
For multi-process deployments, use Redis for shared state:
import redis
import json
from typing import Any, Optional
class RedisBlackboard:
"""Production blackboard backed by Redis with atomic operations."""
def __init__(self, redis_url: str = "redis://localhost:6379", namespace: str = "agent"):
self.r = redis.from_url(redis_url, decode_responses=True)
self.ns = namespace
def _key(self, key: str) -> str:
return f"{self.ns}:{key}"
def write(self, key: str, value: Any, agent_id: str, ex: int = 3600) -> None:
"""Write with TTL (seconds). Overwrites existing value."""
entry = {"value": value, "written_by": agent_id, "ts": datetime.utcnow().isoformat()}
self.r.set(self._key(key), json.dumps(entry), ex=ex)
def read(self, key: str, default: Any = None) -> Any:
raw = self.r.get(self._key(key))
if raw is None:
return default
return json.loads(raw)["value"]
def append_to_list(self, key: str, value: Any, agent_id: str) -> int:
"""Atomically append to a Redis list. Returns new list length."""
serialized = json.dumps({"value": value, "written_by": agent_id})
return self.r.rpush(self._key(key), serialized)
def read_list(self, key: str) -> list[Any]:
items = self.r.lrange(self._key(key), 0, -1)
return [json.loads(item)["value"] for item in items]
def increment(self, key: str, amount: int = 1) -> int:
"""Atomic counter increment — safe for concurrent agents."""
return self.r.incrby(self._key(key), amount)
def set_if_not_exists(self, key: str, value: Any, ex: int = 300) -> bool:
"""Claim a lock or resource. Returns True if claimed, False if already taken."""
entry = json.dumps({"value": value})
return bool(self.r.set(self._key(key), entry, ex=ex, nx=True))
Conflict Resolution Strategies Summary
| Strategy | Behavior | Use When |
|---|---|---|
| Last Write Wins | Newer value overwrites older | Simple counters, non-critical metadata |
| Append | Values accumulate in a list | Findings, log entries, collected data |
| Merge | Dict keys are merged | Profile updates, partial state updates |
| Optimistic Lock | Write fails on version mismatch | Status fields, workflow state transitions |
| Pessimistic Lock | Exclusive write access | Critical operations, financial data |
| Event Sourcing | All writes are events, state derived | Full audit required, complex state |
Key Takeaways
- The blackboard pattern is the foundation of multi-agent shared state — proven in AI research for over 40 years
- Choose your write strategy explicitly:
APPENDfor lists,MERGEfor dicts,OPTIMISTIC_LOCKfor status - Event sourcing trades storage for debuggability — highly recommended for production multi-agent systems
- Redis provides atomic operations (
INCR,RPUSH,SET NX) that make multi-process shared state safe without custom locking - Always set TTLs on blackboard entries to prevent unbounded state growth