Documentation

Multi-Process Applications

Level 3 parallel processing with Epochly for CPU-bound workloads.

Overview

Epochly Level 3 provides true parallelism by bypassing Python's Global Interpreter Lock (GIL) through different mechanisms depending on your Python version.

Parallelism Mechanisms

Python VersionMechanismDescription
3.12+Sub-interpretersNative Python sub-interpreters with isolated GILs
3.9-3.11ProcessPoolSeparate processes using multiprocessing.Pool

Benefits:

  • True parallel execution (no GIL contention)
  • Automatic workload distribution
  • Transparent to application code
  • Falls back gracefully if unavailable

How It Works

Epochly automatically selects the appropriate parallelism mechanism:

import epochly
import sys
@epochly.optimize(level=3)
def parallel_computation(data):
"""Automatically uses best mechanism for Python version"""
return [x ** 2 for x in data]
# Python 3.12+: Uses sub-interpreters
# Python 3.9-3.11: Uses ProcessPool
result = parallel_computation(range(1_000_000))

Automatic Mechanism Selection

+------------------+
| Check Python |
| Version |
+------------------+
|
v
Python 3.12+?
|
+----+----+
| |
Yes No
| |
v v
Sub-int ProcessPool

Worker Configuration

Environment Variables

VariableDescriptionDefault
EPOCHLY_MAX_WORKERSMaximum worker processes/sub-interpretersCPU count
EPOCHLY_RESERVE_CORESCores to reserve for system1
EPOCHLY_PARALLEL_BACKENDForce specific backend (subinterpreters or processpool)Auto-detect

Configure via Environment

# Set maximum workers
export EPOCHLY_MAX_WORKERS=8
# Reserve 2 cores for other tasks
export EPOCHLY_RESERVE_CORES=2
# Force ProcessPool backend
export EPOCHLY_PARALLEL_BACKEND=processpool

Configure Programmatically

import epochly
epochly.configure(
enhancement_level=3,
max_workers=8,
reserve_cores=2,
parallel_backend='auto' # or 'subinterpreters' or 'processpool'
)

Dynamic Worker Configuration

import epochly
import os
# Calculate workers based on workload
cpu_count = os.cpu_count()
workers = max(4, cpu_count - 2) # Leave 2 cores free
epochly.configure(
enhancement_level=3,
max_workers=workers
)
@epochly.optimize(level=3, workers=workers)
def process_data(items):
return [item ** 2 for item in items]

Basic Usage

Simple Parallel Loop

import epochly
@epochly.optimize(level=3)
def parallel_square(numbers):
"""Square numbers in parallel"""
return [x ** 2 for x in numbers]
# Process 10 million numbers in parallel
result = parallel_square(range(10_000_000))
print(f"Processed {len(result)} numbers")

CPU-Bound Computation

import epochly
import math
@epochly.optimize(level=3)
def compute_primes(n):
"""Find prime numbers up to n"""
def is_prime(num):
if num < 2:
return False
for i in range(2, int(math.sqrt(num)) + 1):
if num % i == 0:
return False
return True
return [num for num in range(2, n) if is_prime(num)]
# Find primes in parallel
primes = compute_primes(100_000)
print(f"Found {len(primes)} primes")

List Comprehension Optimization

import epochly
@epochly.optimize(level=3)
def process_items(items):
"""Process items with expensive computation"""
return [
item ** 3 + sum(range(item % 100))
for item in items
]
# Automatically parallelized
data = range(1_000_000)
results = process_items(data)

Data Processing Pipeline

Multi-Stage Pipeline

import epochly
@epochly.optimize(level=3)
def stage1_extract(files):
"""Stage 1: Extract data from files"""
extracted = []
for file in files:
# Simulate file reading and parsing
data = [i for i in range(10000)]
extracted.append(data)
return extracted
@epochly.optimize(level=3)
def stage2_transform(datasets):
"""Stage 2: Transform extracted data"""
transformed = []
for dataset in datasets:
result = [x ** 2 + x for x in dataset]
transformed.append(result)
return transformed
@epochly.optimize(level=3)
def stage3_aggregate(datasets):
"""Stage 3: Aggregate results"""
return sum(sum(dataset) for dataset in datasets)
# Execute pipeline
files = ['file1', 'file2', 'file3', 'file4']
extracted = stage1_extract(files)
transformed = stage2_transform(extracted)
final_result = stage3_aggregate(transformed)
print(f"Pipeline result: {final_result}")

Map-Reduce Pattern

import epochly
@epochly.optimize(level=3)
def map_phase(data_chunks):
"""Map phase: Process chunks in parallel"""
return [sum(chunk) for chunk in data_chunks]
@epochly.optimize(level=2)
def reduce_phase(mapped_results):
"""Reduce phase: Combine results"""
return sum(mapped_results)
# Split data into chunks
data = range(10_000_000)
chunk_size = 100_000
chunks = [list(data[i:i+chunk_size]) for i in range(0, len(data), chunk_size)]
# Execute map-reduce
mapped = map_phase(chunks)
final = reduce_phase(mapped)
print(f"Map-Reduce result: {final}")

Shared State Patterns

Safe Patterns for Multiprocessing

Pattern 1: Return Results (Recommended)

import epochly
@epochly.optimize(level=3)
def safe_parallel_processing(items):
"""Return results, no shared state"""
results = []
for item in items:
result = item ** 2
results.append(result)
return results
# Safe: No shared state
results = safe_parallel_processing(range(10000))

Pattern 2: Read-Only Shared Data

import epochly
# Global read-only configuration
CONFIG = {'multiplier': 2, 'offset': 10}
@epochly.optimize(level=3)
def process_with_config(items):
"""Use read-only shared configuration"""
return [
item * CONFIG['multiplier'] + CONFIG['offset']
for item in items
]
results = process_with_config(range(10000))

Pattern 3: Manager for Shared State (Advanced)

import epochly
from multiprocessing import Manager
def parallel_with_shared_state():
"""Use Manager for shared mutable state"""
manager = Manager()
shared_dict = manager.dict()
shared_dict['counter'] = 0
@epochly.optimize(level=3)
def process_items(items, shared):
results = []
for item in items:
result = item ** 2
results.append(result)
# Update shared counter (with locking)
shared['counter'] += len(results)
return results
data = range(10000)
results = process_items(data, shared_dict)
print(f"Processed {shared_dict['counter']} items")
return results

Avoiding Shared State Issues

import epochly
# ❌ BAD: Mutable global state
counter = 0
@epochly.optimize(level=3)
def bad_pattern(items):
global counter
for item in items:
counter += 1 # Race condition!
return items
# ✅ GOOD: Return aggregated results
@epochly.optimize(level=3)
def good_pattern(items):
local_count = len(items)
return items, local_count
# Aggregate after parallel execution
results, count = good_pattern(range(10000))
total_count = count

Memory Considerations

Worker Memory Usage

Each worker process or sub-interpreter consumes memory:

import epochly
# Calculate memory requirements
import sys
def estimate_memory_per_worker(data_size_mb):
"""Estimate memory needed per worker"""
base_overhead = 50 # MB base overhead per worker
data_overhead = data_size_mb * 1.2 # 20% overhead
return base_overhead + data_overhead
# Configure based on available memory
available_memory_mb = 8192 # 8GB
data_size_mb = 100
memory_per_worker = estimate_memory_per_worker(data_size_mb)
max_workers = int(available_memory_mb / memory_per_worker)
epochly.configure(
enhancement_level=3,
max_workers=min(max_workers, 8)
)

Memory Limits

# Set worker memory limit
export EPOCHLY_WORKER_MEMORY_LIMIT=512 # MB per worker
# Set total memory limit
export EPOCHLY_TOTAL_MEMORY_LIMIT=4096 # MB total

Batch Processing for Memory Efficiency

import epochly
@epochly.optimize(level=3)
def memory_efficient_processing(data_chunks, batch_size=1000):
"""Process large data in batches"""
all_results = []
for i in range(0, len(data_chunks), batch_size):
batch = data_chunks[i:i+batch_size]
batch_results = [x ** 2 for x in batch]
all_results.extend(batch_results)
return all_results
# Process large dataset in manageable batches
large_data = range(10_000_000)
results = memory_efficient_processing(large_data, batch_size=10000)

Integration with Existing Code

Gradual Adoption

Step 1: Identify CPU-Bound Functions

import time
def existing_slow_function(items):
"""Existing CPU-bound function"""
results = []
for item in items:
# Expensive computation
result = sum(range(item % 1000))
results.append(result)
return results
# Benchmark current performance
start = time.time()
results = existing_slow_function(range(100000))
baseline_time = time.time() - start
print(f"Baseline: {baseline_time:.2f}s")

Step 2: Add Decorator

import epochly
@epochly.optimize(level=3)
def optimized_function(items):
"""Same function with Level 3 optimization"""
results = []
for item in items:
result = sum(range(item % 1000))
results.append(result)
return results
# Benchmark optimized performance
start = time.time()
results = optimized_function(range(100000))
optimized_time = time.time() - start
speedup = baseline_time / optimized_time
print(f"Optimized: {optimized_time:.2f}s ({speedup:.1f}x faster)")

Step 3: Gradually Expand

import epochly
# Start with one function
@epochly.optimize(level=3)
def process_batch_1(items):
return [x ** 2 for x in items]
# Expand to related functions
@epochly.optimize(level=3)
def process_batch_2(items):
return [x ** 3 for x in items]
@epochly.optimize(level=3)
def process_batch_3(items):
return [x * 2 for x in items]

Debugging Tips

Enable Debug Mode

# Enable debug logging
export EPOCHLY_DEBUG=1
export EPOCHLY_LOG_LEVEL=DEBUG
import epochly
# Programmatic debug mode
epochly.configure(
enhancement_level=3,
debug=True,
log_level='DEBUG'
)
@epochly.optimize(level=3)
def debug_function(items):
return [x ** 2 for x in items]
# Debug output shows worker allocation and execution
results = debug_function(range(1000))

Common Issues

Issue 1: Workers Not Starting

import epochly
# Check worker status
status = epochly.get_status()
print(f"Workers active: {status.get('workers_active', 0)}")
print(f"Max workers: {status.get('max_workers', 0)}")
# If workers_active = 0, check:
# 1. EPOCHLY_LEVEL=3 is set
# 2. Workload is large enough
# 3. No configuration errors

Issue 2: Performance Not Improving

import epochly
# Too small workload - overhead dominates
@epochly.optimize(level=3)
def too_small(items):
return [x ** 2 for x in items]
result = too_small(range(100)) # Only 100 items!
# Solution: Increase workload or lower level
@epochly.optimize(level=2) # Use JIT for small workloads
def appropriate_level(items):
return [x ** 2 for x in items]

Issue 3: Pickling Errors

# ❌ BAD: Lambda functions can't be pickled
@epochly.optimize(level=3)
def bad_pickle(items):
func = lambda x: x ** 2
return [func(x) for x in items]
# ✅ GOOD: Use regular functions
def square(x):
return x ** 2
@epochly.optimize(level=3)
def good_pickle(items):
return [square(x) for x in items]

Performance Tuning

Workload Threshold Configuration

# Minimum items for Level 3 activation
export EPOCHLY_PARALLEL_THRESHOLD=10000
# Only parallelize if workload > 10K items
import epochly
epochly.configure(
enhancement_level=3,
parallel_threshold=10000 # Min items for parallelization
)
@epochly.optimize(level=3)
def auto_threshold(items):
"""Only parallelizes if len(items) > 10000"""
return [x ** 2 for x in items]
# < 10K items: Runs on single thread
small_result = auto_threshold(range(5000))
# > 10K items: Runs in parallel
large_result = auto_threshold(range(50000))

Worker Count Optimization

import epochly
import time
import os
def benchmark_workers(data, worker_counts):
"""Find optimal worker count"""
results = {}
for workers in worker_counts:
epochly.configure(
enhancement_level=3,
max_workers=workers
)
@epochly.optimize(level=3)
def process(items):
return [x ** 2 for x in items]
start = time.time()
result = process(data)
elapsed = time.time() - start
results[workers] = elapsed
print(f"Workers: {workers}, Time: {elapsed:.2f}s")
return results
# Test different worker counts
data = range(1_000_000)
worker_counts = [2, 4, 8, 16]
results = benchmark_workers(data, worker_counts)
# Find optimal
optimal = min(results, key=results.get)
print(f"Optimal worker count: {optimal}")

Batch Size Tuning

import epochly
@epochly.optimize(level=3, chunk_size=1000)
def tuned_batch_size(items):
"""Process with optimized chunk size"""
return [x ** 2 for x in items]
# Experiment with chunk sizes
chunk_sizes = [100, 1000, 10000]
for chunk_size in chunk_sizes:
epochly.configure(
enhancement_level=3,
default_chunk_size=chunk_size
)
# Benchmark each configuration