Performance Guide¶
This guide covers performance optimization, monitoring, and best practices for FinOps Optimizer.
📋 Performance Overview¶
FinOps Optimizer is designed for high-performance cost analysis and optimization across multiple cloud providers. This guide covers caching, parallel processing, memory optimization, and monitoring.
âš¡ Performance Features¶
1. Intelligent Caching¶
Cache Configuration¶
from finops.performance import PerformanceOptimizer
# Initialize performance optimizer
perf_optimizer = PerformanceOptimizer()
# Configure caching
cache_config = {
'enabled': True,
'ttl': 3600, # 1 hour
'max_size': 1000,
'cleanup_interval': 300 # 5 minutes
}
Cache Implementation¶
import functools
import time
from collections import OrderedDict
class Cache:
def __init__(self, max_size=1000, ttl=3600):
self.max_size = max_size
self.ttl = ttl
self.cache = OrderedDict()
def get(self, key):
if key in self.cache:
value, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
# Move to end (LRU)
self.cache.move_to_end(key)
return value
else:
# Expired
del self.cache[key]
return None
def set(self, key, value):
if len(self.cache) >= self.max_size:
# Remove oldest item
self.cache.popitem(last=False)
self.cache[key] = (value, time.time())
def clear(self):
self.cache.clear()
def get_stats(self):
return {
'size': len(self.cache),
'max_size': self.max_size,
'hit_rate': self._calculate_hit_rate()
}
Caching Decorators¶
def cache_result(ttl=300):
"""Cache function results with TTL."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Create cache key
cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
# Check cache
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function
result = func(*args, **kwargs)
# Cache result
cache.set(cache_key, result)
return result
return wrapper
return decorator
# Usage example
@cache_result(ttl=3600)
def analyze_costs(provider, start_date, end_date):
# Expensive cost analysis operation
return cost_data
2. Parallel Processing¶
Multi-threading Implementation¶
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
class ParallelProcessor:
def __init__(self, max_workers=4):
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def process_providers(self, providers, operation):
"""Process multiple cloud providers in parallel."""
futures = {}
# Submit tasks
for provider in providers:
future = self.executor.submit(operation, provider)
futures[future] = provider
# Collect results
results = {}
for future in as_completed(futures):
provider = futures[future]
try:
results[provider] = future.result()
except Exception as e:
results[provider] = {'error': str(e)}
return results
def batch_process(self, items, processor, batch_size=100):
"""Process items in batches."""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = self.executor.submit(processor, batch)
results.extend(batch_results.result())
return results
Parallel Cost Analysis¶
def parallel_cost_analysis(providers, start_date, end_date):
"""Analyze costs across providers in parallel."""
processor = ParallelProcessor(max_workers=4)
def analyze_provider(provider):
return provider.analyze_costs(start_date, end_date)
results = processor.process_providers(providers, analyze_provider)
# Aggregate results
total_cost = sum(result['total_cost'] for result in results.values())
return {
'total_cost': total_cost,
'provider_results': results
}
3. Memory Optimization¶
Memory Management¶
import gc
import psutil
import os
class MemoryOptimizer:
def __init__(self, max_memory_usage=0.8):
self.max_memory_usage = max_memory_usage
self.process = psutil.Process(os.getpid())
def check_memory_usage(self):
"""Check current memory usage."""
memory_info = self.process.memory_info()
memory_percent = self.process.memory_percent()
return {
'rss': memory_info.rss, # Resident Set Size
'vms': memory_info.vms, # Virtual Memory Size
'percent': memory_percent,
'available': psutil.virtual_memory().available
}
def optimize_memory(self):
"""Optimize memory usage."""
memory_usage = self.check_memory_usage()
if memory_usage['percent'] > self.max_memory_usage * 100:
# Force garbage collection
gc.collect()
# Clear caches
cache.clear()
# Log memory optimization
logger.info(f"Memory optimization performed: {memory_usage['percent']:.1f}%")
def monitor_memory(self, interval=60):
"""Monitor memory usage continuously."""
while True:
memory_usage = self.check_memory_usage()
if memory_usage['percent'] > self.max_memory_usage * 100:
self.optimize_memory()
time.sleep(interval)
Data Structure Optimization¶
class OptimizedDataStructure:
def __init__(self):
self.data = {}
self.indexes = {}
def add_cost_data(self, provider, date, cost_data):
"""Add cost data with optimized storage."""
# Use tuple for date (immutable, more efficient)
date_key = (date.year, date.month, date.day)
if provider not in self.data:
self.data[provider] = {}
self.data[provider][date_key] = cost_data
# Update indexes for fast lookup
if provider not in self.indexes:
self.indexes[provider] = set()
self.indexes[provider].add(date_key)
def get_cost_data(self, provider, start_date, end_date):
"""Get cost data efficiently."""
if provider not in self.data:
return {}
result = {}
start_key = (start_date.year, start_date.month, start_date.day)
end_key = (end_date.year, end_date.month, end_date.day)
for date_key in self.indexes[provider]:
if start_key <= date_key <= end_key:
result[date_key] = self.data[provider][date_key]
return result
4. Batch Processing¶
Batch Cost Analysis¶
class BatchProcessor:
def __init__(self, batch_size=100):
self.batch_size = batch_size
def process_cost_data(self, cost_data):
"""Process cost data in batches."""
results = []
for i in range(0, len(cost_data), self.batch_size):
batch = cost_data[i:i + self.batch_size]
batch_result = self.process_batch(batch)
results.extend(batch_result)
return results
def process_batch(self, batch):
"""Process a single batch."""
# Process batch efficiently
processed_items = []
for item in batch:
# Apply optimizations
optimized_item = self.optimize_cost_item(item)
processed_items.append(optimized_item)
return processed_items
def optimize_cost_item(self, item):
"""Optimize individual cost item."""
# Apply optimization logic
if item['utilization'] < 0.3:
item['recommendation'] = 'downsize'
elif item['utilization'] > 0.8:
item['recommendation'] = 'upsize'
else:
item['recommendation'] = 'optimal'
return item
📊 Performance Monitoring¶
1. Performance Metrics¶
Metrics Collection¶
import time
from collections import defaultdict
class PerformanceMonitor:
def __init__(self):
self.metrics = defaultdict(list)
self.start_times = {}
def start_timer(self, operation):
"""Start timing an operation."""
self.start_times[operation] = time.time()
def end_timer(self, operation):
"""End timing an operation."""
if operation in self.start_times:
duration = time.time() - self.start_times[operation]
self.metrics[operation].append(duration)
del self.start_times[operation]
def get_metrics(self):
"""Get performance metrics."""
metrics = {}
for operation, durations in self.metrics.items():
if durations:
metrics[operation] = {
'count': len(durations),
'average': sum(durations) / len(durations),
'min': min(durations),
'max': max(durations),
'total': sum(durations)
}
return metrics
def get_cache_stats(self):
"""Get cache performance statistics."""
return cache.get_stats()
# Usage
monitor = PerformanceMonitor()
@monitor.timer('cost_analysis')
def analyze_costs():
monitor.start_timer('cost_analysis')
# Perform cost analysis
result = perform_cost_analysis()
monitor.end_timer('cost_analysis')
return result
Real-time Monitoring¶
class RealTimeMonitor:
def __init__(self):
self.metrics = {}
self.alerts = []
def update_metric(self, name, value):
"""Update a metric value."""
self.metrics[name] = {
'value': value,
'timestamp': time.time()
}
# Check for alerts
self.check_alerts(name, value)
def check_alerts(self, name, value):
"""Check if metric triggers an alert."""
alert_thresholds = {
'response_time': 5.0, # seconds
'memory_usage': 0.8, # percentage
'error_rate': 0.05, # percentage
'cache_hit_rate': 0.7 # percentage
}
if name in alert_thresholds:
threshold = alert_thresholds[name]
if value > threshold:
self.alerts.append({
'metric': name,
'value': value,
'threshold': threshold,
'timestamp': time.time()
})
def get_status(self):
"""Get current system status."""
return {
'metrics': self.metrics,
'alerts': self.alerts,
'status': 'healthy' if not self.alerts else 'warning'
}
2. Performance Dashboard¶
Dashboard Metrics¶
def get_performance_dashboard():
"""Get performance dashboard data."""
monitor = PerformanceMonitor()
return {
'response_times': {
'cost_analysis': monitor.get_metrics().get('cost_analysis', {}),
'optimization': monitor.get_metrics().get('optimization', {}),
'forecasting': monitor.get_metrics().get('forecasting', {})
},
'cache_performance': monitor.get_cache_stats(),
'memory_usage': MemoryOptimizer().check_memory_usage(),
'system_status': RealTimeMonitor().get_status()
}
🔧 Performance Configuration¶
1. Configuration Settings¶
performance:
# Caching
cache:
enabled: true
ttl: 3600 # Time to live in seconds
max_size: 1000 # Maximum cache entries
cleanup_interval: 300 # Cleanup interval in seconds
# Parallel processing
parallel:
max_workers: 4
timeout: 300 # Timeout in seconds
batch_size: 100
# Memory optimization
memory:
max_usage: 0.8 # Maximum memory usage (80%)
cleanup_threshold: 0.7 # Cleanup threshold (70%)
gc_interval: 600 # Garbage collection interval
# Batch processing
batch_processing:
enabled: true
batch_size: 100
max_concurrent_batches: 4
timeout: 300
# Batch types
types:
cost_analysis:
batch_size: 50
timeout: 180
rightsizing:
batch_size: 25
timeout: 240
forecasting:
batch_size: 100
timeout: 300
2. Environment-specific Settings¶
environments:
development:
performance:
cache:
enabled: false
ttl: 60
parallel:
max_workers: 2
memory:
max_usage: 0.6
staging:
performance:
cache:
enabled: true
ttl: 1800
parallel:
max_workers: 4
memory:
max_usage: 0.7
production:
performance:
cache:
enabled: true
ttl: 3600
parallel:
max_workers: 8
memory:
max_usage: 0.8
📈 Performance Optimization¶
1. Database Optimization¶
Query Optimization¶
def optimize_database_queries():
"""Optimize database queries for performance."""
optimizations = {
'indexes': [
'CREATE INDEX idx_cost_date ON costs(date)',
'CREATE INDEX idx_cost_provider ON costs(provider)',
'CREATE INDEX idx_cost_service ON costs(service)'
],
'partitions': [
'PARTITION BY RANGE (date)',
'PARTITION BY LIST (provider)'
],
'compression': [
'COMPRESS COST_DATA',
'COMPRESS AUDIT_LOGS'
]
}
return optimizations
Connection Pooling¶
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Configure connection pool
engine = create_engine(
'postgresql://user:pass@localhost/finops',
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
2. API Optimization¶
Response Caching¶
from flask import request, jsonify
from functools import wraps
def cache_response(ttl=300):
"""Cache API responses."""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Create cache key from request
cache_key = f"{request.path}:{request.query_string.decode()}"
# Check cache
cached_response = cache.get(cache_key)
if cached_response:
return jsonify(cached_response)
# Execute function
response = f(*args, **kwargs)
# Cache response
cache.set(cache_key, response)
return jsonify(response)
return decorated_function
return decorator
@app.route('/api/costs')
@cache_response(ttl=3600)
def get_costs():
# Expensive cost analysis
return cost_data
Pagination¶
def paginate_results(results, page=1, per_page=50):
"""Implement pagination for large result sets."""
total = len(results)
start = (page - 1) * per_page
end = start + per_page
paginated_results = results[start:end]
return {
'data': paginated_results,
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'pages': (total + per_page - 1) // per_page
}
}
3. Memory Optimization¶
Lazy Loading¶
class LazyCostAnalyzer:
def __init__(self, provider):
self.provider = provider
self._cost_data = None
@property
def cost_data(self):
"""Lazy load cost data."""
if self._cost_data is None:
self._cost_data = self.provider.get_cost_data()
return self._cost_data
def analyze(self):
"""Analyze costs efficiently."""
return self.cost_data
Data Compression¶
import gzip
import json
def compress_data(data):
"""Compress data for storage."""
json_data = json.dumps(data)
compressed = gzip.compress(json_data.encode('utf-8'))
return compressed
def decompress_data(compressed_data):
"""Decompress data."""
decompressed = gzip.decompress(compressed_data)
return json.loads(decompressed.decode('utf-8'))
🚀 Performance Testing¶
1. Load Testing¶
import asyncio
import aiohttp
import time
async def load_test():
"""Perform load testing."""
async with aiohttp.ClientSession() as session:
tasks = []
# Create concurrent requests
for i in range(100):
task = asyncio.create_task(
session.get('http://localhost:5000/api/costs')
)
tasks.append(task)
# Execute all requests
start_time = time.time()
responses = await asyncio.gather(*tasks)
end_time = time.time()
# Calculate metrics
successful = sum(1 for r in responses if r.status == 200)
total_time = end_time - start_time
return {
'total_requests': len(responses),
'successful_requests': successful,
'total_time': total_time,
'requests_per_second': len(responses) / total_time
}
2. Performance Benchmarks¶
def run_performance_benchmarks():
"""Run performance benchmarks."""
benchmarks = {
'cost_analysis': benchmark_cost_analysis,
'optimization': benchmark_optimization,
'forecasting': benchmark_forecasting,
'caching': benchmark_caching
}
results = {}
for name, benchmark in benchmarks.items():
results[name] = benchmark()
return results
def benchmark_cost_analysis():
"""Benchmark cost analysis performance."""
start_time = time.time()
# Perform cost analysis
result = analyze_costs()
end_time = time.time()
return {
'duration': end_time - start_time,
'memory_usage': get_memory_usage(),
'cpu_usage': get_cpu_usage()
}
📊 Performance Metrics¶
1. Key Performance Indicators¶
class PerformanceKPIs:
def __init__(self):
self.kpis = {}
def update_kpi(self, name, value):
"""Update KPI value."""
self.kpis[name] = {
'value': value,
'timestamp': time.time()
}
def get_kpis(self):
"""Get all KPIs."""
return {
'response_time': {
'current': self.kpis.get('response_time', {}).get('value', 0),
'target': 2.0, # seconds
'status': 'good' if self.kpis.get('response_time', {}).get('value', 0) < 2.0 else 'poor'
},
'cache_hit_rate': {
'current': self.kpis.get('cache_hit_rate', {}).get('value', 0),
'target': 0.8, # 80%
'status': 'good' if self.kpis.get('cache_hit_rate', {}).get('value', 0) > 0.8 else 'poor'
},
'memory_usage': {
'current': self.kpis.get('memory_usage', {}).get('value', 0),
'target': 0.8, # 80%
'status': 'good' if self.kpis.get('memory_usage', {}).get('value', 0) < 0.8 else 'poor'
},
'error_rate': {
'current': self.kpis.get('error_rate', {}).get('value', 0),
'target': 0.01, # 1%
'status': 'good' if self.kpis.get('error_rate', {}).get('value', 0) < 0.01 else 'poor'
}
}
2. Performance Alerts¶
def check_performance_alerts():
"""Check for performance alerts."""
kpis = PerformanceKPIs().get_kpis()
alerts = []
for kpi_name, kpi_data in kpis.items():
if kpi_data['status'] == 'poor':
alerts.append({
'kpi': kpi_name,
'current_value': kpi_data['current'],
'target_value': kpi_data['target'],
'severity': 'high' if kpi_name in ['error_rate', 'response_time'] else 'medium'
})
return alerts
🔧 Performance Troubleshooting¶
1. Common Performance Issues¶
High Response Times¶
def diagnose_high_response_times():
"""Diagnose high response times."""
diagnostics = {
'database_queries': check_database_performance(),
'cache_efficiency': check_cache_performance(),
'memory_usage': check_memory_usage(),
'cpu_usage': check_cpu_usage(),
'network_latency': check_network_latency()
}
return diagnostics
Memory Leaks¶
def detect_memory_leaks():
"""Detect memory leaks."""
import gc
import sys
# Force garbage collection
gc.collect()
# Get object counts
object_counts = {}
for obj in gc.get_objects():
obj_type = type(obj).__name__
object_counts[obj_type] = object_counts.get(obj_type, 0) + 1
# Find potential leaks
potential_leaks = []
for obj_type, count in object_counts.items():
if count > 1000: # Threshold for potential leak
potential_leaks.append({
'type': obj_type,
'count': count
})
return potential_leaks
2. Performance Optimization Tips¶
Code Optimization¶
# Use list comprehensions instead of loops
# Good
costs = [item['cost'] for item in cost_data if item['cost'] > 0]
# Bad
costs = []
for item in cost_data:
if item['cost'] > 0:
costs.append(item['cost'])
# Use generators for large datasets
def cost_generator(cost_data):
for item in cost_data:
yield item['cost']
# Use sets for fast lookups
providers = set(['aws', 'azure', 'gcp'])
if provider in providers:
# Fast lookup
pass
Database Optimization¶
# Use bulk operations
def bulk_insert_costs(cost_data):
"""Bulk insert cost data."""
# Prepare bulk insert
values = [(item['date'], item['cost'], item['provider'])
for item in cost_data]
# Execute bulk insert
cursor.executemany(
"INSERT INTO costs (date, cost, provider) VALUES (%s, %s, %s)",
values
)
Need help with performance? Check our Troubleshooting Guide or open an issue.