ResourcesBest Practices

Chapter 12: Best Practices

Overview

This final chapter provides comprehensive best practices for building secure, efficient, and maintainable applications using the Polysystems Backend API. Follow these guidelines to optimize performance, minimize costs, and ensure reliability.

Table of Contents

  1. Security Best Practices
  2. Performance Optimization
  3. Cost Optimization
  4. Error Handling and Resilience
  5. Monitoring and Observability
  6. Development Workflow
  7. Production Deployment
  8. Maintenance and Operations

Security Best Practices

1. API Key Management

Never Hardcode API Keys

# ❌ BAD: Hardcoded API key
API_KEY = "ps_live_a1b2c3d4e5f6g7h8i9j0"
 
# ✅ GOOD: Environment variable
import os
API_KEY = os.getenv('PS_API_KEY')
 
# ✅ BETTER: Secrets manager
from aws_secretsmanager import get_secret
API_KEY = get_secret('prod/polysystems/api-key')

Rotate Keys Regularly

# Automated key rotation script
import schedule
from datetime import datetime, timedelta
 
def rotate_api_key():
    """Rotate API key every 90 days"""
    # Generate new key
    new_key = generate_new_access_token()
    
    # Update in secrets manager
    update_secret('prod/polysystems/api-key', new_key)
    
    # Deploy to all services
    deploy_configuration_update()
    
    # Wait 24 hours for propagation
    schedule_key_revocation(old_key_id, delay_hours=24)
    
    log_rotation_event()
 
# Schedule rotation every 90 days
schedule.every(90).days.do(rotate_api_key)

Use Different Keys for Different Environments

# Development
PS_API_KEY=ps_test_dev_key_here
 
# Staging
PS_API_KEY=ps_live_staging_key_here
 
# Production
PS_API_KEY=ps_live_production_key_here

2. Token Storage Security

Server-Side Storage

# ✅ Store in secure backend
from cryptography.fernet import Fernet
 
class SecureTokenStorage:
    def __init__(self, encryption_key):
        self.cipher = Fernet(encryption_key)
    
    def store_token(self, user_id, token):
        """Store encrypted token"""
        encrypted = self.cipher.encrypt(token.encode())
        db.execute(
            "INSERT INTO user_tokens (user_id, encrypted_token) VALUES (?, ?)",
            (user_id, encrypted)
        )
    
    def retrieve_token(self, user_id):
        """Retrieve and decrypt token"""
        encrypted = db.query(
            "SELECT encrypted_token FROM user_tokens WHERE user_id = ?",
            (user_id,)
        )
        return self.cipher.decrypt(encrypted).decode()

Client-Side (When Necessary)

// ❌ BAD: localStorage for sensitive tokens
localStorage.setItem('api_key', apiKey);
 
// ✅ GOOD: Secure httpOnly cookies
res.cookie('api_key', apiKey, {
  httpOnly: true,
  secure: true,
  sameSite: 'strict',
  maxAge: 24 * 60 * 60 * 1000
});

3. Access Control

Implement Principle of Least Privilege

# Create tokens with minimal necessary permissions
def create_limited_token(purpose):
    """Create token with specific limits based on purpose"""
    limits = {
        'development': {
            'daily_limit': 1.00,
            'monthly_limit': 10.00,
            'per_request_limit': 0.10
        },
        'testing': {
            'daily_limit': 5.00,
            'monthly_limit': 50.00,
            'per_request_limit': 0.50
        },
        'production': {
            'daily_limit': 100.00,
            'monthly_limit': 2000.00,
            'per_request_limit': 5.00
        }
    }
    
    token = create_access_token(f"{purpose}_token")
    set_spending_limits(token['id'], **limits[purpose])
    return token

4. Request Validation

Validate All Inputs

from pydantic import BaseModel, validator
 
class ChatRequest(BaseModel):
    messages: list
    max_tokens: int = 500
    temperature: float = 0.7
    
    @validator('messages')
    def validate_messages(cls, v):
        if not v or len(v) == 0:
            raise ValueError('messages cannot be empty')
        if len(v) > 50:
            raise ValueError('too many messages (max 50)')
        return v
    
    @validator('max_tokens')
    def validate_max_tokens(cls, v):
        if v < 1 or v > 4000:
            raise ValueError('max_tokens must be between 1 and 4000')
        return v
    
    @validator('temperature')
    def validate_temperature(cls, v):
        if v < 0 or v > 2:
            raise ValueError('temperature must be between 0 and 2')
        return v
 
# Usage
def handle_chat_request(data):
    try:
        request = ChatRequest(**data)
        return make_api_call(request)
    except ValueError as e:
        return error_response(str(e), 400)

5. Rate Limiting on Client Side

from threading import Lock
import time
 
class ClientRateLimiter:
    def __init__(self, requests_per_second=10):
        self.rate = requests_per_second
        self.interval = 1.0 / requests_per_second
        self.last_request = 0
        self.lock = Lock()
    
    def acquire(self):
        """Wait for rate limit token"""
        with self.lock:
            now = time.time()
            time_since_last = now - self.last_request
            
            if time_since_last < self.interval:
                time.sleep(self.interval - time_since_last)
            
            self.last_request = time.time()
 
# Usage
limiter = ClientRateLimiter(requests_per_second=5)
 
def make_request(data):
    limiter.acquire()  # Enforces rate limit
    return api_client.post('/api/endpoint', data)

Performance Optimization

1. Implement Caching

Response Caching

from functools import lru_cache
from datetime import datetime, timedelta
import hashlib
 
class ResponseCache:
    def __init__(self, ttl_seconds=300):
        self.cache = {}
        self.ttl = ttl_seconds
    
    def _cache_key(self, endpoint, params):
        """Generate cache key"""
        key_str = f"{endpoint}:{str(params)}"
        return hashlib.sha256(key_str.encode()).hexdigest()
    
    def get(self, endpoint, params):
        """Get cached response"""
        key = self._cache_key(endpoint, params)
        
        if key in self.cache:
            cached_data, cached_time = self.cache[key]
            if datetime.now() - cached_time < timedelta(seconds=self.ttl):
                return cached_data
        
        return None
    
    def set(self, endpoint, params, data):
        """Cache response"""
        key = self._cache_key(endpoint, params)
        self.cache[key] = (data, datetime.now())
    
    def invalidate(self, endpoint=None):
        """Invalidate cache"""
        if endpoint:
            keys_to_remove = [k for k in self.cache.keys() if endpoint in k]
            for key in keys_to_remove:
                del self.cache[key]
        else:
            self.cache.clear()
 
# Usage
cache = ResponseCache(ttl_seconds=300)
 
def get_chat_response(messages):
    # Check cache
    cached = cache.get('/api/hub/agents/chat', messages)
    if cached:
        return cached
    
    # Make API call
    response = api_client.chat_completion(messages)
    
    # Cache response
    cache.set('/api/hub/agents/chat', messages, response)
    
    return response

Redis Caching

import redis
import json
 
class RedisCache:
    def __init__(self, redis_url='redis://localhost:6379', ttl=300):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl
    
    def get(self, key):
        """Get cached value"""
        value = self.redis.get(key)
        return json.loads(value) if value else None
    
    def set(self, key, value):
        """Cache value"""
        self.redis.setex(
            key,
            self.ttl,
            json.dumps(value)
        )
    
    def invalidate(self, pattern='*'):
        """Invalidate cache by pattern"""
        for key in self.redis.scan_iter(pattern):
            self.redis.delete(key)
 
# Usage
cache = RedisCache(ttl=300)
 
def get_with_cache(cache_key, fetch_func):
    # Try cache first
    cached = cache.get(cache_key)
    if cached:
        return cached
    
    # Fetch from API
    result = fetch_func()
    
    # Cache result
    cache.set(cache_key, result)
    
    return result

2. Connection Pooling

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
 
class OptimizedAPIClient:
    def __init__(self, api_key):
        self.api_key = api_key
        self.session = self._create_session()
    
    def _create_session(self):
        """Create session with connection pooling"""
        session = requests.Session()
        
        # Configure retry strategy
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
        )
        
        # Configure adapter with connection pool
        adapter = HTTPAdapter(
            max_retries=retry_strategy,
            pool_connections=20,
            pool_maxsize=20
        )
        
        session.mount("https://", adapter)
        session.mount("http://", adapter)
        
        # Set default headers
        session.headers.update({
            'X-API-Key': self.api_key,
            'Content-Type': 'application/json'
        })
        
        return session
    
    def request(self, method, url, **kwargs):
        """Make request using pooled connection"""
        return self.session.request(method, url, **kwargs)

3. Async/Parallel Processing

import asyncio
import aiohttp
 
async def process_batch_async(items, max_concurrent=10):
    """Process items in parallel with concurrency limit"""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_item(item):
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    'https://api.polysystems.ai/api/hub/memory',
                    headers={'X-API-Key': API_KEY},
                    json=item
                ) as response:
                    return await response.json()
    
    # Process all items concurrently
    tasks = [process_item(item) for item in items]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Handle errors
    successful = [r for r in results if not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]
    
    return successful, failed
 
# Usage
items = [{'key': f'item_{i}', 'value': f'data_{i}'} for i in range(100)]
successful, failed = asyncio.run(process_batch_async(items))
print(f"Processed: {len(successful)}, Failed: {len(failed)}")

4. Request Deduplication

import hashlib
from datetime import datetime, timedelta
 
class RequestDeduplicator:
    def __init__(self, window_seconds=60):
        self.requests = {}
        self.window = timedelta(seconds=window_seconds)
    
    def _hash_request(self, endpoint, data):
        """Generate request hash"""
        request_str = f"{endpoint}:{str(data)}"
        return hashlib.sha256(request_str.encode()).hexdigest()
    
    def is_duplicate(self, endpoint, data):
        """Check if request is duplicate"""
        request_hash = self._hash_request(endpoint, data)
        now = datetime.now()
        
        if request_hash in self.requests:
            last_time = self.requests[request_hash]
            if now - last_time < self.window:
                return True
        
        self.requests[request_hash] = now
        return False
    
    def cleanup_old_requests(self):
        """Remove old request records"""
        now = datetime.now()
        self.requests = {
            k: v for k, v in self.requests.items()
            if now - v < self.window
        }
 
# Usage
deduplicator = RequestDeduplicator(window_seconds=60)
 
def make_request(endpoint, data):
    if deduplicator.is_duplicate(endpoint, data):
        print("Duplicate request detected, skipping")
        return None
    
    return api_client.post(endpoint, data)

Cost Optimization

1. Token Usage Optimization

def optimize_prompt(prompt, max_length=500):
    """Optimize prompt to reduce token usage"""
    # Remove unnecessary whitespace
    prompt = ' '.join(prompt.split())
    
    # Truncate if too long
    if len(prompt) > max_length:
        prompt = prompt[:max_length] + "..."
    
    return prompt
 
def calculate_estimated_cost(messages, model='gpt-4'):
    """Estimate cost before making request"""
    # Rough estimation: 4 chars ≈ 1 token
    total_chars = sum(len(msg['content']) for msg in messages)
    estimated_tokens = total_chars // 4
    
    pricing = {
        'gpt-4': {'base': 0.0020, 'per_token': 0.00001},
        'gpt-3.5': {'base': 0.0010, 'per_token': 0.000005}
    }
    
    price = pricing[model]
    estimated_cost = price['base'] + (estimated_tokens * price['per_token'])
    
    return estimated_cost, estimated_tokens
 
# Usage
messages = [{'role': 'user', 'content': 'Long prompt here...'}]
estimated_cost, tokens = calculate_estimated_cost(messages)
 
if estimated_cost > 0.10:
    print(f"Warning: This request will cost approximately ${estimated_cost:.4f}")
    # Optimize or confirm before proceeding

2. Smart Caching Strategy

class SmartCache:
    def __init__(self):
        self.cache = {}
        self.hit_count = {}
        self.cost_saved = 0
    
    def should_cache(self, request_type, cost):
        """Decide if response should be cached"""
        # Cache expensive requests
        if cost > 0.01:
            return True
        
        # Cache frequently accessed data
        if self.hit_count.get(request_type, 0) > 5:
            return True
        
        return False
    
    def get_or_fetch(self, key, fetch_func, cost):
        """Get from cache or fetch"""
        if key in self.cache:
            self.hit_count[key] = self.hit_count.get(key, 0) + 1
            self.cost_saved += cost
            return self.cache[key]
        
        result = fetch_func()
        
        if self.should_cache(key, cost):
            self.cache[key] = result
        
        return result
    
    def get_statistics(self):
        """Get cache statistics"""
        return {
            'total_saves': sum(self.hit_count.values()),
            'cost_saved': self.cost_saved,
            'unique_cached': len(self.cache)
        }

3. Batch Operations

def batch_operations(items, batch_size=50):
    """Process items in batches to reduce API calls"""
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        
        # Single API call for batch
        response = api_client.post('/api/hub/memory/batch', {
            'items': batch
        })
        
        results.extend(response['results'])
    
    return results
 
# Instead of 1000 individual calls, make 20 batch calls
items = [{'key': f'k{i}', 'value': f'v{i}'} for i in range(1000)]
results = batch_operations(items, batch_size=50)
# Cost savings: ~80% reduction

Error Handling and Resilience

1. Circuit Breaker Pattern

from enum import Enum
from datetime import datetime, timedelta
 
class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"
 
class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout_seconds=60, success_threshold=2):
        self.failure_threshold = failure_threshold
        self.timeout = timedelta(seconds=timeout_seconds)
        self.success_threshold = success_threshold
        
        self.failures = 0
        self.successes = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker"""
        if self.state == CircuitState.OPEN:
            if datetime.now() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
                self.successes = 0
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        """Handle successful call"""
        self.failures = 0
        
        if self.state == CircuitState.HALF_OPEN:
            self.successes += 1
            if self.successes >= self.success_threshold:
                self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Handle failed call"""
        self.failures += 1
        self.last_failure_time = datetime.now()
        
        if self.failures >= self.failure_threshold:
            self.state = CircuitState.OPEN

2. Graceful Degradation

class ResilientAPIClient:
    def __init__(self, primary_key, fallback_key=None):
        self.primary_client = APIClient(primary_key)
        self.fallback_client = APIClient(fallback_key) if fallback_key else None
        self.cache = ResponseCache()
    
    def request_with_fallbacks(self, endpoint, data):
        """Try primary, fallback, then cache"""
        # Try primary API
        try:
            return self.primary_client.post(endpoint, data)
        except APIError as e:
            if e.status_code == 429:  # Rate limit
                # Try fallback
                if self.fallback_client:
                    try:
                        return self.fallback_client.post(endpoint, data)
                    except:
                        pass
            
            # Try cache
            cached = self.cache.get(endpoint, data)
            if cached:
                return {'data': cached, 'from_cache': True}
            
            # Return default response
            return self._default_response(endpoint)
    
    def _default_response(self, endpoint):
        """Return sensible default"""
        defaults = {
            '/api/hub/agents/chat': {
                'message': 'Service temporarily unavailable'
            }
        }
        return defaults.get(endpoint, {'error': 'Service unavailable'})

Monitoring and Observability

1. Structured Logging

import logging
import json
from datetime import datetime
 
class StructuredLogger:
    def __init__(self, name):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)
    
    def log_api_request(self, endpoint, method, duration, status, cost=None):
        """Log API request with structured data"""
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': 'api_request',
            'endpoint': endpoint,
            'method': method,
            'duration_ms': duration,
            'status_code': status,
            'cost_usd': cost
        }
        self.logger.info(json.dumps(log_data))
    
    def log_error(self, error_type, message, context=None):
        """Log error with context"""
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': 'error',
            'error_type': error_type,
            'message': message,
            'context': context or {}
        }
        self.logger.error(json.dumps(log_data))
 
# Usage
logger = StructuredLogger('polysystems')
 
start_time = time.time()
try:
    response = api_client.post('/api/hub/agents/chat', data)
    duration = (time.time() - start_time) * 1000
    logger.log_api_request(
        '/api/hub/agents/chat',
        'POST',
        duration,
        200,
        cost=0.0025
    )
except APIError as e:
    logger.log_error(e.error_type, e.message, {'request_id': e.request_id})

2. Metrics Collection

from prometheus_client import Counter, Histogram, Gauge
 
# Define metrics
api_requests_total = Counter(
    'api_requests_total',
    'Total API requests',
    ['endpoint', 'method', 'status']
)
 
api_request_duration = Histogram(
    'api_request_duration_seconds',
    'API request duration',
    ['endpoint', 'method']
)
 
api_cost_total = Counter(
    'api_cost_usd_total',
    'Total API cost in USD',
    ['endpoint']
)
 
current_balance = Gauge(
    'account_balance_usd',
    'Current account balance'
)
 
# Usage
def make_monitored_request(endpoint, method, data):
    with api_request_duration.labels(endpoint, method).time():
        try:
            response = api_client.request(method, endpoint, data)
            api_requests_total.labels(endpoint, method, '200').inc()
            
            # Track cost
            cost = response.headers.get('X-Request-Cost', 0)
            api_cost_total.labels(endpoint).inc(float(cost))
            
            return response
        except APIError as e:
            api_requests_total.labels(endpoint, method, str(e.status_code)).inc()
            raise

3. Health Checks

def health_check():
    """Comprehensive health check"""
    checks = {
        'api_connectivity': check_api_connectivity(),
        'balance_sufficient': check_balance(),
        'rate_limit_ok': check_rate_limits(),
        'key_valid': check_key_validity()
    }
    
    all_healthy = all(checks.values())
    
    return {
        'healthy': all_healthy,
        'checks': checks,
        'timestamp': datetime.utcnow().isoformat()
    }
 
def check_api_connectivity():
    try:
        response = api_client.get('/api/hub/health', timeout=5)
        return response.get('status') == 'healthy'
    except:
        return False
 
def check_balance():
    try:
        balance = api_client.get_balance(jwt_token)
        return balance['balance'] > 1.00  # At least $1
    except:
        return False
 
def check_rate_limits():
    try:
        response = api_client.get('/api/hub/health')
        remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
        limit = int(response.headers.get('X-RateLimit-Limit', 1))
        return remaining > (limit * 0.1)  # More than 10% remaining
    except:
        return False

Development Workflow

1. Environment Separation

# .env.development
PS_API_URL=http://localhost:8080
PS_API_KEY=ps_test_dev_key
PS_LOG_LEVEL=DEBUG
PS_CACHE_ENABLED=false
 
# .env.staging
PS_API_URL=https://staging-api.polysystems.ai
PS_API_KEY=ps_live_staging_key
PS_LOG_LEVEL=INFO
PS_CACHE_ENABLED=true
 
# .env.production
PS_API_URL=https://api.polysystems.ai
PS_API_KEY=ps_live_production_key
PS_LOG_LEVEL=WARNING
PS_CACHE_ENABLED=true

2. Testing Strategy

import unittest
from unittest.mock import Mock, patch
 
class TestAPIClient(unittest.TestCase):
    def setUp(self):
        self.client = PolysystemsClient(api_key='test_key')
    
    @patch('requests.Session.request')
    def test_chat_completion_success(self, mock_request):
        """Test successful chat completion"""
        mock_request.return_value.status_code = 200
        mock_request.return_value.json.return_value = {
            'message': 'Test response'
        }
        
        result = self.client.chat_completion([
            {'role': 'user', 'content': 'Test'}
        ])
        
        self.assertEqual(result['message'], 'Test response')
    
    @patch('requests.Session.request')
    def test_rate_limit_handling(self, mock_request):
        """Test rate limit error handling"""
        mock_request.return_value.status_code = 429
        mock_request.return_value.json.return_value = {
            'error': 'Rate limit exceeded'
        }
        
        with self.assertRaises(APIError) as context:
            self.client.chat_completion([
                {'role': 'user', 'content': 'Test'}
            ])
        
        self.assertEqual(context.exception.status_code, 429)
 
# Integration tests
class TestAPIIntegration(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.client = PolysystemsClient(
            api_key=os.getenv('PS_TEST_API_KEY')
        )
    
    def test_end_to_end_chat(self):
        """Test real API call"""
        if os.getenv('RUN_INTEGRATION_TESTS') != 'true':
            self.skipTest('Integration tests disabled')
        
        result = self.client.chat_completion([
            {'role': 'user', 'content': 'Say "test"'}
        ])
        
        self.assertIn('test', result['message'].lower())

3. Code Review Checklist

## API Integration Code Review Checklist
 
### Security
- [ ] No hardcoded API keys
- [ ] Proper error handling that doesn't leak sensitive data
- [ ] Input validation implemented
- [ ] Rate limiting on client side
- [ ] Proper authentication method used
 
### Performance
- [ ] Caching implemented where appropriate
- [ ] Connection pooling configured
- [ ] Async/parallel processing for batch operations
- [ ] Request deduplication in place
 
### Error Handling
- [ ] All API calls wrapped in try-catch
- [ ] Retry logic implemented for transient failures
- [ ] Circuit breaker for repeated failures
- [ ] Graceful degradation strategies
 
### Monitoring
- [ ] Structured logging implemented
- [ ] Metrics collection in place
- [ ] Error tracking configured
- [ ] Health checks implemented
 
### Cost Management
- [ ] Spending limits configured
- [ ] Cost estimation before expensive operations
- [ ] Caching strategy to reduce API calls
- [ ] Token usage optimized
 
### Documentation
- [ ] Code comments for complex logic
- [ ] API usage examples provided
- [ ] Error handling documented
- [ ] Configuration options documented

Production Deployment

1. Deployment Checklist

## Production Deployment Checklist
 
### Pre-Deployment
- [ ] All tests passing (unit, integration, e2e)
- [ ] Security audit completed
- [ ] Performance testing completed
- [ ] Load testing passed
- [ ] Staging deployment successful
- [ ] Rollback plan prepared
 
### Configuration
- [ ] Production API keys configured in secrets manager
- [ ] Environment variables set correctly
- [ ] Spending limits configured appropriately
- [ ] Rate limiting configured
- [ ] Monitoring and alerting set up
 
### Infrastructure
- [ ] Load balancer configured
- [ ] Auto-scaling rules set
- [ ] Database connections pooled
- [ ] Cache layer configured (Redis)
- [ ] CDN configured for static assets
 
### Monitoring
- [ ] APM tools configured
- [ ] Error tracking enabled (Sentry, etc.)
- [ ] Log aggregation set up (ELK, CloudWatch)
- [ ] Metrics dashboard created
- [ ] Alerts configured for critical metrics
 
### Post-Deployment
- [ ] Smoke tests passed
- [ ] Health checks passing
- [ ] Metrics look normal
- [ ] No elevated error rates
- [ ] Team notified of deployment

2. Zero-Downtime Deployment

# Blue-Green Deployment Strategy
class DeploymentManager:
    def __init__(self):
        self.environments = {
            'blue': {'active': True, 'version': '1.0.0'},
            'green': {'active': False, 'version': '1.1.0'}
        }
    
    def deploy_new_version(self, version):
        """Deploy new version with zero downtime"""
        # Deploy to inactive environment
        inactive = self.get_inactive_environment()
        self.deploy_to_environment(inactive, version)
        
        # Run health checks
        if not self.health_check(inactive):
            self.rollback(inactive)
            raise Exception("Health checks failed")
        
        # Switch traffic
        self.switch_traffic(inactive)
        
        # Mark as active
        self.environments[inactive]['active'] = True
        old_active = self.get_other_environment(inactive)
        self.environments[old_active]['active'] = False
    
    def rollback(self, environment):
        """Rollback to previous version"""
        # Switch back to old environment
        old_active = self.get_other_environment(environment)
        self.switch_traffic(old_active)

Maintenance and Operations

1. Regular Maintenance Tasks

# Daily tasks
def daily_maintenance():
    """Run daily maintenance tasks"""
    # Check and clean old cache entries
    cache.cleanup_old_entries()
    
    # Check spending and alert if needed
    check_spending_alerts()
    
    # Verify all keys