Context Manager Patterns
Common patterns for using Epochly context managers to scope optimization.
Available Context Managers
| Context Manager | Purpose | Default Level | Use Case |
|---|---|---|---|
optimize_context(level) | General-purpose optimization | Specified | Any optimization need |
monitoring_context() | Performance monitoring only | 0 | Baseline measurements |
jit_context() | JIT compilation | 2 | Numerical operations |
threading_context() | Threading optimization | 1 | I/O-bound operations |
full_optimize_context() | Multi-core parallelism | 3 | CPU-intensive parallel work |
benchmark_context(name) | Benchmarking with reporting | Current | Performance testing |
epochly_disabled_context() | Disable optimization | 0 (disabled) | Baseline comparison |
Pattern 1: Basic Scoped Optimization
Use optimize_context() to optimize specific code blocks:
import epochlyimport numpy as np# Normal code executiondata = np.random.rand(1_000_000)print("Data prepared")# Optimize only this blockwith epochly.optimize_context(level=2):# JIT compilation applied hereresult = np.sum(data ** 2 + np.sin(data))print(f"Result: {result}")# Back to normal executionprint("Processing complete")
Expected output:
Data preparedResult: 500234.567Processing complete
Pattern 2: Monitoring Context
Use monitoring_context() to track performance metrics:
import epochlydef expensive_operation(n):total = 0for i in range(n):total += i ** 2return total# Monitor performance without optimizationwith epochly.monitoring_context() as metrics:result = expensive_operation(10_000_000)print(f"Result: {result}")# Access metrics after contextprint(f"Duration: {metrics['duration']:.3f}s")print(f"Peak memory: {metrics['peak_memory_mb']:.2f} MB")print(f"CPU percent: {metrics['cpu_percent']:.1f}%")
Expected output:
Result: 333333283333335000000Duration: 2.145sPeak memory: 45.23 MBCPU percent: 98.5%
Pattern 3: JIT Context
Use jit_context() as a shorthand for JIT compilation:
import epochlyimport numpy as npdef numerical_computation(arr):"""Numerical loop that benefits from JIT"""total = 0.0for i in range(len(arr)):total += arr[i] ** 2 + np.sin(arr[i]) * np.cos(arr[i])return totaldata = np.random.rand(1_000_000)# JIT context (equivalent to optimize_context(level=2))with epochly.jit_context():result = numerical_computation(data)print(f"Result: {result}")
Expected output:
Result: 749234.567
Note: First call compiles the function, subsequent calls are much faster.
Pattern 4: Threading Context
Use threading_context() for concurrent I/O operations:
import epochlyimport requestsurls = ['https://api.example.com/data1','https://api.example.com/data2','https://api.example.com/data3','https://api.example.com/data4',]# Threading context for I/O operationswith epochly.threading_context(max_workers=4):results = []for url in urls:response = requests.get(url)results.append(response.json())print(f"Fetched {len(results)} URLs")
Expected output:
Fetched 4 URLs
Speedup: 3-4x faster than sequential fetching.
Pattern 5: Full Optimization Context
Use full_optimize_context() for multi-core parallelism:
import epochlyimport numpy as npdef process_chunk(chunk):"""Process a data chunk"""return np.sum(chunk ** 2 + np.sin(chunk))# Create large datasetdata = [np.random.rand(100_000) for _ in range(100)]# Full optimization (multi-core parallelism)with epochly.full_optimize_context():results = [process_chunk(chunk) for chunk in data]total = sum(results)print(f"Total: {total}")
Expected output:
Total: 7498234.567
Speedup: Near-linear with number of cores (8 cores ≈ 4–6x, 16 cores ≈ 8–12x speedup).
Pattern 6: Benchmarking Context
Use benchmark_context() to measure and report performance:
import epochlyimport numpy as npdef compute_operation(n):arr = np.random.rand(n)return np.sum(arr ** 2)# Benchmark with automatic reportingwith epochly.benchmark_context("compute_operation") as results:result = compute_operation(5_000_000)print(f"Result: {result}")# Access benchmark resultsprint(f"Duration: {results['duration']:.3f}s")
Expected output:
Result: 1666234.567[Benchmark] compute_operation completed in 0.145sDuration: 0.145s
Pattern 7: Disable Optimization
Use epochly_disabled_context() to measure baseline performance:
import epochlyimport timedef compute_task(n):total = 0for i in range(n):total += i ** 2return total# Measure baseline (no optimization)with epochly.epochly_disabled_context():start = time.perf_counter()baseline_result = compute_task(1_000_000)baseline_time = time.perf_counter() - start# Measure with optimizationwith epochly.jit_context():start = time.perf_counter()optimized_result = compute_task(1_000_000)optimized_time = time.perf_counter() - start# Comparespeedup = baseline_time / optimized_timeprint(f"Baseline: {baseline_time:.3f}s")print(f"Optimized: {optimized_time:.3f}s")print(f"Speedup: {speedup:.2f}x")
Expected output:
Baseline: 2.145sOptimized: 0.125sSpeedup: 17.16x
Pattern 8: Nested Contexts
Inner contexts override outer contexts:
import epochly# Outer context: Threading for I/Owith epochly.threading_context():print("Using threading optimization")# Load data with threadingdata = load_multiple_files(file_paths)# Inner context: JIT for computationwith epochly.jit_context():print("Using JIT optimization")result = process_data(data)# Back to threadingprint("Using threading optimization again")save_results(result)
Expected output:
Using threading optimizationUsing JIT optimizationUsing threading optimization again
Rule: The innermost context takes precedence.
Pattern 9: Error Handling
Contexts handle exceptions gracefully and restore state:
import epochlydef risky_operation(data):if len(data) == 0:raise ValueError("Empty data")return sum(data)try:with epochly.optimize_context(level=3):# Exception occurs inside contextresult = risky_operation([])except ValueError as e:print(f"Error caught: {e}")# Optimization level is automatically restoredstatus = epochly.get_status()print(f"Level restored to: {status['enhancement_level']}")
Expected output:
Error caught: Empty dataLevel restored to: 2
Guarantee: Context managers always clean up, even on exceptions.
Pattern 10: Comparing Levels
Benchmark all optimization levels:
import epochlyimport timeimport numpy as npdef workload(n):arr = np.random.rand(n)return np.sum(arr ** 2 + np.sin(arr))# Compare all levelsresults = {}for level in [0, 1, 2, 3]:with epochly.optimize_context(level=level):# Warmupworkload(1_000_000)# Measurestart = time.perf_counter()result = workload(1_000_000)elapsed = time.perf_counter() - startresults[level] = elapsed# Print comparisonprint("Level Comparison:")baseline = results[0]for level, elapsed in results.items():speedup = baseline / elapsedprint(f"Level {level}: {elapsed:.3f}s (speedup: {speedup:.2f}x)")
Expected output:
Level Comparison:Level 0: 2.145s (speedup: 1.00x)Level 1: 1.987s (speedup: 1.08x)Level 2: 0.145s (speedup: 14.79x)Level 3: 0.312s (speedup: 6.88x)
Pattern 11: Conditional Optimization
Choose context based on runtime conditions:
import epochlyimport numpy as npdef smart_process(data):"""Adapt optimization to data size"""data_size = len(data)# Small data: No optimization neededif data_size < 10_000:return np.sum(data ** 2)# Medium data: JIT helpselif data_size < 1_000_000:with epochly.jit_context():return np.sum(data ** 2)# Large data: Multi-core helpselse:with epochly.full_optimize_context():return np.sum(data ** 2)# Test with different sizessmall = np.random.rand(1_000)medium = np.random.rand(100_000)large = np.random.rand(10_000_000)print(f"Small: {smart_process(small)}")print(f"Medium: {smart_process(medium)}")print(f"Large: {smart_process(large)}")
Expected output:
Small: 333.234Medium: 33323.456Large: 3333234.567
Pattern 12: Resource Cleanup
Context managers ensure proper resource cleanup:
import epochly# Resources are properly cleaned up even with exceptionsdef process_with_cleanup():with epochly.full_optimize_context():# Worker threads are createdresult = parallel_computation(data)# If exception occurs, workers are still cleaned upif result < 0:raise ValueError("Invalid result")return result# Workers are automatically cleaned up heretry:result = process_with_cleanup()except ValueError:print("Error handled, resources cleaned up")
Expected output:
Error handled, resources cleaned up
Guarantee: Worker threads and resources are released even if exceptions occur.
Pattern 13: Mixed Workload Pipeline
Use different contexts for different pipeline stages:
import epochlyimport pandas as pddef data_pipeline(file_paths):"""Multi-stage pipeline with appropriate optimization"""# Stage 1: Load files (I/O-bound)with epochly.threading_context():print("Loading files...")dfs = [pd.read_csv(path) for path in file_paths]df = pd.concat(dfs, ignore_index=True)# Stage 2: Transform data (CPU-bound)with epochly.jit_context():print("Transforming data...")df['feature1'] = df['value'] ** 2df['feature2'] = df['value'].apply(lambda x: x ** 2)# Stage 3: Aggregate (parallel)with epochly.full_optimize_context():print("Aggregating...")result = df.groupby('category')['value'].sum()return resultfiles = ['data1.csv', 'data2.csv', 'data3.csv']result = data_pipeline(files)
Expected output:
Loading files...Transforming data...Aggregating...
Best Practices
1. Use for Isolated Operations
Context managers are perfect for scoping optimization to specific operations:
# ✅ GOOD: Scoped optimizationdef process_data(data):# Normal processingcleaned = clean_data(data)# Optimize expensive part onlywith epochly.jit_context():transformed = transform_data(cleaned)# Normal processingreturn save_data(transformed)# ❌ BAD: Over-optimization@epochly.optimize(level=3)def process_data(data):# Everything is optimized, even simple operationscleaned = clean_data(data)transformed = transform_data(cleaned)return save_data(transformed)
2. Benchmark Before Deciding
Always measure before committing to a level:
import epochly# Test different approacheslevels_to_test = [1, 2, 3]for level in levels_to_test:with epochly.benchmark_context(f"Level {level}"):with epochly.optimize_context(level=level):result = my_function(data)# Choose the fastest
3. Disable for Baselines
Use epochly_disabled_context() to establish baselines:
# Measure baseline firstwith epochly.epochly_disabled_context():baseline_time = measure_performance()# Then measure optimizedwith epochly.optimize_context(level=2):optimized_time = measure_performance()speedup = baseline_time / optimized_time
4. Match Level to Workload
Choose the appropriate context for your workload type:
| Workload Type | Context Manager | Reason |
|---|---|---|
| I/O operations | threading_context() | Concurrent I/O access |
| Numerical loops | jit_context() | JIT compilation benefits |
| Parallel processing | full_optimize_context() | Multi-core utilization |
| Baseline measurement | monitoring_context() | No optimization overhead |
5. Prefer Contexts Over Global Config
Contexts are more explicit and maintainable:
# ✅ GOOD: Explicit scopingdef load_and_process():with epochly.threading_context():data = load_files()with epochly.jit_context():return process(data)# ❌ LESS CLEAR: Global configurationepochly.configure(level=1)data = load_files()epochly.configure(level=2)result = process(data)