API FeaturesRate Limiting

Chapter 9: Rate Limiting

Overview

Rate limiting controls the number of API requests you can make within a specific time window. This chapter explains how rate limiting works, how to handle rate limit errors, and strategies for staying within limits.

What is Rate Limiting?

Rate limiting prevents abuse and ensures fair resource allocation by restricting the number of requests per time period. It protects the API infrastructure and ensures quality of service for all users.

Why Rate Limits Exist

  • Infrastructure Protection: Prevent server overload
  • Fair Usage: Ensure resources available to all users
  • Cost Control: Limit computational expenses
  • Security: Mitigate DDoS and abuse attempts
  • Quality of Service: Maintain consistent performance

Rate Limit Structure

Rate Limits Applied Per:
├── Access Token (API Key)
├── User Account
├── IP Address (in extreme cases)
└── Endpoint (different limits for different routes)

Rate Limit Tiers

Default Limits

TierRequests/SecondRequests/MinuteRequests/HourRequests/Day
Free1101001,000
Basic51001,00010,000
Pro201,00010,000100,000
EnterpriseCustomCustomCustomCustom

Per-Endpoint Limits

Different endpoints have different rate limits based on computational cost:

Endpoint CategoryRate MultiplierExample Limit (Pro)
Health ChecksNo limitUnlimited
Read Operations1.0x20 req/sec
Write Operations0.5x10 req/sec
LLM Generation0.2x4 req/sec
Heavy Processing0.1x2 req/sec

Rate Limit Headers

Every API response includes rate limit information:

HTTP/1.1 200 OK
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 999
X-RateLimit-Reset: 1705327200
X-RateLimit-Window: 3600

Header Descriptions

  • X-RateLimit-Limit: Total requests allowed in window
  • X-RateLimit-Remaining: Requests remaining in current window
  • X-RateLimit-Reset: Unix timestamp when limit resets
  • X-RateLimit-Window: Window duration in seconds

Reading Headers in Code

import requests
from datetime import datetime
 
response = requests.get(
    'https://api.polysystems.ai/api/hub/health',
    headers={'X-API-Key': api_key}
)
 
# Parse rate limit headers
limit = int(response.headers.get('X-RateLimit-Limit', 0))
remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
reset = int(response.headers.get('X-RateLimit-Reset', 0))
 
print(f"Rate Limit: {remaining}/{limit} remaining")
print(f"Resets at: {datetime.fromtimestamp(reset)}")
 
# Check if close to limit
if remaining < limit * 0.1:  # Less than 10% remaining
    print("⚠️  Warning: Approaching rate limit")

Rate Limit Exceeded Error

When you exceed the rate limit:

HTTP/1.1 429 Too Many Requests
Retry-After: 60
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1705327200

Response Body:

{
  "error": "Rate limit exceeded",
  "message": "You have exceeded the rate limit. Please retry after 60 seconds.",
  "retry_after": 60,
  "limit": 1000,
  "window": "hour"
}

Handling Rate Limits

Strategy 1: Exponential Backoff

import time
import requests
 
def api_call_with_backoff(url, headers, data, max_retries=5):
    """Make API call with exponential backoff"""
    for attempt in range(max_retries):
        response = requests.post(url, headers=headers, json=data)
        
        if response.status_code == 200:
            return response.json()
        
        elif response.status_code == 429:
            # Rate limit exceeded
            retry_after = int(response.headers.get('Retry-After', 60))
            
            if attempt < max_retries - 1:
                wait_time = min(retry_after, 2 ** attempt)
                print(f"Rate limited. Waiting {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise Exception("Max retries exceeded")
        
        else:
            response.raise_for_status()
    
    raise Exception("Request failed after retries")
 
# Usage
result = api_call_with_backoff(
    'https://api.polysystems.ai/api/hub/agents/chat',
    headers={'X-API-Key': api_key},
    data={'messages': [{'role': 'user', 'content': 'Hello'}]}
)

Strategy 2: Rate Limit Aware Queue

import time
from collections import deque
from datetime import datetime, timedelta
 
class RateLimitedQueue:
    def __init__(self, requests_per_second=10):
        self.rate = requests_per_second
        self.interval = 1.0 / requests_per_second
        self.last_request = None
    
    def wait_if_needed(self):
        """Wait if necessary to stay within rate limit"""
        if self.last_request is not None:
            elapsed = time.time() - self.last_request
            if elapsed < self.interval:
                time.sleep(self.interval - elapsed)
        
        self.last_request = time.time()
    
    def make_request(self, url, headers, data):
        """Make request with rate limiting"""
        self.wait_if_needed()
        return requests.post(url, headers=headers, json=data)
 
# Usage
queue = RateLimitedQueue(requests_per_second=5)
 
for i in range(20):
    response = queue.make_request(
        'https://api.polysystems.ai/api/hub/memory',
        headers={'X-API-Key': api_key},
        data={'key': f'item_{i}', 'value': f'data_{i}'}
    )
    print(f"Request {i+1} completed")

Strategy 3: Token Bucket Algorithm

import time
from threading import Lock
 
class TokenBucket:
    def __init__(self, rate, capacity):
        self.rate = rate  # tokens per second
        self.capacity = capacity  # bucket size
        self.tokens = capacity
        self.last_update = time.time()
        self.lock = Lock()
    
    def consume(self, tokens=1):
        """Try to consume tokens, return True if successful"""
        with self.lock:
            now = time.time()
            
            # Add tokens based on time elapsed
            elapsed = now - self.last_update
            self.tokens = min(
                self.capacity,
                self.tokens + elapsed * self.rate
            )
            self.last_update = now
            
            # Try to consume
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            
            return False
    
    def wait_for_tokens(self, tokens=1):
        """Wait until tokens are available"""
        while not self.consume(tokens):
            time.sleep(0.1)
 
# Usage
bucket = TokenBucket(rate=10, capacity=100)  # 10 req/sec, burst of 100
 
def make_api_call(data):
    bucket.wait_for_tokens()  # Wait if no tokens available
    return requests.post(
        'https://api.polysystems.ai/api/hub/agents/chat',
        headers={'X-API-Key': api_key},
        json=data
    )
 
# Make many requests - automatically rate limited
for i in range(1000):
    result = make_api_call({'messages': [{'role': 'user', 'content': f'Query {i}'}]})
    print(f"Completed request {i+1}")

Monitoring Rate Limits

Track Usage Across Requests

class RateLimitMonitor:
    def __init__(self):
        self.limits = {}
    
    def update_from_headers(self, headers):
        """Update limits from response headers"""
        self.limits = {
            'limit': int(headers.get('X-RateLimit-Limit', 0)),
            'remaining': int(headers.get('X-RateLimit-Remaining', 0)),
            'reset': int(headers.get('X-RateLimit-Reset', 0)),
            'updated_at': time.time()
        }
    
    def get_usage_percentage(self):
        """Get percentage of rate limit used"""
        if not self.limits or self.limits['limit'] == 0:
            return 0
        
        used = self.limits['limit'] - self.limits['remaining']
        return (used / self.limits['limit']) * 100
    
    def time_until_reset(self):
        """Get seconds until rate limit resets"""
        if not self.limits:
            return 0
        
        return max(0, self.limits['reset'] - time.time())
    
    def should_wait(self, threshold=0.9):
        """Check if should wait before making more requests"""
        usage = self.get_usage_percentage()
        return usage > (threshold * 100)
 
# Usage
monitor = RateLimitMonitor()
 
response = requests.get(
    'https://api.polysystems.ai/api/hub/agents',
    headers={'X-API-Key': api_key}
)
 
monitor.update_from_headers(response.headers)
 
print(f"Rate limit usage: {monitor.get_usage_percentage():.1f}%")
print(f"Resets in: {monitor.time_until_reset():.0f} seconds")
 
if monitor.should_wait(threshold=0.9):
    print("Approaching rate limit, slowing down requests")
    time.sleep(5)

Distributed Rate Limiting

Using Redis for Multi-Instance Coordination

import redis
import time
 
class DistributedRateLimiter:
    def __init__(self, redis_client, key_prefix, limit, window):
        self.redis = redis_client
        self.key_prefix = key_prefix
        self.limit = limit
        self.window = window  # in seconds
    
    def is_allowed(self, identifier):
        """Check if request is allowed"""
        key = f"{self.key_prefix}:{identifier}"
        now = time.time()
        window_start = now - self.window
        
        # Remove old entries
        self.redis.zremrangebyscore(key, 0, window_start)
        
        # Count requests in current window
        request_count = self.redis.zcard(key)
        
        if request_count < self.limit:
            # Add current request
            self.redis.zadd(key, {str(now): now})
            self.redis.expire(key, self.window)
            return True
        
        return False
    
    def get_remaining(self, identifier):
        """Get remaining requests in window"""
        key = f"{self.key_prefix}:{identifier}"
        now = time.time()
        window_start = now - self.window
        
        self.redis.zremrangebyscore(key, 0, window_start)
        request_count = self.redis.zcard(key)
        
        return max(0, self.limit - request_count)
 
# Usage
redis_client = redis.Redis(host='localhost', port=6379, db=0)
limiter = DistributedRateLimiter(
    redis_client,
    key_prefix='api_rate_limit',
    limit=100,
    window=3600  # 100 requests per hour
)
 
# Check before making API call
user_id = 'user-123'
if limiter.is_allowed(user_id):
    response = make_api_call()
else:
    print(f"Rate limit exceeded. Remaining: {limiter.get_remaining(user_id)}")

Best Practices

1. Respect Retry-After Header

import requests
import time
 
def make_request_with_retry(url, headers, data):
    response = requests.post(url, headers=headers, json=data)
    
    if response.status_code == 429:
        retry_after = int(response.headers.get('Retry-After', 60))
        print(f"Rate limited. Waiting {retry_after} seconds...")
        time.sleep(retry_after)
        
        # Retry once after waiting
        response = requests.post(url, headers=headers, json=data)
    
    return response

2. Implement Request Queuing

from queue import Queue
from threading import Thread
import time
 
class RequestQueue:
    def __init__(self, rate_limit=10):
        self.queue = Queue()
        self.rate_limit = rate_limit
        self.interval = 1.0 / rate_limit
        self.worker = Thread(target=self._process_queue, daemon=True)
        self.worker.start()
    
    def _process_queue(self):
        """Process requests from queue"""
        while True:
            if not self.queue.empty():
                request_func, args, kwargs = self.queue.get()
                request_func(*args, **kwargs)
                time.sleep(self.interval)
            else:
                time.sleep(0.1)
    
    def enqueue(self, func, *args, **kwargs):
        """Add request to queue"""
        self.queue.put((func, args, kwargs))
 
# Usage
queue = RequestQueue(rate_limit=5)
 
def make_api_call(data):
    response = requests.post(
        'https://api.polysystems.ai/api/hub/memory',
        headers={'X-API-Key': api_key},
        json=data
    )
    print(f"Completed: {data}")
 
# Queue many requests
for i in range(100):
    queue.enqueue(make_api_call, {'key': f'item_{i}', 'value': f'data_{i}'})

3. Use Batch Endpoints

# ❌ Bad: Many individual requests
for item in items:
    response = requests.post(
        'https://api.polysystems.ai/api/hub/memory',
        headers={'X-API-Key': api_key},
        json={'key': item['key'], 'value': item['value']}
    )
# Uses 100 requests for 100 items
 
# ✅ Good: Batch request
response = requests.post(
    'https://api.polysystems.ai/api/hub/memory/batch',
    headers={'X-API-Key': api_key},
    json={'items': items}
)
# Uses 1 request for 100 items

4. Cache Aggressively

from functools import lru_cache
from datetime import datetime, timedelta
 
class CachedAPIClient:
    def __init__(self, api_key):
        self.api_key = api_key
        self.cache = {}
        self.cache_ttl = 300  # 5 minutes
    
    def _cache_key(self, endpoint, params):
        return f"{endpoint}:{hash(str(params))}"
    
    def get(self, endpoint, params=None):
        """Get with caching"""
        cache_key = self._cache_key(endpoint, params)
        
        # Check cache
        if cache_key in self.cache:
            cached_data, cached_time = self.cache[cache_key]
            if datetime.now() - cached_time < timedelta(seconds=self.cache_ttl):
                return cached_data  # Return cached, no API call
        
        # Make API call
        response = requests.get(
            f'https://api.polysystems.ai{endpoint}',
            headers={'X-API-Key': self.api_key},
            params=params
        )
        
        data = response.json()
        
        # Update cache
        self.cache[cache_key] = (data, datetime.now())
        
        return data
 
# Usage - repeated calls use cache
client = CachedAPIClient(api_key)
 
# First call: Makes API request
data1 = client.get('/api/hub/agents')  # API call
 
# Second call within TTL: Returns cached data
data2 = client.get('/api/hub/agents')  # No API call
 
# Saves rate limit quota

5. Monitor and Alert

def monitor_rate_limits():
    """Monitor rate limit usage and alert"""
    response = requests.get(
        'https://api.polysystems.ai/api/hub/health',
        headers={'X-API-Key': api_key}
    )
    
    limit = int(response.headers.get('X-RateLimit-Limit', 0))
    remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
    
    usage_percent = ((limit - remaining) / limit) * 100 if limit > 0 else 0
    
    # Alert thresholds
    if usage_percent > 90:
        send_alert(f"CRITICAL: Rate limit at {usage_percent:.1f}%")
    elif usage_percent > 75:
        send_alert(f"WARNING: Rate limit at {usage_percent:.1f}%")
    
    return {
        'limit': limit,
        'remaining': remaining,
        'usage_percent': usage_percent
    }
 
# Run periodically
import schedule
schedule.every(5).minutes.do(monitor_rate_limits)

Increasing Rate Limits

Upgrade Your Tier

Contact sales to upgrade your rate limit tier:

curl -X POST https://api.polysystems.ai/api/support/upgrade-request \
  -H "Authorization: Bearer YOUR_JWT_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "requested_tier": "pro",
    "reason": "Production application scaling",
    "estimated_requests_per_day": 50000
  }'

Request Custom Limits

For enterprise customers:

  • Custom rate limits per endpoint
  • Dedicated infrastructure
  • SLA guarantees
  • Priority support

Contact: enterprise@polysystems.ai

Troubleshooting

Problem: Constantly hitting rate limits

Solutions:

  1. Implement caching
  2. Use batch endpoints
  3. Optimize request frequency
  4. Upgrade tier
  5. Distribute load across multiple tokens

Problem: Rate limit headers not showing

Check:

  • Endpoint supports rate limiting (health checks may not)
  • Using correct authentication
  • API version supports headers

Problem: Different limits than expected

Possible causes:

  • Different limits for different endpoints
  • Account tier changed
  • Temporary rate limit reduction
  • IP-based limiting in effect

Check current limits:

curl -X GET https://api.polysystems.ai/api/account/limits \
  -H "Authorization: Bearer YOUR_JWT_TOKEN"

Summary

In this chapter, you learned:

  • ✅ How rate limiting works and why it exists
  • ✅ Rate limit tiers and per-endpoint limits
  • ✅ Reading and interpreting rate limit headers
  • ✅ Handling 429 Too Many Requests errors
  • ✅ Implementing exponential backoff
  • ✅ Rate limit aware queuing strategies
  • ✅ Distributed rate limiting with Redis
  • ✅ Best practices for staying within limits
  • ✅ Monitoring and alerting on rate limit usage
  • ✅ How to request higher limits

Next Steps