AI Inference
Epochly AI Inference: API Reference
Complete API reference for epochly.wrap(), InferenceProxy, cache tiers, micro-batching, safety gates, and serving integrations.
Complete API reference for epochly.wrap(), InferenceProxy, cache tiers, micro-batching, safety gates, and serving integrations.
Top-Level API
epochly.wrap(model) -> InferenceProxy
Wrap a model instance for profiling and optimization.
Parameters:
model-- A supported model instance (torch.nn.Module, transformers.Pipeline, transformers.PreTrainedModel, onnxruntime.InferenceSession)
Returns: InferenceProxy that delegates to the original model with profiling.
Raises: TypeError if model is not from a supported framework.
Usage:
import epochlymodel = MyModel().to("cuda").eval()model = epochly.wrap(model) # Wrap LASTresult = model(input_tensor)
Framework Adapters
InferenceAdapter Protocol
Protocol defining the common interface for all framework adapters.
Key Methods:
detect(obj) -> bool-- Check if obj is a model from this frameworkget_model_classes() -> List[type]-- Base classes for isinstance detectionget_model_info(model) -> ModelInfo-- Extract metadataclassify_model(model) -> ModelType-- Classify workload typewrap_forward(model) -> Any-- Create proxy wrapper (returns InferenceProxy)compile(model, config) -> Any-- Compile modelrun_inference(model, inputs) -> Any-- Run inferenceoutputs_match(expected, actual, tolerance) -> bool-- Compare outputshash_model(model) -> str-- Hash model weightscapture_golden(model, n) -> List[GoldenOutput]-- Capture golden outputs
InferenceProxy
Framework-agnostic proxy wrapping a model instance.
Properties:
unwrapped-- Access the original modelmodel_id -> int-- The id() of the wrapped modelcall_count -> int-- Number of inference calls routed through this proxy
Usage:
proxy = epochly.wrap(model)result = proxy(input) # Profiled calloriginal = proxy.unwrapped # Raw model access
Serving Adapters
EpochlyInferenceMiddleware
ASGI middleware for request-level observability and control.
Constructor Parameters:
app-- ASGI applicationmax_concurrency: int = 64-- Maximum concurrent requestsconfig: Any = None-- Optional InferenceConfig
Properties:
request_count -> int-- Total requests processedavg_latency_ms -> float-- Average latency in milliseconds
LLMCompanionAdapter
Control layer for vLLM/TGI deployments.
Constructor Parameters:
runtime_url: str-- URL of the LLM runtimeconfig: Optional[LLMCompanionConfig] = None-- Configuration
Methods:
async generate(prompt, **kwargs) -> str-- Generate completionget_stats() -> dict-- Get companion metrics
LLMCompanionConfig
Configuration for LLM companion mode.
Fields:
max_concurrent_requests: int = 64cache_enabled: bool = Truecache_max_size: int = 10_000default_timeout_seconds: float = 120.0api_path: str = "/v1/completions"
EpochlyRayServeWrapper
Ray Serve deployment wrapper (v1 preview).
Constructor Parameters:
config: Optional[RayServeConfig] = None-- Configuration (defaults to RayServeConfig())
Methods:
handle_request(model_fn, args, priority=RequestPriority.NORMAL, *kwargs) -> Anyget_stats() -> dict
epochly_serve
Decorator for adding Epochly telemetry to inference functions.
Usage:
@epochly_servedef predict(model, data):return model(data)@epochly_serve(config=RayServeConfig(enable_telemetry=True))def predict(model, data):return model(data)
ABModelComparison
A/B testing for model variants. Routes a configurable percentage of traffic to a challenger model (B), compares outputs, and reports per-variant metrics. Supports split mode (one model per request) and shadow mode (both models run, only A's result returned).
Constructor Parameters:
model_a: Any-- The production (champion) model callablemodel_b: Any-- The challenger model callableadapter: InferenceAdapter-- Framework adaptertraffic_split: float = 0.1-- Fraction of traffic routed to model B (0.0 to 1.0)shadow: bool = False-- If True, both models run for every request; only model A's result is returned
Raises: ValueError if traffic_split is outside [0.0, 1.0].
Methods:
infer(input_data, **kwargs) -> Any-- Route request to model A or B based on traffic split. In shadow mode, both models run and model A's result is returned. In split mode, exactly one model runs per request.get_comparison_report() -> dict-- Return comparison metrics between A and B. Returns a dictionary with keys:model_a(metrics dict),model_b(metrics dict),traffic_split,total_requests,shadow.compute_significance(confidence_level=0.95) -> dict-- Compute statistical significance of latency difference using Welch's t-test. Usesscipy.stats.ttest_indwhen available, falls back to a built-in manual implementation. Returns a dictionary with keys:p_value,confidence_interval(lower, upper),significant(bool),sample_sizes.
Usage:
from epochly.inference.serving.ab_testing import ABModelComparisonab = ABModelComparison(model_a=production_model,model_b=challenger_model,adapter=pytorch_adapter,traffic_split=0.1,)result = ab.infer(input_tensor)report = ab.get_comparison_report()sig = ab.compute_significance(confidence_level=0.95)if sig["significant"]:print(f"Difference is significant (p={sig['p_value']:.4f})")
MultiModelComparison
A/B/n testing for N models with weighted traffic distribution.
Constructor Parameters:
models: Dict[str, Any]-- Mapping from model name to model callableweights: Dict[str, float]-- Mapping from model name to traffic weight (must sum to ~1.0)adapter: InferenceAdapter-- Framework adapter
Methods:
infer(input_data, **kwargs) -> Any-- Route request based on weighted distributionget_comparison_report() -> dict-- Return comparison metrics across all models
GradualRollout
Staged traffic ramping with health checks at each step.
Default steps: 1% -> 5% -> 10% -> 25% -> 50% -> 100%.
Constructor Parameters:
initial_pct: float-- Starting traffic percentagetarget_pct: float-- Target traffic percentagestep_duration_sec: float-- Minimum time at each stepsteps: Optional[List[float]]-- Custom step percentages
Properties:
current_pct -> float-- Current traffic percentageis_complete -> bool-- Whether rollout reached target
Methods:
advance_step(health_ok: bool) -> bool-- Attempt to advance to next step
Model Registry
ModelRegistryClient
Version-tracked model loading from multiple backends.
Constructor Parameters:
backend: str-- One of"local","huggingface", or"mlflow"on_version_change: Optional[Callable]-- Callback on version hash change
Methods:
load(model_name, revision=None) -> Tuple[Any, str]-- Load model and return (model, version_hash)check_for_update(model_name) -> Optional[str]-- Check for newer versionset_lineage(model_name, version, lineage) -> None-- Store provenance metadataget_lineage(model_name, version) -> Optional[ModelLineage]-- Retrieve lineageset_stage(model_name, version, stage) -> None-- Set lifecycle stageget_stage(model_name, version) -> Optional[ModelStage]-- Get lifecycle stagepromote(model_name, version, approval_callback=None) -> bool-- Promote to next stagedemote(model_name, version) -> bool-- Demote to previous stagerollback(model_name) -> Optional[str]-- Rollback to previous production version
ModelStage
Lifecycle stage enum: DRAFT -> STAGING -> PRODUCTION -> ARCHIVED
ModelLineage
Provenance tracking dataclass.
Fields:
data_version: strcode_hash: strhyperparameters: Dict[str, Any]training_metrics: Dict[str, float]parent_model: Optional[str]created_at: float
Cache
InferenceCache
Multi-tier LRU cache for inference results (L1 in-memory).
Constructor Parameters:
config: Optional[CacheConfig]-- Cache configurationtenant_isolation: Optional[TenantIsolation]-- Tenant isolation for key namespacing
Methods:
get(model_id, inputs) -> Optional[Any]put(model_id, inputs, outputs) -> Noneinvalidate_model(model_id) -> Noneget_stats() -> dict
L2Cache (SQLite)
SQLite WAL-mode persistent cache with AES-256-GCM encryption.
Methods:
get(key) -> Optional[bytes]put(key, value) -> Nonedelete(key) -> Noneclear_prefix(prefix) -> Noneget_stats() -> dictclose() -> None
L3Cache (Redis)
Redis-backed distributed cache tier.
Methods:
get(key) -> Optional[bytes]put(key, value, ttl=None) -> Nonedelete(key) -> Noneclear_all() -> Noneget_stats() -> dict
Compilation
ModelCompiler
Background model compilation with safety gate.
Methods:
pre_compile_check(model, sample_input) -> Tuple[bool, Optional[str]]async compile_async(model, safety_orchestrator) -> Optional[Any]get_compiled(model) -> Optional[Any]
TorchCompileSafetyMonitor
Safety monitoring for torch.compile operations.
Methods:
pre_compile_check(model, sample_input) -> PreCompileResultcheck_health(pre_compile_memory_mb, post_compile_memory_mb) -> HealthCheckResultcheck_output_validity(outputs) -> OutputValidityResult
Configuration
InferenceConfig
Top-level inference configuration dataclass.
Fields:
enabled: bool = True-- Enable/disable inference modulemax_level: int = 2-- Maximum enhancement level (0-4)frameworks: List[str]-- Supported framework namesbatching: BatchingConfig-- Batching configurationcompilation: CompilationConfig-- Compilation configurationcache: CacheSettings-- Cache configuration
Class Methods:
from_dict(data) -> InferenceConfig-- Create configuration from a dictionaryfrom_env() -> InferenceConfig-- Create configuration from environment variables
Metrics
InferenceMetrics
Central metrics collector for request, model, cache, and GPU statistics.
Methods:
record_request(endpoint, total_ms, ...) -> Nonerecord_inference(model_id, latency_ms, batch_size=1, ...) -> Nonerecord_cache_hit() -> None/record_cache_miss() -> Noneget_all_metrics() -> dictget_model_stats(model_id) -> Optional[dict]get_cache_stats() -> dict
CostEstimator
Inference cost estimation based on GPU time and pricing.
Constructor Parameters:
gpu_name: str-- GPU identifier (e.g.,"A100_80GB","H100","T4"). Falls back to $1.00/hr for unrecognized names.custom_rate: Optional[float] = None-- Override GPU hourly rate in USD. When provided (and > 0), overrides the lookup table.
Methods:
record_inference(baseline_gpu_seconds, optimized_gpu_seconds) -> None-- Record a single inference for cost tracking
Properties:
gpu_seconds_per_request -> Tuple[float, float]--(baseline, optimized)GPU-seconds per requestcost_per_1k_requests -> Tuple[float, float]--(baseline_cost, optimized_cost)per 1000 requests in USDobserved_requests_per_hour -> float-- Observed request rate based on actual trafficprojected_hourly_savings -> float-- Projected hourly savings in USDprojected_monthly_savings -> float-- Projected monthly savings in USD (730 hours)
PrometheusExporter
Generates Prometheus exposition format text from InferenceMetrics.
Static Methods:
export(metrics, cost=None, latency_buckets=None, model_labels=None, exemplars=None) -> str
Emitted Metrics:
| Metric Name | Type | Description |
|---|---|---|
epochly_inference_requests_total | counter | Total HTTP requests processed |
epochly_inference_request_latency_avg_ms | gauge | Average request latency in milliseconds |
epochly_inference_inferences_total | counter | Total model inference calls |
epochly_inference_inferences_per_model | counter | Per-model inference counts (labeled) |
epochly_inference_latency_seconds | histogram | Inference latency distribution in seconds |
epochly_inference_cache_hits_total | counter | Total cache hits |
epochly_inference_cache_misses_total | counter | Total cache misses |
epochly_inference_cache_hit_rate | gauge | Cache hit rate (0.0-1.0) |
epochly_inference_gpu_utilization | gauge | Per-model GPU compute utilization (0.0-1.0) |
epochly_inference_gpu_memory_utilization | gauge | Per-model GPU memory utilization (0.0-1.0) |
epochly_inference_cost_per_1k_requests_usd | gauge | Cost per 1000 requests (baseline and optimized variants) |
epochly_inference_cost_savings_per_hour_usd | gauge | Projected hourly savings |
epochly_inference_cost_savings_per_month_usd | gauge | Projected monthly savings |
InferenceOTelExporter
Maps InferenceMetrics to OpenTelemetry instrument names.
Methods:
export(metrics) -> Dict[str, Any]export_cost(cost_estimator) -> Dict[str, Any]export_safety(breakers=None, drift_monitor=None, golden_store=None) -> Dict[str, Any]
Profiling
InferenceProfiler
Per-model inference profiler. Profiles the first N calls to establish baselines, then continues sampling periodically for drift detection.
Constructor Parameters:
warmup_target: int = 10sample_rate: float = 0.01
Methods:
set_golden_callback(callback) -> Nonerecord_call(model_id, adapter, call_time, gpu_time, batch_size, model, inputs=None, outputs=None) -> Noneget_summary(model_id) -> Optional[ModelProfileSummary]is_warmup_complete(model_id) -> boolget_call_count(model_id) -> int-- Get total call count for a modelget_all_summaries() -> Dict[int, ModelProfileSummary]
Safety
SafetyOrchestrator
Top-level safety coordinator. Orchestrates golden output capture, canary validation, circuit breakers, drift monitoring, fallback chains, and hysteresis control.
ValidatorRegistry
Registry for custom workload validators. Built-in validators: embedding, classifier, reranker, generation, llm, encoder.
Methods:
register(workload_type, validator) -> Noneunregister(workload_type) -> boolget(workload_type) -> Optional[Any]list_registered() -> List[str]is_async(workload_type) -> boolon_alert(callback) -> Nonefire_alerts(report) -> None
InputSchemaValidator
Validates input data against an InputSchema before inference.
Usage:
from epochly.inference.safety.validator_registry import (InputSchemaValidator, InputSchema, FieldSpec,)import numpy as npschema = InputSchema(fields={"input_ids": FieldSpec(dtype="int64", shape=(-1, 128), min_val=0, max_val=30522),"attention_mask": FieldSpec(dtype="int64", shape=(-1, 128), min_val=0, max_val=1),})validator = InputSchemaValidator(schema)result = validator.validate({"input_ids": np.zeros((4, 128), dtype=np.int64),"attention_mask": np.ones((4, 128), dtype=np.int64),})if not result.valid:for err in result.errors:print(f"Validation error: {err}")
DriftMonitor
EWMA-based online drift detection for optimized inference outputs.
Methods:
should_sample() -> boolrecord_shadow_comparison(optimization_name, original_output, optimized_output, adapter, workload_type) -> Optional[str]refresh_reference(optimization_name) -> Noneget_ewma_scores(optimization_name) -> Optional[Dict[str, float]]get_diagnostics() -> dict
HysteresisController
Anti-flapping controller for optimization state transitions.
Methods:
should_enable(optimization_name, canary_report) -> boolshould_disable(optimization_name) -> boolrecord_transition(optimization_name) -> Noneget_diagnostics() -> dict
CircuitBreaker
Per-optimization circuit breaker. Opens on repeated failures, transitions to half-open for probing, and closes on success.
FallbackChain
Graceful degradation chain. When an optimization fails, falls back through a configurable chain of alternatives.
GoldenStore
Storage for golden outputs captured during L0 profiling. Used as ground truth for canary validation.
Privacy & Security
SecurityValidator
Startup security validation.
Constructor Parameters:
max_input_size_bytes: int = 10_485_760-- Maximum input size (10 MB)
Methods:
check_cache_isolation(cache) -> SecurityCheckResultcheck_privacy_controls(privacy_mode, encryption_key_present, redaction_patterns_configured) -> SecurityCheckResultcheck_safety_bypass_resistance(config) -> SecurityCheckResultcheck_input_size(input_data) -> SecurityCheckResultrun_all_checks(cache, privacy_mode, encryption_key_present, redaction_patterns_configured, config) -> Dict[str, Any]
InputRedactor
Regex-based PII/sensitive data redaction before storage. Redaction runs before any data is written to disk or retained in golden stores.
Constructor Parameters:
patterns: List[str]-- List of regex patterns to match sensitive datareplacement: str = "[REDACTED]"-- Replacement string for matched patterns
Methods:
redact(data) -> Union[str, bytes]-- Redact sensitive patterns from input data. Handles bothstrandbytesinputs. Returns the same type as the input.
TenantIsolation
Namespace cache keys by tenant/deployment ID. Prevents cross-tenant cache leakage by prefixing all cache keys with the tenant identifier.
Constructor Parameters:
tenant_id: str = "default"-- Tenant identifier for key namespacing
Properties:
tenant_id -> str-- The configured tenant identifier
Methods:
namespace_key(key) -> str-- Prefix a cache key with the tenant namespace. Returns"{tenant_id}:{key}".from_env() -> TenantIsolation-- Class method. Create fromEPOCHLY_TENANT_IDenvironment variable (defaults to"default").
AuditLogger
Append-only structured audit log for safety decisions. Records all safety gate decisions, cache accesses, and optimization state changes. Bounded via deque(maxlen) to prevent unbounded memory growth.
Constructor Parameters:
max_entries: int = 10_000-- Maximum number of audit entries retained in memory
Methods:
log_event(operation, model_id, optimization_name, result, details=None) -> None-- Log a safety or cache eventget_entries() -> List[dict]-- Return a copy of all audit entriesget_entries_since(since_timestamp) -> List[dict]-- Return entries since a given Unix timestamp
Properties:
count -> int-- Number of entries in the audit log
PrivacyConfig
Privacy configuration dataclass.
Fields:
mode: str = "ephemeral"-- Privacy mode:"ephemeral","persisted_encrypted", or"hashes_only"encrypt_at_rest: bool = True-- Enable encryption for persistent storageencryption_key_source: str = "env"-- Key source:"env","file", or"hsm"redact_patterns: List[str] = []-- Regex patterns for PII redactioncache_tenant_isolation: bool = True-- Enable tenant-scoped cache keysaudit_log_enabled: bool = True-- Enable safety audit loggingaudit_log_path: str = "~/.epochly/audit/"-- Path for persistent audit logs
Batching
DynamicMicroBatcher
Async request micro-batching with keyed sub-queues.
Constructor Parameters:
model: Any-- The model to run inference onadapter: InferenceAdapter-- Framework adapterconfig: Optional[MicroBatcherConfig]-- Configuration
Methods:
async start() -> Noneasync stop() -> Noneasync infer(input_data, priority=1, batch_key=None) -> Any
MicroBatcherConfig
Fields:
max_batch_size: int = 32max_queue_depth: int = 1024max_wait_ms: float = 50.0gpu_headroom_pct: int = 20
Context
RequestContext
Per-request context propagated via contextvars.
Fields:
request_id: strendpoint: strenqueue_time: floatmodel_calls: inttimings: Dict[str, float]trace_id: Optional[str]span_id: Optional[str]
BatchKey
Structured key for micro-batching request compatibility.
Fields:
model_id: intendpoint: strinput_shape_bucket: Tuple[int, ...]dtype: strmax_new_tokens: Optional[int]temperature: Optional[float]top_p: Optional[float]pad_token_id: Optional[int]