API Integration

Error Handling and Rate Limits

11m read

Error Handling and Rate Limits

Production LLM integrations fail in predictable ways. Understanding the failure modes and implementing proper retry logic, rate limiting, and fallback strategies is what separates a hobby project from a production system.

Common API Error Types

ErrorHTTP StatusCauseAction
Rate limit exceeded429Too many requests per minute/dayRetry with exponential backoff
Authentication error401Invalid or expired API keyFix credentials, no retry
Bad request400Invalid parameters, malformed inputFix request, no retry
Context length exceeded400Input + max_tokens > context windowTruncate input or reduce max_tokens
Service unavailable503Provider outage or overloadRetry with backoff
Gateway timeout504Request took too longRetry, possibly with smaller input
Content policy violation400Input or output violates usage policyReview content, no automatic retry

Exponential Backoff with Jitter

import asyncio
import random
import logging
from typing import TypeVar, Callable, Awaitable

T = TypeVar("T")

async def retry_with_backoff(
    func: Callable[[], Awaitable[T]],
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    retryable_status_codes: set[int] = frozenset({429, 500, 502, 503, 504}),
) -> T:
    """
    Retry an async function with exponential backoff and full jitter.
    
    Full jitter formula: delay = random(0, min(max_delay, base * 2^attempt))
    This avoids thundering herd when many clients retry simultaneously.
    """
    logger = logging.getLogger(__name__)
    last_error = None

    for attempt in range(max_retries + 1):
        try:
            return await func()
        except Exception as exc:
            last_error = exc
            
            # Check if error is retryable
            status_code = getattr(exc, "status_code", None)
            if status_code and status_code not in retryable_status_codes:
                logger.error(f"Non-retryable error {status_code}: {exc}")
                raise
            
            if attempt == max_retries:
                break
            
            # Calculate backoff with full jitter
            cap = min(max_delay, base_delay * (2 ** attempt))
            delay = random.uniform(0, cap)
            
            logger.warning(
                f"Attempt {attempt + 1}/{max_retries} failed. "
                f"Retrying in {delay:.2f}s. Error: {exc}"
            )
            await asyncio.sleep(delay)

    raise RuntimeError(f"All {max_retries} retries exhausted") from last_error

Handling Rate Limits Proactively

Instead of always hitting limits and retrying, implement a token bucket to stay within limits:

import time
from threading import Lock

class TokenBucketRateLimiter:
    """
    Token bucket algorithm for proactive rate limiting.
    
    Allows bursts up to capacity, then enforces a steady rate.
    """
    def __init__(self, requests_per_minute: int, tokens_per_minute: int):
        self.rpm_limit = requests_per_minute
        self.tpm_limit = tokens_per_minute
        self._request_tokens = requests_per_minute
        self._token_tokens = tokens_per_minute
        self._last_refill = time.monotonic()
        self._lock = Lock()

    def _refill(self) -> None:
        now = time.monotonic()
        elapsed_minutes = (now - self._last_refill) / 60.0
        self._request_tokens = min(
            self.rpm_limit,
            self._request_tokens + elapsed_minutes * self.rpm_limit
        )
        self._token_tokens = min(
            self.tpm_limit,
            self._token_tokens + elapsed_minutes * self.tpm_limit
        )
        self._last_refill = now

    def acquire(self, estimated_tokens: int = 100) -> float:
        """Returns wait time in seconds. 0 if request can proceed immediately."""
        with self._lock:
            self._refill()
            
            request_wait = max(0, (1 - self._request_tokens) / self.rpm_limit * 60)
            token_wait = max(0, (estimated_tokens - self._token_tokens) / self.tpm_limit * 60)
            wait_time = max(request_wait, token_wait)
            
            if wait_time == 0:
                self._request_tokens -= 1
                self._token_tokens -= estimated_tokens
            
            return wait_time

# Usage
limiter = TokenBucketRateLimiter(requests_per_minute=60, tokens_per_minute=90_000)

async def rate_limited_call(prompt: str) -> str:
    wait = limiter.acquire(estimated_tokens=len(prompt) // 4)
    if wait > 0:
        await asyncio.sleep(wait)
    
    return await retry_with_backoff(
        lambda: async_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}]
        )
    )

Context Length Error Handling

import tiktoken

def truncate_to_fit(
    messages: list[dict],
    model: str = "gpt-4o-mini",
    max_context: int = 128_000,
    max_output_tokens: int = 2_000,
    buffer_tokens: int = 100,
) -> list[dict]:
    """
    Truncate message history to fit within the context window.
    
    Preserves: system message + most recent messages.
    Removes: oldest non-system messages first.
    """
    enc = tiktoken.encoding_for_model(model)
    
    def count_tokens(msgs: list[dict]) -> int:
        return sum(len(enc.encode(m.get("content", ""))) + 4 for m in msgs)
    
    available = max_context - max_output_tokens - buffer_tokens
    
    # Always keep system message
    system_msgs = [m for m in messages if m["role"] == "system"]
    other_msgs = [m for m in messages if m["role"] != "system"]
    
    # Remove oldest non-system messages until we fit
    while count_tokens(system_msgs + other_msgs) > available and other_msgs:
        other_msgs.pop(0)
    
    return system_msgs + other_msgs

Fallback Chains

When your primary model fails, fall back to a cheaper or more available alternative:

from openai import OpenAI, APIError
import anthropic

primary_client = OpenAI()
fallback_client = anthropic.Anthropic()

async def resilient_completion(prompt: str) -> str:
    """Try GPT-4o-mini first, fall back to Claude Haiku on failure."""
    try:
        response = await primary_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            timeout=30.0,
        )
        return response.choices[0].message.content
    except (APIError, TimeoutError) as primary_error:
        logging.warning(f"Primary model failed: {primary_error}. Using fallback.")
        try:
            response = await fallback_client.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}],
            )
            return response.content[0].text
        except Exception as fallback_error:
            logging.error(f"Fallback also failed: {fallback_error}")
            raise RuntimeError("All AI providers unavailable") from fallback_error

Robust error handling is unglamorous but critical. The difference between an agent that crashes on the first API hiccup and one that handles errors gracefully is entirely in the implementation of these patterns.

API Integration — Check Your Understanding

3 вопроса · проходной балл 70%

  1. 1.What HTTP status code does the OpenAI API return when you exceed your rate limit?

  2. 2.What is exponential backoff in the context of API error handling?

  3. 3.Which API parameter controls the randomness/creativity of LLM outputs?

Осталось ответить: 3