Production Hardening

Cost Controls and Budget Management

15m read

Cost Controls in Production Agents

The Cost Explosion Problem

A single agent run might use 10,000 tokens. A production agent handling 10,000 requests per day consumes 100M tokens — at GPT-4o pricing, that's roughly $250/day. Add tool use, multi-step reasoning, and occasional runaway loops, and costs can escalate rapidly.

Cost management is not an afterthought — it is a core engineering discipline for production agent systems. This lesson covers the full stack of cost controls: token budgets, caching strategies, model routing, and spend monitoring.


Token Budget Management

A token budget caps the maximum tokens a single agent run may consume. Exceeding the budget triggers a graceful shutdown rather than an open-ended continuation.

Token Budget Manager

import tiktoken
from dataclasses import dataclass, field
from typing import Any


@dataclass
class TokenBudget:
    """
    Tracks token consumption across a single agent run and enforces a hard cap.

    Tracks both prompt and completion tokens separately to allow
    fine-grained budget allocation (e.g., reserve 20% for the final answer).
    """
    total_budget: int                       # Max tokens for the entire run
    final_answer_reserve: int = 2000        # Tokens reserved for the synthesis step
    model: str = "gpt-4o"
    _prompt_tokens: int = field(default=0, init=False)
    _completion_tokens: int = field(default=0, init=False)

    def __post_init__(self):
        try:
            self._encoder = tiktoken.encoding_for_model(self.model)
        except KeyError:
            self._encoder = tiktoken.get_encoding("cl100k_base")

    @property
    def used(self) -> int:
        return self._prompt_tokens + self._completion_tokens

    @property
    def remaining(self) -> int:
        return self.total_budget - self.used

    @property
    def available_for_next_step(self) -> int:
        """Budget available for next tool call, after reserving for final answer."""
        return self.remaining - self.final_answer_reserve

    def count_tokens(self, text: str) -> int:
        """Count tokens in a string using the model's tokenizer."""
        return len(self._encoder.encode(text))

    def record_llm_call(self, prompt: str, completion: str):
        """Record token consumption from an LLM call."""
        self._prompt_tokens += self.count_tokens(prompt)
        self._completion_tokens += self.count_tokens(completion)

    def record_prompt_tokens(self, n: int):
        """Record raw prompt token count (from API response metadata)."""
        self._prompt_tokens += n

    def record_completion_tokens(self, n: int):
        """Record raw completion token count (from API response metadata)."""
        self._completion_tokens += n

    def can_continue(self) -> bool:
        """Return True if there is budget remaining for at least one more step."""
        return self.available_for_next_step > 500   # min viable step budget

    def check_prompt_fits(self, prompt: str) -> bool:
        """Check if a prompt fits within the remaining budget."""
        return self.count_tokens(prompt) <= self.available_for_next_step

    def truncate_to_budget(self, text: str, target_tokens: int) -> str:
        """Truncate text to fit within target_tokens."""
        tokens = self._encoder.encode(text)
        if len(tokens) <= target_tokens:
            return text
        return self._encoder.decode(tokens[:target_tokens]) + "... [truncated]"

    def summary(self) -> dict:
        return {
            "total_budget": self.total_budget,
            "used": self.used,
            "remaining": self.remaining,
            "available_for_next_step": self.available_for_next_step,
            "prompt_tokens": self._prompt_tokens,
            "completion_tokens": self._completion_tokens,
            "utilisation": self.used / self.total_budget,
        }

Integrating Budget into Agent Loop

async def budget_aware_agent_run(
    task: str,
    agent,
    budget: TokenBudget,
    tools: list,
) -> str:
    """
    Run an agent with hard token budget enforcement.
    Stops and synthesises a partial answer when budget is near exhaustion.
    """
    completed_steps = []

    for step_num in range(MAX_STEPS):
        if not budget.can_continue():
            # Budget exhausted — synthesise with what we have
            print(f"[Budget] Exhausted at step {step_num}. "
                  f"Used {budget.used}/{budget.total_budget} tokens.")
            return synthesise_partial_answer(task, completed_steps,
                                             reason="token_budget_exhausted")

        # Build next prompt, truncating context if needed
        context = build_context(completed_steps)
        if not budget.check_prompt_fits(task + context):
            context = budget.truncate_to_budget(
                context, budget.available_for_next_step - budget.count_tokens(task)
            )

        # Execute step
        response = await agent.step(task=task, context=context, tools=tools)
        budget.record_prompt_tokens(response.usage.prompt_tokens)
        budget.record_completion_tokens(response.usage.completion_tokens)

        if response.is_final:
            return response.answer

        completed_steps.append(response.step_result)

    return synthesise_partial_answer(task, completed_steps, reason="max_steps_reached")

Caching Strategies

Caching is the highest-ROI cost reduction technique. Tool call results and LLM responses are often repeated across users and sessions.

Exact Cache

import hashlib
import json
import time
from typing import Any


class ExactCache:
    """
    Deterministic cache for tool calls and LLM responses.
    Uses a content-addressed key: SHA-256 of the canonicalised inputs.
    Suitable for: deterministic tools (search, database), temperature=0 LLM calls.
    """

    def __init__(self, backend, ttl_seconds: int = 3600):
        self.backend = backend       # Redis, DynamoDB, or dict for testing
        self.ttl = ttl_seconds
        self.hits = 0
        self.misses = 0

    def _make_key(self, namespace: str, **kwargs) -> str:
        canonical = json.dumps(
            {"namespace": namespace, **kwargs}, sort_keys=True
        )
        digest = hashlib.sha256(canonical.encode()).hexdigest()[:16]
        return f"cache:{namespace}:{digest}"

    async def get(self, namespace: str, **kwargs) -> Any | None:
        key = self._make_key(namespace, **kwargs)
        value = await self.backend.get(key)
        if value is not None:
            self.hits += 1
            return json.loads(value)
        self.misses += 1
        return None

    async def set(self, namespace: str, value: Any, **kwargs):
        key = self._make_key(namespace, **kwargs)
        await self.backend.setex(key, self.ttl, json.dumps(value, default=str))

    async def get_or_compute(self, namespace: str, compute_fn, **kwargs) -> Any:
        """Cache-aside pattern: get from cache or compute and store."""
        cached = await self.get(namespace, **kwargs)
        if cached is not None:
            return cached
        result = await compute_fn(**kwargs)
        await self.set(namespace, result, **kwargs)
        return result

    @property
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0.0

Semantic Cache

For LLM calls, exact matching misses near-duplicate queries. A semantic cache uses embedding similarity to return cached answers for semantically equivalent inputs.

import numpy as np
from langchain_openai import OpenAIEmbeddings


class SemanticCache:
    """
    Cache that returns hits for semantically similar queries.
    Uses cosine similarity on text embeddings to find near-duplicates.

    Suitable for: LLM calls, Q&A, search queries.
    Not suitable for: tool calls where exact inputs matter.
    """

    def __init__(
        self,
        similarity_threshold: float = 0.97,
        max_entries: int = 10_000,
    ):
        self.threshold = similarity_threshold
        self.max_entries = max_entries
        self.embedder = OpenAIEmbeddings(model="text-embedding-3-small")
        self._store: list[dict] = []   # {embedding, query, response, ts}
        self.hits = 0
        self.misses = 0

    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        a_arr = np.array(a)
        b_arr = np.array(b)
        return float(
            np.dot(a_arr, b_arr) / (np.linalg.norm(a_arr) * np.linalg.norm(b_arr))
        )

    async def get(self, query: str) -> str | None:
        """Return a cached response if a sufficiently similar query exists."""
        if not self._store:
            self.misses += 1
            return None

        query_embedding = await self.embedder.aembed_query(query)
        best_sim = 0.0
        best_response = None

        for entry in self._store:
            sim = self._cosine_similarity(query_embedding, entry["embedding"])
            if sim > best_sim:
                best_sim = sim
                best_response = entry["response"]

        if best_sim >= self.threshold:
            self.hits += 1
            return best_response

        self.misses += 1
        return None

    async def set(self, query: str, response: str):
        """Store a query-response pair with its embedding."""
        embedding = await self.embedder.aembed_query(query)
        self._store.append({
            "embedding": embedding,
            "query": query,
            "response": response,
            "ts": time.time(),
        })
        # Evict oldest entries if over limit (LRU-style)
        if len(self._store) > self.max_entries:
            self._store = sorted(self._store, key=lambda x: x["ts"])
            self._store = self._store[-(self.max_entries):]

    async def get_or_compute(self, query: str, compute_fn) -> str:
        """Semantic cache-aside: check cache, compute on miss, then store."""
        cached = await self.get(query)
        if cached is not None:
            return cached
        result = await compute_fn(query)
        await self.set(query, result)
        return result

Anthropic Prompt Caching (Built-In)

Anthropic's API offers native prompt caching — if the first N tokens of a prompt are identical to a recent request, they are cached at the infrastructure level at 10% of normal cost.

from anthropic import Anthropic

client = Anthropic()

# Mark large, stable sections with cache_control: {"type": "ephemeral"}
# Cached sections must be >= 1024 tokens (Claude 3.5) or >= 2048 (Claude 3)
response = client.messages.create(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    system=[
        {
            "type": "text",
            "text": LARGE_SYSTEM_PROMPT,       # ~5000 tokens, rarely changes
            "cache_control": {"type": "ephemeral"},
        }
    ],
    messages=[{"role": "user", "content": user_query}],
)

# Check cache usage in the response
print(f"Cache creation tokens:    {response.usage.cache_creation_input_tokens}")
print(f"Cache read tokens:        {response.usage.cache_read_input_tokens}")
print(f"Uncached prompt tokens:   {response.usage.input_tokens}")
# cache_read tokens cost 10% of normal; cache_creation costs 125%

Model Routing

Not every task needs the most expensive model. Model routing directs simple queries to cheap, fast models and reserves expensive models for complex tasks.

from dataclasses import dataclass
from typing import Callable


@dataclass
class ModelConfig:
    name: str
    cost_per_1k_input: float     # USD
    cost_per_1k_output: float    # USD
    context_window: int          # tokens
    avg_latency_ms: int


MODELS = {
    "cheap": ModelConfig(
        name="gpt-4o-mini",
        cost_per_1k_input=0.00015,
        cost_per_1k_output=0.0006,
        context_window=128_000,
        avg_latency_ms=500,
    ),
    "standard": ModelConfig(
        name="gpt-4o",
        cost_per_1k_input=0.0025,
        cost_per_1k_output=0.01,
        context_window=128_000,
        avg_latency_ms=1200,
    ),
    "powerful": ModelConfig(
        name="claude-opus-4-5",
        cost_per_1k_input=0.015,
        cost_per_1k_output=0.075,
        context_window=200_000,
        avg_latency_ms=3000,
    ),
}


class ModelRouter:
    """
    Routes requests to the most cost-effective model that can handle the task.
    Uses a classifier to estimate task difficulty, then selects accordingly.
    """

    def __init__(self, classifier_fn: Callable[[str], str]):
        self.classifier = classifier_fn   # Returns "simple", "medium", "complex"
        self.routing_map = {
            "simple":  MODELS["cheap"],
            "medium":  MODELS["standard"],
            "complex": MODELS["powerful"],
        }
        self._routing_log: list[dict] = []

    def route(self, task: str) -> ModelConfig:
        """Select the appropriate model for this task."""
        difficulty = self.classifier(task)
        model = self.routing_map.get(difficulty, MODELS["standard"])
        self._routing_log.append({
            "task_preview": task[:100],
            "difficulty": difficulty,
            "model_selected": model.name,
            "ts": time.time(),
        })
        return model

    def cost_saved_vs_always_powerful(self) -> float:
        """Estimate USD saved by routing vs. always using the powerful model."""
        powerful = MODELS["powerful"]
        total_saved = 0.0
        for entry in self._routing_log:
            selected = next(
                m for m in MODELS.values() if m.name == entry["model_selected"]
            )
            # Assume average 2000 input + 500 output tokens per call
            saved = (
                (powerful.cost_per_1k_input - selected.cost_per_1k_input) * 2
                + (powerful.cost_per_1k_output - selected.cost_per_1k_output) * 0.5
            )
            total_saved += saved
        return total_saved


# Simple rule-based classifier (replace with ML classifier for production)
def classify_task_difficulty(task: str) -> str:
    task_lower = task.lower()
    simple_signals = ["what is", "define", "list", "when was", "who is"]
    complex_signals = ["analyse", "compare", "write a", "debug", "explain why",
                       "multi-step", "research"]

    simple_count = sum(1 for s in simple_signals if s in task_lower)
    complex_count = sum(1 for s in complex_signals if s in task_lower)
    word_count = len(task.split())

    if word_count > 100 or complex_count >= 2:
        return "complex"
    if simple_count >= 1 and complex_count == 0 and word_count < 30:
        return "simple"
    return "medium"

Rate Limiting and Spend Monitoring

import asyncio
from collections import deque


class RateLimiter:
    """
    Token bucket rate limiter for API calls.
    Prevents burst usage from triggering provider rate limits or surprise bills.
    """

    def __init__(self, max_tokens_per_minute: int, max_requests_per_minute: int):
        self.max_tokens_pm = max_tokens_per_minute
        self.max_requests_pm = max_requests_per_minute
        self._token_usage: deque[tuple[float, int]] = deque()  # (ts, count)
        self._request_times: deque[float] = deque()
        self._lock = asyncio.Lock()

    def _prune(self):
        cutoff = time.monotonic() - 60.0
        while self._token_usage and self._token_usage[0][0] < cutoff:
            self._token_usage.popleft()
        while self._request_times and self._request_times[0] < cutoff:
            self._request_times.popleft()

    def _tokens_used_last_minute(self) -> int:
        return sum(c for _, c in self._token_usage)

    async def acquire(self, estimated_tokens: int):
        """Wait until rate limits allow this request to proceed."""
        while True:
            async with self._lock:
                self._prune()
                tokens_used = self._tokens_used_last_minute()
                requests_used = len(self._request_times)

                if (tokens_used + estimated_tokens <= self.max_tokens_pm
                        and requests_used < self.max_requests_pm):
                    now = time.monotonic()
                    self._token_usage.append((now, estimated_tokens))
                    self._request_times.append(now)
                    return   # Proceed

            await asyncio.sleep(0.5)   # Back off; retry


class SpendMonitor:
    """
    Real-time spend tracker with alerting thresholds.
    Integrates with ModelConfig to compute accurate cost estimates.
    """

    def __init__(
        self,
        daily_budget_usd: float,
        alert_threshold: float = 0.8,   # Alert at 80% of budget
    ):
        self.daily_budget = daily_budget_usd
        self.alert_threshold = alert_threshold
        self._spend_today: float = 0.0
        self._call_log: list[dict] = []

    def record_call(self, model: ModelConfig, prompt_tokens: int, completion_tokens: int):
        """Record an LLM API call and its cost."""
        cost = (
            model.cost_per_1k_input * prompt_tokens / 1000
            + model.cost_per_1k_output * completion_tokens / 1000
        )
        self._spend_today += cost
        self._call_log.append({
            "model": model.name,
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
            "cost_usd": cost,
            "ts": time.time(),
        })
        self._check_alerts(cost)
        return cost

    def _check_alerts(self, latest_cost: float):
        ratio = self._spend_today / self.daily_budget
        if ratio >= self.alert_threshold:
            # In production: send to PagerDuty / Slack / email
            print(f"[SPEND ALERT] ${self._spend_today:.2f} / ${self.daily_budget:.2f} "
                  f"({ratio:.0%}) — approaching daily budget limit")

    def report(self) -> dict:
        by_model: dict[str, float] = {}
        for call in self._call_log:
            by_model[call["model"]] = by_model.get(call["model"], 0) + call["cost_usd"]
        return {
            "spend_today_usd": self._spend_today,
            "daily_budget_usd": self.daily_budget,
            "utilisation": self._spend_today / self.daily_budget,
            "total_calls": len(self._call_log),
            "by_model": by_model,
        }

Cost vs. Quality Trade-offs

StrategyCost ReductionQuality ImpactWhen to Use
Model routing60–80%Low (if classifier is good)Always
Exact caching20–40%NoneDeterministic tools
Semantic caching10–30%Very lowHigh query repetition
Prompt caching10–20%NoneLong stable system prompts
Token budgetsVariesMedium (truncates context)Runaway loop prevention
Shorter context20–50%Medium–HighAfter careful testing

Tip: Track cost per successful task completion, not just cost per call. An agent that costs $0.05 but succeeds 95% of the time is cheaper than one that costs $0.02 but succeeds 60% — because failed tasks often get retried at full cost.


Summary

Production cost control is a stack, not a single technique. Start with model routing (highest ROI, no quality loss), add exact caching for deterministic tools, semantic caching for repeated LLM queries, and native prompt caching for long system prompts. Layer on token budgets to prevent runaway loops, rate limiters to stay within provider limits, and a spend monitor to catch budget overruns before they become surprises. Instrument every LLM call with its token counts so you have visibility into where money is actually being spent.

Production Hardening — Check Your Understanding

3 вопроса · проходной балл 70%

  1. 1.What does a circuit breaker pattern do in a production agent system?

  2. 2.What information should be captured in an LLM trace for effective debugging?

  3. 3.What is 'prompt caching' and how does it reduce costs?

Осталось ответить: 3