Error Handling and Rate Limits
Production LLM integrations fail in predictable ways. Understanding the failure modes and implementing proper retry logic, rate limiting, and fallback strategies is what separates a hobby project from a production system.
Common API Error Types
| Error | HTTP Status | Cause | Action |
|---|---|---|---|
| Rate limit exceeded | 429 | Too many requests per minute/day | Retry with exponential backoff |
| Authentication error | 401 | Invalid or expired API key | Fix credentials, no retry |
| Bad request | 400 | Invalid parameters, malformed input | Fix request, no retry |
| Context length exceeded | 400 | Input + max_tokens > context window | Truncate input or reduce max_tokens |
| Service unavailable | 503 | Provider outage or overload | Retry with backoff |
| Gateway timeout | 504 | Request took too long | Retry, possibly with smaller input |
| Content policy violation | 400 | Input or output violates usage policy | Review content, no automatic retry |
Exponential Backoff with Jitter
import asyncio
import random
import logging
from typing import TypeVar, Callable, Awaitable
T = TypeVar("T")
async def retry_with_backoff(
func: Callable[[], Awaitable[T]],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
retryable_status_codes: set[int] = frozenset({429, 500, 502, 503, 504}),
) -> T:
"""
Retry an async function with exponential backoff and full jitter.
Full jitter formula: delay = random(0, min(max_delay, base * 2^attempt))
This avoids thundering herd when many clients retry simultaneously.
"""
logger = logging.getLogger(__name__)
last_error = None
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as exc:
last_error = exc
# Check if error is retryable
status_code = getattr(exc, "status_code", None)
if status_code and status_code not in retryable_status_codes:
logger.error(f"Non-retryable error {status_code}: {exc}")
raise
if attempt == max_retries:
break
# Calculate backoff with full jitter
cap = min(max_delay, base_delay * (2 ** attempt))
delay = random.uniform(0, cap)
logger.warning(
f"Attempt {attempt + 1}/{max_retries} failed. "
f"Retrying in {delay:.2f}s. Error: {exc}"
)
await asyncio.sleep(delay)
raise RuntimeError(f"All {max_retries} retries exhausted") from last_error
Handling Rate Limits Proactively
Instead of always hitting limits and retrying, implement a token bucket to stay within limits:
import time
from threading import Lock
class TokenBucketRateLimiter:
"""
Token bucket algorithm for proactive rate limiting.
Allows bursts up to capacity, then enforces a steady rate.
"""
def __init__(self, requests_per_minute: int, tokens_per_minute: int):
self.rpm_limit = requests_per_minute
self.tpm_limit = tokens_per_minute
self._request_tokens = requests_per_minute
self._token_tokens = tokens_per_minute
self._last_refill = time.monotonic()
self._lock = Lock()
def _refill(self) -> None:
now = time.monotonic()
elapsed_minutes = (now - self._last_refill) / 60.0
self._request_tokens = min(
self.rpm_limit,
self._request_tokens + elapsed_minutes * self.rpm_limit
)
self._token_tokens = min(
self.tpm_limit,
self._token_tokens + elapsed_minutes * self.tpm_limit
)
self._last_refill = now
def acquire(self, estimated_tokens: int = 100) -> float:
"""Returns wait time in seconds. 0 if request can proceed immediately."""
with self._lock:
self._refill()
request_wait = max(0, (1 - self._request_tokens) / self.rpm_limit * 60)
token_wait = max(0, (estimated_tokens - self._token_tokens) / self.tpm_limit * 60)
wait_time = max(request_wait, token_wait)
if wait_time == 0:
self._request_tokens -= 1
self._token_tokens -= estimated_tokens
return wait_time
# Usage
limiter = TokenBucketRateLimiter(requests_per_minute=60, tokens_per_minute=90_000)
async def rate_limited_call(prompt: str) -> str:
wait = limiter.acquire(estimated_tokens=len(prompt) // 4)
if wait > 0:
await asyncio.sleep(wait)
return await retry_with_backoff(
lambda: async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
)
Context Length Error Handling
import tiktoken
def truncate_to_fit(
messages: list[dict],
model: str = "gpt-4o-mini",
max_context: int = 128_000,
max_output_tokens: int = 2_000,
buffer_tokens: int = 100,
) -> list[dict]:
"""
Truncate message history to fit within the context window.
Preserves: system message + most recent messages.
Removes: oldest non-system messages first.
"""
enc = tiktoken.encoding_for_model(model)
def count_tokens(msgs: list[dict]) -> int:
return sum(len(enc.encode(m.get("content", ""))) + 4 for m in msgs)
available = max_context - max_output_tokens - buffer_tokens
# Always keep system message
system_msgs = [m for m in messages if m["role"] == "system"]
other_msgs = [m for m in messages if m["role"] != "system"]
# Remove oldest non-system messages until we fit
while count_tokens(system_msgs + other_msgs) > available and other_msgs:
other_msgs.pop(0)
return system_msgs + other_msgs
Fallback Chains
When your primary model fails, fall back to a cheaper or more available alternative:
from openai import OpenAI, APIError
import anthropic
primary_client = OpenAI()
fallback_client = anthropic.Anthropic()
async def resilient_completion(prompt: str) -> str:
"""Try GPT-4o-mini first, fall back to Claude Haiku on failure."""
try:
response = await primary_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
timeout=30.0,
)
return response.choices[0].message.content
except (APIError, TimeoutError) as primary_error:
logging.warning(f"Primary model failed: {primary_error}. Using fallback.")
try:
response = await fallback_client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text
except Exception as fallback_error:
logging.error(f"Fallback also failed: {fallback_error}")
raise RuntimeError("All AI providers unavailable") from fallback_error
Robust error handling is unglamorous but critical. The difference between an agent that crashes on the first API hiccup and one that handles errors gracefully is entirely in the implementation of these patterns.