Chapter 12: Best Practices
Overview
This final chapter provides comprehensive best practices for building secure, efficient, and maintainable applications using the Polysystems Backend API. Follow these guidelines to optimize performance, minimize costs, and ensure reliability.
Table of Contents
- Security Best Practices
- Performance Optimization
- Cost Optimization
- Error Handling and Resilience
- Monitoring and Observability
- Development Workflow
- Production Deployment
- Maintenance and Operations
Security Best Practices
1. API Key Management
Never Hardcode API Keys
# ❌ BAD: Hardcoded API key
API_KEY = "ps_live_a1b2c3d4e5f6g7h8i9j0"
# ✅ GOOD: Environment variable
import os
API_KEY = os.getenv('PS_API_KEY')
# ✅ BETTER: Secrets manager
from aws_secretsmanager import get_secret
API_KEY = get_secret('prod/polysystems/api-key')Rotate Keys Regularly
# Automated key rotation script
import schedule
from datetime import datetime, timedelta
def rotate_api_key():
"""Rotate API key every 90 days"""
# Generate new key
new_key = generate_new_access_token()
# Update in secrets manager
update_secret('prod/polysystems/api-key', new_key)
# Deploy to all services
deploy_configuration_update()
# Wait 24 hours for propagation
schedule_key_revocation(old_key_id, delay_hours=24)
log_rotation_event()
# Schedule rotation every 90 days
schedule.every(90).days.do(rotate_api_key)Use Different Keys for Different Environments
# Development
PS_API_KEY=ps_test_dev_key_here
# Staging
PS_API_KEY=ps_live_staging_key_here
# Production
PS_API_KEY=ps_live_production_key_here2. Token Storage Security
Server-Side Storage
# ✅ Store in secure backend
from cryptography.fernet import Fernet
class SecureTokenStorage:
def __init__(self, encryption_key):
self.cipher = Fernet(encryption_key)
def store_token(self, user_id, token):
"""Store encrypted token"""
encrypted = self.cipher.encrypt(token.encode())
db.execute(
"INSERT INTO user_tokens (user_id, encrypted_token) VALUES (?, ?)",
(user_id, encrypted)
)
def retrieve_token(self, user_id):
"""Retrieve and decrypt token"""
encrypted = db.query(
"SELECT encrypted_token FROM user_tokens WHERE user_id = ?",
(user_id,)
)
return self.cipher.decrypt(encrypted).decode()Client-Side (When Necessary)
// ❌ BAD: localStorage for sensitive tokens
localStorage.setItem('api_key', apiKey);
// ✅ GOOD: Secure httpOnly cookies
res.cookie('api_key', apiKey, {
httpOnly: true,
secure: true,
sameSite: 'strict',
maxAge: 24 * 60 * 60 * 1000
});3. Access Control
Implement Principle of Least Privilege
# Create tokens with minimal necessary permissions
def create_limited_token(purpose):
"""Create token with specific limits based on purpose"""
limits = {
'development': {
'daily_limit': 1.00,
'monthly_limit': 10.00,
'per_request_limit': 0.10
},
'testing': {
'daily_limit': 5.00,
'monthly_limit': 50.00,
'per_request_limit': 0.50
},
'production': {
'daily_limit': 100.00,
'monthly_limit': 2000.00,
'per_request_limit': 5.00
}
}
token = create_access_token(f"{purpose}_token")
set_spending_limits(token['id'], **limits[purpose])
return token4. Request Validation
Validate All Inputs
from pydantic import BaseModel, validator
class ChatRequest(BaseModel):
messages: list
max_tokens: int = 500
temperature: float = 0.7
@validator('messages')
def validate_messages(cls, v):
if not v or len(v) == 0:
raise ValueError('messages cannot be empty')
if len(v) > 50:
raise ValueError('too many messages (max 50)')
return v
@validator('max_tokens')
def validate_max_tokens(cls, v):
if v < 1 or v > 4000:
raise ValueError('max_tokens must be between 1 and 4000')
return v
@validator('temperature')
def validate_temperature(cls, v):
if v < 0 or v > 2:
raise ValueError('temperature must be between 0 and 2')
return v
# Usage
def handle_chat_request(data):
try:
request = ChatRequest(**data)
return make_api_call(request)
except ValueError as e:
return error_response(str(e), 400)5. Rate Limiting on Client Side
from threading import Lock
import time
class ClientRateLimiter:
def __init__(self, requests_per_second=10):
self.rate = requests_per_second
self.interval = 1.0 / requests_per_second
self.last_request = 0
self.lock = Lock()
def acquire(self):
"""Wait for rate limit token"""
with self.lock:
now = time.time()
time_since_last = now - self.last_request
if time_since_last < self.interval:
time.sleep(self.interval - time_since_last)
self.last_request = time.time()
# Usage
limiter = ClientRateLimiter(requests_per_second=5)
def make_request(data):
limiter.acquire() # Enforces rate limit
return api_client.post('/api/endpoint', data)Performance Optimization
1. Implement Caching
Response Caching
from functools import lru_cache
from datetime import datetime, timedelta
import hashlib
class ResponseCache:
def __init__(self, ttl_seconds=300):
self.cache = {}
self.ttl = ttl_seconds
def _cache_key(self, endpoint, params):
"""Generate cache key"""
key_str = f"{endpoint}:{str(params)}"
return hashlib.sha256(key_str.encode()).hexdigest()
def get(self, endpoint, params):
"""Get cached response"""
key = self._cache_key(endpoint, params)
if key in self.cache:
cached_data, cached_time = self.cache[key]
if datetime.now() - cached_time < timedelta(seconds=self.ttl):
return cached_data
return None
def set(self, endpoint, params, data):
"""Cache response"""
key = self._cache_key(endpoint, params)
self.cache[key] = (data, datetime.now())
def invalidate(self, endpoint=None):
"""Invalidate cache"""
if endpoint:
keys_to_remove = [k for k in self.cache.keys() if endpoint in k]
for key in keys_to_remove:
del self.cache[key]
else:
self.cache.clear()
# Usage
cache = ResponseCache(ttl_seconds=300)
def get_chat_response(messages):
# Check cache
cached = cache.get('/api/hub/agents/chat', messages)
if cached:
return cached
# Make API call
response = api_client.chat_completion(messages)
# Cache response
cache.set('/api/hub/agents/chat', messages, response)
return responseRedis Caching
import redis
import json
class RedisCache:
def __init__(self, redis_url='redis://localhost:6379', ttl=300):
self.redis = redis.from_url(redis_url)
self.ttl = ttl
def get(self, key):
"""Get cached value"""
value = self.redis.get(key)
return json.loads(value) if value else None
def set(self, key, value):
"""Cache value"""
self.redis.setex(
key,
self.ttl,
json.dumps(value)
)
def invalidate(self, pattern='*'):
"""Invalidate cache by pattern"""
for key in self.redis.scan_iter(pattern):
self.redis.delete(key)
# Usage
cache = RedisCache(ttl=300)
def get_with_cache(cache_key, fetch_func):
# Try cache first
cached = cache.get(cache_key)
if cached:
return cached
# Fetch from API
result = fetch_func()
# Cache result
cache.set(cache_key, result)
return result2. Connection Pooling
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class OptimizedAPIClient:
def __init__(self, api_key):
self.api_key = api_key
self.session = self._create_session()
def _create_session(self):
"""Create session with connection pooling"""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
# Configure adapter with connection pool
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=20,
pool_maxsize=20
)
session.mount("https://", adapter)
session.mount("http://", adapter)
# Set default headers
session.headers.update({
'X-API-Key': self.api_key,
'Content-Type': 'application/json'
})
return session
def request(self, method, url, **kwargs):
"""Make request using pooled connection"""
return self.session.request(method, url, **kwargs)3. Async/Parallel Processing
import asyncio
import aiohttp
async def process_batch_async(items, max_concurrent=10):
"""Process items in parallel with concurrency limit"""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_item(item):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.post(
'https://api.polysystems.ai/api/hub/memory',
headers={'X-API-Key': API_KEY},
json=item
) as response:
return await response.json()
# Process all items concurrently
tasks = [process_item(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle errors
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
return successful, failed
# Usage
items = [{'key': f'item_{i}', 'value': f'data_{i}'} for i in range(100)]
successful, failed = asyncio.run(process_batch_async(items))
print(f"Processed: {len(successful)}, Failed: {len(failed)}")4. Request Deduplication
import hashlib
from datetime import datetime, timedelta
class RequestDeduplicator:
def __init__(self, window_seconds=60):
self.requests = {}
self.window = timedelta(seconds=window_seconds)
def _hash_request(self, endpoint, data):
"""Generate request hash"""
request_str = f"{endpoint}:{str(data)}"
return hashlib.sha256(request_str.encode()).hexdigest()
def is_duplicate(self, endpoint, data):
"""Check if request is duplicate"""
request_hash = self._hash_request(endpoint, data)
now = datetime.now()
if request_hash in self.requests:
last_time = self.requests[request_hash]
if now - last_time < self.window:
return True
self.requests[request_hash] = now
return False
def cleanup_old_requests(self):
"""Remove old request records"""
now = datetime.now()
self.requests = {
k: v for k, v in self.requests.items()
if now - v < self.window
}
# Usage
deduplicator = RequestDeduplicator(window_seconds=60)
def make_request(endpoint, data):
if deduplicator.is_duplicate(endpoint, data):
print("Duplicate request detected, skipping")
return None
return api_client.post(endpoint, data)Cost Optimization
1. Token Usage Optimization
def optimize_prompt(prompt, max_length=500):
"""Optimize prompt to reduce token usage"""
# Remove unnecessary whitespace
prompt = ' '.join(prompt.split())
# Truncate if too long
if len(prompt) > max_length:
prompt = prompt[:max_length] + "..."
return prompt
def calculate_estimated_cost(messages, model='gpt-4'):
"""Estimate cost before making request"""
# Rough estimation: 4 chars ≈ 1 token
total_chars = sum(len(msg['content']) for msg in messages)
estimated_tokens = total_chars // 4
pricing = {
'gpt-4': {'base': 0.0020, 'per_token': 0.00001},
'gpt-3.5': {'base': 0.0010, 'per_token': 0.000005}
}
price = pricing[model]
estimated_cost = price['base'] + (estimated_tokens * price['per_token'])
return estimated_cost, estimated_tokens
# Usage
messages = [{'role': 'user', 'content': 'Long prompt here...'}]
estimated_cost, tokens = calculate_estimated_cost(messages)
if estimated_cost > 0.10:
print(f"Warning: This request will cost approximately ${estimated_cost:.4f}")
# Optimize or confirm before proceeding2. Smart Caching Strategy
class SmartCache:
def __init__(self):
self.cache = {}
self.hit_count = {}
self.cost_saved = 0
def should_cache(self, request_type, cost):
"""Decide if response should be cached"""
# Cache expensive requests
if cost > 0.01:
return True
# Cache frequently accessed data
if self.hit_count.get(request_type, 0) > 5:
return True
return False
def get_or_fetch(self, key, fetch_func, cost):
"""Get from cache or fetch"""
if key in self.cache:
self.hit_count[key] = self.hit_count.get(key, 0) + 1
self.cost_saved += cost
return self.cache[key]
result = fetch_func()
if self.should_cache(key, cost):
self.cache[key] = result
return result
def get_statistics(self):
"""Get cache statistics"""
return {
'total_saves': sum(self.hit_count.values()),
'cost_saved': self.cost_saved,
'unique_cached': len(self.cache)
}3. Batch Operations
def batch_operations(items, batch_size=50):
"""Process items in batches to reduce API calls"""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Single API call for batch
response = api_client.post('/api/hub/memory/batch', {
'items': batch
})
results.extend(response['results'])
return results
# Instead of 1000 individual calls, make 20 batch calls
items = [{'key': f'k{i}', 'value': f'v{i}'} for i in range(1000)]
results = batch_operations(items, batch_size=50)
# Cost savings: ~80% reductionError Handling and Resilience
1. Circuit Breaker Pattern
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout_seconds=60, success_threshold=2):
self.failure_threshold = failure_threshold
self.timeout = timedelta(seconds=timeout_seconds)
self.success_threshold = success_threshold
self.failures = 0
self.successes = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker"""
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
self.successes = 0
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful call"""
self.failures = 0
if self.state == CircuitState.HALF_OPEN:
self.successes += 1
if self.successes >= self.success_threshold:
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call"""
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN2. Graceful Degradation
class ResilientAPIClient:
def __init__(self, primary_key, fallback_key=None):
self.primary_client = APIClient(primary_key)
self.fallback_client = APIClient(fallback_key) if fallback_key else None
self.cache = ResponseCache()
def request_with_fallbacks(self, endpoint, data):
"""Try primary, fallback, then cache"""
# Try primary API
try:
return self.primary_client.post(endpoint, data)
except APIError as e:
if e.status_code == 429: # Rate limit
# Try fallback
if self.fallback_client:
try:
return self.fallback_client.post(endpoint, data)
except:
pass
# Try cache
cached = self.cache.get(endpoint, data)
if cached:
return {'data': cached, 'from_cache': True}
# Return default response
return self._default_response(endpoint)
def _default_response(self, endpoint):
"""Return sensible default"""
defaults = {
'/api/hub/agents/chat': {
'message': 'Service temporarily unavailable'
}
}
return defaults.get(endpoint, {'error': 'Service unavailable'})Monitoring and Observability
1. Structured Logging
import logging
import json
from datetime import datetime
class StructuredLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def log_api_request(self, endpoint, method, duration, status, cost=None):
"""Log API request with structured data"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'api_request',
'endpoint': endpoint,
'method': method,
'duration_ms': duration,
'status_code': status,
'cost_usd': cost
}
self.logger.info(json.dumps(log_data))
def log_error(self, error_type, message, context=None):
"""Log error with context"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'error',
'error_type': error_type,
'message': message,
'context': context or {}
}
self.logger.error(json.dumps(log_data))
# Usage
logger = StructuredLogger('polysystems')
start_time = time.time()
try:
response = api_client.post('/api/hub/agents/chat', data)
duration = (time.time() - start_time) * 1000
logger.log_api_request(
'/api/hub/agents/chat',
'POST',
duration,
200,
cost=0.0025
)
except APIError as e:
logger.log_error(e.error_type, e.message, {'request_id': e.request_id})2. Metrics Collection
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
api_requests_total = Counter(
'api_requests_total',
'Total API requests',
['endpoint', 'method', 'status']
)
api_request_duration = Histogram(
'api_request_duration_seconds',
'API request duration',
['endpoint', 'method']
)
api_cost_total = Counter(
'api_cost_usd_total',
'Total API cost in USD',
['endpoint']
)
current_balance = Gauge(
'account_balance_usd',
'Current account balance'
)
# Usage
def make_monitored_request(endpoint, method, data):
with api_request_duration.labels(endpoint, method).time():
try:
response = api_client.request(method, endpoint, data)
api_requests_total.labels(endpoint, method, '200').inc()
# Track cost
cost = response.headers.get('X-Request-Cost', 0)
api_cost_total.labels(endpoint).inc(float(cost))
return response
except APIError as e:
api_requests_total.labels(endpoint, method, str(e.status_code)).inc()
raise3. Health Checks
def health_check():
"""Comprehensive health check"""
checks = {
'api_connectivity': check_api_connectivity(),
'balance_sufficient': check_balance(),
'rate_limit_ok': check_rate_limits(),
'key_valid': check_key_validity()
}
all_healthy = all(checks.values())
return {
'healthy': all_healthy,
'checks': checks,
'timestamp': datetime.utcnow().isoformat()
}
def check_api_connectivity():
try:
response = api_client.get('/api/hub/health', timeout=5)
return response.get('status') == 'healthy'
except:
return False
def check_balance():
try:
balance = api_client.get_balance(jwt_token)
return balance['balance'] > 1.00 # At least $1
except:
return False
def check_rate_limits():
try:
response = api_client.get('/api/hub/health')
remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
limit = int(response.headers.get('X-RateLimit-Limit', 1))
return remaining > (limit * 0.1) # More than 10% remaining
except:
return FalseDevelopment Workflow
1. Environment Separation
# .env.development
PS_API_URL=http://localhost:8080
PS_API_KEY=ps_test_dev_key
PS_LOG_LEVEL=DEBUG
PS_CACHE_ENABLED=false
# .env.staging
PS_API_URL=https://staging-api.polysystems.ai
PS_API_KEY=ps_live_staging_key
PS_LOG_LEVEL=INFO
PS_CACHE_ENABLED=true
# .env.production
PS_API_URL=https://api.polysystems.ai
PS_API_KEY=ps_live_production_key
PS_LOG_LEVEL=WARNING
PS_CACHE_ENABLED=true2. Testing Strategy
import unittest
from unittest.mock import Mock, patch
class TestAPIClient(unittest.TestCase):
def setUp(self):
self.client = PolysystemsClient(api_key='test_key')
@patch('requests.Session.request')
def test_chat_completion_success(self, mock_request):
"""Test successful chat completion"""
mock_request.return_value.status_code = 200
mock_request.return_value.json.return_value = {
'message': 'Test response'
}
result = self.client.chat_completion([
{'role': 'user', 'content': 'Test'}
])
self.assertEqual(result['message'], 'Test response')
@patch('requests.Session.request')
def test_rate_limit_handling(self, mock_request):
"""Test rate limit error handling"""
mock_request.return_value.status_code = 429
mock_request.return_value.json.return_value = {
'error': 'Rate limit exceeded'
}
with self.assertRaises(APIError) as context:
self.client.chat_completion([
{'role': 'user', 'content': 'Test'}
])
self.assertEqual(context.exception.status_code, 429)
# Integration tests
class TestAPIIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.client = PolysystemsClient(
api_key=os.getenv('PS_TEST_API_KEY')
)
def test_end_to_end_chat(self):
"""Test real API call"""
if os.getenv('RUN_INTEGRATION_TESTS') != 'true':
self.skipTest('Integration tests disabled')
result = self.client.chat_completion([
{'role': 'user', 'content': 'Say "test"'}
])
self.assertIn('test', result['message'].lower())3. Code Review Checklist
## API Integration Code Review Checklist
### Security
- [ ] No hardcoded API keys
- [ ] Proper error handling that doesn't leak sensitive data
- [ ] Input validation implemented
- [ ] Rate limiting on client side
- [ ] Proper authentication method used
### Performance
- [ ] Caching implemented where appropriate
- [ ] Connection pooling configured
- [ ] Async/parallel processing for batch operations
- [ ] Request deduplication in place
### Error Handling
- [ ] All API calls wrapped in try-catch
- [ ] Retry logic implemented for transient failures
- [ ] Circuit breaker for repeated failures
- [ ] Graceful degradation strategies
### Monitoring
- [ ] Structured logging implemented
- [ ] Metrics collection in place
- [ ] Error tracking configured
- [ ] Health checks implemented
### Cost Management
- [ ] Spending limits configured
- [ ] Cost estimation before expensive operations
- [ ] Caching strategy to reduce API calls
- [ ] Token usage optimized
### Documentation
- [ ] Code comments for complex logic
- [ ] API usage examples provided
- [ ] Error handling documented
- [ ] Configuration options documentedProduction Deployment
1. Deployment Checklist
## Production Deployment Checklist
### Pre-Deployment
- [ ] All tests passing (unit, integration, e2e)
- [ ] Security audit completed
- [ ] Performance testing completed
- [ ] Load testing passed
- [ ] Staging deployment successful
- [ ] Rollback plan prepared
### Configuration
- [ ] Production API keys configured in secrets manager
- [ ] Environment variables set correctly
- [ ] Spending limits configured appropriately
- [ ] Rate limiting configured
- [ ] Monitoring and alerting set up
### Infrastructure
- [ ] Load balancer configured
- [ ] Auto-scaling rules set
- [ ] Database connections pooled
- [ ] Cache layer configured (Redis)
- [ ] CDN configured for static assets
### Monitoring
- [ ] APM tools configured
- [ ] Error tracking enabled (Sentry, etc.)
- [ ] Log aggregation set up (ELK, CloudWatch)
- [ ] Metrics dashboard created
- [ ] Alerts configured for critical metrics
### Post-Deployment
- [ ] Smoke tests passed
- [ ] Health checks passing
- [ ] Metrics look normal
- [ ] No elevated error rates
- [ ] Team notified of deployment2. Zero-Downtime Deployment
# Blue-Green Deployment Strategy
class DeploymentManager:
def __init__(self):
self.environments = {
'blue': {'active': True, 'version': '1.0.0'},
'green': {'active': False, 'version': '1.1.0'}
}
def deploy_new_version(self, version):
"""Deploy new version with zero downtime"""
# Deploy to inactive environment
inactive = self.get_inactive_environment()
self.deploy_to_environment(inactive, version)
# Run health checks
if not self.health_check(inactive):
self.rollback(inactive)
raise Exception("Health checks failed")
# Switch traffic
self.switch_traffic(inactive)
# Mark as active
self.environments[inactive]['active'] = True
old_active = self.get_other_environment(inactive)
self.environments[old_active]['active'] = False
def rollback(self, environment):
"""Rollback to previous version"""
# Switch back to old environment
old_active = self.get_other_environment(environment)
self.switch_traffic(old_active)Maintenance and Operations
1. Regular Maintenance Tasks
# Daily tasks
def daily_maintenance():
"""Run daily maintenance tasks"""
# Check and clean old cache entries
cache.cleanup_old_entries()
# Check spending and alert if needed
check_spending_alerts()
# Verify all keys