Documentation

Benchmarking Guide

Systematically measure Epochly performance improvements.

Using benchmark_context

The benchmark_context is a simple context manager for measuring execution time.

import epochly
import numpy as np
def compute_intensive_task(n):
"""CPU-intensive task"""
arr = np.random.rand(n)
return np.sum(arr ** 2 + np.sin(arr) * np.cos(arr))
# Benchmark with context manager
with epochly.benchmark_context("compute_task"):
result = compute_intensive_task(1_000_000)
# Output:
# [Benchmark] compute_task completed in 0.123s

Nested Benchmarks

import epochly
@epochly.optimize(level=2)
def process_data(data):
with epochly.benchmark_context("data_processing"):
# Phase 1
with epochly.benchmark_context("phase1_transform"):
transformed = data ** 2
# Phase 2
with epochly.benchmark_context("phase2_aggregate"):
result = np.sum(transformed)
return result
data = np.random.rand(10_000_000)
result = process_data(data)
# Output:
# [Benchmark] phase1_transform completed in 0.045s
# [Benchmark] phase2_aggregate completed in 0.012s
# [Benchmark] data_processing completed in 0.057s

CLI Benchmarking

Use the epochly --benchmark command to benchmark your scripts.

# Run with benchmarking enabled
epochly --benchmark script.py
# With specific level
epochly --benchmark --level 3 script.py
# Save benchmark results
epochly --benchmark --output benchmark_results.json script.py

Example Script for CLI Benchmarking

# benchmark_example.py
import epochly
import numpy as np
@epochly.optimize(level=2)
def matrix_operations(n=5000):
A = np.random.rand(n, n)
B = np.random.rand(n, n)
return np.dot(A, B)
result = matrix_operations()
print("Computation complete")
epochly --benchmark benchmark_example.py
# Output includes timing and optimization metrics

Baseline Comparison

Use epochly_disabled_context() to measure baseline performance without optimization.

import epochly
import numpy as np
import time
def compute_task(n):
arr = np.random.rand(n)
return np.sum(arr ** 2)
# Measure baseline (no optimization)
with epochly.epochly_disabled_context():
start = time.perf_counter()
baseline_result = compute_task(10_000_000)
baseline_time = time.perf_counter() - start
# Measure with optimization
@epochly.optimize(level=2)
def optimized_compute(n):
arr = np.random.rand(n)
return np.sum(arr ** 2)
start = time.perf_counter()
optimized_result = optimized_compute(10_000_000)
optimized_time = time.perf_counter() - start
# Compare
speedup = baseline_time / optimized_time
print(f"Baseline: {baseline_time:.3f}s")
print(f"Optimized: {optimized_time:.3f}s")
print(f"Speedup: {speedup:.2f}x")

Multi-Level Comparison

Compare all enhancement levels to find the optimal configuration.

import epochly
import numpy as np
import time
def workload(n=1_000_000):
"""Test workload"""
arr = np.random.rand(n)
return np.sum(arr ** 2 + np.sin(arr))
def benchmark_levels():
"""Compare all enhancement levels"""
levels = [0, 1, 2, 3]
results = {}
for level in levels:
with epochly.optimize_context(level=level):
times = []
# Warmup run
workload()
# Measure 5 runs
for _ in range(5):
start = time.perf_counter()
workload()
elapsed = time.perf_counter() - start
times.append(elapsed)
results[f"Level {level}"] = {
'mean': np.mean(times),
'min': np.min(times),
'max': np.max(times)
}
# Print results
print("\nLevel Comparison:")
print(f"{'Level':<10} {'Mean (s)':<12} {'Min (s)':<12} {'Max (s)':<12}")
print("-" * 50)
for level_name, stats in results.items():
print(f"{level_name:<10} {stats['mean']:>10.3f} {stats['min']:>10.3f} {stats['max']:>10.3f}")
benchmark_levels()

Warmup Handling

JIT compilation requires warmup runs before measuring performance.

import epochly
import numpy as np
import time
@epochly.optimize(level=2)
def jit_function(n):
"""Function that benefits from JIT compilation"""
arr = np.random.rand(n)
result = 0
for i in range(len(arr)):
result += arr[i] ** 2
return result
def benchmark_with_warmup(func, n, warmup_runs=3, measure_runs=10):
"""Benchmark with proper warmup"""
# Warmup phase (JIT compilation happens here)
print(f"Warmup: {warmup_runs} runs...")
for i in range(warmup_runs):
func(n)
# Measurement phase
print(f"Measuring: {measure_runs} runs...")
times = []
for i in range(measure_runs):
start = time.perf_counter()
func(n)
elapsed = time.perf_counter() - start
times.append(elapsed)
return times
# Benchmark with warmup
times = benchmark_with_warmup(jit_function, 100_000)
print(f"\nResults after warmup:")
print(f"Mean: {np.mean(times):.3f}s")
print(f"Std: {np.std(times):.3f}s")
print(f"Min: {np.min(times):.3f}s")
print(f"Max: {np.max(times):.3f}s")

Statistical Benchmarking

Measure performance with statistical analysis.

import epochly
import numpy as np
import time
import statistics
def statistical_benchmark(func, *args, runs=20):
"""Benchmark with statistical analysis"""
times = []
# Warmup
for _ in range(3):
func(*args)
# Measurement
for _ in range(runs):
start = time.perf_counter()
func(*args)
elapsed = time.perf_counter() - start
times.append(elapsed)
# Calculate statistics
return {
'mean': statistics.mean(times),
'median': statistics.median(times),
'stdev': statistics.stdev(times),
'min': min(times),
'max': max(times),
'samples': len(times)
}
@epochly.optimize(level=2)
def optimized_task(n):
arr = np.random.rand(n)
return np.sum(arr ** 2 + np.sin(arr) * np.cos(arr))
# Run statistical benchmark
stats = statistical_benchmark(optimized_task, 1_000_000, runs=20)
print("Statistical Benchmark Results:")
print(f"Mean: {stats['mean']:.4f}s")
print(f"Median: {stats['median']:.4f}s")
print(f"Stdev: {stats['stdev']:.4f}s")
print(f"Min: {stats['min']:.4f}s")
print(f"Max: {stats['max']:.4f}s")
print(f"Runs: {stats['samples']}")

Data Size Scaling

Measure performance across different data sizes to understand scalability.

import epochly
import numpy as np
import time
@epochly.optimize(level=2)
def process_array(arr):
"""Process array with computation"""
return np.sum(arr ** 2 + np.sin(arr) * np.cos(arr))
def benchmark_scaling():
"""Benchmark across different data sizes"""
sizes = [1_000, 10_000, 100_000, 1_000_000, 10_000_000]
results = []
for size in sizes:
arr = np.random.rand(size)
# Warmup
process_array(arr)
# Measure
times = []
for _ in range(5):
start = time.perf_counter()
process_array(arr)
elapsed = time.perf_counter() - start
times.append(elapsed)
mean_time = np.mean(times)
throughput = size / mean_time # elements per second
results.append({
'size': size,
'time': mean_time,
'throughput': throughput
})
# Print results
print("\nData Size Scaling:")
print(f"{'Size':<15} {'Time (s)':<15} {'Throughput (elem/s)':<20}")
print("-" * 50)
for r in results:
print(f"{r['size']:<15,} {r['time']:<15.4f} {r['throughput']:<20,.0f}")
benchmark_scaling()

Overhead Analysis

Measure Epochly overhead with tiny functions.

import epochly
import time
def tiny_function():
"""Very small function to measure overhead"""
return 1 + 1
@epochly.optimize(level=2)
def tiny_optimized():
"""Same function with optimization"""
return 1 + 1
def measure_overhead(runs=10000):
"""Measure optimization overhead"""
# Baseline
start = time.perf_counter()
for _ in range(runs):
tiny_function()
baseline_time = time.perf_counter() - start
# With optimization (after warmup)
tiny_optimized() # Warmup
start = time.perf_counter()
for _ in range(runs):
tiny_optimized()
optimized_time = time.perf_counter() - start
overhead = (optimized_time - baseline_time) / runs * 1_000_000 # microseconds
print(f"Overhead Analysis ({runs:,} calls):")
print(f"Baseline total: {baseline_time:.4f}s")
print(f"Optimized total: {optimized_time:.4f}s")
print(f"Overhead per call: {overhead:.2f}µs")
if overhead < 0:
print("✓ No measurable overhead")
else:
print(f"ℹ Overhead: {overhead:.2f}µs per call")
measure_overhead()

Memory Benchmarking

Track memory usage during execution.

import epochly
import numpy as np
import tracemalloc
def memory_benchmark(func, *args):
"""Benchmark memory usage"""
# Start memory tracking
tracemalloc.start()
# Get baseline
baseline = tracemalloc.get_traced_memory()
# Run function
result = func(*args)
# Get peak memory
current, peak = tracemalloc.get_traced_memory()
# Stop tracking
tracemalloc.stop()
return {
'baseline': baseline[0] / 1024 / 1024, # MB
'current': current / 1024 / 1024,
'peak': peak / 1024 / 1024,
'allocated': (peak - baseline[0]) / 1024 / 1024
}
@epochly.optimize(level=2)
def memory_intensive_task(n):
"""Create large arrays"""
arr1 = np.random.rand(n)
arr2 = np.random.rand(n)
result = arr1 ** 2 + arr2 ** 2
return np.sum(result)
# Benchmark memory
mem_stats = memory_benchmark(memory_intensive_task, 10_000_000)
print("Memory Usage:")
print(f"Baseline: {mem_stats['baseline']:.2f} MB")
print(f"Current: {mem_stats['current']:.2f} MB")
print(f"Peak: {mem_stats['peak']:.2f} MB")
print(f"Allocated: {mem_stats['allocated']:.2f} MB")

Benchmark Report Generation

Generate formatted benchmark reports.

import epochly
import numpy as np
import time
from datetime import datetime
def generate_benchmark_report(name, benchmarks):
"""Generate formatted benchmark report"""
report = []
report.append("=" * 70)
report.append(f"BENCHMARK REPORT: {name}")
report.append(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("=" * 70)
report.append("")
# Table header
report.append(f"{'Benchmark':<30} {'Time (s)':<12} {'Speedup':<10}")
report.append("-" * 70)
# Calculate baseline
baseline_time = benchmarks[0]['time'] if benchmarks else 1.0
# Table rows
for bench in benchmarks:
speedup = baseline_time / bench['time']
report.append(
f"{bench['name']:<30} {bench['time']:>10.4f} {speedup:>8.2f}x"
)
report.append("=" * 70)
return "\n".join(report)
# Example benchmarks
def run_benchmark_suite():
"""Run complete benchmark suite"""
benchmarks = []
def workload(n):
arr = np.random.rand(n)
return np.sum(arr ** 2 + np.sin(arr))
n = 5_000_000
# Baseline (Level 0)
with epochly.optimize_context(level=0):
workload(n) # Warmup
start = time.perf_counter()
workload(n)
elapsed = time.perf_counter() - start
benchmarks.append({'name': 'Baseline (Level 0)', 'time': elapsed})
# Level 1
with epochly.optimize_context(level=1):
workload(n) # Warmup
start = time.perf_counter()
workload(n)
elapsed = time.perf_counter() - start
benchmarks.append({'name': 'Level 1 (Threading)', 'time': elapsed})
# Level 2
with epochly.optimize_context(level=2):
workload(n) # Warmup
start = time.perf_counter()
workload(n)
elapsed = time.perf_counter() - start
benchmarks.append({'name': 'Level 2 (JIT)', 'time': elapsed})
# Level 3
with epochly.optimize_context(level=3):
workload(n) # Warmup
start = time.perf_counter()
workload(n)
elapsed = time.perf_counter() - start
benchmarks.append({'name': 'Level 3 (Multicore)', 'time': elapsed})
# Generate report
report = generate_benchmark_report("NumPy Operations", benchmarks)
print(report)
# Save to file
with open('benchmark_report.txt', 'w') as f:
f.write(report)
print("\n✓ Report saved to benchmark_report.txt")
run_benchmark_suite()

Best Practices

1. Always Include Warmup

# ❌ BAD: No warmup
@epochly.optimize(level=2)
def jit_func(x):
return x ** 2
times = []
for _ in range(10):
start = time.perf_counter()
jit_func(100)
times.append(time.perf_counter() - start)
# First run is slower due to JIT compilation
# ✅ GOOD: With warmup
for _ in range(3): # 3 warmup runs
jit_func(100)
times = []
for _ in range(10):
start = time.perf_counter()
jit_func(100)
times.append(time.perf_counter() - start)
# All runs are fast

2. Run Multiple Iterations

# ❌ BAD: Single measurement
start = time.perf_counter()
result = compute()
elapsed = time.perf_counter() - start
# Unreliable, affected by noise
# ✅ GOOD: Multiple measurements
times = []
for _ in range(20):
start = time.perf_counter()
result = compute()
times.append(time.perf_counter() - start)
mean_time = statistics.mean(times)
std_time = statistics.stdev(times)
# More reliable with statistical analysis

3. Use Consistent Environment

import os
import epochly
# Set consistent configuration
os.environ['EPOCHLY_LEVEL'] = '2'
os.environ['EPOCHLY_MAX_WORKERS'] = '8'
epochly.configure(
enhancement_level=2,
max_workers=8
)
# Run benchmarks with consistent settings

4. Test with Representative Data

# ❌ BAD: Trivial test data
test_data = np.array([1, 2, 3])
# ✅ GOOD: Production-like data
test_data = np.random.rand(1_000_000) # Similar to production size

5. Use Statistical Analysis

import statistics
def robust_benchmark(func, runs=20):
"""Benchmark with outlier detection"""
times = []
# Warmup
for _ in range(3):
func()
# Measure
for _ in range(runs):
start = time.perf_counter()
func()
times.append(time.perf_counter() - start)
# Calculate statistics
mean = statistics.mean(times)
stdev = statistics.stdev(times)
# Remove outliers (> 2 standard deviations)
filtered_times = [t for t in times if abs(t - mean) <= 2 * stdev]
return {
'mean': statistics.mean(filtered_times),
'median': statistics.median(filtered_times),
'stdev': statistics.stdev(filtered_times),
'samples': len(filtered_times),
'outliers_removed': len(times) - len(filtered_times)
}

6. Benchmark in Isolation

# ❌ BAD: Other processes running
# Run benchmarks while other apps are active
# ✅ GOOD: Dedicated benchmarking
# Close other applications
# Run on dedicated machine or container
# Disable background tasks
import psutil
def check_system_load():
"""Check if system is idle enough for benchmarking"""
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 20:
print(f"⚠ Warning: CPU usage is {cpu_percent}%")
print("Consider closing other applications")
else:
print(f"✓ System ready for benchmarking (CPU: {cpu_percent}%)")
check_system_load()

7. Document Benchmark Configuration

def benchmark_metadata():
"""Capture benchmark environment details"""
import platform
import epochly
metadata = {
'timestamp': datetime.now().isoformat(),
'python_version': platform.python_version(),
'platform': platform.platform(),
'processor': platform.processor(),
'cpu_count': os.cpu_count(),
'epochly_version': epochly.__version__,
'epochly_level': epochly.get_level(),
'max_workers': epochly.get_config().get('max_workers')
}
print("Benchmark Environment:")
for key, value in metadata.items():
print(f" {key}: {value}")
return metadata
# Run before benchmarks
metadata = benchmark_metadata()