Cost Controls in Production Agents
The Cost Explosion Problem
A single agent run might use 10,000 tokens. A production agent handling 10,000 requests per day consumes 100M tokens — at GPT-4o pricing, that's roughly $250/day. Add tool use, multi-step reasoning, and occasional runaway loops, and costs can escalate rapidly.
Cost management is not an afterthought — it is a core engineering discipline for production agent systems. This lesson covers the full stack of cost controls: token budgets, caching strategies, model routing, and spend monitoring.
Token Budget Management
A token budget caps the maximum tokens a single agent run may consume. Exceeding the budget triggers a graceful shutdown rather than an open-ended continuation.
Token Budget Manager
import tiktoken
from dataclasses import dataclass, field
from typing import Any
@dataclass
class TokenBudget:
"""
Tracks token consumption across a single agent run and enforces a hard cap.
Tracks both prompt and completion tokens separately to allow
fine-grained budget allocation (e.g., reserve 20% for the final answer).
"""
total_budget: int # Max tokens for the entire run
final_answer_reserve: int = 2000 # Tokens reserved for the synthesis step
model: str = "gpt-4o"
_prompt_tokens: int = field(default=0, init=False)
_completion_tokens: int = field(default=0, init=False)
def __post_init__(self):
try:
self._encoder = tiktoken.encoding_for_model(self.model)
except KeyError:
self._encoder = tiktoken.get_encoding("cl100k_base")
@property
def used(self) -> int:
return self._prompt_tokens + self._completion_tokens
@property
def remaining(self) -> int:
return self.total_budget - self.used
@property
def available_for_next_step(self) -> int:
"""Budget available for next tool call, after reserving for final answer."""
return self.remaining - self.final_answer_reserve
def count_tokens(self, text: str) -> int:
"""Count tokens in a string using the model's tokenizer."""
return len(self._encoder.encode(text))
def record_llm_call(self, prompt: str, completion: str):
"""Record token consumption from an LLM call."""
self._prompt_tokens += self.count_tokens(prompt)
self._completion_tokens += self.count_tokens(completion)
def record_prompt_tokens(self, n: int):
"""Record raw prompt token count (from API response metadata)."""
self._prompt_tokens += n
def record_completion_tokens(self, n: int):
"""Record raw completion token count (from API response metadata)."""
self._completion_tokens += n
def can_continue(self) -> bool:
"""Return True if there is budget remaining for at least one more step."""
return self.available_for_next_step > 500 # min viable step budget
def check_prompt_fits(self, prompt: str) -> bool:
"""Check if a prompt fits within the remaining budget."""
return self.count_tokens(prompt) <= self.available_for_next_step
def truncate_to_budget(self, text: str, target_tokens: int) -> str:
"""Truncate text to fit within target_tokens."""
tokens = self._encoder.encode(text)
if len(tokens) <= target_tokens:
return text
return self._encoder.decode(tokens[:target_tokens]) + "... [truncated]"
def summary(self) -> dict:
return {
"total_budget": self.total_budget,
"used": self.used,
"remaining": self.remaining,
"available_for_next_step": self.available_for_next_step,
"prompt_tokens": self._prompt_tokens,
"completion_tokens": self._completion_tokens,
"utilisation": self.used / self.total_budget,
}
Integrating Budget into Agent Loop
async def budget_aware_agent_run(
task: str,
agent,
budget: TokenBudget,
tools: list,
) -> str:
"""
Run an agent with hard token budget enforcement.
Stops and synthesises a partial answer when budget is near exhaustion.
"""
completed_steps = []
for step_num in range(MAX_STEPS):
if not budget.can_continue():
# Budget exhausted — synthesise with what we have
print(f"[Budget] Exhausted at step {step_num}. "
f"Used {budget.used}/{budget.total_budget} tokens.")
return synthesise_partial_answer(task, completed_steps,
reason="token_budget_exhausted")
# Build next prompt, truncating context if needed
context = build_context(completed_steps)
if not budget.check_prompt_fits(task + context):
context = budget.truncate_to_budget(
context, budget.available_for_next_step - budget.count_tokens(task)
)
# Execute step
response = await agent.step(task=task, context=context, tools=tools)
budget.record_prompt_tokens(response.usage.prompt_tokens)
budget.record_completion_tokens(response.usage.completion_tokens)
if response.is_final:
return response.answer
completed_steps.append(response.step_result)
return synthesise_partial_answer(task, completed_steps, reason="max_steps_reached")
Caching Strategies
Caching is the highest-ROI cost reduction technique. Tool call results and LLM responses are often repeated across users and sessions.
Exact Cache
import hashlib
import json
import time
from typing import Any
class ExactCache:
"""
Deterministic cache for tool calls and LLM responses.
Uses a content-addressed key: SHA-256 of the canonicalised inputs.
Suitable for: deterministic tools (search, database), temperature=0 LLM calls.
"""
def __init__(self, backend, ttl_seconds: int = 3600):
self.backend = backend # Redis, DynamoDB, or dict for testing
self.ttl = ttl_seconds
self.hits = 0
self.misses = 0
def _make_key(self, namespace: str, **kwargs) -> str:
canonical = json.dumps(
{"namespace": namespace, **kwargs}, sort_keys=True
)
digest = hashlib.sha256(canonical.encode()).hexdigest()[:16]
return f"cache:{namespace}:{digest}"
async def get(self, namespace: str, **kwargs) -> Any | None:
key = self._make_key(namespace, **kwargs)
value = await self.backend.get(key)
if value is not None:
self.hits += 1
return json.loads(value)
self.misses += 1
return None
async def set(self, namespace: str, value: Any, **kwargs):
key = self._make_key(namespace, **kwargs)
await self.backend.setex(key, self.ttl, json.dumps(value, default=str))
async def get_or_compute(self, namespace: str, compute_fn, **kwargs) -> Any:
"""Cache-aside pattern: get from cache or compute and store."""
cached = await self.get(namespace, **kwargs)
if cached is not None:
return cached
result = await compute_fn(**kwargs)
await self.set(namespace, result, **kwargs)
return result
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
Semantic Cache
For LLM calls, exact matching misses near-duplicate queries. A semantic cache uses embedding similarity to return cached answers for semantically equivalent inputs.
import numpy as np
from langchain_openai import OpenAIEmbeddings
class SemanticCache:
"""
Cache that returns hits for semantically similar queries.
Uses cosine similarity on text embeddings to find near-duplicates.
Suitable for: LLM calls, Q&A, search queries.
Not suitable for: tool calls where exact inputs matter.
"""
def __init__(
self,
similarity_threshold: float = 0.97,
max_entries: int = 10_000,
):
self.threshold = similarity_threshold
self.max_entries = max_entries
self.embedder = OpenAIEmbeddings(model="text-embedding-3-small")
self._store: list[dict] = [] # {embedding, query, response, ts}
self.hits = 0
self.misses = 0
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
a_arr = np.array(a)
b_arr = np.array(b)
return float(
np.dot(a_arr, b_arr) / (np.linalg.norm(a_arr) * np.linalg.norm(b_arr))
)
async def get(self, query: str) -> str | None:
"""Return a cached response if a sufficiently similar query exists."""
if not self._store:
self.misses += 1
return None
query_embedding = await self.embedder.aembed_query(query)
best_sim = 0.0
best_response = None
for entry in self._store:
sim = self._cosine_similarity(query_embedding, entry["embedding"])
if sim > best_sim:
best_sim = sim
best_response = entry["response"]
if best_sim >= self.threshold:
self.hits += 1
return best_response
self.misses += 1
return None
async def set(self, query: str, response: str):
"""Store a query-response pair with its embedding."""
embedding = await self.embedder.aembed_query(query)
self._store.append({
"embedding": embedding,
"query": query,
"response": response,
"ts": time.time(),
})
# Evict oldest entries if over limit (LRU-style)
if len(self._store) > self.max_entries:
self._store = sorted(self._store, key=lambda x: x["ts"])
self._store = self._store[-(self.max_entries):]
async def get_or_compute(self, query: str, compute_fn) -> str:
"""Semantic cache-aside: check cache, compute on miss, then store."""
cached = await self.get(query)
if cached is not None:
return cached
result = await compute_fn(query)
await self.set(query, result)
return result
Anthropic Prompt Caching (Built-In)
Anthropic's API offers native prompt caching — if the first N tokens of a prompt are identical to a recent request, they are cached at the infrastructure level at 10% of normal cost.
from anthropic import Anthropic
client = Anthropic()
# Mark large, stable sections with cache_control: {"type": "ephemeral"}
# Cached sections must be >= 1024 tokens (Claude 3.5) or >= 2048 (Claude 3)
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
system=[
{
"type": "text",
"text": LARGE_SYSTEM_PROMPT, # ~5000 tokens, rarely changes
"cache_control": {"type": "ephemeral"},
}
],
messages=[{"role": "user", "content": user_query}],
)
# Check cache usage in the response
print(f"Cache creation tokens: {response.usage.cache_creation_input_tokens}")
print(f"Cache read tokens: {response.usage.cache_read_input_tokens}")
print(f"Uncached prompt tokens: {response.usage.input_tokens}")
# cache_read tokens cost 10% of normal; cache_creation costs 125%
Model Routing
Not every task needs the most expensive model. Model routing directs simple queries to cheap, fast models and reserves expensive models for complex tasks.
from dataclasses import dataclass
from typing import Callable
@dataclass
class ModelConfig:
name: str
cost_per_1k_input: float # USD
cost_per_1k_output: float # USD
context_window: int # tokens
avg_latency_ms: int
MODELS = {
"cheap": ModelConfig(
name="gpt-4o-mini",
cost_per_1k_input=0.00015,
cost_per_1k_output=0.0006,
context_window=128_000,
avg_latency_ms=500,
),
"standard": ModelConfig(
name="gpt-4o",
cost_per_1k_input=0.0025,
cost_per_1k_output=0.01,
context_window=128_000,
avg_latency_ms=1200,
),
"powerful": ModelConfig(
name="claude-opus-4-5",
cost_per_1k_input=0.015,
cost_per_1k_output=0.075,
context_window=200_000,
avg_latency_ms=3000,
),
}
class ModelRouter:
"""
Routes requests to the most cost-effective model that can handle the task.
Uses a classifier to estimate task difficulty, then selects accordingly.
"""
def __init__(self, classifier_fn: Callable[[str], str]):
self.classifier = classifier_fn # Returns "simple", "medium", "complex"
self.routing_map = {
"simple": MODELS["cheap"],
"medium": MODELS["standard"],
"complex": MODELS["powerful"],
}
self._routing_log: list[dict] = []
def route(self, task: str) -> ModelConfig:
"""Select the appropriate model for this task."""
difficulty = self.classifier(task)
model = self.routing_map.get(difficulty, MODELS["standard"])
self._routing_log.append({
"task_preview": task[:100],
"difficulty": difficulty,
"model_selected": model.name,
"ts": time.time(),
})
return model
def cost_saved_vs_always_powerful(self) -> float:
"""Estimate USD saved by routing vs. always using the powerful model."""
powerful = MODELS["powerful"]
total_saved = 0.0
for entry in self._routing_log:
selected = next(
m for m in MODELS.values() if m.name == entry["model_selected"]
)
# Assume average 2000 input + 500 output tokens per call
saved = (
(powerful.cost_per_1k_input - selected.cost_per_1k_input) * 2
+ (powerful.cost_per_1k_output - selected.cost_per_1k_output) * 0.5
)
total_saved += saved
return total_saved
# Simple rule-based classifier (replace with ML classifier for production)
def classify_task_difficulty(task: str) -> str:
task_lower = task.lower()
simple_signals = ["what is", "define", "list", "when was", "who is"]
complex_signals = ["analyse", "compare", "write a", "debug", "explain why",
"multi-step", "research"]
simple_count = sum(1 for s in simple_signals if s in task_lower)
complex_count = sum(1 for s in complex_signals if s in task_lower)
word_count = len(task.split())
if word_count > 100 or complex_count >= 2:
return "complex"
if simple_count >= 1 and complex_count == 0 and word_count < 30:
return "simple"
return "medium"
Rate Limiting and Spend Monitoring
import asyncio
from collections import deque
class RateLimiter:
"""
Token bucket rate limiter for API calls.
Prevents burst usage from triggering provider rate limits or surprise bills.
"""
def __init__(self, max_tokens_per_minute: int, max_requests_per_minute: int):
self.max_tokens_pm = max_tokens_per_minute
self.max_requests_pm = max_requests_per_minute
self._token_usage: deque[tuple[float, int]] = deque() # (ts, count)
self._request_times: deque[float] = deque()
self._lock = asyncio.Lock()
def _prune(self):
cutoff = time.monotonic() - 60.0
while self._token_usage and self._token_usage[0][0] < cutoff:
self._token_usage.popleft()
while self._request_times and self._request_times[0] < cutoff:
self._request_times.popleft()
def _tokens_used_last_minute(self) -> int:
return sum(c for _, c in self._token_usage)
async def acquire(self, estimated_tokens: int):
"""Wait until rate limits allow this request to proceed."""
while True:
async with self._lock:
self._prune()
tokens_used = self._tokens_used_last_minute()
requests_used = len(self._request_times)
if (tokens_used + estimated_tokens <= self.max_tokens_pm
and requests_used < self.max_requests_pm):
now = time.monotonic()
self._token_usage.append((now, estimated_tokens))
self._request_times.append(now)
return # Proceed
await asyncio.sleep(0.5) # Back off; retry
class SpendMonitor:
"""
Real-time spend tracker with alerting thresholds.
Integrates with ModelConfig to compute accurate cost estimates.
"""
def __init__(
self,
daily_budget_usd: float,
alert_threshold: float = 0.8, # Alert at 80% of budget
):
self.daily_budget = daily_budget_usd
self.alert_threshold = alert_threshold
self._spend_today: float = 0.0
self._call_log: list[dict] = []
def record_call(self, model: ModelConfig, prompt_tokens: int, completion_tokens: int):
"""Record an LLM API call and its cost."""
cost = (
model.cost_per_1k_input * prompt_tokens / 1000
+ model.cost_per_1k_output * completion_tokens / 1000
)
self._spend_today += cost
self._call_log.append({
"model": model.name,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"cost_usd": cost,
"ts": time.time(),
})
self._check_alerts(cost)
return cost
def _check_alerts(self, latest_cost: float):
ratio = self._spend_today / self.daily_budget
if ratio >= self.alert_threshold:
# In production: send to PagerDuty / Slack / email
print(f"[SPEND ALERT] ${self._spend_today:.2f} / ${self.daily_budget:.2f} "
f"({ratio:.0%}) — approaching daily budget limit")
def report(self) -> dict:
by_model: dict[str, float] = {}
for call in self._call_log:
by_model[call["model"]] = by_model.get(call["model"], 0) + call["cost_usd"]
return {
"spend_today_usd": self._spend_today,
"daily_budget_usd": self.daily_budget,
"utilisation": self._spend_today / self.daily_budget,
"total_calls": len(self._call_log),
"by_model": by_model,
}
Cost vs. Quality Trade-offs
| Strategy | Cost Reduction | Quality Impact | When to Use |
|---|---|---|---|
| Model routing | 60–80% | Low (if classifier is good) | Always |
| Exact caching | 20–40% | None | Deterministic tools |
| Semantic caching | 10–30% | Very low | High query repetition |
| Prompt caching | 10–20% | None | Long stable system prompts |
| Token budgets | Varies | Medium (truncates context) | Runaway loop prevention |
| Shorter context | 20–50% | Medium–High | After careful testing |
Tip: Track cost per successful task completion, not just cost per call. An agent that costs $0.05 but succeeds 95% of the time is cheaper than one that costs $0.02 but succeeds 60% — because failed tasks often get retried at full cost.
Summary
Production cost control is a stack, not a single technique. Start with model routing (highest ROI, no quality loss), add exact caching for deterministic tools, semantic caching for repeated LLM queries, and native prompt caching for long system prompts. Layer on token budgets to prevent runaway loops, rate limiters to stay within provider limits, and a spend monitor to catch budget overruns before they become surprises. Instrument every LLM call with its token counts so you have visibility into where money is actually being spent.