Skip to content

Performance Guide

This guide covers performance optimization, monitoring, and best practices for FinOps Optimizer.

📋 Performance Overview

FinOps Optimizer is designed for high-performance cost analysis and optimization across multiple cloud providers. This guide covers caching, parallel processing, memory optimization, and monitoring.

âš¡ Performance Features

1. Intelligent Caching

Cache Configuration

from finops.performance import PerformanceOptimizer

# Initialize performance optimizer
perf_optimizer = PerformanceOptimizer()

# Configure caching
cache_config = {
    'enabled': True,
    'ttl': 3600,  # 1 hour
    'max_size': 1000,
    'cleanup_interval': 300  # 5 minutes
}

Cache Implementation

import functools
import time
from collections import OrderedDict

class Cache:
    def __init__(self, max_size=1000, ttl=3600):
        self.max_size = max_size
        self.ttl = ttl
        self.cache = OrderedDict()

    def get(self, key):
        if key in self.cache:
            value, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl:
                # Move to end (LRU)
                self.cache.move_to_end(key)
                return value
            else:
                # Expired
                del self.cache[key]
        return None

    def set(self, key, value):
        if len(self.cache) >= self.max_size:
            # Remove oldest item
            self.cache.popitem(last=False)

        self.cache[key] = (value, time.time())

    def clear(self):
        self.cache.clear()

    def get_stats(self):
        return {
            'size': len(self.cache),
            'max_size': self.max_size,
            'hit_rate': self._calculate_hit_rate()
        }

Caching Decorators

def cache_result(ttl=300):
    """Cache function results with TTL."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key
            cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"

            # Check cache
            cached_result = cache.get(cache_key)
            if cached_result is not None:
                return cached_result

            # Execute function
            result = func(*args, **kwargs)

            # Cache result
            cache.set(cache_key, result)

            return result
        return wrapper
    return decorator

# Usage example
@cache_result(ttl=3600)
def analyze_costs(provider, start_date, end_date):
    # Expensive cost analysis operation
    return cost_data

2. Parallel Processing

Multi-threading Implementation

import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

class ParallelProcessor:
    def __init__(self, max_workers=4):
        self.max_workers = max_workers
        self.executor = ThreadPoolExecutor(max_workers=max_workers)

    def process_providers(self, providers, operation):
        """Process multiple cloud providers in parallel."""
        futures = {}

        # Submit tasks
        for provider in providers:
            future = self.executor.submit(operation, provider)
            futures[future] = provider

        # Collect results
        results = {}
        for future in as_completed(futures):
            provider = futures[future]
            try:
                results[provider] = future.result()
            except Exception as e:
                results[provider] = {'error': str(e)}

        return results

    def batch_process(self, items, processor, batch_size=100):
        """Process items in batches."""
        results = []

        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            batch_results = self.executor.submit(processor, batch)
            results.extend(batch_results.result())

        return results

Parallel Cost Analysis

def parallel_cost_analysis(providers, start_date, end_date):
    """Analyze costs across providers in parallel."""
    processor = ParallelProcessor(max_workers=4)

    def analyze_provider(provider):
        return provider.analyze_costs(start_date, end_date)

    results = processor.process_providers(providers, analyze_provider)

    # Aggregate results
    total_cost = sum(result['total_cost'] for result in results.values())

    return {
        'total_cost': total_cost,
        'provider_results': results
    }

3. Memory Optimization

Memory Management

import gc
import psutil
import os

class MemoryOptimizer:
    def __init__(self, max_memory_usage=0.8):
        self.max_memory_usage = max_memory_usage
        self.process = psutil.Process(os.getpid())

    def check_memory_usage(self):
        """Check current memory usage."""
        memory_info = self.process.memory_info()
        memory_percent = self.process.memory_percent()

        return {
            'rss': memory_info.rss,  # Resident Set Size
            'vms': memory_info.vms,  # Virtual Memory Size
            'percent': memory_percent,
            'available': psutil.virtual_memory().available
        }

    def optimize_memory(self):
        """Optimize memory usage."""
        memory_usage = self.check_memory_usage()

        if memory_usage['percent'] > self.max_memory_usage * 100:
            # Force garbage collection
            gc.collect()

            # Clear caches
            cache.clear()

            # Log memory optimization
            logger.info(f"Memory optimization performed: {memory_usage['percent']:.1f}%")

    def monitor_memory(self, interval=60):
        """Monitor memory usage continuously."""
        while True:
            memory_usage = self.check_memory_usage()

            if memory_usage['percent'] > self.max_memory_usage * 100:
                self.optimize_memory()

            time.sleep(interval)

Data Structure Optimization

class OptimizedDataStructure:
    def __init__(self):
        self.data = {}
        self.indexes = {}

    def add_cost_data(self, provider, date, cost_data):
        """Add cost data with optimized storage."""
        # Use tuple for date (immutable, more efficient)
        date_key = (date.year, date.month, date.day)

        if provider not in self.data:
            self.data[provider] = {}

        self.data[provider][date_key] = cost_data

        # Update indexes for fast lookup
        if provider not in self.indexes:
            self.indexes[provider] = set()

        self.indexes[provider].add(date_key)

    def get_cost_data(self, provider, start_date, end_date):
        """Get cost data efficiently."""
        if provider not in self.data:
            return {}

        result = {}
        start_key = (start_date.year, start_date.month, start_date.day)
        end_key = (end_date.year, end_date.month, end_date.day)

        for date_key in self.indexes[provider]:
            if start_key <= date_key <= end_key:
                result[date_key] = self.data[provider][date_key]

        return result

4. Batch Processing

Batch Cost Analysis

class BatchProcessor:
    def __init__(self, batch_size=100):
        self.batch_size = batch_size

    def process_cost_data(self, cost_data):
        """Process cost data in batches."""
        results = []

        for i in range(0, len(cost_data), self.batch_size):
            batch = cost_data[i:i + self.batch_size]
            batch_result = self.process_batch(batch)
            results.extend(batch_result)

        return results

    def process_batch(self, batch):
        """Process a single batch."""
        # Process batch efficiently
        processed_items = []

        for item in batch:
            # Apply optimizations
            optimized_item = self.optimize_cost_item(item)
            processed_items.append(optimized_item)

        return processed_items

    def optimize_cost_item(self, item):
        """Optimize individual cost item."""
        # Apply optimization logic
        if item['utilization'] < 0.3:
            item['recommendation'] = 'downsize'
        elif item['utilization'] > 0.8:
            item['recommendation'] = 'upsize'
        else:
            item['recommendation'] = 'optimal'

        return item

📊 Performance Monitoring

1. Performance Metrics

Metrics Collection

import time
from collections import defaultdict

class PerformanceMonitor:
    def __init__(self):
        self.metrics = defaultdict(list)
        self.start_times = {}

    def start_timer(self, operation):
        """Start timing an operation."""
        self.start_times[operation] = time.time()

    def end_timer(self, operation):
        """End timing an operation."""
        if operation in self.start_times:
            duration = time.time() - self.start_times[operation]
            self.metrics[operation].append(duration)
            del self.start_times[operation]

    def get_metrics(self):
        """Get performance metrics."""
        metrics = {}

        for operation, durations in self.metrics.items():
            if durations:
                metrics[operation] = {
                    'count': len(durations),
                    'average': sum(durations) / len(durations),
                    'min': min(durations),
                    'max': max(durations),
                    'total': sum(durations)
                }

        return metrics

    def get_cache_stats(self):
        """Get cache performance statistics."""
        return cache.get_stats()

# Usage
monitor = PerformanceMonitor()

@monitor.timer('cost_analysis')
def analyze_costs():
    monitor.start_timer('cost_analysis')
    # Perform cost analysis
    result = perform_cost_analysis()
    monitor.end_timer('cost_analysis')
    return result

Real-time Monitoring

class RealTimeMonitor:
    def __init__(self):
        self.metrics = {}
        self.alerts = []

    def update_metric(self, name, value):
        """Update a metric value."""
        self.metrics[name] = {
            'value': value,
            'timestamp': time.time()
        }

        # Check for alerts
        self.check_alerts(name, value)

    def check_alerts(self, name, value):
        """Check if metric triggers an alert."""
        alert_thresholds = {
            'response_time': 5.0,  # seconds
            'memory_usage': 0.8,   # percentage
            'error_rate': 0.05,    # percentage
            'cache_hit_rate': 0.7  # percentage
        }

        if name in alert_thresholds:
            threshold = alert_thresholds[name]
            if value > threshold:
                self.alerts.append({
                    'metric': name,
                    'value': value,
                    'threshold': threshold,
                    'timestamp': time.time()
                })

    def get_status(self):
        """Get current system status."""
        return {
            'metrics': self.metrics,
            'alerts': self.alerts,
            'status': 'healthy' if not self.alerts else 'warning'
        }

2. Performance Dashboard

Dashboard Metrics

def get_performance_dashboard():
    """Get performance dashboard data."""
    monitor = PerformanceMonitor()

    return {
        'response_times': {
            'cost_analysis': monitor.get_metrics().get('cost_analysis', {}),
            'optimization': monitor.get_metrics().get('optimization', {}),
            'forecasting': monitor.get_metrics().get('forecasting', {})
        },
        'cache_performance': monitor.get_cache_stats(),
        'memory_usage': MemoryOptimizer().check_memory_usage(),
        'system_status': RealTimeMonitor().get_status()
    }

🔧 Performance Configuration

1. Configuration Settings

performance:
  # Caching
  cache:
    enabled: true
    ttl: 3600  # Time to live in seconds
    max_size: 1000  # Maximum cache entries
    cleanup_interval: 300  # Cleanup interval in seconds

  # Parallel processing
  parallel:
    max_workers: 4
    timeout: 300  # Timeout in seconds
    batch_size: 100

  # Memory optimization
  memory:
    max_usage: 0.8  # Maximum memory usage (80%)
    cleanup_threshold: 0.7  # Cleanup threshold (70%)
    gc_interval: 600  # Garbage collection interval

  # Batch processing
  batch_processing:
    enabled: true
    batch_size: 100
    max_concurrent_batches: 4
    timeout: 300

    # Batch types
    types:
      cost_analysis:
        batch_size: 50
        timeout: 180
      rightsizing:
        batch_size: 25
        timeout: 240
      forecasting:
        batch_size: 100
        timeout: 300

2. Environment-specific Settings

environments:
  development:
    performance:
      cache:
        enabled: false
        ttl: 60
      parallel:
        max_workers: 2
      memory:
        max_usage: 0.6

  staging:
    performance:
      cache:
        enabled: true
        ttl: 1800
      parallel:
        max_workers: 4
      memory:
        max_usage: 0.7

  production:
    performance:
      cache:
        enabled: true
        ttl: 3600
      parallel:
        max_workers: 8
      memory:
        max_usage: 0.8

📈 Performance Optimization

1. Database Optimization

Query Optimization

def optimize_database_queries():
    """Optimize database queries for performance."""
    optimizations = {
        'indexes': [
            'CREATE INDEX idx_cost_date ON costs(date)',
            'CREATE INDEX idx_cost_provider ON costs(provider)',
            'CREATE INDEX idx_cost_service ON costs(service)'
        ],
        'partitions': [
            'PARTITION BY RANGE (date)',
            'PARTITION BY LIST (provider)'
        ],
        'compression': [
            'COMPRESS COST_DATA',
            'COMPRESS AUDIT_LOGS'
        ]
    }

    return optimizations

Connection Pooling

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Configure connection pool
engine = create_engine(
    'postgresql://user:pass@localhost/finops',
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,
    pool_recycle=3600
)

2. API Optimization

Response Caching

from flask import request, jsonify
from functools import wraps

def cache_response(ttl=300):
    """Cache API responses."""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # Create cache key from request
            cache_key = f"{request.path}:{request.query_string.decode()}"

            # Check cache
            cached_response = cache.get(cache_key)
            if cached_response:
                return jsonify(cached_response)

            # Execute function
            response = f(*args, **kwargs)

            # Cache response
            cache.set(cache_key, response)

            return jsonify(response)
        return decorated_function
    return decorator

@app.route('/api/costs')
@cache_response(ttl=3600)
def get_costs():
    # Expensive cost analysis
    return cost_data

Pagination

def paginate_results(results, page=1, per_page=50):
    """Implement pagination for large result sets."""
    total = len(results)
    start = (page - 1) * per_page
    end = start + per_page

    paginated_results = results[start:end]

    return {
        'data': paginated_results,
        'pagination': {
            'page': page,
            'per_page': per_page,
            'total': total,
            'pages': (total + per_page - 1) // per_page
        }
    }

3. Memory Optimization

Lazy Loading

class LazyCostAnalyzer:
    def __init__(self, provider):
        self.provider = provider
        self._cost_data = None

    @property
    def cost_data(self):
        """Lazy load cost data."""
        if self._cost_data is None:
            self._cost_data = self.provider.get_cost_data()
        return self._cost_data

    def analyze(self):
        """Analyze costs efficiently."""
        return self.cost_data

Data Compression

import gzip
import json

def compress_data(data):
    """Compress data for storage."""
    json_data = json.dumps(data)
    compressed = gzip.compress(json_data.encode('utf-8'))
    return compressed

def decompress_data(compressed_data):
    """Decompress data."""
    decompressed = gzip.decompress(compressed_data)
    return json.loads(decompressed.decode('utf-8'))

🚀 Performance Testing

1. Load Testing

import asyncio
import aiohttp
import time

async def load_test():
    """Perform load testing."""
    async with aiohttp.ClientSession() as session:
        tasks = []

        # Create concurrent requests
        for i in range(100):
            task = asyncio.create_task(
                session.get('http://localhost:5000/api/costs')
            )
            tasks.append(task)

        # Execute all requests
        start_time = time.time()
        responses = await asyncio.gather(*tasks)
        end_time = time.time()

        # Calculate metrics
        successful = sum(1 for r in responses if r.status == 200)
        total_time = end_time - start_time

        return {
            'total_requests': len(responses),
            'successful_requests': successful,
            'total_time': total_time,
            'requests_per_second': len(responses) / total_time
        }

2. Performance Benchmarks

def run_performance_benchmarks():
    """Run performance benchmarks."""
    benchmarks = {
        'cost_analysis': benchmark_cost_analysis,
        'optimization': benchmark_optimization,
        'forecasting': benchmark_forecasting,
        'caching': benchmark_caching
    }

    results = {}
    for name, benchmark in benchmarks.items():
        results[name] = benchmark()

    return results

def benchmark_cost_analysis():
    """Benchmark cost analysis performance."""
    start_time = time.time()

    # Perform cost analysis
    result = analyze_costs()

    end_time = time.time()

    return {
        'duration': end_time - start_time,
        'memory_usage': get_memory_usage(),
        'cpu_usage': get_cpu_usage()
    }

📊 Performance Metrics

1. Key Performance Indicators

class PerformanceKPIs:
    def __init__(self):
        self.kpis = {}

    def update_kpi(self, name, value):
        """Update KPI value."""
        self.kpis[name] = {
            'value': value,
            'timestamp': time.time()
        }

    def get_kpis(self):
        """Get all KPIs."""
        return {
            'response_time': {
                'current': self.kpis.get('response_time', {}).get('value', 0),
                'target': 2.0,  # seconds
                'status': 'good' if self.kpis.get('response_time', {}).get('value', 0) < 2.0 else 'poor'
            },
            'cache_hit_rate': {
                'current': self.kpis.get('cache_hit_rate', {}).get('value', 0),
                'target': 0.8,  # 80%
                'status': 'good' if self.kpis.get('cache_hit_rate', {}).get('value', 0) > 0.8 else 'poor'
            },
            'memory_usage': {
                'current': self.kpis.get('memory_usage', {}).get('value', 0),
                'target': 0.8,  # 80%
                'status': 'good' if self.kpis.get('memory_usage', {}).get('value', 0) < 0.8 else 'poor'
            },
            'error_rate': {
                'current': self.kpis.get('error_rate', {}).get('value', 0),
                'target': 0.01,  # 1%
                'status': 'good' if self.kpis.get('error_rate', {}).get('value', 0) < 0.01 else 'poor'
            }
        }

2. Performance Alerts

def check_performance_alerts():
    """Check for performance alerts."""
    kpis = PerformanceKPIs().get_kpis()
    alerts = []

    for kpi_name, kpi_data in kpis.items():
        if kpi_data['status'] == 'poor':
            alerts.append({
                'kpi': kpi_name,
                'current_value': kpi_data['current'],
                'target_value': kpi_data['target'],
                'severity': 'high' if kpi_name in ['error_rate', 'response_time'] else 'medium'
            })

    return alerts

🔧 Performance Troubleshooting

1. Common Performance Issues

High Response Times

def diagnose_high_response_times():
    """Diagnose high response times."""
    diagnostics = {
        'database_queries': check_database_performance(),
        'cache_efficiency': check_cache_performance(),
        'memory_usage': check_memory_usage(),
        'cpu_usage': check_cpu_usage(),
        'network_latency': check_network_latency()
    }

    return diagnostics

Memory Leaks

def detect_memory_leaks():
    """Detect memory leaks."""
    import gc
    import sys

    # Force garbage collection
    gc.collect()

    # Get object counts
    object_counts = {}
    for obj in gc.get_objects():
        obj_type = type(obj).__name__
        object_counts[obj_type] = object_counts.get(obj_type, 0) + 1

    # Find potential leaks
    potential_leaks = []
    for obj_type, count in object_counts.items():
        if count > 1000:  # Threshold for potential leak
            potential_leaks.append({
                'type': obj_type,
                'count': count
            })

    return potential_leaks

2. Performance Optimization Tips

Code Optimization

# Use list comprehensions instead of loops
# Good
costs = [item['cost'] for item in cost_data if item['cost'] > 0]

# Bad
costs = []
for item in cost_data:
    if item['cost'] > 0:
        costs.append(item['cost'])

# Use generators for large datasets
def cost_generator(cost_data):
    for item in cost_data:
        yield item['cost']

# Use sets for fast lookups
providers = set(['aws', 'azure', 'gcp'])
if provider in providers:
    # Fast lookup
    pass

Database Optimization

# Use bulk operations
def bulk_insert_costs(cost_data):
    """Bulk insert cost data."""
    # Prepare bulk insert
    values = [(item['date'], item['cost'], item['provider']) 
              for item in cost_data]

    # Execute bulk insert
    cursor.executemany(
        "INSERT INTO costs (date, cost, provider) VALUES (%s, %s, %s)",
        values
    )

Need help with performance? Check our Troubleshooting Guide or open an issue.