Chapter 9: Rate Limiting
Overview
Rate limiting controls the number of API requests you can make within a specific time window. This chapter explains how rate limiting works, how to handle rate limit errors, and strategies for staying within limits.
What is Rate Limiting?
Rate limiting prevents abuse and ensures fair resource allocation by restricting the number of requests per time period. It protects the API infrastructure and ensures quality of service for all users.
Why Rate Limits Exist
- Infrastructure Protection: Prevent server overload
- Fair Usage: Ensure resources available to all users
- Cost Control: Limit computational expenses
- Security: Mitigate DDoS and abuse attempts
- Quality of Service: Maintain consistent performance
Rate Limit Structure
Rate Limits Applied Per:
├── Access Token (API Key)
├── User Account
├── IP Address (in extreme cases)
└── Endpoint (different limits for different routes)Rate Limit Tiers
Default Limits
| Tier | Requests/Second | Requests/Minute | Requests/Hour | Requests/Day |
|---|---|---|---|---|
| Free | 1 | 10 | 100 | 1,000 |
| Basic | 5 | 100 | 1,000 | 10,000 |
| Pro | 20 | 1,000 | 10,000 | 100,000 |
| Enterprise | Custom | Custom | Custom | Custom |
Per-Endpoint Limits
Different endpoints have different rate limits based on computational cost:
| Endpoint Category | Rate Multiplier | Example Limit (Pro) |
|---|---|---|
| Health Checks | No limit | Unlimited |
| Read Operations | 1.0x | 20 req/sec |
| Write Operations | 0.5x | 10 req/sec |
| LLM Generation | 0.2x | 4 req/sec |
| Heavy Processing | 0.1x | 2 req/sec |
Rate Limit Headers
Every API response includes rate limit information:
HTTP/1.1 200 OK
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 999
X-RateLimit-Reset: 1705327200
X-RateLimit-Window: 3600Header Descriptions
- X-RateLimit-Limit: Total requests allowed in window
- X-RateLimit-Remaining: Requests remaining in current window
- X-RateLimit-Reset: Unix timestamp when limit resets
- X-RateLimit-Window: Window duration in seconds
Reading Headers in Code
import requests
from datetime import datetime
response = requests.get(
'https://api.polysystems.ai/api/hub/health',
headers={'X-API-Key': api_key}
)
# Parse rate limit headers
limit = int(response.headers.get('X-RateLimit-Limit', 0))
remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
reset = int(response.headers.get('X-RateLimit-Reset', 0))
print(f"Rate Limit: {remaining}/{limit} remaining")
print(f"Resets at: {datetime.fromtimestamp(reset)}")
# Check if close to limit
if remaining < limit * 0.1: # Less than 10% remaining
print("⚠️ Warning: Approaching rate limit")Rate Limit Exceeded Error
When you exceed the rate limit:
HTTP/1.1 429 Too Many Requests
Retry-After: 60
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1705327200Response Body:
{
"error": "Rate limit exceeded",
"message": "You have exceeded the rate limit. Please retry after 60 seconds.",
"retry_after": 60,
"limit": 1000,
"window": "hour"
}Handling Rate Limits
Strategy 1: Exponential Backoff
import time
import requests
def api_call_with_backoff(url, headers, data, max_retries=5):
"""Make API call with exponential backoff"""
for attempt in range(max_retries):
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
# Rate limit exceeded
retry_after = int(response.headers.get('Retry-After', 60))
if attempt < max_retries - 1:
wait_time = min(retry_after, 2 ** attempt)
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise Exception("Max retries exceeded")
else:
response.raise_for_status()
raise Exception("Request failed after retries")
# Usage
result = api_call_with_backoff(
'https://api.polysystems.ai/api/hub/agents/chat',
headers={'X-API-Key': api_key},
data={'messages': [{'role': 'user', 'content': 'Hello'}]}
)Strategy 2: Rate Limit Aware Queue
import time
from collections import deque
from datetime import datetime, timedelta
class RateLimitedQueue:
def __init__(self, requests_per_second=10):
self.rate = requests_per_second
self.interval = 1.0 / requests_per_second
self.last_request = None
def wait_if_needed(self):
"""Wait if necessary to stay within rate limit"""
if self.last_request is not None:
elapsed = time.time() - self.last_request
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
self.last_request = time.time()
def make_request(self, url, headers, data):
"""Make request with rate limiting"""
self.wait_if_needed()
return requests.post(url, headers=headers, json=data)
# Usage
queue = RateLimitedQueue(requests_per_second=5)
for i in range(20):
response = queue.make_request(
'https://api.polysystems.ai/api/hub/memory',
headers={'X-API-Key': api_key},
data={'key': f'item_{i}', 'value': f'data_{i}'}
)
print(f"Request {i+1} completed")Strategy 3: Token Bucket Algorithm
import time
from threading import Lock
class TokenBucket:
def __init__(self, rate, capacity):
self.rate = rate # tokens per second
self.capacity = capacity # bucket size
self.tokens = capacity
self.last_update = time.time()
self.lock = Lock()
def consume(self, tokens=1):
"""Try to consume tokens, return True if successful"""
with self.lock:
now = time.time()
# Add tokens based on time elapsed
elapsed = now - self.last_update
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_update = now
# Try to consume
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_for_tokens(self, tokens=1):
"""Wait until tokens are available"""
while not self.consume(tokens):
time.sleep(0.1)
# Usage
bucket = TokenBucket(rate=10, capacity=100) # 10 req/sec, burst of 100
def make_api_call(data):
bucket.wait_for_tokens() # Wait if no tokens available
return requests.post(
'https://api.polysystems.ai/api/hub/agents/chat',
headers={'X-API-Key': api_key},
json=data
)
# Make many requests - automatically rate limited
for i in range(1000):
result = make_api_call({'messages': [{'role': 'user', 'content': f'Query {i}'}]})
print(f"Completed request {i+1}")Monitoring Rate Limits
Track Usage Across Requests
class RateLimitMonitor:
def __init__(self):
self.limits = {}
def update_from_headers(self, headers):
"""Update limits from response headers"""
self.limits = {
'limit': int(headers.get('X-RateLimit-Limit', 0)),
'remaining': int(headers.get('X-RateLimit-Remaining', 0)),
'reset': int(headers.get('X-RateLimit-Reset', 0)),
'updated_at': time.time()
}
def get_usage_percentage(self):
"""Get percentage of rate limit used"""
if not self.limits or self.limits['limit'] == 0:
return 0
used = self.limits['limit'] - self.limits['remaining']
return (used / self.limits['limit']) * 100
def time_until_reset(self):
"""Get seconds until rate limit resets"""
if not self.limits:
return 0
return max(0, self.limits['reset'] - time.time())
def should_wait(self, threshold=0.9):
"""Check if should wait before making more requests"""
usage = self.get_usage_percentage()
return usage > (threshold * 100)
# Usage
monitor = RateLimitMonitor()
response = requests.get(
'https://api.polysystems.ai/api/hub/agents',
headers={'X-API-Key': api_key}
)
monitor.update_from_headers(response.headers)
print(f"Rate limit usage: {monitor.get_usage_percentage():.1f}%")
print(f"Resets in: {monitor.time_until_reset():.0f} seconds")
if monitor.should_wait(threshold=0.9):
print("Approaching rate limit, slowing down requests")
time.sleep(5)Distributed Rate Limiting
Using Redis for Multi-Instance Coordination
import redis
import time
class DistributedRateLimiter:
def __init__(self, redis_client, key_prefix, limit, window):
self.redis = redis_client
self.key_prefix = key_prefix
self.limit = limit
self.window = window # in seconds
def is_allowed(self, identifier):
"""Check if request is allowed"""
key = f"{self.key_prefix}:{identifier}"
now = time.time()
window_start = now - self.window
# Remove old entries
self.redis.zremrangebyscore(key, 0, window_start)
# Count requests in current window
request_count = self.redis.zcard(key)
if request_count < self.limit:
# Add current request
self.redis.zadd(key, {str(now): now})
self.redis.expire(key, self.window)
return True
return False
def get_remaining(self, identifier):
"""Get remaining requests in window"""
key = f"{self.key_prefix}:{identifier}"
now = time.time()
window_start = now - self.window
self.redis.zremrangebyscore(key, 0, window_start)
request_count = self.redis.zcard(key)
return max(0, self.limit - request_count)
# Usage
redis_client = redis.Redis(host='localhost', port=6379, db=0)
limiter = DistributedRateLimiter(
redis_client,
key_prefix='api_rate_limit',
limit=100,
window=3600 # 100 requests per hour
)
# Check before making API call
user_id = 'user-123'
if limiter.is_allowed(user_id):
response = make_api_call()
else:
print(f"Rate limit exceeded. Remaining: {limiter.get_remaining(user_id)}")Best Practices
1. Respect Retry-After Header
import requests
import time
def make_request_with_retry(url, headers, data):
response = requests.post(url, headers=headers, json=data)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
# Retry once after waiting
response = requests.post(url, headers=headers, json=data)
return response2. Implement Request Queuing
from queue import Queue
from threading import Thread
import time
class RequestQueue:
def __init__(self, rate_limit=10):
self.queue = Queue()
self.rate_limit = rate_limit
self.interval = 1.0 / rate_limit
self.worker = Thread(target=self._process_queue, daemon=True)
self.worker.start()
def _process_queue(self):
"""Process requests from queue"""
while True:
if not self.queue.empty():
request_func, args, kwargs = self.queue.get()
request_func(*args, **kwargs)
time.sleep(self.interval)
else:
time.sleep(0.1)
def enqueue(self, func, *args, **kwargs):
"""Add request to queue"""
self.queue.put((func, args, kwargs))
# Usage
queue = RequestQueue(rate_limit=5)
def make_api_call(data):
response = requests.post(
'https://api.polysystems.ai/api/hub/memory',
headers={'X-API-Key': api_key},
json=data
)
print(f"Completed: {data}")
# Queue many requests
for i in range(100):
queue.enqueue(make_api_call, {'key': f'item_{i}', 'value': f'data_{i}'})3. Use Batch Endpoints
# ❌ Bad: Many individual requests
for item in items:
response = requests.post(
'https://api.polysystems.ai/api/hub/memory',
headers={'X-API-Key': api_key},
json={'key': item['key'], 'value': item['value']}
)
# Uses 100 requests for 100 items
# ✅ Good: Batch request
response = requests.post(
'https://api.polysystems.ai/api/hub/memory/batch',
headers={'X-API-Key': api_key},
json={'items': items}
)
# Uses 1 request for 100 items4. Cache Aggressively
from functools import lru_cache
from datetime import datetime, timedelta
class CachedAPIClient:
def __init__(self, api_key):
self.api_key = api_key
self.cache = {}
self.cache_ttl = 300 # 5 minutes
def _cache_key(self, endpoint, params):
return f"{endpoint}:{hash(str(params))}"
def get(self, endpoint, params=None):
"""Get with caching"""
cache_key = self._cache_key(endpoint, params)
# Check cache
if cache_key in self.cache:
cached_data, cached_time = self.cache[cache_key]
if datetime.now() - cached_time < timedelta(seconds=self.cache_ttl):
return cached_data # Return cached, no API call
# Make API call
response = requests.get(
f'https://api.polysystems.ai{endpoint}',
headers={'X-API-Key': self.api_key},
params=params
)
data = response.json()
# Update cache
self.cache[cache_key] = (data, datetime.now())
return data
# Usage - repeated calls use cache
client = CachedAPIClient(api_key)
# First call: Makes API request
data1 = client.get('/api/hub/agents') # API call
# Second call within TTL: Returns cached data
data2 = client.get('/api/hub/agents') # No API call
# Saves rate limit quota5. Monitor and Alert
def monitor_rate_limits():
"""Monitor rate limit usage and alert"""
response = requests.get(
'https://api.polysystems.ai/api/hub/health',
headers={'X-API-Key': api_key}
)
limit = int(response.headers.get('X-RateLimit-Limit', 0))
remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
usage_percent = ((limit - remaining) / limit) * 100 if limit > 0 else 0
# Alert thresholds
if usage_percent > 90:
send_alert(f"CRITICAL: Rate limit at {usage_percent:.1f}%")
elif usage_percent > 75:
send_alert(f"WARNING: Rate limit at {usage_percent:.1f}%")
return {
'limit': limit,
'remaining': remaining,
'usage_percent': usage_percent
}
# Run periodically
import schedule
schedule.every(5).minutes.do(monitor_rate_limits)Increasing Rate Limits
Upgrade Your Tier
Contact sales to upgrade your rate limit tier:
curl -X POST https://api.polysystems.ai/api/support/upgrade-request \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"requested_tier": "pro",
"reason": "Production application scaling",
"estimated_requests_per_day": 50000
}'Request Custom Limits
For enterprise customers:
- Custom rate limits per endpoint
- Dedicated infrastructure
- SLA guarantees
- Priority support
Contact: enterprise@polysystems.ai
Troubleshooting
Problem: Constantly hitting rate limits
Solutions:
- Implement caching
- Use batch endpoints
- Optimize request frequency
- Upgrade tier
- Distribute load across multiple tokens
Problem: Rate limit headers not showing
Check:
- Endpoint supports rate limiting (health checks may not)
- Using correct authentication
- API version supports headers
Problem: Different limits than expected
Possible causes:
- Different limits for different endpoints
- Account tier changed
- Temporary rate limit reduction
- IP-based limiting in effect
Check current limits:
curl -X GET https://api.polysystems.ai/api/account/limits \
-H "Authorization: Bearer YOUR_JWT_TOKEN"Summary
In this chapter, you learned:
- ✅ How rate limiting works and why it exists
- ✅ Rate limit tiers and per-endpoint limits
- ✅ Reading and interpreting rate limit headers
- ✅ Handling 429 Too Many Requests errors
- ✅ Implementing exponential backoff
- ✅ Rate limit aware queuing strategies
- ✅ Distributed rate limiting with Redis
- ✅ Best practices for staying within limits
- ✅ Monitoring and alerting on rate limit usage
- ✅ How to request higher limits
Next Steps
- Chapter 10: Error Handling - Handle all error types
- Chapter 11: Code Examples - Complete integration examples
- Chapter 12: Best Practices - Optimization and security