Core Agent Logic

Agent State Management

13m read

Managing Agent State

State is what separates a stateless LLM call from a true agent. Without state, every prompt is a fresh start — the agent cannot remember what it has tried, cannot track where it is in a multi-step plan, and cannot recover from failures. This lesson covers the full spectrum of state management: conversation history, working memory, explicit state machines, checkpointing, and recovery.


What Counts as "State"?

Agent state is any information that must persist across at least two loop iterations. In practice this means:

State CategoryWhat It ContainsTypical Lifetime
Conversation historyUser and assistant turns as they happenedEntire session
Working memoryIntermediate results, current plan, scratchpad notesCurrent task only
Phase / control flowWhich step of the agent's plan is executingCurrent task only
Tool call logWhich tools were called, with what arguments, and what they returnedDiagnostic and recovery
Long-term memorySynthesised knowledge from past sessionsAcross sessions

The first four categories are covered in this lesson. Long-term memory with vector databases is covered in the Tools and Memory module.


Designing State with Python Dataclasses

Python dataclasses are the right primitive for agent state. They are typed — catching bugs at development time rather than at 2 a.m. in production — easily serialisable to JSON for checkpointing, and work well with immutable transition patterns.

Core State Structures

from __future__ import annotations

import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional


class Role(str, Enum):
    """Speaker role in a conversation turn."""
    USER = "user"
    ASSISTANT = "assistant"
    TOOL = "tool"
    SYSTEM = "system"


class AgentPhase(str, Enum):
    """
    High-level phases of the agent lifecycle.

    IDLE        → waiting for a user request
    PLANNING    → decomposing the goal into sub-steps
    EXECUTING   → running tools and gathering information
    REFLECTING  → evaluating intermediate results, deciding to continue or stop
    DONE        → final answer has been produced
    ERROR       → unrecoverable failure, needs human intervention
    """
    IDLE = "idle"
    PLANNING = "planning"
    EXECUTING = "executing"
    REFLECTING = "reflecting"
    DONE = "done"
    ERROR = "error"


@dataclass
class Message:
    """A single turn in the conversation history."""
    role: Role
    content: str
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    metadata: dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> dict:
        return {
            "role": self.role.value,
            "content": self.content,
            "timestamp": self.timestamp.isoformat(),
            "metadata": self.metadata,
        }


@dataclass
class ToolCall:
    """Record of a single tool invocation and its result."""
    call_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    tool_name: str = ""
    arguments: dict[str, Any] = field(default_factory=dict)
    result: Optional[str] = None
    error: Optional[str] = None
    duration_ms: Optional[float] = None
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

    @property
    def succeeded(self) -> bool:
        return self.error is None

    def to_observation(self) -> str:
        """Render as the 'Observation:' string shown to the LLM."""
        if self.error:
            return f"[Tool Error] {self.tool_name} failed: {self.error}"
        return str(self.result)


@dataclass
class WorkingMemory:
    """
    Short-term scratchpad for the current task.

    Stores the current plan, intermediate findings, and key-value notes
    the agent wants to carry between reasoning steps. This is distinct
    from the conversation history — it is the agent's private notepad,
    not the user-visible dialogue.
    """
    current_goal: str = ""
    plan_steps: list[str] = field(default_factory=list)
    completed_steps: list[str] = field(default_factory=list)
    notes: dict[str, str] = field(default_factory=dict)
    tool_calls: list[ToolCall] = field(default_factory=list)

    def note(self, key: str, value: str) -> None:
        """Store a named observation for use in later reasoning steps."""
        self.notes[key] = value

    def record_tool_call(self, call: ToolCall) -> None:
        self.tool_calls.append(call)

    def pending_steps(self) -> list[str]:
        return [s for s in self.plan_steps if s not in self.completed_steps]

The Root AgentState

@dataclass
class AgentState:
    """
    Complete, serialisable state of the agent at any point in time.

    This is the single source of truth passed between the Orchestrator,
    Planner, and Dispatcher. All components read from and return new
    instances of this object rather than mutating it in place, making
    state transitions explicit and debuggable.
    """
    session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    phase: AgentPhase = AgentPhase.IDLE
    history: list[Message] = field(default_factory=list)
    memory: WorkingMemory = field(default_factory=WorkingMemory)
    iteration: int = 0
    final_answer: Optional[str] = None
    error_message: Optional[str] = None
    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

    @classmethod
    def from_user_input(cls, user_input: str) -> "AgentState":
        """Create an initial state from a user's message."""
        state = cls(phase=AgentPhase.PLANNING)
        state.history.append(Message(role=Role.USER, content=user_input))
        state.memory.current_goal = user_input
        return state

    # ------------------------------------------------------------------
    # Transition methods — return a new state, never mutate in place
    # ------------------------------------------------------------------

    def _bump(self) -> "AgentState":
        """Return a shallow copy with updated_at and incremented iteration."""
        import copy
        new = copy.copy(self)
        new.iteration = self.iteration + 1
        new.updated_at = datetime.now(timezone.utc)
        return new

    def to_executing(self) -> "AgentState":
        """Transition: PLANNING → EXECUTING."""
        assert self.phase == AgentPhase.PLANNING, (
            f"Cannot move to EXECUTING from {self.phase}"
        )
        new = self._bump()
        new.phase = AgentPhase.EXECUTING
        return new

    def to_reflecting(self) -> "AgentState":
        """Transition: EXECUTING → REFLECTING."""
        assert self.phase == AgentPhase.EXECUTING, (
            f"Cannot move to REFLECTING from {self.phase}"
        )
        new = self._bump()
        new.phase = AgentPhase.REFLECTING
        return new

    def to_done(self, answer: str) -> "AgentState":
        """Transition to DONE with a final answer."""
        new = self._bump()
        new.phase = AgentPhase.DONE
        new.final_answer = answer
        new.history.append(Message(role=Role.ASSISTANT, content=answer))
        return new

    def to_error(self, message: str) -> "AgentState":
        """Transition to ERROR with a diagnostic message."""
        new = self._bump()
        new.phase = AgentPhase.ERROR
        new.error_message = message
        return new

    def record_observation(self, thought: str, call: ToolCall) -> "AgentState":
        """Record a completed tool call and its observation."""
        new = self._bump()
        new.memory.record_tool_call(call)
        new.history.append(Message(
            role=Role.TOOL,
            content=call.to_observation(),
            metadata={"tool_name": call.tool_name, "call_id": call.call_id},
        ))
        return new

The Agent State Machine

An explicit state machine prevents your agent from taking nonsensical actions — like trying to "execute" when it is already in DONE, or jumping back to PLANNING from ERROR. Formalising the allowed transitions makes your code dramatically easier to reason about:

                    ┌─────────┐
         input      │  IDLE   │
         ──────────▶│         │
                    └────┬────┘
                         │ from_user_input()
                         ▼
                    ┌─────────┐
                    │PLANNING │◀─────────────────┐
                    └────┬────┘                  │
                         │ to_executing()        │ (re-plan on reflection)
                         ▼                       │
                    ┌─────────┐                  │
                    │EXECUTING│──────────────────▶│
                    └────┬────┘     to_reflecting()
                         │
                    ┌────▼─────┐
                    │REFLECTING│
                    └────┬─────┘
                         │ to_done() or to_error()
                    ┌────┴──────────┐
                    ▼               ▼
               ┌──────┐        ┌───────┐
               │ DONE │        │ ERROR │
               └──────┘        └───────┘

Design Principle: Never allow arbitrary phase transitions. Use explicit to_X() methods that assert the current phase. This turns impossible states into immediate AssertionError exceptions rather than silent, hard-to-diagnose bugs.


Managing Conversation History

The conversation history is a list of Message objects appended in order. For the LLM, it is rendered as the messages array in the chat API call:

from typing import Any


def history_to_messages(state: AgentState) -> list[dict[str, Any]]:
    """
    Convert AgentState history to the format expected by the OpenAI chat API.

    Handles the special case of TOOL messages, which the OpenAI API expects
    to carry a tool_call_id reference linking them to their originating call.
    """
    messages = []
    for msg in state.history:
        if msg.role == Role.SYSTEM:
            messages.append({"role": "system", "content": msg.content})
        elif msg.role == Role.USER:
            messages.append({"role": "user", "content": msg.content})
        elif msg.role == Role.ASSISTANT:
            messages.append({"role": "assistant", "content": msg.content})
        elif msg.role == Role.TOOL:
            messages.append({
                "role": "tool",
                "tool_call_id": msg.metadata.get("call_id", "unknown"),
                "content": msg.content,
            })
    return messages

Truncating History to Fit the Context Window

Long conversations will exceed the LLM's context window. A simple but effective strategy keeps the system prompt and the most recent N messages:

def truncate_history(
    messages: list[dict],
    max_tokens: int = 100_000,
    tokens_per_message_estimate: int = 250,
) -> list[dict]:
    """
    Truncate the message list to fit within max_tokens.

    Always preserves system messages and trims the oldest non-system
    messages first. Uses a rough token estimate; for production, use
    tiktoken for precise counting.
    """
    if not messages:
        return messages

    system_messages = [m for m in messages if m["role"] == "system"]
    non_system = [m for m in messages if m["role"] != "system"]

    estimated_total = len(messages) * tokens_per_message_estimate
    while estimated_total > max_tokens and len(non_system) > 1:
        non_system.pop(0)  # Drop the oldest non-system message
        estimated_total = (
            (len(system_messages) + len(non_system)) * tokens_per_message_estimate
        )

    return system_messages + non_system

Tip: Always keep at least the last 4–6 turns regardless of token count. Cutting off too aggressively causes the model to lose track of the user's original intent.


Checkpointing and Recovery

Checkpointing saves the agent's state to durable storage so that a crash, timeout, or redeployment does not lose all progress. This is essential for long-running tasks such as research jobs, code generation, or multi-hour data pipelines.

Serialising State to JSON

import json
from pathlib import Path
from dataclasses import asdict


def save_checkpoint(state: AgentState, directory: str | Path) -> Path:
    """
    Serialise AgentState to a JSON file in the checkpoint directory.

    File is named by session_id so concurrent agents do not overwrite
    each other. Returns the path of the written file.
    """
    path = Path(directory) / f"{state.session_id}.json"
    path.parent.mkdir(parents=True, exist_ok=True)

    payload = asdict(state)

    def _convert(obj: Any) -> Any:
        if isinstance(obj, (AgentPhase, Role)):
            return obj.value
        if isinstance(obj, datetime):
            return obj.isoformat()
        return obj

    def _walk(obj: Any) -> Any:
        if isinstance(obj, dict):
            return {k: _walk(v) for k, v in obj.items()}
        if isinstance(obj, list):
            return [_walk(item) for item in obj]
        return _convert(obj)

    path.write_text(json.dumps(_walk(payload), indent=2))
    return path


def load_checkpoint(session_id: str, directory: str | Path) -> AgentState:
    """
    Deserialise an AgentState from its checkpoint file.

    Raises FileNotFoundError if the session_id has no checkpoint.
    """
    path = Path(directory) / f"{session_id}.json"
    payload = json.loads(path.read_text())

    state = AgentState(
        session_id=payload["session_id"],
        phase=AgentPhase(payload["phase"]),
        iteration=payload["iteration"],
        final_answer=payload.get("final_answer"),
        error_message=payload.get("error_message"),
    )

    state.history = [
        Message(
            role=Role(m["role"]),
            content=m["content"],
            timestamp=datetime.fromisoformat(m["timestamp"]),
            metadata=m.get("metadata", {}),
        )
        for m in payload.get("history", [])
    ]

    mem_data = payload.get("memory", {})
    state.memory = WorkingMemory(
        current_goal=mem_data.get("current_goal", ""),
        plan_steps=mem_data.get("plan_steps", []),
        completed_steps=mem_data.get("completed_steps", []),
        notes=mem_data.get("notes", {}),
    )

    return state

Checkpoint-Aware Orchestrator

from pathlib import Path


class ResumableOrchestrator:
    """
    Orchestrator that saves a checkpoint after every iteration.
    On startup, checks for an existing checkpoint and resumes from it.
    """

    CHECKPOINT_DIR = Path("/tmp/agent_checkpoints")

    def run(self, user_input: str, session_id: str | None = None) -> str:
        """Run or resume an agent session."""
        if session_id:
            try:
                state = load_checkpoint(session_id, self.CHECKPOINT_DIR)
                print(f"[INFO] Resuming session {session_id} at iteration {state.iteration}")
            except FileNotFoundError:
                print(f"[INFO] No checkpoint for {session_id}, starting fresh")
                state = AgentState.from_user_input(user_input)
        else:
            state = AgentState.from_user_input(user_input)

        for _ in range(20):
            state = self._step(state)
            save_checkpoint(state, self.CHECKPOINT_DIR)

            if state.phase in (AgentPhase.DONE, AgentPhase.ERROR):
                break

        return state.final_answer or state.error_message or "No result produced."

    def _step(self, state: AgentState) -> AgentState:
        """Execute one reasoning step. Implement in subclasses."""
        raise NotImplementedError

Key Takeaways

  • Model state explicitly using typed dataclasses — they are serialisable, IDE-friendly, and force clarity about your data model from day one.
  • Separate conversation history (user-visible dialogue) from working memory (the agent's private scratchpad).
  • Use an explicit state machine with to_X() transition methods to prevent impossible phase combinations.
  • Truncate conversation history proactively to stay within the context window; always preserve recent turns.
  • Checkpoint after every iteration for long-running tasks — recovery from a checkpoint is much cheaper than restarting from scratch.

Further Reading