Custom Optimization
Fine-tuning Epochly optimization for specific workloads and use cases.
Worker Configuration
Control the number of parallel workers with EPOCHLY_MAX_WORKERS.
# Set maximum number of workersexport EPOCHLY_MAX_WORKERS=8# Use all available coresexport EPOCHLY_MAX_WORKERS=-1# Single-threaded executionexport EPOCHLY_MAX_WORKERS=1
Programmatic Configuration
import epochly# Configure workers at runtimeepochly.configure(max_workers=8)# Query current configurationconfig = epochly.get_config()print(f"Max workers: {config['max_workers']}")
Dynamic Worker Adjustment
import epochlyimport osdef auto_configure_workers():"""Automatically configure workers based on CPU count"""cpu_count = os.cpu_count()if cpu_count <= 4:workers = cpu_countelif cpu_count <= 16:workers = cpu_count - 2else:workers = max(16, cpu_count - 4)epochly.configure(max_workers=workers)print(f"Configured {workers} workers for {cpu_count} CPUs")auto_configure_workers()
Core Reservation
Reserve CPU cores for system processes to prevent oversubscription.
Environment Variables
# Reserve specific number of coresexport EPOCHLY_RESERVE_CORES=2# Use percentage of coresexport EPOCHLY_MAX_CORES_PERCENT=75 # Use 75% of available cores
Configuration Examples
import epochly# Reserve 2 cores for system tasksepochly.configure(enhancement_level=3,reserve_cores=2)# Use 80% of available coresepochly.configure(enhancement_level=3,max_cores_percent=80)
Core Reservation Strategy
| System Type | Cores | Reserve | Workers | Reason |
|---|---|---|---|---|
| Desktop | 8 | 1-2 | 6-7 | Keep UI responsive |
| Workstation | 16 | 2-4 | 12-14 | Balance work and system |
| Server | 64 | 4-8 | 56-60 | Maximize throughput |
| Container | Variable | 1 | N-1 | Leave headroom |
JIT Backend Selection
Choose the JIT compiler backend based on your Python version and requirements.
Available Backends
# Use Numba (default for Python 3.9-3.12)export EPOCHLY_JIT_BACKEND=numba# Use native JIT (Python 3.13+ only)export EPOCHLY_JIT_BACKEND=native# Use Pyston (if installed)export EPOCHLY_JIT_BACKEND=pyston
Backend Comparison
| Backend | Python Version | Compilation Speed | Runtime Speed | Memory |
|---|---|---|---|---|
| Numba | 3.9+ | Slow (first run) | Very Fast | Medium |
| Native | 3.13+ | Fast | Fast | Low |
| Pyston | 3.8+ | Fast | Fast | Medium |
Programmatic Backend Selection
import epochlyimport sysdef select_jit_backend():"""Select optimal JIT backend"""python_version = sys.version_infoif python_version >= (3, 13):# Use native JIT on Python 3.13+epochly.configure(jit_backend='native')print("Using native JIT backend")else:# Fall back to Numbaepochly.configure(jit_backend='numba')print("Using Numba backend")select_jit_backend()
JIT Threshold Tuning
Configure when hot paths trigger JIT compilation.
# Set hot path threshold (number of executions)export EPOCHLY_JIT_HOT_PATH_THRESHOLD=100# Lower threshold for aggressive compilationexport EPOCHLY_JIT_HOT_PATH_THRESHOLD=10# Higher threshold for conservative compilationexport EPOCHLY_JIT_HOT_PATH_THRESHOLD=1000
Threshold Guidelines
import epochly# Aggressive: Compile frequently executed code quicklyepochly.configure(enhancement_level=2,jit_hot_path_threshold=10 # Compile after 10 calls)# Balanced: Default behaviorepochly.configure(enhancement_level=2,jit_hot_path_threshold=100)# Conservative: Only compile very hot pathsepochly.configure(enhancement_level=2,jit_hot_path_threshold=1000)
Workload-Based Threshold Selection
import epochlydef configure_for_workload(workload_type):"""Configure JIT threshold based on workload"""thresholds = {'short_lived': 10, # Quick scripts'interactive': 50, # User-facing apps'batch': 100, # Data processing'long_running': 1000 # Services/daemons}threshold = thresholds.get(workload_type, 100)epochly.configure(enhancement_level=2,jit_hot_path_threshold=threshold)print(f"JIT threshold set to {threshold} for {workload_type} workload")# Configure for batch processingconfigure_for_workload('batch')
Memory Pool Configuration
Configure shared memory and memory limits.
# Set shared memory pool size (MB)export EPOCHLY_MEMORY_SHARED_SIZE=1024# Set total memory limit (GB)export EPOCHLY_MEMORY_LIMIT_GB=8# Set worker memory limit (MB per worker)export EPOCHLY_WORKER_MEMORY_LIMIT=512
Memory Configuration
import epochly# Configure memory poolsepochly.configure(enhancement_level=3,memory_shared_size=1024, # 1GB shared poolmemory_limit_gb=8, # 8GB total limitworker_memory_limit=512 # 512MB per worker)
Calculate Memory Requirements
import epochlyimport osdef calculate_memory_config(data_size_gb, num_workers):"""Calculate optimal memory configuration"""# Estimate memory per workerworker_memory = (data_size_gb * 1024) / num_workers * 1.5 # 50% overhead# Shared memory for inter-worker communicationshared_memory = min(1024, worker_memory * 0.2) # 20% of worker memory# Total memory limittotal_memory = worker_memory * num_workers + shared_memoryconfig = {'worker_memory_limit': int(worker_memory),'memory_shared_size': int(shared_memory),'memory_limit_gb': int(total_memory / 1024)}print(f"Memory Configuration:")print(f" Worker memory: {config['worker_memory_limit']} MB")print(f" Shared memory: {config['memory_shared_size']} MB")print(f" Total limit: {config['memory_limit_gb']} GB")return config# Configure for 10GB dataset with 8 workersconfig = calculate_memory_config(10, 8)epochly.configure(**config)
GPU Configuration
Fine-tune GPU memory and workload thresholds.
# Set GPU workload threshold (minimum array size for GPU)export EPOCHLY_GPU_WORKLOAD_THRESHOLD=1000000# Set GPU memory limit (MB)export EPOCHLY_GPU_MEMORY_LIMIT=4096# Control visible GPUsexport CUDA_VISIBLE_DEVICES=0,1 # Use GPUs 0 and 1
GPU Threshold Configuration
import epochly# Configure GPU thresholdsepochly.configure(enhancement_level=4,gpu_workload_threshold=1_000_000, # Use GPU for arrays >= 1M elementsgpu_memory_limit=4096 # Limit to 4GB)@epochly.optimize(level=4)def smart_gpu_compute(arr):"""Automatically uses GPU for large arrays"""# GPU used if len(arr) >= 1Mreturn arr ** 2 + np.sin(arr)
Multi-GPU Configuration
import epochlyimport osdef configure_multi_gpu():"""Configure for multi-GPU setup"""# Select specific GPUsos.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3'# Configure memory per GPUepochly.configure(enhancement_level=4,gpu_memory_limit=8192, # 8GB per GPUmax_workers=4 # One worker per GPU)print("Configured 4 GPUs with 8GB each")configure_multi_gpu()
Conditional Optimization
Choose optimization level based on data characteristics.
import epochlyimport numpy as npdef smart_compute(data):"""Adapt optimization level to data size"""data_size = len(data)# Small data: No optimization neededif data_size < 10_000:with epochly.optimize_context(level=0):return np.sum(data ** 2)# Medium data: Use JITelif data_size < 1_000_000:with epochly.optimize_context(level=2):return np.sum(data ** 2)# Large data: Use multicoreelse:with epochly.optimize_context(level=3):return np.sum(data ** 2)# Test with different sizessmall = np.random.rand(1_000)medium = np.random.rand(100_000)large = np.random.rand(10_000_000)print(f"Small result: {smart_compute(small)}")print(f"Medium result: {smart_compute(medium)}")print(f"Large result: {smart_compute(large)}")
Threshold-Based Optimization
import epochlyclass AdaptiveOptimizer:"""Automatically select optimization level"""def __init__(self):self.thresholds = {'small': 10_000,'medium': 1_000_000,'large': 10_000_000}def get_level(self, data_size):"""Get optimization level for data size"""if data_size < self.thresholds['small']:return 0 # No optimizationelif data_size < self.thresholds['medium']:return 2 # JITelif data_size < self.thresholds['large']:return 3 # Multicoreelse:return 4 # GPU (if available)def optimize(self, func, data):"""Execute with optimal level"""level = self.get_level(len(data))with epochly.optimize_context(level=level):return func(data)# Use adaptive optimizeroptimizer = AdaptiveOptimizer()def compute(data):return np.sum(data ** 2)data = np.random.rand(5_000_000)result = optimizer.optimize(compute, data)
Mixed Level Operations
Use different optimization levels for different pipeline stages.
import epochlyimport numpy as npdef data_pipeline(file_paths):"""Pipeline with mixed optimization levels"""# Stage 1: I/O - Use threading (Level 1)with epochly.optimize_context(level=1):raw_data = []for path in file_paths:data = np.load(path)raw_data.append(data)# Stage 2: Transform - Use JIT (Level 2)with epochly.optimize_context(level=2):transformed = []for data in raw_data:result = data ** 2 + np.sin(data) * np.cos(data)transformed.append(result)# Stage 3: Aggregate - Use multicore (Level 3)with epochly.optimize_context(level=3):aggregated = np.sum([np.sum(t) for t in transformed])return aggregated# Process multiple filesfiles = [f'data_{i}.npy' for i in range(10)]result = data_pipeline(files)print(f"Pipeline result: {result}")
Complex Pipeline Example
import epochlyclass OptimizedPipeline:"""Multi-stage pipeline with per-stage optimization"""def __init__(self):self.stage_levels = {'load': 1, # I/O bound'clean': 2, # CPU bound, benefits from JIT'transform': 2, # CPU bound, benefits from JIT'aggregate': 3, # Parallel aggregation'save': 1 # I/O bound}def run(self, data):"""Execute pipeline with optimized stages"""result = data# Load stagewith epochly.optimize_context(level=self.stage_levels['load']):print("Loading data...")# I/O operations here# Clean stagewith epochly.optimize_context(level=self.stage_levels['clean']):print("Cleaning data...")result = self._clean(result)# Transform stagewith epochly.optimize_context(level=self.stage_levels['transform']):print("Transforming data...")result = self._transform(result)# Aggregate stagewith epochly.optimize_context(level=self.stage_levels['aggregate']):print("Aggregating results...")result = self._aggregate(result)# Save stagewith epochly.optimize_context(level=self.stage_levels['save']):print("Saving results...")# I/O operations herereturn resultdef _clean(self, data):# Remove outliers, handle missing valuesreturn data[data > 0]def _transform(self, data):# Apply transformationsreturn data ** 2 + np.sin(data)def _aggregate(self, data):# Aggregate resultsreturn np.sum(data)# Run pipelinepipeline = OptimizedPipeline()data = np.random.randn(10_000_000)result = pipeline.run(data)
Disabling for Specific Code
Temporarily disable optimization for specific code sections.
import epochly@epochly.optimize(level=3)def mixed_optimization(data):"""Function with selectively disabled optimization"""# Optimized sectionresult1 = np.sum(data ** 2)# Disable optimization for problematic codewith epochly.epochly_disabled_context():# This code runs without optimization# Useful for debugging or incompatible operationsresult2 = some_library_function(data)# Resume optimizationresult3 = np.sum(data * 2)return result1 + result2 + result3
Selective Disabling
import epochly@epochly.optimize(level=3)def process_with_exceptions(data_list):"""Process most data optimized, some unoptimized"""results = []for data in data_list:# Check if data needs special handlingif needs_special_handling(data):# Disable optimization for special caseswith epochly.epochly_disabled_context():result = special_process(data)else:# Use optimization for normal casesresult = normal_process(data)results.append(result)return resultsdef needs_special_handling(data):# Check for edge casesreturn len(data) < 100 or np.any(np.isnan(data))
Profile-Guided Optimization
Automatically detect the best optimization level through profiling.
import epochlyimport timeimport numpy as npdef profile_workload(func, data, runs=3):"""Profile function to find best optimization level"""levels = [0, 1, 2, 3]results = {}print("Profiling workload...")for level in levels:times = []with epochly.optimize_context(level=level):# Warmupfunc(data)# Measurefor _ in range(runs):start = time.perf_counter()func(data)elapsed = time.perf_counter() - starttimes.append(elapsed)avg_time = np.mean(times)results[level] = avg_timeprint(f" Level {level}: {avg_time:.4f}s")# Find best levelbest_level = min(results, key=results.get)speedup = results[0] / results[best_level]print(f"\nBest level: {best_level} ({speedup:.2f}x speedup)")return best_level# Define workloaddef compute_workload(data):return np.sum(data ** 2 + np.sin(data) * np.cos(data))# Profile and get best leveldata = np.random.rand(1_000_000)best_level = profile_workload(compute_workload, data)# Use best levelwith epochly.optimize_context(level=best_level):result = compute_workload(data)
Auto-Tuning System
import epochlyimport timeclass AutoTuner:"""Automatically tune optimization settings"""def __init__(self, func):self.func = funcself.best_level = Noneself.best_time = float('inf')def tune(self, sample_data, runs=5):"""Find optimal level for function"""print(f"Auto-tuning {self.func.__name__}...")for level in [0, 1, 2, 3]:with epochly.optimize_context(level=level):# Warmupself.func(sample_data)# Measurestart = time.perf_counter()for _ in range(runs):self.func(sample_data)elapsed = time.perf_counter() - startavg_time = elapsed / runsif avg_time < self.best_time:self.best_time = avg_timeself.best_level = levelprint(f" Level {level}: {avg_time:.4f}s")print(f"Optimal level: {self.best_level}")return self.best_leveldef __call__(self, data):"""Call function with optimal level"""if self.best_level is None:raise ValueError("Must call tune() first")with epochly.optimize_context(level=self.best_level):return self.func(data)# Use auto-tunerdef my_function(data):return np.sum(data ** 2)tuner = AutoTuner(my_function)sample = np.random.rand(100_000)tuner.tune(sample)# Now use with optimal settingsdata = np.random.rand(10_000_000)result = tuner(data)
Batch Size Tuning
Process data in optimal batch sizes for best performance.
import epochlyimport numpy as np@epochly.optimize(level=3)def process_in_batches(data, batch_size=100_000):"""Process large dataset in optimized batches"""results = []for i in range(0, len(data), batch_size):batch = data[i:i+batch_size]result = np.sum(batch ** 2 + np.sin(batch))results.append(result)return sum(results)# Test different batch sizesdata = np.random.rand(10_000_000)batch_sizes = [10_000, 50_000, 100_000, 500_000]for batch_size in batch_sizes:import timestart = time.perf_counter()result = process_in_batches(data, batch_size)elapsed = time.perf_counter() - startprint(f"Batch size {batch_size:>8,}: {elapsed:.3f}s")
Automatic Batch Size Selection
import epochlyimport numpy as npdef calculate_optimal_batch_size(data_size, workers, memory_per_worker_mb=512):"""Calculate optimal batch size"""# Estimate memory per element (in bytes)element_size = 8 # float64# Memory per worker in bytesworker_memory = memory_per_worker_mb * 1024 * 1024# Elements per batchelements_per_batch = worker_memory // element_size // 2 # 50% safety margin# Ensure minimum batch sizebatch_size = max(1000, min(elements_per_batch, data_size // workers))return batch_size@epochly.optimize(level=3)def smart_batch_processing(data, workers=8):"""Process with automatically calculated batch size"""batch_size = calculate_optimal_batch_size(len(data), workers)print(f"Using batch size: {batch_size:,}")results = []for i in range(0, len(data), batch_size):batch = data[i:i+batch_size]result = np.sum(batch ** 2)results.append(result)return sum(results)data = np.random.rand(50_000_000)result = smart_batch_processing(data)
Logging Configuration
Configure detailed logging for debugging and monitoring.
# Enable debug loggingexport EPOCHLY_LOG_LEVEL=DEBUG# Set log fileexport EPOCHLY_LOG_FILE=/var/log/epochly.log# Log to console and fileexport EPOCHLY_LOG_CONSOLE=true
Programmatic Logging
import epochlyimport logging# Configure loggingepochly.configure_logging(level='DEBUG',log_file='epochly_debug.log',console=True)# Get Epochly loggerlogger = epochly.get_logger()@epochly.optimize(level=3)def logged_function(data):logger.info(f"Processing {len(data)} elements")result = np.sum(data ** 2)logger.info(f"Result: {result}")return resultdata = np.random.rand(1_000_000)result = logged_function(data)
Log Level Guidelines
| Level | Use Case | Output |
|---|---|---|
| DEBUG | Development, troubleshooting | All messages including internal details |
| INFO | Production monitoring | Important events and milestones |
| WARNING | Unexpected behavior | Warnings and potential issues |
| ERROR | Error handling | Errors and exceptions |
| CRITICAL | Silent operation | Only critical failures |
Best Practices
1. Profile First
import epochly# Always profile before optimizingdef workflow():# Profile baselinewith epochly.benchmark_context("baseline"):baseline_result = compute_baseline()# Profile optimized@epochly.optimize(level=2)def optimized_compute():return compute_baseline()with epochly.benchmark_context("optimized"):optimized_result = optimized_compute()
2. Start Conservative
# Start with Level 2 (JIT) for most workloadsepochly.configure(enhancement_level=2)# Only move to Level 3 if profiling shows benefit# Only use Level 4 if you have GPU and large arrays
3. Match Workload
# I/O-bound: Level 1 (Threading)@epochly.optimize(level=1)def read_files(paths):return [open(p).read() for p in paths]# CPU-bound, medium data: Level 2 (JIT)@epochly.optimize(level=2)def compute_stats(data):return np.mean(data), np.std(data)# CPU-bound, large data: Level 3 (Multicore)@epochly.optimize(level=3)def parallel_processing(big_data):return [process(chunk) for chunk in chunks(big_data)]# Massive arrays: Level 4 (GPU)@epochly.optimize(level=4)def gpu_compute(huge_array):return huge_array ** 2 + np.sin(huge_array)
4. Tune Incrementally
import epochly# Start with defaultsepochly.configure(enhancement_level=2)# Measure baselinebaseline_time = measure_performance()# Tune one parameter at a timeepochly.configure(enhancement_level=2,jit_hot_path_threshold=50 # Adjust threshold)tuned_time = measure_performance()if tuned_time < baseline_time * 0.9: # 10% improvementprint("Tuning improved performance")else:print("Revert to defaults")
5. Monitor Overhead
import epochlyimport timedef check_overhead():"""Ensure optimization overhead is acceptable"""def tiny_function():return sum(range(100))# Measure without optimizationstart = time.perf_counter()for _ in range(1000):tiny_function()unoptimized_time = time.perf_counter() - start# Measure with optimization@epochly.optimize(level=2)def optimized_tiny():return sum(range(100))# Warmupoptimized_tiny()start = time.perf_counter()for _ in range(1000):optimized_tiny()optimized_time = time.perf_counter() - startoverhead_pct = (optimized_time - unoptimized_time) / unoptimized_time * 100if overhead_pct > 10:print(f"Warning: {overhead_pct:.1f}% overhead detected")print("Consider using optimization only for larger workloads")else:print(f"Overhead acceptable: {overhead_pct:.1f}%")check_overhead()