Documentation

Examples: Intermediate

Epochly Examples: Intermediate: Custom Optimization

Advanced optimization patterns: custom level selection, workload-specific tuning, and optimization hints.

Fine-tuning Epochly optimization for specific workloads and use cases.

Worker Configuration

Control the number of parallel workers with EPOCHLY_MAX_WORKERS.

# Set maximum number of workers
export EPOCHLY_MAX_WORKERS=8
# Use all available cores
export EPOCHLY_MAX_WORKERS=-1
# Single-threaded execution
export EPOCHLY_MAX_WORKERS=1

Programmatic Configuration

import epochly
# Configure workers at runtime
epochly.configure(max_workers=8)
# Query current configuration
config = epochly.get_config()
print(f"Max workers: {config['max_workers']}")

Dynamic Worker Adjustment

import epochly
import os
def auto_configure_workers():
"""Automatically configure workers based on CPU count"""
cpu_count = os.cpu_count()
if cpu_count <= 4:
workers = cpu_count
elif cpu_count <= 16:
workers = cpu_count - 2
else:
workers = max(16, cpu_count - 4)
epochly.configure(max_workers=workers)
print(f"Configured {workers} workers for {cpu_count} CPUs")
auto_configure_workers()

Core Reservation

Reserve CPU cores for system processes to prevent oversubscription.

Environment Variables

# Reserve specific number of cores
export EPOCHLY_RESERVE_CORES=2
# Use percentage of cores
export EPOCHLY_MAX_CORES_PERCENT=75 # Use 75% of available cores

Configuration Examples

import epochly
# Limit workers to leave cores for system tasks
epochly.configure(
enhancement_level=3,
max_workers=6 # On 8-core system, reserves 2 for system
)

Core Reservation Strategy

System TypeCoresReserveWorkersReason
Desktop81-26-7Keep UI responsive
Workstation162-412-14Balance work and system
Server644-856-60Maximize throughput
ContainerVariable1N-1Leave headroom

JIT Backend Selection

Choose the JIT compiler backend based on your Python version and requirements.

Enabling JIT

# Enable JIT compilation (Epochly auto-selects the best backend)
export EPOCHLY_JIT_ENABLED=true
export EPOCHLY_LEVEL=2

Backend Comparison

BackendPython VersionCompilation SpeedRuntime SpeedMemory
Numba3.9+Slow (first run)Very FastMedium
Native3.13+FastFastLow
Pyston3.8+FastFastMedium

Programmatic Backend Selection

import epochly
import sys
def configure_jit():
"""Configure JIT compilation"""
# Enable JIT compilation
epochly.configure(jit_enabled=True)
print("JIT compilation enabled")
# Epochly automatically selects the best backend
status = epochly.get_status()
print(f"JIT available: {status.get('jit_available', False)}")
configure_jit()

JIT Threshold Tuning

Configure JIT compilation behavior.

# Enable/disable JIT via environment
export EPOCHLY_LEVEL=2 # Level 2 enables JIT

JIT Configuration

import epochly
# Enable JIT compilation
epochly.configure(
enhancement_level=2,
jit_enabled=True
)

Workload-Based Configuration

import epochly
def configure_for_workload(workload_type):
"""Configure enhancement level based on workload"""
levels = {
'short_lived': 1, # Threading for quick scripts
'interactive': 2, # JIT for user-facing apps
'batch': 3, # Multicore for data processing
'long_running': 2 # JIT for services/daemons
}
modes = {
'short_lived': 'conservative',
'interactive': 'balanced',
'batch': 'aggressive',
'long_running': 'balanced'
}
level = levels.get(workload_type, 2)
mode = modes.get(workload_type, 'balanced')
epochly.configure(
enhancement_level=level,
mode=mode
)
print(f"Configured level {level} ({mode}) for {workload_type} workload")
# Configure for batch processing
configure_for_workload('batch')

Memory Pool Configuration

Configure shared memory and memory limits.

# Set shared memory pool size (MB)
export EPOCHLY_MEMORY_SHARED_SIZE=1024
# Set total memory limit (MB)
export EPOCHLY_MEMORY_LIMIT=8192

Memory Configuration

import epochly
# Configure memory limit
epochly.configure(
enhancement_level=3,
memory_limit=8192 # 8GB total limit
)

Calculate Memory Requirements

import epochly
import os
def calculate_memory_config(data_size_gb, num_workers):
"""Calculate optimal memory configuration"""
# Estimate memory per worker
worker_memory = (data_size_gb * 1024) / num_workers * 1.5 # 50% overhead
# Shared memory for inter-worker communication
shared_memory = min(1024, worker_memory * 0.2) # 20% of worker memory
# Total memory limit
total_memory = worker_memory * num_workers + shared_memory
config = {
'worker_memory_limit': int(worker_memory),
'memory_shared_size': int(shared_memory),
'memory_limit_gb': int(total_memory / 1024)
}
print(f"Memory Configuration:")
print(f" Worker memory: {config['worker_memory_limit']} MB")
print(f" Shared memory: {config['memory_shared_size']} MB")
print(f" Total limit: {config['memory_limit_gb']} GB")
return config
# Configure for 10GB dataset with 8 workers
config = calculate_memory_config(10, 8)
epochly.configure(**config)

GPU Configuration

Fine-tune GPU memory and workload thresholds.

# Set memory limit (MB)
export EPOCHLY_MEMORY_LIMIT=4096
# Control visible GPUs
export CUDA_VISIBLE_DEVICES=0,1 # Use GPUs 0 and 1

GPU Threshold Configuration

import epochly
# Configure GPU thresholds
epochly.configure(
enhancement_level=4,
gpu_enabled=True,
memory_limit=4096 # Limit to 4GB
)
@epochly.optimize(level=4)
def smart_gpu_compute(arr):
"""Automatically uses GPU for large arrays"""
# GPU used if len(arr) >= 1M
return arr ** 2 + np.sin(arr)

Multi-GPU Configuration

import epochly
import os
def configure_multi_gpu():
"""Configure for multi-GPU setup"""
# Select specific GPUs
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3'
# Configure memory per GPU
epochly.configure(
enhancement_level=4,
memory_limit=8192, # 8GB per GPU
max_workers=4 # One worker per GPU
)
print("Configured 4 GPUs with 8GB each")
configure_multi_gpu()

Conditional Optimization

Choose optimization level based on data characteristics.

import epochly
import numpy as np
def smart_compute(data):
"""Adapt optimization level to data size"""
data_size = len(data)
# Small data: No optimization needed
if data_size < 10_000:
with epochly.optimize_context(level=0):
return np.sum(data ** 2)
# Medium data: Use JIT
elif data_size < 1_000_000:
with epochly.optimize_context(level=2):
return np.sum(data ** 2)
# Large data: Use multicore
else:
with epochly.optimize_context(level=3):
return np.sum(data ** 2)
# Test with different sizes
small = np.random.rand(1_000)
medium = np.random.rand(100_000)
large = np.random.rand(10_000_000)
print(f"Small result: {smart_compute(small)}")
print(f"Medium result: {smart_compute(medium)}")
print(f"Large result: {smart_compute(large)}")

Threshold-Based Optimization

import epochly
class AdaptiveOptimizer:
"""Automatically select optimization level"""
def __init__(self):
self.thresholds = {
'small': 10_000,
'medium': 1_000_000,
'large': 10_000_000
}
def get_level(self, data_size):
"""Get optimization level for data size"""
if data_size < self.thresholds['small']:
return 0 # No optimization
elif data_size < self.thresholds['medium']:
return 2 # JIT
elif data_size < self.thresholds['large']:
return 3 # Multicore
else:
return 4 # GPU (if available)
def optimize(self, func, data):
"""Execute with optimal level"""
level = self.get_level(len(data))
with epochly.optimize_context(level=level):
return func(data)
# Use adaptive optimizer
optimizer = AdaptiveOptimizer()
def compute(data):
return np.sum(data ** 2)
data = np.random.rand(5_000_000)
result = optimizer.optimize(compute, data)

Mixed Level Operations

Use different optimization levels for different pipeline stages.

import epochly
import numpy as np
def data_pipeline(file_paths):
"""Pipeline with mixed optimization levels"""
# Stage 1: I/O - Use threading (Level 1)
with epochly.optimize_context(level=1):
raw_data = []
for path in file_paths:
data = np.load(path)
raw_data.append(data)
# Stage 2: Transform - Use JIT (Level 2)
with epochly.optimize_context(level=2):
transformed = []
for data in raw_data:
result = data ** 2 + np.sin(data) * np.cos(data)
transformed.append(result)
# Stage 3: Aggregate - Use multicore (Level 3)
with epochly.optimize_context(level=3):
aggregated = np.sum([np.sum(t) for t in transformed])
return aggregated
# Process multiple files
files = [f'data_{i}.npy' for i in range(10)]
result = data_pipeline(files)
print(f"Pipeline result: {result}")

Complex Pipeline Example

import epochly
class OptimizedPipeline:
"""Multi-stage pipeline with per-stage optimization"""
def __init__(self):
self.stage_levels = {
'load': 1, # I/O bound
'clean': 2, # CPU bound, benefits from JIT
'transform': 2, # CPU bound, benefits from JIT
'aggregate': 3, # Parallel aggregation
'save': 1 # I/O bound
}
def run(self, data):
"""Execute pipeline with optimized stages"""
result = data
# Load stage
with epochly.optimize_context(level=self.stage_levels['load']):
print("Loading data...")
# I/O operations here
# Clean stage
with epochly.optimize_context(level=self.stage_levels['clean']):
print("Cleaning data...")
result = self._clean(result)
# Transform stage
with epochly.optimize_context(level=self.stage_levels['transform']):
print("Transforming data...")
result = self._transform(result)
# Aggregate stage
with epochly.optimize_context(level=self.stage_levels['aggregate']):
print("Aggregating results...")
result = self._aggregate(result)
# Save stage
with epochly.optimize_context(level=self.stage_levels['save']):
print("Saving results...")
# I/O operations here
return result
def _clean(self, data):
# Remove outliers, handle missing values
return data[data > 0]
def _transform(self, data):
# Apply transformations
return data ** 2 + np.sin(data)
def _aggregate(self, data):
# Aggregate results
return np.sum(data)
# Run pipeline
pipeline = OptimizedPipeline()
data = np.random.randn(10_000_000)
result = pipeline.run(data)

Disabling for Specific Code

Temporarily disable optimization for specific code sections.

import epochly
@epochly.optimize(level=3)
def mixed_optimization(data):
"""Function with selectively disabled optimization"""
# Optimized section
result1 = np.sum(data ** 2)
# Disable optimization for problematic code
with epochly.epochly_disabled_context():
# This code runs without optimization
# Useful for debugging or incompatible operations
result2 = some_library_function(data)
# Resume optimization
result3 = np.sum(data * 2)
return result1 + result2 + result3

Selective Disabling

import epochly
@epochly.optimize(level=3)
def process_with_exceptions(data_list):
"""Process most data optimized, some unoptimized"""
results = []
for data in data_list:
# Check if data needs special handling
if needs_special_handling(data):
# Disable optimization for special cases
with epochly.epochly_disabled_context():
result = special_process(data)
else:
# Use optimization for normal cases
result = normal_process(data)
results.append(result)
return results
def needs_special_handling(data):
# Check for edge cases
return len(data) < 100 or np.any(np.isnan(data))

Profile-Guided Optimization

Automatically detect the best optimization level through profiling.

import epochly
import time
import numpy as np
def profile_workload(func, data, runs=3):
"""Profile function to find best optimization level"""
levels = [0, 1, 2, 3]
results = {}
print("Profiling workload...")
for level in levels:
times = []
with epochly.optimize_context(level=level):
# Warmup
func(data)
# Measure
for _ in range(runs):
start = time.perf_counter()
func(data)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg_time = np.mean(times)
results[level] = avg_time
print(f" Level {level}: {avg_time:.4f}s")
# Find best level
best_level = min(results, key=results.get)
speedup = results[0] / results[best_level]
print(f"\nBest level: {best_level} ({speedup:.2f}x speedup)")
return best_level
# Define workload
def compute_workload(data):
return np.sum(data ** 2 + np.sin(data) * np.cos(data))
# Profile and get best level
data = np.random.rand(1_000_000)
best_level = profile_workload(compute_workload, data)
# Use best level
with epochly.optimize_context(level=best_level):
result = compute_workload(data)

Auto-Tuning System

import epochly
import time
class AutoTuner:
"""Automatically tune optimization settings"""
def __init__(self, func):
self.func = func
self.best_level = None
self.best_time = float('inf')
def tune(self, sample_data, runs=5):
"""Find optimal level for function"""
print(f"Auto-tuning {self.func.__name__}...")
for level in [0, 1, 2, 3]:
with epochly.optimize_context(level=level):
# Warmup
self.func(sample_data)
# Measure
start = time.perf_counter()
for _ in range(runs):
self.func(sample_data)
elapsed = time.perf_counter() - start
avg_time = elapsed / runs
if avg_time < self.best_time:
self.best_time = avg_time
self.best_level = level
print(f" Level {level}: {avg_time:.4f}s")
print(f"Optimal level: {self.best_level}")
return self.best_level
def __call__(self, data):
"""Call function with optimal level"""
if self.best_level is None:
raise ValueError("Must call tune() first")
with epochly.optimize_context(level=self.best_level):
return self.func(data)
# Use auto-tuner
def my_function(data):
return np.sum(data ** 2)
tuner = AutoTuner(my_function)
sample = np.random.rand(100_000)
tuner.tune(sample)
# Now use with optimal settings
data = np.random.rand(10_000_000)
result = tuner(data)

Batch Size Tuning

Process data in optimal batch sizes for best performance.

import epochly
import numpy as np
@epochly.optimize(level=3)
def process_in_batches(data, batch_size=100_000):
"""Process large dataset in optimized batches"""
results = []
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
result = np.sum(batch ** 2 + np.sin(batch))
results.append(result)
return sum(results)
# Test different batch sizes
data = np.random.rand(10_000_000)
batch_sizes = [10_000, 50_000, 100_000, 500_000]
for batch_size in batch_sizes:
import time
start = time.perf_counter()
result = process_in_batches(data, batch_size)
elapsed = time.perf_counter() - start
print(f"Batch size {batch_size:>8,}: {elapsed:.3f}s")

Automatic Batch Size Selection

import epochly
import numpy as np
def calculate_optimal_batch_size(data_size, workers, memory_per_worker_mb=512):
"""Calculate optimal batch size"""
# Estimate memory per element (in bytes)
element_size = 8 # float64
# Memory per worker in bytes
worker_memory = memory_per_worker_mb * 1024 * 1024
# Elements per batch
elements_per_batch = worker_memory // element_size // 2 # 50% safety margin
# Ensure minimum batch size
batch_size = max(1000, min(elements_per_batch, data_size // workers))
return batch_size
@epochly.optimize(level=3)
def smart_batch_processing(data, workers=8):
"""Process with automatically calculated batch size"""
batch_size = calculate_optimal_batch_size(len(data), workers)
print(f"Using batch size: {batch_size:,}")
results = []
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
result = np.sum(batch ** 2)
results.append(result)
return sum(results)
data = np.random.rand(50_000_000)
result = smart_batch_processing(data)

Logging Configuration

Configure detailed logging for debugging and monitoring.

# Enable debug logging
export EPOCHLY_LOG_LEVEL=DEBUG
# Set log file
export EPOCHLY_LOG_FILE=/var/log/epochly.log

Programmatic Logging

import epochly
import logging
# Configure logging via standard library
logger = logging.getLogger('epochly')
logger.setLevel(logging.DEBUG)
# Configure Epochly log level
epochly.configure(log_level='DEBUG')
@epochly.optimize(level=3)
def logged_function(data):
logger.info(f"Processing {len(data)} elements")
result = np.sum(data ** 2)
logger.info(f"Result: {result}")
return result
data = np.random.rand(1_000_000)
result = logged_function(data)

Log Level Guidelines

LevelUse CaseOutput
DEBUGDevelopment, troubleshootingAll messages including internal details
INFOProduction monitoringImportant events and milestones
WARNINGUnexpected behaviorWarnings and potential issues
ERRORError handlingErrors and exceptions
CRITICALSilent operationOnly critical failures

Best Practices

1. Profile First

import epochly
# Always profile before optimizing
def workflow():
# Profile baseline
with epochly.benchmark_context("baseline"):
baseline_result = compute_baseline()
# Profile optimized
@epochly.optimize(level=2)
def optimized_compute():
return compute_baseline()
with epochly.benchmark_context("optimized"):
optimized_result = optimized_compute()

2. Start Conservative

# Start with Level 2 (JIT) for most workloads
epochly.configure(enhancement_level=2)
# Only move to Level 3 if profiling shows benefit
# Only use Level 4 if you have GPU and large arrays

3. Match Workload

# I/O-bound: Level 1 (Threading)
@epochly.optimize(level=1)
def read_files(paths):
return [open(p).read() for p in paths]
# CPU-bound, medium data: Level 2 (JIT)
@epochly.optimize(level=2)
def compute_stats(data):
return np.mean(data), np.std(data)
# CPU-bound, large data: Level 3 (Multicore)
@epochly.optimize(level=3)
def parallel_processing(big_data):
return [process(chunk) for chunk in chunks(big_data)]
# Massive arrays: Level 4 (GPU)
@epochly.optimize(level=4)
def gpu_compute(huge_array):
return huge_array ** 2 + np.sin(huge_array)

4. Tune Incrementally

import epochly
# Start with defaults
epochly.configure(enhancement_level=2)
# Measure baseline
baseline_time = measure_performance()
# Tune one parameter at a time
epochly.configure(
enhancement_level=2,
jit_enabled=True
)
tuned_time = measure_performance()
if tuned_time < baseline_time * 0.9: # 10% improvement
print("Tuning improved performance")
else:
print("Revert to defaults")

5. Monitor Overhead

import epochly
import time
def check_overhead():
"""Ensure optimization overhead is acceptable"""
def tiny_function():
return sum(range(100))
# Measure without optimization
start = time.perf_counter()
for _ in range(1000):
tiny_function()
unoptimized_time = time.perf_counter() - start
# Measure with optimization
@epochly.optimize(level=2)
def optimized_tiny():
return sum(range(100))
# Warmup
optimized_tiny()
start = time.perf_counter()
for _ in range(1000):
optimized_tiny()
optimized_time = time.perf_counter() - start
overhead_pct = (optimized_time - unoptimized_time) / unoptimized_time * 100
if overhead_pct > 10:
print(f"Warning: {overhead_pct:.1f}% overhead detected")
print("Consider using optimization only for larger workloads")
else:
print(f"Overhead acceptable: {overhead_pct:.1f}%")
check_overhead()