Documentation

Context Manager Patterns

Common patterns for using Epochly context managers to scope optimization.

Available Context Managers

Context ManagerPurposeDefault LevelUse Case
optimize_context(level)General-purpose optimizationSpecifiedAny optimization need
monitoring_context()Performance monitoring only0Baseline measurements
jit_context()JIT compilation2Numerical operations
threading_context()Threading optimization1I/O-bound operations
full_optimize_context()Multi-core parallelism3CPU-intensive parallel work
benchmark_context(name)Benchmarking with reportingCurrentPerformance testing
epochly_disabled_context()Disable optimization0 (disabled)Baseline comparison

Pattern 1: Basic Scoped Optimization

Use optimize_context() to optimize specific code blocks:

import epochly
import numpy as np
# Normal code execution
data = np.random.rand(1_000_000)
print("Data prepared")
# Optimize only this block
with epochly.optimize_context(level=2):
# JIT compilation applied here
result = np.sum(data ** 2 + np.sin(data))
print(f"Result: {result}")
# Back to normal execution
print("Processing complete")

Expected output:

Data prepared
Result: 500234.567
Processing complete

Pattern 2: Monitoring Context

Use monitoring_context() to track performance metrics:

import epochly
def expensive_operation(n):
total = 0
for i in range(n):
total += i ** 2
return total
# Monitor performance without optimization
with epochly.monitoring_context() as metrics:
result = expensive_operation(10_000_000)
print(f"Result: {result}")
# Access metrics after context
print(f"Duration: {metrics['duration']:.3f}s")
print(f"Peak memory: {metrics['peak_memory_mb']:.2f} MB")
print(f"CPU percent: {metrics['cpu_percent']:.1f}%")

Expected output:

Result: 333333283333335000000
Duration: 2.145s
Peak memory: 45.23 MB
CPU percent: 98.5%

Pattern 3: JIT Context

Use jit_context() as a shorthand for JIT compilation:

import epochly
import numpy as np
def numerical_computation(arr):
"""Numerical loop that benefits from JIT"""
total = 0.0
for i in range(len(arr)):
total += arr[i] ** 2 + np.sin(arr[i]) * np.cos(arr[i])
return total
data = np.random.rand(1_000_000)
# JIT context (equivalent to optimize_context(level=2))
with epochly.jit_context():
result = numerical_computation(data)
print(f"Result: {result}")

Expected output:

Result: 749234.567

Note: First call compiles the function, subsequent calls are much faster.

Pattern 4: Threading Context

Use threading_context() for concurrent I/O operations:

import epochly
import requests
urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3',
'https://api.example.com/data4',
]
# Threading context for I/O operations
with epochly.threading_context(max_workers=4):
results = []
for url in urls:
response = requests.get(url)
results.append(response.json())
print(f"Fetched {len(results)} URLs")

Expected output:

Fetched 4 URLs

Speedup: 3-4x faster than sequential fetching.

Pattern 5: Full Optimization Context

Use full_optimize_context() for multi-core parallelism:

import epochly
import numpy as np
def process_chunk(chunk):
"""Process a data chunk"""
return np.sum(chunk ** 2 + np.sin(chunk))
# Create large dataset
data = [np.random.rand(100_000) for _ in range(100)]
# Full optimization (multi-core parallelism)
with epochly.full_optimize_context():
results = [process_chunk(chunk) for chunk in data]
total = sum(results)
print(f"Total: {total}")

Expected output:

Total: 7498234.567

Speedup: Near-linear with number of cores (8 cores ≈ 4–6x, 16 cores ≈ 8–12x speedup).

Pattern 6: Benchmarking Context

Use benchmark_context() to measure and report performance:

import epochly
import numpy as np
def compute_operation(n):
arr = np.random.rand(n)
return np.sum(arr ** 2)
# Benchmark with automatic reporting
with epochly.benchmark_context("compute_operation") as results:
result = compute_operation(5_000_000)
print(f"Result: {result}")
# Access benchmark results
print(f"Duration: {results['duration']:.3f}s")

Expected output:

Result: 1666234.567
[Benchmark] compute_operation completed in 0.145s
Duration: 0.145s

Pattern 7: Disable Optimization

Use epochly_disabled_context() to measure baseline performance:

import epochly
import time
def compute_task(n):
total = 0
for i in range(n):
total += i ** 2
return total
# Measure baseline (no optimization)
with epochly.epochly_disabled_context():
start = time.perf_counter()
baseline_result = compute_task(1_000_000)
baseline_time = time.perf_counter() - start
# Measure with optimization
with epochly.jit_context():
start = time.perf_counter()
optimized_result = compute_task(1_000_000)
optimized_time = time.perf_counter() - start
# Compare
speedup = baseline_time / optimized_time
print(f"Baseline: {baseline_time:.3f}s")
print(f"Optimized: {optimized_time:.3f}s")
print(f"Speedup: {speedup:.2f}x")

Expected output:

Baseline: 2.145s
Optimized: 0.125s
Speedup: 17.16x

Pattern 8: Nested Contexts

Inner contexts override outer contexts:

import epochly
# Outer context: Threading for I/O
with epochly.threading_context():
print("Using threading optimization")
# Load data with threading
data = load_multiple_files(file_paths)
# Inner context: JIT for computation
with epochly.jit_context():
print("Using JIT optimization")
result = process_data(data)
# Back to threading
print("Using threading optimization again")
save_results(result)

Expected output:

Using threading optimization
Using JIT optimization
Using threading optimization again

Rule: The innermost context takes precedence.

Pattern 9: Error Handling

Contexts handle exceptions gracefully and restore state:

import epochly
def risky_operation(data):
if len(data) == 0:
raise ValueError("Empty data")
return sum(data)
try:
with epochly.optimize_context(level=3):
# Exception occurs inside context
result = risky_operation([])
except ValueError as e:
print(f"Error caught: {e}")
# Optimization level is automatically restored
status = epochly.get_status()
print(f"Level restored to: {status['enhancement_level']}")

Expected output:

Error caught: Empty data
Level restored to: 2

Guarantee: Context managers always clean up, even on exceptions.

Pattern 10: Comparing Levels

Benchmark all optimization levels:

import epochly
import time
import numpy as np
def workload(n):
arr = np.random.rand(n)
return np.sum(arr ** 2 + np.sin(arr))
# Compare all levels
results = {}
for level in [0, 1, 2, 3]:
with epochly.optimize_context(level=level):
# Warmup
workload(1_000_000)
# Measure
start = time.perf_counter()
result = workload(1_000_000)
elapsed = time.perf_counter() - start
results[level] = elapsed
# Print comparison
print("Level Comparison:")
baseline = results[0]
for level, elapsed in results.items():
speedup = baseline / elapsed
print(f"Level {level}: {elapsed:.3f}s (speedup: {speedup:.2f}x)")

Expected output:

Level Comparison:
Level 0: 2.145s (speedup: 1.00x)
Level 1: 1.987s (speedup: 1.08x)
Level 2: 0.145s (speedup: 14.79x)
Level 3: 0.312s (speedup: 6.88x)

Pattern 11: Conditional Optimization

Choose context based on runtime conditions:

import epochly
import numpy as np
def smart_process(data):
"""Adapt optimization to data size"""
data_size = len(data)
# Small data: No optimization needed
if data_size < 10_000:
return np.sum(data ** 2)
# Medium data: JIT helps
elif data_size < 1_000_000:
with epochly.jit_context():
return np.sum(data ** 2)
# Large data: Multi-core helps
else:
with epochly.full_optimize_context():
return np.sum(data ** 2)
# Test with different sizes
small = np.random.rand(1_000)
medium = np.random.rand(100_000)
large = np.random.rand(10_000_000)
print(f"Small: {smart_process(small)}")
print(f"Medium: {smart_process(medium)}")
print(f"Large: {smart_process(large)}")

Expected output:

Small: 333.234
Medium: 33323.456
Large: 3333234.567

Pattern 12: Resource Cleanup

Context managers ensure proper resource cleanup:

import epochly
# Resources are properly cleaned up even with exceptions
def process_with_cleanup():
with epochly.full_optimize_context():
# Worker threads are created
result = parallel_computation(data)
# If exception occurs, workers are still cleaned up
if result < 0:
raise ValueError("Invalid result")
return result
# Workers are automatically cleaned up here
try:
result = process_with_cleanup()
except ValueError:
print("Error handled, resources cleaned up")

Expected output:

Error handled, resources cleaned up

Guarantee: Worker threads and resources are released even if exceptions occur.

Pattern 13: Mixed Workload Pipeline

Use different contexts for different pipeline stages:

import epochly
import pandas as pd
def data_pipeline(file_paths):
"""Multi-stage pipeline with appropriate optimization"""
# Stage 1: Load files (I/O-bound)
with epochly.threading_context():
print("Loading files...")
dfs = [pd.read_csv(path) for path in file_paths]
df = pd.concat(dfs, ignore_index=True)
# Stage 2: Transform data (CPU-bound)
with epochly.jit_context():
print("Transforming data...")
df['feature1'] = df['value'] ** 2
df['feature2'] = df['value'].apply(lambda x: x ** 2)
# Stage 3: Aggregate (parallel)
with epochly.full_optimize_context():
print("Aggregating...")
result = df.groupby('category')['value'].sum()
return result
files = ['data1.csv', 'data2.csv', 'data3.csv']
result = data_pipeline(files)

Expected output:

Loading files...
Transforming data...
Aggregating...

Best Practices

1. Use for Isolated Operations

Context managers are perfect for scoping optimization to specific operations:

# ✅ GOOD: Scoped optimization
def process_data(data):
# Normal processing
cleaned = clean_data(data)
# Optimize expensive part only
with epochly.jit_context():
transformed = transform_data(cleaned)
# Normal processing
return save_data(transformed)
# ❌ BAD: Over-optimization
@epochly.optimize(level=3)
def process_data(data):
# Everything is optimized, even simple operations
cleaned = clean_data(data)
transformed = transform_data(cleaned)
return save_data(transformed)

2. Benchmark Before Deciding

Always measure before committing to a level:

import epochly
# Test different approaches
levels_to_test = [1, 2, 3]
for level in levels_to_test:
with epochly.benchmark_context(f"Level {level}"):
with epochly.optimize_context(level=level):
result = my_function(data)
# Choose the fastest

3. Disable for Baselines

Use epochly_disabled_context() to establish baselines:

# Measure baseline first
with epochly.epochly_disabled_context():
baseline_time = measure_performance()
# Then measure optimized
with epochly.optimize_context(level=2):
optimized_time = measure_performance()
speedup = baseline_time / optimized_time

4. Match Level to Workload

Choose the appropriate context for your workload type:

Workload TypeContext ManagerReason
I/O operationsthreading_context()Concurrent I/O access
Numerical loopsjit_context()JIT compilation benefits
Parallel processingfull_optimize_context()Multi-core utilization
Baseline measurementmonitoring_context()No optimization overhead

5. Prefer Contexts Over Global Config

Contexts are more explicit and maintainable:

# ✅ GOOD: Explicit scoping
def load_and_process():
with epochly.threading_context():
data = load_files()
with epochly.jit_context():
return process(data)
# ❌ LESS CLEAR: Global configuration
epochly.configure(level=1)
data = load_files()
epochly.configure(level=2)
result = process(data)