Documentation

Examples: Basic

Epochly Examples: Basic: Context Manager Patterns

Epochly context manager patterns for optimizing code blocks, nested contexts, and conditional activation.

Common patterns for using Epochly context managers to scope optimization.

Available Context Managers

Context ManagerPurposeDefault LevelUse Case
optimize_context(level)General-purpose optimizationSpecifiedAny optimization need
monitoring_context()Performance monitoring only0Baseline measurements
jit_context()JIT compilation2Numerical operations
threading_context()Threading optimization1I/O-bound operations
full_optimize_context()Multi-core parallelism3CPU-intensive parallel work
benchmark_context(name)Benchmarking with reportingCurrentPerformance testing
epochly_disabled_context()Disable optimization0 (disabled)Baseline comparison

Pattern 1: Basic Scoped Optimization

Use optimize_context() to optimize specific code blocks:

import epochly
import numpy as np
# Normal code execution
data = np.random.rand(1_000_000)
print("Data prepared")
# Optimize only this block
with epochly.optimize_context(level=2):
# JIT compilation applied here
result = np.sum(data ** 2 + np.sin(data))
print(f"Result: {result}")
# Back to normal execution
print("Processing complete")

Expected output:

Data prepared
Result: 500234.567
Processing complete

Pattern 2: Monitoring Context

Use monitoring_context() to track performance metrics:

import epochly
def expensive_operation(n):
total = 0
for i in range(n):
total += i ** 2
return total
# Monitor performance without optimization
with epochly.monitoring_context() as metrics:
result = expensive_operation(10_000_000)
print(f"Result: {result}")
# Access metrics after context
print(f"Duration: {metrics['duration']:.3f}s")
print(f"Peak memory: {metrics['peak_memory_mb']:.2f} MB")
print(f"CPU percent: {metrics['cpu_percent']:.1f}%")

Expected output:

Result: 333333283333335000000
Duration: 2.145s
Peak memory: 45.23 MB
CPU percent: 98.5%

Pattern 3: JIT Context

Use jit_context() as a shorthand for JIT compilation:

import epochly
import numpy as np
def numerical_computation(arr):
"""Numerical loop that benefits from JIT"""
total = 0.0
for i in range(len(arr)):
total += arr[i] ** 2 + np.sin(arr[i]) * np.cos(arr[i])
return total
data = np.random.rand(1_000_000)
# JIT context (equivalent to optimize_context(level=2))
with epochly.jit_context():
result = numerical_computation(data)
print(f"Result: {result}")

Expected output:

Result: 749234.567

Note: First call compiles the function, subsequent calls are much faster.

Pattern 4: Threading Context

Use threading_context() for concurrent I/O operations:

import epochly
import requests
urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3',
'https://api.example.com/data4',
]
# Threading context for I/O operations
with epochly.threading_context(max_workers=4):
results = []
for url in urls:
response = requests.get(url)
results.append(response.json())
print(f"Fetched {len(results)} URLs")

Expected output:

Fetched 4 URLs

Speedup: 3-4x faster than sequential fetching.

Pattern 5: Full Optimization Context

Use full_optimize_context() for multi-core parallelism:

import epochly
import numpy as np
def process_chunk(chunk):
"""Process a data chunk"""
return np.sum(chunk ** 2 + np.sin(chunk))
# Create large dataset
data = [np.random.rand(100_000) for _ in range(100)]
# Full optimization (multi-core parallelism)
with epochly.full_optimize_context():
results = [process_chunk(chunk) for chunk in data]
total = sum(results)
print(f"Total: {total}")

Expected output:

Total: 7498234.567

Speedup: Near-linear with number of cores (8 cores ≈ 4–6x, 16 cores ≈ 8–12x speedup).

Pattern 6: Benchmarking Context

Use benchmark_context() to measure and report performance:

import epochly
import numpy as np
def compute_operation(n):
arr = np.random.rand(n)
return np.sum(arr ** 2)
# Benchmark with automatic reporting
with epochly.benchmark_context("compute_operation") as results:
result = compute_operation(5_000_000)
print(f"Result: {result}")
# Access benchmark results
print(f"Duration: {results['duration']:.3f}s")

Expected output:

Result: 1666234.567
[Benchmark] compute_operation completed in 0.145s
Duration: 0.145s

Pattern 7: Disable Optimization

Use epochly_disabled_context() to measure baseline performance:

import epochly
import time
def compute_task(n):
total = 0
for i in range(n):
total += i ** 2
return total
# Measure baseline (no optimization)
with epochly.epochly_disabled_context():
start = time.perf_counter()
baseline_result = compute_task(1_000_000)
baseline_time = time.perf_counter() - start
# Measure with optimization
with epochly.jit_context():
start = time.perf_counter()
optimized_result = compute_task(1_000_000)
optimized_time = time.perf_counter() - start
# Compare
speedup = baseline_time / optimized_time
print(f"Baseline: {baseline_time:.3f}s")
print(f"Optimized: {optimized_time:.3f}s")
print(f"Speedup: {speedup:.2f}x")

Expected output:

Baseline: 2.145s
Optimized: 0.125s
Speedup: 17.16x

Pattern 8: Nested Contexts

Inner contexts override outer contexts:

import epochly
# Outer context: Threading for I/O
with epochly.threading_context():
print("Using threading optimization")
# Load data with threading
data = load_multiple_files(file_paths)
# Inner context: JIT for computation
with epochly.jit_context():
print("Using JIT optimization")
result = process_data(data)
# Back to threading
print("Using threading optimization again")
save_results(result)

Expected output:

Using threading optimization
Using JIT optimization
Using threading optimization again

Rule: The innermost context takes precedence.

Pattern 9: Error Handling

Contexts handle exceptions gracefully and restore state:

import epochly
def risky_operation(data):
if len(data) == 0:
raise ValueError("Empty data")
return sum(data)
try:
with epochly.optimize_context(level=3):
# Exception occurs inside context
result = risky_operation([])
except ValueError as e:
print(f"Error caught: {e}")
# Optimization level is automatically restored
status = epochly.get_status()
print(f"Level restored to: {status['enhancement_level']}")

Expected output:

Error caught: Empty data
Level restored to: 2

Guarantee: Context managers always clean up, even on exceptions.

Pattern 10: Comparing Levels

Benchmark all optimization levels:

import epochly
import time
import numpy as np
def workload(n):
arr = np.random.rand(n)
return np.sum(arr ** 2 + np.sin(arr))
# Compare all levels
results = {}
for level in [0, 1, 2, 3]:
with epochly.optimize_context(level=level):
# Warmup
workload(1_000_000)
# Measure
start = time.perf_counter()
result = workload(1_000_000)
elapsed = time.perf_counter() - start
results[level] = elapsed
# Print comparison
print("Level Comparison:")
baseline = results[0]
for level, elapsed in results.items():
speedup = baseline / elapsed
print(f"Level {level}: {elapsed:.3f}s (speedup: {speedup:.2f}x)")

Expected output:

Level Comparison:
Level 0: 2.145s (speedup: 1.00x)
Level 1: 1.987s (speedup: 1.08x)
Level 2: 0.145s (speedup: 14.79x)
Level 3: 0.312s (speedup: 6.88x)

Pattern 11: Conditional Optimization

Choose context based on runtime conditions:

import epochly
import numpy as np
def smart_process(data):
"""Adapt optimization to data size"""
data_size = len(data)
# Small data: No optimization needed
if data_size < 10_000:
return np.sum(data ** 2)
# Medium data: JIT helps
elif data_size < 1_000_000:
with epochly.jit_context():
return np.sum(data ** 2)
# Large data: Multi-core helps
else:
with epochly.full_optimize_context():
return np.sum(data ** 2)
# Test with different sizes
small = np.random.rand(1_000)
medium = np.random.rand(100_000)
large = np.random.rand(10_000_000)
print(f"Small: {smart_process(small)}")
print(f"Medium: {smart_process(medium)}")
print(f"Large: {smart_process(large)}")

Expected output:

Small: 333.234
Medium: 33323.456
Large: 3333234.567

Pattern 12: Resource Cleanup

Context managers ensure proper resource cleanup:

import epochly
# Resources are properly cleaned up even with exceptions
def process_with_cleanup():
with epochly.full_optimize_context():
# Worker threads are created
result = parallel_computation(data)
# If exception occurs, workers are still cleaned up
if result < 0:
raise ValueError("Invalid result")
return result
# Workers are automatically cleaned up here
try:
result = process_with_cleanup()
except ValueError:
print("Error handled, resources cleaned up")

Expected output:

Error handled, resources cleaned up

Guarantee: Worker threads and resources are released even if exceptions occur.

Pattern 13: Mixed Workload Pipeline

Use different contexts for different pipeline stages:

import epochly
import pandas as pd
def data_pipeline(file_paths):
"""Multi-stage pipeline with appropriate optimization"""
# Stage 1: Load files (I/O-bound)
with epochly.threading_context():
print("Loading files...")
dfs = [pd.read_csv(path) for path in file_paths]
df = pd.concat(dfs, ignore_index=True)
# Stage 2: Transform data (CPU-bound)
with epochly.jit_context():
print("Transforming data...")
df['feature1'] = df['value'] ** 2
df['feature2'] = df['value'].apply(lambda x: x ** 2)
# Stage 3: Aggregate (parallel)
with epochly.full_optimize_context():
print("Aggregating...")
result = df.groupby('category')['value'].sum()
return result
files = ['data1.csv', 'data2.csv', 'data3.csv']
result = data_pipeline(files)

Expected output:

Loading files...
Transforming data...
Aggregating...

Best Practices

1. Use for Isolated Operations

Context managers are perfect for scoping optimization to specific operations:

# ✅ GOOD: Scoped optimization
def process_data(data):
# Normal processing
cleaned = clean_data(data)
# Optimize expensive part only
with epochly.jit_context():
transformed = transform_data(cleaned)
# Normal processing
return save_data(transformed)
# ❌ BAD: Over-optimization
@epochly.optimize(level=3)
def process_data(data):
# Everything is optimized, even simple operations
cleaned = clean_data(data)
transformed = transform_data(cleaned)
return save_data(transformed)

2. Benchmark Before Deciding

Always measure before committing to a level:

import epochly
# Test different approaches
levels_to_test = [1, 2, 3]
for level in levels_to_test:
with epochly.benchmark_context(f"Level {level}"):
with epochly.optimize_context(level=level):
result = my_function(data)
# Choose the fastest

3. Disable for Baselines

Use epochly_disabled_context() to establish baselines:

# Measure baseline first
with epochly.epochly_disabled_context():
baseline_time = measure_performance()
# Then measure optimized
with epochly.optimize_context(level=2):
optimized_time = measure_performance()
speedup = baseline_time / optimized_time

4. Match Level to Workload

Choose the appropriate context for your workload type:

Workload TypeContext ManagerReason
I/O operationsthreading_context()Concurrent I/O access
Numerical loopsjit_context()JIT compilation benefits
Parallel processingfull_optimize_context()Multi-core utilization
Baseline measurementmonitoring_context()No optimization overhead

5. Prefer Contexts Over Global Config

Contexts are more explicit and maintainable:

# ✅ GOOD: Explicit scoping
def load_and_process():
with epochly.threading_context():
data = load_files()
with epochly.jit_context():
return process(data)
# ❌ LESS CLEAR: Global configuration
epochly.configure(enhancement_level=1)
data = load_files()
epochly.configure(enhancement_level=2)
result = process(data)