Multi-Process Applications
Level 3 parallel processing with Epochly for CPU-bound workloads.
Overview
Epochly Level 3 provides true parallelism by bypassing Python's Global Interpreter Lock (GIL) through different mechanisms depending on your Python version.
Parallelism Mechanisms
| Python Version | Mechanism | Description |
|---|---|---|
| 3.12+ | Sub-interpreters | Native Python sub-interpreters with isolated GILs |
| 3.9-3.11 | ProcessPool | Separate processes using multiprocessing.Pool |
Benefits:
- True parallel execution (no GIL contention)
- Automatic workload distribution
- Transparent to application code
- Falls back gracefully if unavailable
How It Works
Epochly automatically selects the appropriate parallelism mechanism:
import epochlyimport sys@epochly.optimize(level=3)def parallel_computation(data):"""Automatically uses best mechanism for Python version"""return [x ** 2 for x in data]# Python 3.12+: Uses sub-interpreters# Python 3.9-3.11: Uses ProcessPoolresult = parallel_computation(range(1_000_000))
Automatic Mechanism Selection
+------------------+| Check Python || Version |+------------------+|vPython 3.12+?|+----+----+| |Yes No| |v vSub-int ProcessPool
Worker Configuration
Environment Variables
| Variable | Description | Default |
|---|---|---|
EPOCHLY_MAX_WORKERS | Maximum worker processes/sub-interpreters | CPU count |
EPOCHLY_RESERVE_CORES | Cores to reserve for system | 1 |
EPOCHLY_PARALLEL_BACKEND | Force specific backend (subinterpreters or processpool) | Auto-detect |
Configure via Environment
# Set maximum workersexport EPOCHLY_MAX_WORKERS=8# Reserve 2 cores for other tasksexport EPOCHLY_RESERVE_CORES=2# Force ProcessPool backendexport EPOCHLY_PARALLEL_BACKEND=processpool
Configure Programmatically
import epochlyepochly.configure(enhancement_level=3,max_workers=8,reserve_cores=2,parallel_backend='auto' # or 'subinterpreters' or 'processpool')
Dynamic Worker Configuration
import epochlyimport os# Calculate workers based on workloadcpu_count = os.cpu_count()workers = max(4, cpu_count - 2) # Leave 2 cores freeepochly.configure(enhancement_level=3,max_workers=workers)@epochly.optimize(level=3, workers=workers)def process_data(items):return [item ** 2 for item in items]
Basic Usage
Simple Parallel Loop
import epochly@epochly.optimize(level=3)def parallel_square(numbers):"""Square numbers in parallel"""return [x ** 2 for x in numbers]# Process 10 million numbers in parallelresult = parallel_square(range(10_000_000))print(f"Processed {len(result)} numbers")
CPU-Bound Computation
import epochlyimport math@epochly.optimize(level=3)def compute_primes(n):"""Find prime numbers up to n"""def is_prime(num):if num < 2:return Falsefor i in range(2, int(math.sqrt(num)) + 1):if num % i == 0:return Falsereturn Truereturn [num for num in range(2, n) if is_prime(num)]# Find primes in parallelprimes = compute_primes(100_000)print(f"Found {len(primes)} primes")
List Comprehension Optimization
import epochly@epochly.optimize(level=3)def process_items(items):"""Process items with expensive computation"""return [item ** 3 + sum(range(item % 100))for item in items]# Automatically parallelizeddata = range(1_000_000)results = process_items(data)
Data Processing Pipeline
Multi-Stage Pipeline
import epochly@epochly.optimize(level=3)def stage1_extract(files):"""Stage 1: Extract data from files"""extracted = []for file in files:# Simulate file reading and parsingdata = [i for i in range(10000)]extracted.append(data)return extracted@epochly.optimize(level=3)def stage2_transform(datasets):"""Stage 2: Transform extracted data"""transformed = []for dataset in datasets:result = [x ** 2 + x for x in dataset]transformed.append(result)return transformed@epochly.optimize(level=3)def stage3_aggregate(datasets):"""Stage 3: Aggregate results"""return sum(sum(dataset) for dataset in datasets)# Execute pipelinefiles = ['file1', 'file2', 'file3', 'file4']extracted = stage1_extract(files)transformed = stage2_transform(extracted)final_result = stage3_aggregate(transformed)print(f"Pipeline result: {final_result}")
Map-Reduce Pattern
import epochly@epochly.optimize(level=3)def map_phase(data_chunks):"""Map phase: Process chunks in parallel"""return [sum(chunk) for chunk in data_chunks]@epochly.optimize(level=2)def reduce_phase(mapped_results):"""Reduce phase: Combine results"""return sum(mapped_results)# Split data into chunksdata = range(10_000_000)chunk_size = 100_000chunks = [list(data[i:i+chunk_size]) for i in range(0, len(data), chunk_size)]# Execute map-reducemapped = map_phase(chunks)final = reduce_phase(mapped)print(f"Map-Reduce result: {final}")
Shared State Patterns
Safe Patterns for Multiprocessing
Pattern 1: Return Results (Recommended)
import epochly@epochly.optimize(level=3)def safe_parallel_processing(items):"""Return results, no shared state"""results = []for item in items:result = item ** 2results.append(result)return results# Safe: No shared stateresults = safe_parallel_processing(range(10000))
Pattern 2: Read-Only Shared Data
import epochly# Global read-only configurationCONFIG = {'multiplier': 2, 'offset': 10}@epochly.optimize(level=3)def process_with_config(items):"""Use read-only shared configuration"""return [item * CONFIG['multiplier'] + CONFIG['offset']for item in items]results = process_with_config(range(10000))
Pattern 3: Manager for Shared State (Advanced)
import epochlyfrom multiprocessing import Managerdef parallel_with_shared_state():"""Use Manager for shared mutable state"""manager = Manager()shared_dict = manager.dict()shared_dict['counter'] = 0@epochly.optimize(level=3)def process_items(items, shared):results = []for item in items:result = item ** 2results.append(result)# Update shared counter (with locking)shared['counter'] += len(results)return resultsdata = range(10000)results = process_items(data, shared_dict)print(f"Processed {shared_dict['counter']} items")return results
Avoiding Shared State Issues
import epochly# ❌ BAD: Mutable global statecounter = 0@epochly.optimize(level=3)def bad_pattern(items):global counterfor item in items:counter += 1 # Race condition!return items# ✅ GOOD: Return aggregated results@epochly.optimize(level=3)def good_pattern(items):local_count = len(items)return items, local_count# Aggregate after parallel executionresults, count = good_pattern(range(10000))total_count = count
Memory Considerations
Worker Memory Usage
Each worker process or sub-interpreter consumes memory:
import epochly# Calculate memory requirementsimport sysdef estimate_memory_per_worker(data_size_mb):"""Estimate memory needed per worker"""base_overhead = 50 # MB base overhead per workerdata_overhead = data_size_mb * 1.2 # 20% overheadreturn base_overhead + data_overhead# Configure based on available memoryavailable_memory_mb = 8192 # 8GBdata_size_mb = 100memory_per_worker = estimate_memory_per_worker(data_size_mb)max_workers = int(available_memory_mb / memory_per_worker)epochly.configure(enhancement_level=3,max_workers=min(max_workers, 8))
Memory Limits
# Set worker memory limitexport EPOCHLY_WORKER_MEMORY_LIMIT=512 # MB per worker# Set total memory limitexport EPOCHLY_TOTAL_MEMORY_LIMIT=4096 # MB total
Batch Processing for Memory Efficiency
import epochly@epochly.optimize(level=3)def memory_efficient_processing(data_chunks, batch_size=1000):"""Process large data in batches"""all_results = []for i in range(0, len(data_chunks), batch_size):batch = data_chunks[i:i+batch_size]batch_results = [x ** 2 for x in batch]all_results.extend(batch_results)return all_results# Process large dataset in manageable batcheslarge_data = range(10_000_000)results = memory_efficient_processing(large_data, batch_size=10000)
Integration with Existing Code
Gradual Adoption
Step 1: Identify CPU-Bound Functions
import timedef existing_slow_function(items):"""Existing CPU-bound function"""results = []for item in items:# Expensive computationresult = sum(range(item % 1000))results.append(result)return results# Benchmark current performancestart = time.time()results = existing_slow_function(range(100000))baseline_time = time.time() - startprint(f"Baseline: {baseline_time:.2f}s")
Step 2: Add Decorator
import epochly@epochly.optimize(level=3)def optimized_function(items):"""Same function with Level 3 optimization"""results = []for item in items:result = sum(range(item % 1000))results.append(result)return results# Benchmark optimized performancestart = time.time()results = optimized_function(range(100000))optimized_time = time.time() - startspeedup = baseline_time / optimized_timeprint(f"Optimized: {optimized_time:.2f}s ({speedup:.1f}x faster)")
Step 3: Gradually Expand
import epochly# Start with one function@epochly.optimize(level=3)def process_batch_1(items):return [x ** 2 for x in items]# Expand to related functions@epochly.optimize(level=3)def process_batch_2(items):return [x ** 3 for x in items]@epochly.optimize(level=3)def process_batch_3(items):return [x * 2 for x in items]
Debugging Tips
Enable Debug Mode
# Enable debug loggingexport EPOCHLY_DEBUG=1export EPOCHLY_LOG_LEVEL=DEBUG
import epochly# Programmatic debug modeepochly.configure(enhancement_level=3,debug=True,log_level='DEBUG')@epochly.optimize(level=3)def debug_function(items):return [x ** 2 for x in items]# Debug output shows worker allocation and executionresults = debug_function(range(1000))
Common Issues
Issue 1: Workers Not Starting
import epochly# Check worker statusstatus = epochly.get_status()print(f"Workers active: {status.get('workers_active', 0)}")print(f"Max workers: {status.get('max_workers', 0)}")# If workers_active = 0, check:# 1. EPOCHLY_LEVEL=3 is set# 2. Workload is large enough# 3. No configuration errors
Issue 2: Performance Not Improving
import epochly# Too small workload - overhead dominates@epochly.optimize(level=3)def too_small(items):return [x ** 2 for x in items]result = too_small(range(100)) # Only 100 items!# Solution: Increase workload or lower level@epochly.optimize(level=2) # Use JIT for small workloadsdef appropriate_level(items):return [x ** 2 for x in items]
Issue 3: Pickling Errors
# ❌ BAD: Lambda functions can't be pickled@epochly.optimize(level=3)def bad_pickle(items):func = lambda x: x ** 2return [func(x) for x in items]# ✅ GOOD: Use regular functionsdef square(x):return x ** 2@epochly.optimize(level=3)def good_pickle(items):return [square(x) for x in items]
Performance Tuning
Workload Threshold Configuration
# Minimum items for Level 3 activationexport EPOCHLY_PARALLEL_THRESHOLD=10000# Only parallelize if workload > 10K items
import epochlyepochly.configure(enhancement_level=3,parallel_threshold=10000 # Min items for parallelization)@epochly.optimize(level=3)def auto_threshold(items):"""Only parallelizes if len(items) > 10000"""return [x ** 2 for x in items]# < 10K items: Runs on single threadsmall_result = auto_threshold(range(5000))# > 10K items: Runs in parallellarge_result = auto_threshold(range(50000))
Worker Count Optimization
import epochlyimport timeimport osdef benchmark_workers(data, worker_counts):"""Find optimal worker count"""results = {}for workers in worker_counts:epochly.configure(enhancement_level=3,max_workers=workers)@epochly.optimize(level=3)def process(items):return [x ** 2 for x in items]start = time.time()result = process(data)elapsed = time.time() - startresults[workers] = elapsedprint(f"Workers: {workers}, Time: {elapsed:.2f}s")return results# Test different worker countsdata = range(1_000_000)worker_counts = [2, 4, 8, 16]results = benchmark_workers(data, worker_counts)# Find optimaloptimal = min(results, key=results.get)print(f"Optimal worker count: {optimal}")
Batch Size Tuning
import epochly@epochly.optimize(level=3, chunk_size=1000)def tuned_batch_size(items):"""Process with optimized chunk size"""return [x ** 2 for x in items]# Experiment with chunk sizeschunk_sizes = [100, 1000, 10000]for chunk_size in chunk_sizes:epochly.configure(enhancement_level=3,default_chunk_size=chunk_size)# Benchmark each configuration