Deployment and Testing

Deployment Strategies

15m read

Deployment Strategies for AI Agents

Deploying an AI agent to production is more complex than deploying a standard web service. Agent requests can run for 30–120 seconds, consume variable amounts of expensive API budget, and produce outputs that need monitoring and cost attribution. This final lesson covers the deployment patterns — REST wrapping, async task queues, scaling, monitoring, and cost management — that allow teams to ship agents with confidence.


Choosing Your Deployment Pattern

Not all agents should be deployed the same way. The right pattern depends on expected latency and interactivity:

PatternBest ForLatencyComplexity
Synchronous RESTShort tasks (< 15s), interactive chatLowLow
Async task queueLong-running tasks (15s–10min), batch jobsHighMedium
Streaming RESTChat UIs that want live token outputLow (perceived)Medium
Serverless functionsSporadic, bursty workloadsVariable (cold start)Low
Background workerScheduled tasks, ETL pipelinesOfflineLow

Rule of thumb: If your agent consistently takes more than 15 seconds, use an async task queue. Browser clients will time out on synchronous requests longer than ~30 seconds, and mobile clients even sooner.


Pattern 1: Synchronous REST API with FastAPI

For agents that complete in under 15 seconds, a synchronous FastAPI endpoint is the simplest production-ready deployment.

# agent/main.py
from __future__ import annotations

import logging
import time
from contextlib import asynccontextmanager
from typing import Optional

from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field

logger = logging.getLogger(__name__)


# ─────────────────────────────────────────────────
# Request / Response Models
# ─────────────────────────────────────────────────

class RunRequest(BaseModel):
    """Request body for a synchronous agent run."""
    message: str = Field(..., min_length=1, max_length=10_000)
    session_id: Optional[str] = Field(default=None)
    user_id: str = Field(...)
    stream: bool = Field(default=False)


class RunResponse(BaseModel):
    """Response body for a completed agent run."""
    content: str
    session_id: str
    iterations: int
    tool_calls_made: list[str]
    latency_ms: float
    success: bool


# ─────────────────────────────────────────────────
# Application Lifecycle
# ─────────────────────────────────────────────────

agent_instance = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialise the agent on startup and clean up on shutdown."""
    global agent_instance
    logger.info("[INFO][lifespan] Initialising agent...")
    agent_instance = create_production_agent()
    logger.info("[INFO][lifespan] Agent ready.")
    yield
    logger.info("[INFO][lifespan] Shutting down agent...")
    agent_instance = None


app = FastAPI(
    title="AI Agent API",
    version="1.0.0",
    lifespan=lifespan,
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.example.com"],
    allow_methods=["POST", "GET"],
    allow_headers=["Authorization", "Content-Type"],
)


# ─────────────────────────────────────────────────
# Authentication
# ─────────────────────────────────────────────────

async def verify_api_key(x_api_key: str = Header(...)) -> str:
    """Validate the API key from the X-Api-Key request header."""
    from agent.config import settings
    if x_api_key != settings.api_key:
        raise HTTPException(status_code=401, detail="Invalid API key")
    return x_api_key


# ─────────────────────────────────────────────────
# Endpoints
# ─────────────────────────────────────────────────

@app.post("/run", response_model=RunResponse, dependencies=[Depends(verify_api_key)])
async def run_agent(request: RunRequest):
    """
    Execute the agent synchronously and return the final answer.

    Suitable for tasks expected to complete within 15 seconds.
    For longer tasks, use the /tasks endpoint with async processing.
    """
    if agent_instance is None:
        raise HTTPException(status_code=503, detail="Agent not yet initialised")

    start = time.monotonic()

    from agent.state import AgentRequest
    agent_request = AgentRequest(
        user_id=request.user_id,
        session_id=request.session_id or f"sess_{int(time.time())}",
        message=request.message,
    )

    try:
        response = agent_instance.handle(agent_request)
    except Exception as exc:
        logger.error("[ERROR][run_agent] Unhandled exception: %s", exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Internal agent error")

    latency_ms = (time.monotonic() - start) * 1000

    logger.info(
        "[INFO][run_agent] session=%s iterations=%d latency=%.1fms success=%s",
        agent_request.session_id,
        response.iterations,
        latency_ms,
        response.success,
    )

    return RunResponse(
        content=response.content,
        session_id=agent_request.session_id,
        iterations=response.iterations,
        tool_calls_made=response.tool_calls_made,
        latency_ms=round(latency_ms, 1),
        success=response.success,
    )


@app.get("/health")
async def health():
    """Liveness and readiness check used by load balancers and Docker."""
    return {
        "status": "healthy",
        "agent_ready": agent_instance is not None,
    }

Pattern 2: Async Task Queue with Celery and Redis

For tasks that take more than 15 seconds, decouple the HTTP request from the agent execution using a task queue. The client submits a job and polls for the result — or you push a webhook when the task completes.

Client → POST /tasks → FastAPI → Celery queue → Worker process → Result store
                  ↓                                                      ↑
             task_id                                                      │
                  └─────────── GET /tasks/{task_id} ────────────────────┘

Celery Task Definition

# agent/tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
import time

from agent.config import settings
from agent.factory import create_production_agent
from agent.state import AgentRequest

logger = get_task_logger(__name__)

# Configure Celery with Redis as both broker and result backend
celery_app = Celery(
    "agent_tasks",
    broker=settings.redis_url,
    backend=settings.redis_url,
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_soft_time_limit=300,   # 5 minutes — raise SoftTimeLimitExceeded
    task_time_limit=360,        # 6 minutes — SIGKILL
    task_acks_late=True,        # Acknowledge only after task completes
    worker_prefetch_multiplier=1,  # Process one task at a time per worker
)


@celery_app.task(
    name="agent.run",
    bind=True,
    max_retries=2,
    default_retry_delay=5,
)
def run_agent_task(
    self,
    user_id: str,
    session_id: str,
    message: str,
) -> dict:
    """
    Execute the agent as an async Celery task.

    Stores the result in Redis via Celery's result backend.
    The FastAPI layer polls this result by task_id.
    """
    start = time.monotonic()
    logger.info(
        "[INFO][run_agent_task] Starting task_id=%s user=%s",
        self.request.id, user_id,
    )

    try:
        agent = create_production_agent()
        request = AgentRequest(
            user_id=user_id,
            session_id=session_id,
            message=message,
        )
        response = agent.handle(request)
        latency_ms = (time.monotonic() - start) * 1000

        logger.info(
            "[INFO][run_agent_task] Completed task_id=%s in %.1fms",
            self.request.id, latency_ms,
        )

        return {
            "content": response.content,
            "success": response.success,
            "iterations": response.iterations,
            "tool_calls_made": response.tool_calls_made,
            "latency_ms": round(latency_ms, 1),
        }

    except Exception as exc:
        logger.error(
            "[ERROR][run_agent_task] task_id=%s failed: %s",
            self.request.id, exc, exc_info=True,
        )
        raise self.retry(exc=exc)

FastAPI Endpoints for Async Tasks

# agent/main.py — additional async task endpoints

from celery.result import AsyncResult


class TaskCreateResponse(BaseModel):
    task_id: str
    status: str = "queued"


class TaskStatusResponse(BaseModel):
    task_id: str
    status: str                       # queued | running | completed | failed
    result: Optional[dict] = None
    error: Optional[str] = None


@app.post("/tasks", response_model=TaskCreateResponse, dependencies=[Depends(verify_api_key)])
async def create_task(request: RunRequest):
    """
    Enqueue an agent run as an async background task.

    Returns a task_id immediately. Poll GET /tasks/{task_id} for the result.
    """
    import uuid
    session_id = request.session_id or str(uuid.uuid4())

    task = run_agent_task.delay(
        user_id=request.user_id,
        session_id=session_id,
        message=request.message,
    )

    logger.info("[INFO][create_task] Enqueued task_id=%s", task.id)
    return TaskCreateResponse(task_id=task.id)


@app.get("/tasks/{task_id}", response_model=TaskStatusResponse)
async def get_task_status(task_id: str, _: str = Depends(verify_api_key)):
    """
    Poll for the status and result of an async agent task.

    Status values:
    - queued:    task is waiting in the queue
    - running:   task is actively being processed by a worker
    - completed: task finished successfully; result is in the response
    - failed:    task raised an unhandled exception; error is in the response
    """
    result = AsyncResult(task_id, app=celery_app)

    if result.state == "PENDING":
        return TaskStatusResponse(task_id=task_id, status="queued")
    if result.state == "STARTED":
        return TaskStatusResponse(task_id=task_id, status="running")
    if result.state == "SUCCESS":
        return TaskStatusResponse(
            task_id=task_id,
            status="completed",
            result=result.result,
        )
    if result.state == "FAILURE":
        return TaskStatusResponse(
            task_id=task_id,
            status="failed",
            error=str(result.result),
        )

    return TaskStatusResponse(task_id=task_id, status=result.state.lower())

Scaling Considerations

Horizontal Scaling

Both the FastAPI layer and the Celery workers can be scaled horizontally without coordination:

# docker-compose.yml — scaled deployment
services:
  agent_api:
    build: .
    command: uvicorn agent.main:app --host 0.0.0.0 --port 8000 --workers 4
    deploy:
      replicas: 2      # Two API containers behind a load balancer

  agent_worker:
    build: .
    command: celery -A agent.tasks.celery_app worker --loglevel=info --concurrency=2
    deploy:
      replicas: 4      # Four worker containers, each processing 2 tasks at once

  redis:
    image: redis:7-alpine

Note: Each Celery worker makes real LLM API calls. Two replicas × two concurrency = four simultaneous LLM calls. At GPT-4o prices, this can be expensive. Always set concurrency limits based on your LLM tier's rate limits.

Rate Limiting

Protect against accidental or malicious overuse with per-user rate limits:

# agent/middleware.py
from fastapi import Request, HTTPException
import time
from collections import defaultdict


class RateLimiter:
    """
    Simple in-memory rate limiter using a sliding window.

    For production, replace the in-memory dict with a Redis counter
    so limits are shared across all API container replicas.
    """

    def __init__(self, max_requests: int, window_seconds: int) -> None:
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self._windows: dict[str, list[float]] = defaultdict(list)

    def check(self, key: str) -> None:
        """Raise HTTPException if the key has exceeded its rate limit."""
        now = time.monotonic()
        window_start = now - self.window_seconds

        # Purge timestamps outside the current window
        self._windows[key] = [t for t in self._windows[key] if t > window_start]

        if len(self._windows[key]) >= self.max_requests:
            raise HTTPException(
                status_code=429,
                detail=f"Rate limit exceeded: max {self.max_requests} requests per {self.window_seconds}s",
            )

        self._windows[key].append(now)


rate_limiter = RateLimiter(max_requests=10, window_seconds=60)

Monitoring and Logging in Production

Structured Logging

Use structured JSON logs so your log aggregation platform (Datadog, Grafana Loki, CloudWatch) can index and query them:

# agent/logging_config.py
import logging
import json
from datetime import datetime, timezone


class JSONFormatter(logging.Formatter):
    """Emit log records as single-line JSON objects."""

    def format(self, record: logging.LogRecord) -> str:
        log_obj = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        if record.exc_info:
            log_obj["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_obj)


def configure_logging(level: str = "INFO") -> None:
    """Configure root logger with JSON output."""
    handler = logging.StreamHandler()
    handler.setFormatter(JSONFormatter())
    logging.basicConfig(level=level, handlers=[handler], force=True)

Metrics to Track

For every agent run, emit these metrics to your observability platform:

MetricTypeDescription
agent.run.latency_msHistogramEnd-to-end request latency
agent.run.iterationsHistogramNumber of reasoning loop iterations
agent.run.success_rateCounterRatio of successful to failed runs
agent.llm.tokens_usedCounterTokens consumed per run (input + output)
agent.llm.cost_usdCounterEstimated cost per run
agent.tool.calls_totalCounterTool calls, labelled by tool name
agent.tool.error_rateCounterTool errors, labelled by tool name and error type

Cost Management for LLM API Calls

LLM API costs can spiral quickly in production. These strategies keep them under control:

Prompt Caching

Both OpenAI and Anthropic offer prompt caching for repeated system prompts. Ensure your system prompt is at the start of your message list and is large enough to benefit (typically 1 000+ tokens):

# The system prompt is sent on every request — cache it
SYSTEM_PROMPT = """You are a helpful research assistant...
[include enough context to make caching cost-effective]
"""

# For Anthropic Claude, enable caching explicitly
messages = [
    {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": SYSTEM_PROMPT,
                "cache_control": {"type": "ephemeral"},  # Cache this block
            },
            {
                "type": "text",
                "text": user_message,
            }
        ]
    }
]

Per-Run Budget Caps

Set a hard token budget per agent run to prevent runaway costs:

class BudgetedOrchestrator:
    """Orchestrator that stops the reasoning loop when the token budget is exhausted."""

    def __init__(self, llm, memory, tools, max_tokens_per_run: int = 50_000):
        self.llm = llm
        self.memory = memory
        self.tools = tools
        self.max_tokens_per_run = max_tokens_per_run
        self._tokens_used = 0

    def handle(self, request):
        self._tokens_used = 0
        for iteration in range(20):
            response = self.llm.complete(self._build_messages())
            self._tokens_used += response.get("usage", {}).get("total_tokens", 0)

            if self._tokens_used >= self.max_tokens_per_run:
                return AgentResponse(
                    content="I was unable to complete this task within the token budget.",
                    success=False,
                    error=f"Token budget of {self.max_tokens_per_run} exceeded",
                    iterations=iteration + 1,
                    tool_calls_made=[],
                )

            if response.get("finish_reason") == "stop":
                return self._build_success_response(response, iteration)

            self._handle_tool_calls(response)

Cost Estimation Before Running

For user-facing agents, estimate the likely cost before starting an expensive run:

def estimate_run_cost(
    message: str,
    model: str = "gpt-4o",
    estimated_iterations: int = 5,
) -> float:
    """
    Rough cost estimate in USD for a single agent run.

    Uses conservative token estimates. Actual cost may be higher if
    tool outputs are large or the task requires many iterations.
    """
    # Rough prices per 1K tokens (check provider pricing pages for current rates)
    pricing = {
        "gpt-4o": {"input": 0.0025, "output": 0.01},
        "gpt-4o-mini": {"input": 0.000150, "output": 0.000600},
        "claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
    }
    rates = pricing.get(model, pricing["gpt-4o"])

    # Estimate: 500 tokens input + 300 tokens output per iteration
    input_tokens = 500 * estimated_iterations
    output_tokens = 300 * estimated_iterations
    prompt_tokens = len(message.split()) * 1.3  # Rough tokenisation estimate

    total_input = (input_tokens + prompt_tokens) / 1000
    total_output = output_tokens / 1000

    return (total_input * rates["input"]) + (total_output * rates["output"])

Deployment Architecture Overview

                     ┌──────────────┐
                     │   Client     │
                     │ (Web / API)  │
                     └──────┬───────┘
                            │ HTTPS
                     ┌──────▼───────┐
                     │ Load Balancer │
                     │  (nginx /    │
                     │   ALB)       │
                     └──────┬───────┘
                  ┌─────────┴──────────┐
         ┌────────▼────────┐   ┌───────▼────────┐
         │  FastAPI API    │   │  FastAPI API   │
         │  Replica 1      │   │  Replica 2     │
         └────────┬────────┘   └───────┬────────┘
                  └─────────┬──────────┘
                            │
                   ┌────────▼────────┐
                   │  Redis (Broker  │
                   │  + Result Store)│
                   └────────┬────────┘
           ┌────────────────┼─────────────────┐
  ┌────────▼───────┐ ┌──────▼──────┐ ┌────────▼───────┐
  │ Celery Worker 1│ │  Worker 2   │ │  Worker 3      │
  │ (Agent runs)  │ │ (Agent runs)│ │ (Agent runs)   │
  └───────┬────────┘ └──────┬──────┘ └────────┬───────┘
          └──────────────────┼─────────────────┘
                             │
                    ┌────────▼────────┐
                    │   PostgreSQL    │
                    │  (Long-term     │
                    │   memory)       │
                    └─────────────────┘

Key Takeaways

  • Use synchronous REST for short tasks (< 15s) and async task queues for long-running jobs.
  • FastAPI + Celery + Redis is the standard Python stack for async agent deployment.
  • Always implement rate limiting — both per user and globally — to control costs.
  • Use structured JSON logging so your observability platform can index and alert on agent behaviour.
  • Track token usage and cost per run from day one — surprises at billing time are avoidable.
  • Set per-run token budgets to prevent runaway costs from malfunctioning agents.
  • Prompt caching can reduce costs by 60–90% for agents with large, repeated system prompts.

Further Reading

Deployment and Testing — Check Your Understanding

3 вопроса · проходной балл 70%

  1. 1.Why is testing AI agents harder than testing traditional software?

  2. 2.What is the primary benefit of containerizing an agent with Docker?

  3. 3.What deployment pattern minimizes risk when releasing a new version of a production agent?

Осталось ответить: 3