Deployment Strategies for AI Agents
Deploying an AI agent to production is more complex than deploying a standard web service. Agent requests can run for 30–120 seconds, consume variable amounts of expensive API budget, and produce outputs that need monitoring and cost attribution. This final lesson covers the deployment patterns — REST wrapping, async task queues, scaling, monitoring, and cost management — that allow teams to ship agents with confidence.
Choosing Your Deployment Pattern
Not all agents should be deployed the same way. The right pattern depends on expected latency and interactivity:
| Pattern | Best For | Latency | Complexity |
|---|---|---|---|
| Synchronous REST | Short tasks (< 15s), interactive chat | Low | Low |
| Async task queue | Long-running tasks (15s–10min), batch jobs | High | Medium |
| Streaming REST | Chat UIs that want live token output | Low (perceived) | Medium |
| Serverless functions | Sporadic, bursty workloads | Variable (cold start) | Low |
| Background worker | Scheduled tasks, ETL pipelines | Offline | Low |
Rule of thumb: If your agent consistently takes more than 15 seconds, use an async task queue. Browser clients will time out on synchronous requests longer than ~30 seconds, and mobile clients even sooner.
Pattern 1: Synchronous REST API with FastAPI
For agents that complete in under 15 seconds, a synchronous FastAPI endpoint is the simplest production-ready deployment.
# agent/main.py
from __future__ import annotations
import logging
import time
from contextlib import asynccontextmanager
from typing import Optional
from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────
# Request / Response Models
# ─────────────────────────────────────────────────
class RunRequest(BaseModel):
"""Request body for a synchronous agent run."""
message: str = Field(..., min_length=1, max_length=10_000)
session_id: Optional[str] = Field(default=None)
user_id: str = Field(...)
stream: bool = Field(default=False)
class RunResponse(BaseModel):
"""Response body for a completed agent run."""
content: str
session_id: str
iterations: int
tool_calls_made: list[str]
latency_ms: float
success: bool
# ─────────────────────────────────────────────────
# Application Lifecycle
# ─────────────────────────────────────────────────
agent_instance = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialise the agent on startup and clean up on shutdown."""
global agent_instance
logger.info("[INFO][lifespan] Initialising agent...")
agent_instance = create_production_agent()
logger.info("[INFO][lifespan] Agent ready.")
yield
logger.info("[INFO][lifespan] Shutting down agent...")
agent_instance = None
app = FastAPI(
title="AI Agent API",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com"],
allow_methods=["POST", "GET"],
allow_headers=["Authorization", "Content-Type"],
)
# ─────────────────────────────────────────────────
# Authentication
# ─────────────────────────────────────────────────
async def verify_api_key(x_api_key: str = Header(...)) -> str:
"""Validate the API key from the X-Api-Key request header."""
from agent.config import settings
if x_api_key != settings.api_key:
raise HTTPException(status_code=401, detail="Invalid API key")
return x_api_key
# ─────────────────────────────────────────────────
# Endpoints
# ─────────────────────────────────────────────────
@app.post("/run", response_model=RunResponse, dependencies=[Depends(verify_api_key)])
async def run_agent(request: RunRequest):
"""
Execute the agent synchronously and return the final answer.
Suitable for tasks expected to complete within 15 seconds.
For longer tasks, use the /tasks endpoint with async processing.
"""
if agent_instance is None:
raise HTTPException(status_code=503, detail="Agent not yet initialised")
start = time.monotonic()
from agent.state import AgentRequest
agent_request = AgentRequest(
user_id=request.user_id,
session_id=request.session_id or f"sess_{int(time.time())}",
message=request.message,
)
try:
response = agent_instance.handle(agent_request)
except Exception as exc:
logger.error("[ERROR][run_agent] Unhandled exception: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail="Internal agent error")
latency_ms = (time.monotonic() - start) * 1000
logger.info(
"[INFO][run_agent] session=%s iterations=%d latency=%.1fms success=%s",
agent_request.session_id,
response.iterations,
latency_ms,
response.success,
)
return RunResponse(
content=response.content,
session_id=agent_request.session_id,
iterations=response.iterations,
tool_calls_made=response.tool_calls_made,
latency_ms=round(latency_ms, 1),
success=response.success,
)
@app.get("/health")
async def health():
"""Liveness and readiness check used by load balancers and Docker."""
return {
"status": "healthy",
"agent_ready": agent_instance is not None,
}
Pattern 2: Async Task Queue with Celery and Redis
For tasks that take more than 15 seconds, decouple the HTTP request from the agent execution using a task queue. The client submits a job and polls for the result — or you push a webhook when the task completes.
Client → POST /tasks → FastAPI → Celery queue → Worker process → Result store
↓ ↑
task_id │
└─────────── GET /tasks/{task_id} ────────────────────┘
Celery Task Definition
# agent/tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
import time
from agent.config import settings
from agent.factory import create_production_agent
from agent.state import AgentRequest
logger = get_task_logger(__name__)
# Configure Celery with Redis as both broker and result backend
celery_app = Celery(
"agent_tasks",
broker=settings.redis_url,
backend=settings.redis_url,
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_soft_time_limit=300, # 5 minutes — raise SoftTimeLimitExceeded
task_time_limit=360, # 6 minutes — SIGKILL
task_acks_late=True, # Acknowledge only after task completes
worker_prefetch_multiplier=1, # Process one task at a time per worker
)
@celery_app.task(
name="agent.run",
bind=True,
max_retries=2,
default_retry_delay=5,
)
def run_agent_task(
self,
user_id: str,
session_id: str,
message: str,
) -> dict:
"""
Execute the agent as an async Celery task.
Stores the result in Redis via Celery's result backend.
The FastAPI layer polls this result by task_id.
"""
start = time.monotonic()
logger.info(
"[INFO][run_agent_task] Starting task_id=%s user=%s",
self.request.id, user_id,
)
try:
agent = create_production_agent()
request = AgentRequest(
user_id=user_id,
session_id=session_id,
message=message,
)
response = agent.handle(request)
latency_ms = (time.monotonic() - start) * 1000
logger.info(
"[INFO][run_agent_task] Completed task_id=%s in %.1fms",
self.request.id, latency_ms,
)
return {
"content": response.content,
"success": response.success,
"iterations": response.iterations,
"tool_calls_made": response.tool_calls_made,
"latency_ms": round(latency_ms, 1),
}
except Exception as exc:
logger.error(
"[ERROR][run_agent_task] task_id=%s failed: %s",
self.request.id, exc, exc_info=True,
)
raise self.retry(exc=exc)
FastAPI Endpoints for Async Tasks
# agent/main.py — additional async task endpoints
from celery.result import AsyncResult
class TaskCreateResponse(BaseModel):
task_id: str
status: str = "queued"
class TaskStatusResponse(BaseModel):
task_id: str
status: str # queued | running | completed | failed
result: Optional[dict] = None
error: Optional[str] = None
@app.post("/tasks", response_model=TaskCreateResponse, dependencies=[Depends(verify_api_key)])
async def create_task(request: RunRequest):
"""
Enqueue an agent run as an async background task.
Returns a task_id immediately. Poll GET /tasks/{task_id} for the result.
"""
import uuid
session_id = request.session_id or str(uuid.uuid4())
task = run_agent_task.delay(
user_id=request.user_id,
session_id=session_id,
message=request.message,
)
logger.info("[INFO][create_task] Enqueued task_id=%s", task.id)
return TaskCreateResponse(task_id=task.id)
@app.get("/tasks/{task_id}", response_model=TaskStatusResponse)
async def get_task_status(task_id: str, _: str = Depends(verify_api_key)):
"""
Poll for the status and result of an async agent task.
Status values:
- queued: task is waiting in the queue
- running: task is actively being processed by a worker
- completed: task finished successfully; result is in the response
- failed: task raised an unhandled exception; error is in the response
"""
result = AsyncResult(task_id, app=celery_app)
if result.state == "PENDING":
return TaskStatusResponse(task_id=task_id, status="queued")
if result.state == "STARTED":
return TaskStatusResponse(task_id=task_id, status="running")
if result.state == "SUCCESS":
return TaskStatusResponse(
task_id=task_id,
status="completed",
result=result.result,
)
if result.state == "FAILURE":
return TaskStatusResponse(
task_id=task_id,
status="failed",
error=str(result.result),
)
return TaskStatusResponse(task_id=task_id, status=result.state.lower())
Scaling Considerations
Horizontal Scaling
Both the FastAPI layer and the Celery workers can be scaled horizontally without coordination:
# docker-compose.yml — scaled deployment
services:
agent_api:
build: .
command: uvicorn agent.main:app --host 0.0.0.0 --port 8000 --workers 4
deploy:
replicas: 2 # Two API containers behind a load balancer
agent_worker:
build: .
command: celery -A agent.tasks.celery_app worker --loglevel=info --concurrency=2
deploy:
replicas: 4 # Four worker containers, each processing 2 tasks at once
redis:
image: redis:7-alpine
Note: Each Celery worker makes real LLM API calls. Two replicas × two concurrency = four simultaneous LLM calls. At GPT-4o prices, this can be expensive. Always set concurrency limits based on your LLM tier's rate limits.
Rate Limiting
Protect against accidental or malicious overuse with per-user rate limits:
# agent/middleware.py
from fastapi import Request, HTTPException
import time
from collections import defaultdict
class RateLimiter:
"""
Simple in-memory rate limiter using a sliding window.
For production, replace the in-memory dict with a Redis counter
so limits are shared across all API container replicas.
"""
def __init__(self, max_requests: int, window_seconds: int) -> None:
self.max_requests = max_requests
self.window_seconds = window_seconds
self._windows: dict[str, list[float]] = defaultdict(list)
def check(self, key: str) -> None:
"""Raise HTTPException if the key has exceeded its rate limit."""
now = time.monotonic()
window_start = now - self.window_seconds
# Purge timestamps outside the current window
self._windows[key] = [t for t in self._windows[key] if t > window_start]
if len(self._windows[key]) >= self.max_requests:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded: max {self.max_requests} requests per {self.window_seconds}s",
)
self._windows[key].append(now)
rate_limiter = RateLimiter(max_requests=10, window_seconds=60)
Monitoring and Logging in Production
Structured Logging
Use structured JSON logs so your log aggregation platform (Datadog, Grafana Loki, CloudWatch) can index and query them:
# agent/logging_config.py
import logging
import json
from datetime import datetime, timezone
class JSONFormatter(logging.Formatter):
"""Emit log records as single-line JSON objects."""
def format(self, record: logging.LogRecord) -> str:
log_obj = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
if record.exc_info:
log_obj["exception"] = self.formatException(record.exc_info)
return json.dumps(log_obj)
def configure_logging(level: str = "INFO") -> None:
"""Configure root logger with JSON output."""
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.basicConfig(level=level, handlers=[handler], force=True)
Metrics to Track
For every agent run, emit these metrics to your observability platform:
| Metric | Type | Description |
|---|---|---|
agent.run.latency_ms | Histogram | End-to-end request latency |
agent.run.iterations | Histogram | Number of reasoning loop iterations |
agent.run.success_rate | Counter | Ratio of successful to failed runs |
agent.llm.tokens_used | Counter | Tokens consumed per run (input + output) |
agent.llm.cost_usd | Counter | Estimated cost per run |
agent.tool.calls_total | Counter | Tool calls, labelled by tool name |
agent.tool.error_rate | Counter | Tool errors, labelled by tool name and error type |
Cost Management for LLM API Calls
LLM API costs can spiral quickly in production. These strategies keep them under control:
Prompt Caching
Both OpenAI and Anthropic offer prompt caching for repeated system prompts. Ensure your system prompt is at the start of your message list and is large enough to benefit (typically 1 000+ tokens):
# The system prompt is sent on every request — cache it
SYSTEM_PROMPT = """You are a helpful research assistant...
[include enough context to make caching cost-effective]
"""
# For Anthropic Claude, enable caching explicitly
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"}, # Cache this block
},
{
"type": "text",
"text": user_message,
}
]
}
]
Per-Run Budget Caps
Set a hard token budget per agent run to prevent runaway costs:
class BudgetedOrchestrator:
"""Orchestrator that stops the reasoning loop when the token budget is exhausted."""
def __init__(self, llm, memory, tools, max_tokens_per_run: int = 50_000):
self.llm = llm
self.memory = memory
self.tools = tools
self.max_tokens_per_run = max_tokens_per_run
self._tokens_used = 0
def handle(self, request):
self._tokens_used = 0
for iteration in range(20):
response = self.llm.complete(self._build_messages())
self._tokens_used += response.get("usage", {}).get("total_tokens", 0)
if self._tokens_used >= self.max_tokens_per_run:
return AgentResponse(
content="I was unable to complete this task within the token budget.",
success=False,
error=f"Token budget of {self.max_tokens_per_run} exceeded",
iterations=iteration + 1,
tool_calls_made=[],
)
if response.get("finish_reason") == "stop":
return self._build_success_response(response, iteration)
self._handle_tool_calls(response)
Cost Estimation Before Running
For user-facing agents, estimate the likely cost before starting an expensive run:
def estimate_run_cost(
message: str,
model: str = "gpt-4o",
estimated_iterations: int = 5,
) -> float:
"""
Rough cost estimate in USD for a single agent run.
Uses conservative token estimates. Actual cost may be higher if
tool outputs are large or the task requires many iterations.
"""
# Rough prices per 1K tokens (check provider pricing pages for current rates)
pricing = {
"gpt-4o": {"input": 0.0025, "output": 0.01},
"gpt-4o-mini": {"input": 0.000150, "output": 0.000600},
"claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
}
rates = pricing.get(model, pricing["gpt-4o"])
# Estimate: 500 tokens input + 300 tokens output per iteration
input_tokens = 500 * estimated_iterations
output_tokens = 300 * estimated_iterations
prompt_tokens = len(message.split()) * 1.3 # Rough tokenisation estimate
total_input = (input_tokens + prompt_tokens) / 1000
total_output = output_tokens / 1000
return (total_input * rates["input"]) + (total_output * rates["output"])
Deployment Architecture Overview
┌──────────────┐
│ Client │
│ (Web / API) │
└──────┬───────┘
│ HTTPS
┌──────▼───────┐
│ Load Balancer │
│ (nginx / │
│ ALB) │
└──────┬───────┘
┌─────────┴──────────┐
┌────────▼────────┐ ┌───────▼────────┐
│ FastAPI API │ │ FastAPI API │
│ Replica 1 │ │ Replica 2 │
└────────┬────────┘ └───────┬────────┘
└─────────┬──────────┘
│
┌────────▼────────┐
│ Redis (Broker │
│ + Result Store)│
└────────┬────────┘
┌────────────────┼─────────────────┐
┌────────▼───────┐ ┌──────▼──────┐ ┌────────▼───────┐
│ Celery Worker 1│ │ Worker 2 │ │ Worker 3 │
│ (Agent runs) │ │ (Agent runs)│ │ (Agent runs) │
└───────┬────────┘ └──────┬──────┘ └────────┬───────┘
└──────────────────┼─────────────────┘
│
┌────────▼────────┐
│ PostgreSQL │
│ (Long-term │
│ memory) │
└─────────────────┘
Key Takeaways
- Use synchronous REST for short tasks (< 15s) and async task queues for long-running jobs.
- FastAPI + Celery + Redis is the standard Python stack for async agent deployment.
- Always implement rate limiting — both per user and globally — to control costs.
- Use structured JSON logging so your observability platform can index and alert on agent behaviour.
- Track token usage and cost per run from day one — surprises at billing time are avoidable.
- Set per-run token budgets to prevent runaway costs from malfunctioning agents.
- Prompt caching can reduce costs by 60–90% for agents with large, repeated system prompts.
Further Reading
- FastAPI documentation — the standard Python async web framework
- Celery documentation — distributed task queue
- OpenAI prompt caching guide — official caching documentation
- LangSmith — production monitoring and evaluation for LangChain agents