Documentation

Custom Optimization

Fine-tuning Epochly optimization for specific workloads and use cases.

Worker Configuration

Control the number of parallel workers with EPOCHLY_MAX_WORKERS.

# Set maximum number of workers
export EPOCHLY_MAX_WORKERS=8
# Use all available cores
export EPOCHLY_MAX_WORKERS=-1
# Single-threaded execution
export EPOCHLY_MAX_WORKERS=1

Programmatic Configuration

import epochly
# Configure workers at runtime
epochly.configure(max_workers=8)
# Query current configuration
config = epochly.get_config()
print(f"Max workers: {config['max_workers']}")

Dynamic Worker Adjustment

import epochly
import os
def auto_configure_workers():
"""Automatically configure workers based on CPU count"""
cpu_count = os.cpu_count()
if cpu_count <= 4:
workers = cpu_count
elif cpu_count <= 16:
workers = cpu_count - 2
else:
workers = max(16, cpu_count - 4)
epochly.configure(max_workers=workers)
print(f"Configured {workers} workers for {cpu_count} CPUs")
auto_configure_workers()

Core Reservation

Reserve CPU cores for system processes to prevent oversubscription.

Environment Variables

# Reserve specific number of cores
export EPOCHLY_RESERVE_CORES=2
# Use percentage of cores
export EPOCHLY_MAX_CORES_PERCENT=75 # Use 75% of available cores

Configuration Examples

import epochly
# Reserve 2 cores for system tasks
epochly.configure(
enhancement_level=3,
reserve_cores=2
)
# Use 80% of available cores
epochly.configure(
enhancement_level=3,
max_cores_percent=80
)

Core Reservation Strategy

System TypeCoresReserveWorkersReason
Desktop81-26-7Keep UI responsive
Workstation162-412-14Balance work and system
Server644-856-60Maximize throughput
ContainerVariable1N-1Leave headroom

JIT Backend Selection

Choose the JIT compiler backend based on your Python version and requirements.

Available Backends

# Use Numba (default for Python 3.9-3.12)
export EPOCHLY_JIT_BACKEND=numba
# Use native JIT (Python 3.13+ only)
export EPOCHLY_JIT_BACKEND=native
# Use Pyston (if installed)
export EPOCHLY_JIT_BACKEND=pyston

Backend Comparison

BackendPython VersionCompilation SpeedRuntime SpeedMemory
Numba3.9+Slow (first run)Very FastMedium
Native3.13+FastFastLow
Pyston3.8+FastFastMedium

Programmatic Backend Selection

import epochly
import sys
def select_jit_backend():
"""Select optimal JIT backend"""
python_version = sys.version_info
if python_version >= (3, 13):
# Use native JIT on Python 3.13+
epochly.configure(jit_backend='native')
print("Using native JIT backend")
else:
# Fall back to Numba
epochly.configure(jit_backend='numba')
print("Using Numba backend")
select_jit_backend()

JIT Threshold Tuning

Configure when hot paths trigger JIT compilation.

# Set hot path threshold (number of executions)
export EPOCHLY_JIT_HOT_PATH_THRESHOLD=100
# Lower threshold for aggressive compilation
export EPOCHLY_JIT_HOT_PATH_THRESHOLD=10
# Higher threshold for conservative compilation
export EPOCHLY_JIT_HOT_PATH_THRESHOLD=1000

Threshold Guidelines

import epochly
# Aggressive: Compile frequently executed code quickly
epochly.configure(
enhancement_level=2,
jit_hot_path_threshold=10 # Compile after 10 calls
)
# Balanced: Default behavior
epochly.configure(
enhancement_level=2,
jit_hot_path_threshold=100
)
# Conservative: Only compile very hot paths
epochly.configure(
enhancement_level=2,
jit_hot_path_threshold=1000
)

Workload-Based Threshold Selection

import epochly
def configure_for_workload(workload_type):
"""Configure JIT threshold based on workload"""
thresholds = {
'short_lived': 10, # Quick scripts
'interactive': 50, # User-facing apps
'batch': 100, # Data processing
'long_running': 1000 # Services/daemons
}
threshold = thresholds.get(workload_type, 100)
epochly.configure(
enhancement_level=2,
jit_hot_path_threshold=threshold
)
print(f"JIT threshold set to {threshold} for {workload_type} workload")
# Configure for batch processing
configure_for_workload('batch')

Memory Pool Configuration

Configure shared memory and memory limits.

# Set shared memory pool size (MB)
export EPOCHLY_MEMORY_SHARED_SIZE=1024
# Set total memory limit (GB)
export EPOCHLY_MEMORY_LIMIT_GB=8
# Set worker memory limit (MB per worker)
export EPOCHLY_WORKER_MEMORY_LIMIT=512

Memory Configuration

import epochly
# Configure memory pools
epochly.configure(
enhancement_level=3,
memory_shared_size=1024, # 1GB shared pool
memory_limit_gb=8, # 8GB total limit
worker_memory_limit=512 # 512MB per worker
)

Calculate Memory Requirements

import epochly
import os
def calculate_memory_config(data_size_gb, num_workers):
"""Calculate optimal memory configuration"""
# Estimate memory per worker
worker_memory = (data_size_gb * 1024) / num_workers * 1.5 # 50% overhead
# Shared memory for inter-worker communication
shared_memory = min(1024, worker_memory * 0.2) # 20% of worker memory
# Total memory limit
total_memory = worker_memory * num_workers + shared_memory
config = {
'worker_memory_limit': int(worker_memory),
'memory_shared_size': int(shared_memory),
'memory_limit_gb': int(total_memory / 1024)
}
print(f"Memory Configuration:")
print(f" Worker memory: {config['worker_memory_limit']} MB")
print(f" Shared memory: {config['memory_shared_size']} MB")
print(f" Total limit: {config['memory_limit_gb']} GB")
return config
# Configure for 10GB dataset with 8 workers
config = calculate_memory_config(10, 8)
epochly.configure(**config)

GPU Configuration

Fine-tune GPU memory and workload thresholds.

# Set GPU workload threshold (minimum array size for GPU)
export EPOCHLY_GPU_WORKLOAD_THRESHOLD=1000000
# Set GPU memory limit (MB)
export EPOCHLY_GPU_MEMORY_LIMIT=4096
# Control visible GPUs
export CUDA_VISIBLE_DEVICES=0,1 # Use GPUs 0 and 1

GPU Threshold Configuration

import epochly
# Configure GPU thresholds
epochly.configure(
enhancement_level=4,
gpu_workload_threshold=1_000_000, # Use GPU for arrays >= 1M elements
gpu_memory_limit=4096 # Limit to 4GB
)
@epochly.optimize(level=4)
def smart_gpu_compute(arr):
"""Automatically uses GPU for large arrays"""
# GPU used if len(arr) >= 1M
return arr ** 2 + np.sin(arr)

Multi-GPU Configuration

import epochly
import os
def configure_multi_gpu():
"""Configure for multi-GPU setup"""
# Select specific GPUs
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3'
# Configure memory per GPU
epochly.configure(
enhancement_level=4,
gpu_memory_limit=8192, # 8GB per GPU
max_workers=4 # One worker per GPU
)
print("Configured 4 GPUs with 8GB each")
configure_multi_gpu()

Conditional Optimization

Choose optimization level based on data characteristics.

import epochly
import numpy as np
def smart_compute(data):
"""Adapt optimization level to data size"""
data_size = len(data)
# Small data: No optimization needed
if data_size < 10_000:
with epochly.optimize_context(level=0):
return np.sum(data ** 2)
# Medium data: Use JIT
elif data_size < 1_000_000:
with epochly.optimize_context(level=2):
return np.sum(data ** 2)
# Large data: Use multicore
else:
with epochly.optimize_context(level=3):
return np.sum(data ** 2)
# Test with different sizes
small = np.random.rand(1_000)
medium = np.random.rand(100_000)
large = np.random.rand(10_000_000)
print(f"Small result: {smart_compute(small)}")
print(f"Medium result: {smart_compute(medium)}")
print(f"Large result: {smart_compute(large)}")

Threshold-Based Optimization

import epochly
class AdaptiveOptimizer:
"""Automatically select optimization level"""
def __init__(self):
self.thresholds = {
'small': 10_000,
'medium': 1_000_000,
'large': 10_000_000
}
def get_level(self, data_size):
"""Get optimization level for data size"""
if data_size < self.thresholds['small']:
return 0 # No optimization
elif data_size < self.thresholds['medium']:
return 2 # JIT
elif data_size < self.thresholds['large']:
return 3 # Multicore
else:
return 4 # GPU (if available)
def optimize(self, func, data):
"""Execute with optimal level"""
level = self.get_level(len(data))
with epochly.optimize_context(level=level):
return func(data)
# Use adaptive optimizer
optimizer = AdaptiveOptimizer()
def compute(data):
return np.sum(data ** 2)
data = np.random.rand(5_000_000)
result = optimizer.optimize(compute, data)

Mixed Level Operations

Use different optimization levels for different pipeline stages.

import epochly
import numpy as np
def data_pipeline(file_paths):
"""Pipeline with mixed optimization levels"""
# Stage 1: I/O - Use threading (Level 1)
with epochly.optimize_context(level=1):
raw_data = []
for path in file_paths:
data = np.load(path)
raw_data.append(data)
# Stage 2: Transform - Use JIT (Level 2)
with epochly.optimize_context(level=2):
transformed = []
for data in raw_data:
result = data ** 2 + np.sin(data) * np.cos(data)
transformed.append(result)
# Stage 3: Aggregate - Use multicore (Level 3)
with epochly.optimize_context(level=3):
aggregated = np.sum([np.sum(t) for t in transformed])
return aggregated
# Process multiple files
files = [f'data_{i}.npy' for i in range(10)]
result = data_pipeline(files)
print(f"Pipeline result: {result}")

Complex Pipeline Example

import epochly
class OptimizedPipeline:
"""Multi-stage pipeline with per-stage optimization"""
def __init__(self):
self.stage_levels = {
'load': 1, # I/O bound
'clean': 2, # CPU bound, benefits from JIT
'transform': 2, # CPU bound, benefits from JIT
'aggregate': 3, # Parallel aggregation
'save': 1 # I/O bound
}
def run(self, data):
"""Execute pipeline with optimized stages"""
result = data
# Load stage
with epochly.optimize_context(level=self.stage_levels['load']):
print("Loading data...")
# I/O operations here
# Clean stage
with epochly.optimize_context(level=self.stage_levels['clean']):
print("Cleaning data...")
result = self._clean(result)
# Transform stage
with epochly.optimize_context(level=self.stage_levels['transform']):
print("Transforming data...")
result = self._transform(result)
# Aggregate stage
with epochly.optimize_context(level=self.stage_levels['aggregate']):
print("Aggregating results...")
result = self._aggregate(result)
# Save stage
with epochly.optimize_context(level=self.stage_levels['save']):
print("Saving results...")
# I/O operations here
return result
def _clean(self, data):
# Remove outliers, handle missing values
return data[data > 0]
def _transform(self, data):
# Apply transformations
return data ** 2 + np.sin(data)
def _aggregate(self, data):
# Aggregate results
return np.sum(data)
# Run pipeline
pipeline = OptimizedPipeline()
data = np.random.randn(10_000_000)
result = pipeline.run(data)

Disabling for Specific Code

Temporarily disable optimization for specific code sections.

import epochly
@epochly.optimize(level=3)
def mixed_optimization(data):
"""Function with selectively disabled optimization"""
# Optimized section
result1 = np.sum(data ** 2)
# Disable optimization for problematic code
with epochly.epochly_disabled_context():
# This code runs without optimization
# Useful for debugging or incompatible operations
result2 = some_library_function(data)
# Resume optimization
result3 = np.sum(data * 2)
return result1 + result2 + result3

Selective Disabling

import epochly
@epochly.optimize(level=3)
def process_with_exceptions(data_list):
"""Process most data optimized, some unoptimized"""
results = []
for data in data_list:
# Check if data needs special handling
if needs_special_handling(data):
# Disable optimization for special cases
with epochly.epochly_disabled_context():
result = special_process(data)
else:
# Use optimization for normal cases
result = normal_process(data)
results.append(result)
return results
def needs_special_handling(data):
# Check for edge cases
return len(data) < 100 or np.any(np.isnan(data))

Profile-Guided Optimization

Automatically detect the best optimization level through profiling.

import epochly
import time
import numpy as np
def profile_workload(func, data, runs=3):
"""Profile function to find best optimization level"""
levels = [0, 1, 2, 3]
results = {}
print("Profiling workload...")
for level in levels:
times = []
with epochly.optimize_context(level=level):
# Warmup
func(data)
# Measure
for _ in range(runs):
start = time.perf_counter()
func(data)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg_time = np.mean(times)
results[level] = avg_time
print(f" Level {level}: {avg_time:.4f}s")
# Find best level
best_level = min(results, key=results.get)
speedup = results[0] / results[best_level]
print(f"\nBest level: {best_level} ({speedup:.2f}x speedup)")
return best_level
# Define workload
def compute_workload(data):
return np.sum(data ** 2 + np.sin(data) * np.cos(data))
# Profile and get best level
data = np.random.rand(1_000_000)
best_level = profile_workload(compute_workload, data)
# Use best level
with epochly.optimize_context(level=best_level):
result = compute_workload(data)

Auto-Tuning System

import epochly
import time
class AutoTuner:
"""Automatically tune optimization settings"""
def __init__(self, func):
self.func = func
self.best_level = None
self.best_time = float('inf')
def tune(self, sample_data, runs=5):
"""Find optimal level for function"""
print(f"Auto-tuning {self.func.__name__}...")
for level in [0, 1, 2, 3]:
with epochly.optimize_context(level=level):
# Warmup
self.func(sample_data)
# Measure
start = time.perf_counter()
for _ in range(runs):
self.func(sample_data)
elapsed = time.perf_counter() - start
avg_time = elapsed / runs
if avg_time < self.best_time:
self.best_time = avg_time
self.best_level = level
print(f" Level {level}: {avg_time:.4f}s")
print(f"Optimal level: {self.best_level}")
return self.best_level
def __call__(self, data):
"""Call function with optimal level"""
if self.best_level is None:
raise ValueError("Must call tune() first")
with epochly.optimize_context(level=self.best_level):
return self.func(data)
# Use auto-tuner
def my_function(data):
return np.sum(data ** 2)
tuner = AutoTuner(my_function)
sample = np.random.rand(100_000)
tuner.tune(sample)
# Now use with optimal settings
data = np.random.rand(10_000_000)
result = tuner(data)

Batch Size Tuning

Process data in optimal batch sizes for best performance.

import epochly
import numpy as np
@epochly.optimize(level=3)
def process_in_batches(data, batch_size=100_000):
"""Process large dataset in optimized batches"""
results = []
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
result = np.sum(batch ** 2 + np.sin(batch))
results.append(result)
return sum(results)
# Test different batch sizes
data = np.random.rand(10_000_000)
batch_sizes = [10_000, 50_000, 100_000, 500_000]
for batch_size in batch_sizes:
import time
start = time.perf_counter()
result = process_in_batches(data, batch_size)
elapsed = time.perf_counter() - start
print(f"Batch size {batch_size:>8,}: {elapsed:.3f}s")

Automatic Batch Size Selection

import epochly
import numpy as np
def calculate_optimal_batch_size(data_size, workers, memory_per_worker_mb=512):
"""Calculate optimal batch size"""
# Estimate memory per element (in bytes)
element_size = 8 # float64
# Memory per worker in bytes
worker_memory = memory_per_worker_mb * 1024 * 1024
# Elements per batch
elements_per_batch = worker_memory // element_size // 2 # 50% safety margin
# Ensure minimum batch size
batch_size = max(1000, min(elements_per_batch, data_size // workers))
return batch_size
@epochly.optimize(level=3)
def smart_batch_processing(data, workers=8):
"""Process with automatically calculated batch size"""
batch_size = calculate_optimal_batch_size(len(data), workers)
print(f"Using batch size: {batch_size:,}")
results = []
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
result = np.sum(batch ** 2)
results.append(result)
return sum(results)
data = np.random.rand(50_000_000)
result = smart_batch_processing(data)

Logging Configuration

Configure detailed logging for debugging and monitoring.

# Enable debug logging
export EPOCHLY_LOG_LEVEL=DEBUG
# Set log file
export EPOCHLY_LOG_FILE=/var/log/epochly.log
# Log to console and file
export EPOCHLY_LOG_CONSOLE=true

Programmatic Logging

import epochly
import logging
# Configure logging
epochly.configure_logging(
level='DEBUG',
log_file='epochly_debug.log',
console=True
)
# Get Epochly logger
logger = epochly.get_logger()
@epochly.optimize(level=3)
def logged_function(data):
logger.info(f"Processing {len(data)} elements")
result = np.sum(data ** 2)
logger.info(f"Result: {result}")
return result
data = np.random.rand(1_000_000)
result = logged_function(data)

Log Level Guidelines

LevelUse CaseOutput
DEBUGDevelopment, troubleshootingAll messages including internal details
INFOProduction monitoringImportant events and milestones
WARNINGUnexpected behaviorWarnings and potential issues
ERRORError handlingErrors and exceptions
CRITICALSilent operationOnly critical failures

Best Practices

1. Profile First

import epochly
# Always profile before optimizing
def workflow():
# Profile baseline
with epochly.benchmark_context("baseline"):
baseline_result = compute_baseline()
# Profile optimized
@epochly.optimize(level=2)
def optimized_compute():
return compute_baseline()
with epochly.benchmark_context("optimized"):
optimized_result = optimized_compute()

2. Start Conservative

# Start with Level 2 (JIT) for most workloads
epochly.configure(enhancement_level=2)
# Only move to Level 3 if profiling shows benefit
# Only use Level 4 if you have GPU and large arrays

3. Match Workload

# I/O-bound: Level 1 (Threading)
@epochly.optimize(level=1)
def read_files(paths):
return [open(p).read() for p in paths]
# CPU-bound, medium data: Level 2 (JIT)
@epochly.optimize(level=2)
def compute_stats(data):
return np.mean(data), np.std(data)
# CPU-bound, large data: Level 3 (Multicore)
@epochly.optimize(level=3)
def parallel_processing(big_data):
return [process(chunk) for chunk in chunks(big_data)]
# Massive arrays: Level 4 (GPU)
@epochly.optimize(level=4)
def gpu_compute(huge_array):
return huge_array ** 2 + np.sin(huge_array)

4. Tune Incrementally

import epochly
# Start with defaults
epochly.configure(enhancement_level=2)
# Measure baseline
baseline_time = measure_performance()
# Tune one parameter at a time
epochly.configure(
enhancement_level=2,
jit_hot_path_threshold=50 # Adjust threshold
)
tuned_time = measure_performance()
if tuned_time < baseline_time * 0.9: # 10% improvement
print("Tuning improved performance")
else:
print("Revert to defaults")

5. Monitor Overhead

import epochly
import time
def check_overhead():
"""Ensure optimization overhead is acceptable"""
def tiny_function():
return sum(range(100))
# Measure without optimization
start = time.perf_counter()
for _ in range(1000):
tiny_function()
unoptimized_time = time.perf_counter() - start
# Measure with optimization
@epochly.optimize(level=2)
def optimized_tiny():
return sum(range(100))
# Warmup
optimized_tiny()
start = time.perf_counter()
for _ in range(1000):
optimized_tiny()
optimized_time = time.perf_counter() - start
overhead_pct = (optimized_time - unoptimized_time) / unoptimized_time * 100
if overhead_pct > 10:
print(f"Warning: {overhead_pct:.1f}% overhead detected")
print("Consider using optimization only for larger workloads")
else:
print(f"Overhead acceptable: {overhead_pct:.1f}%")
check_overhead()