@emmvish/stable-infra
v2.0.4
Published
A production-grade stable & flexible execution engine for resilient workflows, API integrations, and batch processing.
Maintainers
Readme
@emmvish/stable-infra
A stability-first production-grade TypeScript framework for resilient API integrations, batch processing, and orchestrating complex workflows with deterministic error handling, type safety, and comprehensive observability.
Table of Contents
- Overview
- Core Concepts
- Core Modules
- Stable Runner
- Resilience Mechanisms
- Workflow Patterns
- Graph-based Workflow Patterns
- Configuration & State
- Hooks & Observability
- Advanced Features
- Best Practices
Overview
@emmvish/stable-infra evolved from a focused library for resilient API calls to a comprehensive execution framework. Originally addressing API integration challenges via stableRequest, it expanded to include:
- Batch orchestration via
stableApiGatewayfor processing groups of mixed requests/functions - Phased workflows via
stableWorkflowfor array-based multi-phase execution with dynamic control flow - Graph-based workflows via
stableWorkflowGraphfor DAG execution with higher parallelism - Generic function execution via
stableFunction, inheriting all resilience guards - Queue based scheduling via
StableScheduler, with option to preserve scheduler state and recover from saved state - Transactional shared state via
StableBuffer, a concurrency-safe buffer you can pass ascommonBufferorsharedBuffer - Distributed coordination via
DistributedCoordinatorfor multi-node locking, state, leader election, pub/sub, and 2PC transactions
All core modules support the same resilience stack: retries, jitter, circuit breaking, caching, rate/concurrency limits, config cascading, shared buffers, trial mode, comprehensive hooks, and metrics. This uniformity makes it trivial to compose requests and functions in any topology. Finally, Stable Runner executes jobs from config.
Core Concepts
Resilience as Default
Every execution—whether a single request, a pure function, or an entire workflow—inherits built-in resilience:
- Retries with configurable backoff strategies (FIXED, LINEAR, EXPONENTIAL)
- Jitter to prevent thundering herd
- Circuit breaker to fail fast and protect downstream systems
- Caching for idempotent read operations
- Rate & concurrency limits to respect external constraints
- Metrics guardrails to validate execution against thresholds with automatic anomaly detection
Type Safety
All examples in this guide use TypeScript generics for type-safe request/response data and function arguments/returns. Analyzers validate shapes at runtime; TypeScript ensures compile-time safety.
Config Cascading
Global defaults → group overrides → phase overrides → branch overrides → item overrides. Lower levels always win, preventing repetition while maintaining expressiveness.
Shared State
Workflows and gateways support sharedBuffer for passing computed state across phases/branches/items without global state.
Core Modules
stableRequest
Single API call with resilience, type-safe request and response types.
import { stableRequest, REQUEST_METHODS, VALID_REQUEST_PROTOCOLS } from '@emmvish/stable-infra';
interface GetUserRequest {
// Empty for GET requests with no body
}
interface User {
id: number;
name: string;
}
const result = await stableRequest<GetUserRequest, User>({
reqData: {
method: REQUEST_METHODS.GET,
protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
hostname: 'api.example.com',
path: '/users/1'
},
resReq: true,
attempts: 3,
wait: 500,
jitter: 100,
cache: { enabled: true, ttl: 5000 },
rateLimit: { maxRequests: 10, windowMs: 1000 },
maxConcurrentRequests: 5,
responseAnalyzer: ({ data }) => {
return typeof data === 'object' && data !== null && 'id' in data;
},
handleSuccessfulAttemptData: ({ successfulAttemptData }) => {
console.log(`User loaded: ${successfulAttemptData.data.name}`);
}
});
if (result.success) {
console.log(result.data.name, result.metrics.totalAttempts);
} else {
console.error(result.error);
}Key responsibilities:
- Execute a single HTTP request with automatic retry and backoff
- Validate response shape via analyzer; retry if invalid
- Cache successful responses with TTL
- Apply rate and concurrency limits
- Throw or gracefully suppress errors via finalErrorAnalyzer
- Collect attempt metrics and infra dashboards (circuit breaker, cache, rate limiter state)
stableFunction
Generic async/sync function execution with identical resilience.
import { stableFunction, RETRY_STRATEGIES } from '@emmvish/stable-infra';
type ComputeArgs = [number, number];
type ComputeResult = number;
const multiply = (a: number, b: number) => a * b;
const result = await stableFunction<ComputeArgs, ComputeResult>({
fn: multiply,
args: [5, 3],
returnResult: true,
attempts: 2,
wait: 100,
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
responseAnalyzer: ({ data }) => data > 0,
cache: { enabled: true, ttl: 10000 }
});
if (result.success) {
console.log('Result:', result.data); // 15
}Key responsibilities:
- Execute any async or sync function with typed arguments and return
- Support argument-based cache key generation
- Retry on error or analyzer rejection
- Enforce success criteria via analyzer
- Optionally suppress exceptions
stableApiGateway
Batch orchestration of mixed requests and functions.
import {
stableApiGateway,
REQUEST_METHODS,
VALID_REQUEST_PROTOCOLS,
RequestOrFunction
} from '@emmvish/stable-infra';
import type { API_GATEWAY_ITEM } from '@emmvish/stable-infra';
// Define request types
interface ApiRequestData {
filters?: Record<string, any>;
}
interface ApiResponse {
id: number;
value: string;
}
// Define function types
type TransformArgs = [ApiResponse[], number];
type TransformResult = {
transformed: ApiResponse[];
count: number;
};
type ValidateArgs = [TransformResult];
type ValidateResult = boolean;
const items: API_GATEWAY_ITEM<ApiRequestData, ApiResponse, TransformArgs | ValidateArgs, TransformResult | ValidateResult>[] = [
{
type: RequestOrFunction.REQUEST,
request: {
id: 'fetch-data',
requestOptions: {
reqData: {
method: REQUEST_METHODS.GET,
protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
hostname: 'api.example.com',
path: '/data'
},
resReq: true,
attempts: 3
}
}
},
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'transform-data',
functionOptions: {
fn: (data: ApiResponse[], threshold: number): TransformResult => ({
transformed: data.filter(item => item.id > threshold),
count: data.length
}),
args: [[], 10] as TransformArgs,
returnResult: true,
attempts: 2,
cache: { enabled: true, ttl: 5000 }
}
}
},
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'validate-result',
functionOptions: {
fn: (result: TransformResult): ValidateResult => result.count > 0,
args: [{ transformed: [], count: 0 }] as ValidateArgs,
returnResult: true
}
}
}
];
const responses = await stableApiGateway<ApiRequestData, ApiResponse>(items, {
concurrentExecution: true,
stopOnFirstError: false,
sharedBuffer: {},
commonAttempts: 2,
commonWait: 300,
maxConcurrentRequests: 3
});
// Access individual responses
responses.forEach((resp, i) => {
console.log(`Item ${i}: success=${resp.success}`);
});
// Access aggregate metrics
console.log(`Success rate: ${responses.metrics.successRate.toFixed(2)}%`);
console.log(`Execution time: ${responses.metrics.executionTime}ms`);
console.log(`Throughput: ${responses.metrics.throughput.toFixed(2)} req/s`);
console.log(`Average duration: ${responses.metrics.averageRequestDuration.toFixed(2)}ms`);Request/Function Racing
Enable racing to accept the first successful request or function and cancel others, useful for redundant API calls or failover scenarios.
const responses = await stableApiGateway(items, {
concurrentExecution: true,
enableRacing: true, // First successful item wins, others cancelled
maxConcurrentRequests: 10
});
// responses contains only the winning result
// Losing items marked as cancelled with appropriate errorKey responsibilities:
- Execute a batch of requests and functions concurrently or sequentially
- Apply global, group-level, and item-level config overrides
- Maintain shared buffer across items for state passing
- Stop on first error or continue despite failures
- Collect per-item and aggregate metrics (success rates, execution time, throughput)
- Support request grouping with group-specific config
- Track infrastructure metrics (circuit breaker, cache, rate limiter, concurrency)
stableWorkflow
Phased array-based workflows with sequential/concurrent phases, mixed items, and non-linear control flow.
You can start a workflow from a specific phase using startPhaseIndex (0-based). When starting inside a concurrent group (markConcurrentPhase), execution aligns to the group’s first phase.
import { stableWorkflow, PHASE_DECISION_ACTIONS, RequestOrFunction, REQUEST_METHODS } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE, API_GATEWAY_ITEM } from '@emmvish/stable-infra';
// Define types for requests
interface FetchRequestData {}
interface FetchResponse {
users: Array<{ id: number; name: string }>;
posts: Array<{ id: number; title: string }>;
}
// Define types for functions
type ProcessArgs = [FetchResponse];
type ProcessResult = {
merged: Array<{ userId: number; userName: string; postTitle: string }>;
};
type AuditArgs = [ProcessResult, string];
type AuditResult = { logged: boolean; timestamp: string };
const phases: STABLE_WORKFLOW_PHASE<FetchRequestData, FetchResponse, ProcessArgs | AuditArgs, ProcessResult | AuditResult>[] = [
{
id: 'fetch-data',
requests: [
{
id: 'get-users-posts',
requestOptions: {
reqData: {
hostname: 'api.example.com',
path: '/users-and-posts'
},
resReq: true,
attempts: 3
}
}
]
},
{
id: 'process-and-audit',
markConcurrentPhase: true,
items: [
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'process-data',
functionOptions: {
fn: (data: FetchResponse): ProcessResult => ({
merged: data.users.map((user, idx) => ({
userId: user.id,
userName: user.name,
postTitle: data.posts[idx]?.title || 'No post'
}))
}),
args: [{ users: [], posts: [] }] as ProcessArgs,
returnResult: true
}
}
},
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'audit-processing',
functionOptions: {
fn: async (result: ProcessResult, auditId: string): Promise<AuditResult> => {
console.log(`Audit ${auditId}:`, result);
return { logged: true, timestamp: new Date().toISOString() };
},
args: [{ merged: [] }, 'audit-123'] as AuditArgs,
returnResult: true
}
}
}
],
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
if (!phaseResult.success) {
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
}
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
{
id: 'finalize',
requests: [
{
id: 'store-result',
requestOptions: {
reqData: {
hostname: 'api.example.com',
path: '/store',
method: REQUEST_METHODS.POST
},
resReq: false
}
}
]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'data-pipeline',
concurrentPhaseExecution: false, // Phases sequential
enableNonLinearExecution: true,
sharedBuffer: { userId: '123' },
commonAttempts: 2,
commonWait: 200,
handlePhaseCompletion: ({ phaseResult, workflowId }) => {
console.log(`Phase ${phaseResult.phaseId} complete in workflow ${workflowId}`);
}
});
console.log(`Workflow succeeded: ${result.success}, phases: ${result.totalPhases}`);Key responsibilities:
- Execute phases sequentially or concurrently
- Support mixed requests and functions per phase
- Enable non-linear flow (CONTINUE, SKIP, REPLAY, JUMP, TERMINATE)
- Maintain shared buffer across all phases
- Apply phase-level and request-level config cascading
- Support branching with parallel/sequential branches
- Collect per-phase metrics and workflow aggregates
stableWorkflowGraph
DAG-based execution for higher parallelism and explicit phase dependencies.
import { stableWorkflowGraph, WorkflowGraphBuilder } from '@emmvish/stable-infra';
const graph = new WorkflowGraphBuilder()
.addPhase('fetch-posts', {
requests: [{
id: 'get-posts',
requestOptions: {
reqData: { hostname: 'api.example.com', path: '/posts' },
resReq: true
}
}]
})
.addPhase('fetch-users', {
requests: [{
id: 'get-users',
requestOptions: {
reqData: { hostname: 'api.example.com', path: '/users' },
resReq: true
}
}]
})
.addParallelGroup('fetch-all', ['fetch-posts', 'fetch-users'])
.addPhase('aggregate', {
functions: [{
id: 'combine',
functionOptions: {
fn: () => ({ posts: [], users: [] }),
args: [],
returnResult: true
}
}]
})
.addMergePoint('sync', ['fetch-all'])
.connectSequence('fetch-all', 'sync', 'aggregate')
.setEntryPoint('fetch-all')
.build();
const result = await stableWorkflowGraph(graph, {
workflowId: 'data-aggregation'
});
console.log(`Graph workflow success: ${result.success}`);Key responsibilities:
- Define phases as DAG nodes with explicit dependency edges
- Execute independent phases in parallel automatically
- Support parallel groups, merge points, and conditional routing
- Validate graph structure (cycle detection, reachability, orphan detection)
- Provide deterministic execution order
- Offer higher parallelism than phased workflows for complex topologies
StableScheduler
Queue-based scheduler for cron/interval/timestamp execution with concurrency limits and recoverable state via custom persistence handlers.
Key responsibilities:
- Enforce max-parallel job execution
- Schedule jobs with cron, interval, or timestamp(s)
- Persist and restore scheduler state via user-provided handlers
StableBuffer
Transactional, concurrency-safe shared state. It’s opt-in: pass a StableBuffer instance as commonBuffer or sharedBuffer to serialize updates across concurrent executions.
Key features:
- Serialized transactions via FIFO queue
- Snapshot reads with
read() - Optional transaction timeouts
- Optional transaction logging with
logTransaction
import { StableBuffer } from '@emmvish/stable-infra';
const buffer = new StableBuffer({
initialState: { counter: 0 },
transactionTimeoutMs: 500,
logTransaction: (log) => {
// persist log.transactionId, log.activity, log.hookName, log.stateBefore, log.stateAfter
}
});
await buffer.run(
(state) => { state.counter += 1; },
{ activity: 'workflow-phase', hookName: 'phase-1', workflowId: 'wf-1' }
);Replay utility (transaction logs → deterministic state replay):
import { replayStableBufferTransactions } from '@emmvish/stable-infra';
const replay = await replayStableBufferTransactions({
logs, // StableBufferTransactionLog[]
handlers: {
'phase-1': (state) => { state.counter += 1; }
},
initialState: { counter: 0 }
});
console.log(replay.buffer.getState());Distributed Infrastructure
Multi-node coordination via a shared backend (Redis, PostgreSQL, etcd, etc.). Nodes connect independently to the backend — no peer-to-peer discovery or IP exchange required. All coordination goes through the DistributedCoordinator, backed by a pluggable DistributedAdapter.
Key capabilities: distributed locking with fencing tokens, compare-and-swap (CAS), quorum-based leader election, pub/sub with delivery guarantees (at-most-once / at-least-once / exactly-once), two-phase commit transactions, distributed buffers, and distributed schedulers.
import { DistributedCoordinator, InMemoryDistributedAdapter } from '@emmvish/stable-infra';
const coordinator = new DistributedCoordinator({
adapter: new InMemoryDistributedAdapter('node-1'), // Use Redis/Postgres adapter in production
namespace: 'my-app',
enableLeaderElection: true,
leaderHeartbeatMs: 5000,
});
await coordinator.connect();
// Distributed lock with fencing token
const lock = await coordinator.acquireLock({ resource: 'order:123', ttlMs: 30000 });
// Leader election — nodes register via shared backend, no IP discovery needed
await coordinator.registerForElection('scheduler-leader');
await coordinator.campaignForLeader({ electionKey: 'scheduler-leader' });
// Shared state & pub/sub across nodes
await coordinator.setState('user:123', { balance: 100 });
await coordinator.publish('events', { type: 'ORDER_CREATED', orderId: 123 });An InMemoryDistributedAdapter is included for testing and single-instance use. For production multi-node deployments, implement the DistributedAdapter interface with your backend of choice.
Distributed Buffer — Cross-Node StableBuffer
Wrap a StableBuffer with distributed sync so transactions on one node propagate to all others via pub/sub. Conflict resolution is configurable.
import {
createDistributedStableBuffer,
DistributedConflictResolution,
withDistributedBufferLock
} from '@emmvish/stable-infra';
const { buffer, sync, refresh, disconnect } = await createDistributedStableBuffer({
distributed: { adapter, namespace: 'my-app' },
initialState: { counter: 0, items: [] },
conflictResolution: DistributedConflictResolution.LAST_WRITE_WINS, // or MERGE, CUSTOM
syncOnTransaction: true // auto-push state after each buffer.run()
});
// Use exactly like a local StableBuffer — sync happens automatically
await buffer.run(state => { state.counter += 1; });
// Acquire a distributed lock for critical sections
await withDistributedBufferLock({ buffer, coordinator }, async () => {
await buffer.run(state => { state.counter -= 100; }); // exclusive cross-node access
}, { ttlMs: 10000 });
await refresh(); // Force pull latest state from remote
await disconnect();Distributed Resilience — Shared Circuit Breaker, Rate Limiter, Cache
Every resilience component (CircuitBreaker, RateLimiter, ConcurrencyLimiter, CacheManager) supports a persistence interface. The distributed layer provides an implementation backed by coordinator.getState/setState, so component state (failure counts, open/closed status, rate windows) is synchronized across nodes.
Create them individually or as a bundle:
import { createDistributedInfrastructureBundle } from '@emmvish/stable-infra';
// One coordinator, all components share the same backend
const infra = await createDistributedInfrastructureBundle({
distributed: { adapter, namespace: 'my-service' },
circuitBreaker: {
failureThresholdPercentage: 50,
minimumRequests: 10,
recoveryTimeoutMs: 30000
},
rateLimiter: { maxRequests: 1000, windowMs: 60000 },
concurrencyLimiter: { limit: 50 },
cacheManager: { enabled: true, ttl: 300000 }
});
// Plug directly into any core module via sharedInfrastructure
const scheduler = new StableScheduler({
maxParallel: 10,
sharedInfrastructure: {
circuitBreaker: infra.circuitBreaker, // shared state across nodes
rateLimiter: infra.rateLimiter,
concurrencyLimiter: infra.concurrencyLimiter,
cacheManager: infra.cacheManager
}
}, handler);
await infra.disconnect(); // cleanupStandalone factories are also available: createDistributedCircuitBreaker(), createDistributedRateLimiter(), createDistributedConcurrencyLimiter(), createDistributedCacheManager().
Distributed Scheduler — Leader-Based Execution
Wrap StableScheduler so only the elected leader node processes jobs, with distributed state persistence and shared resilience infrastructure.
import { runAsDistributedScheduler } from '@emmvish/stable-infra';
const runner = await runAsDistributedScheduler({
distributed: { adapter, namespace: 'workers' },
scheduler: { maxParallel: 5 },
circuitBreaker: { failureThresholdPercentage: 50, minimumRequests: 10, recoveryTimeoutMs: 30000 },
rateLimiter: { maxRequests: 100, windowMs: 60000 },
createScheduler: (config) => new StableScheduler(config, async (job) => {
await processJob(job);
})
});
await runner.start(); // Campaigns for leadership, starts scheduler when elected
runner.isLeader(); // Check current status
await runner.stop(); // Graceful shutdown, resigns leadershipInternally, scheduler state (job queue, execution history) is persisted via the coordinator. When the leader crashes or resigns, a new leader is elected (after the previous leader’s lease expires). The new leader automatically restores state from the coordinator and runs remaining jobs, so work resumes from where the previous leader left off.
Multi-Node Integration Example
Combining distributed buffer, shared circuit breaker, leader election, pub/sub, locking, and workflows:
import {
createDistributedStableBuffer, createDistributedSchedulerConfig,
DistributedCoordinator, DistributedConflictResolution,
DistributedTransactionOperationType, withDistributedBufferLock,
StableScheduler
} from '@emmvish/stable-infra';
// 1. Distributed buffer with custom conflict resolution
const { buffer: sharedBuffer } = await createDistributedStableBuffer({
distributed: { adapter, namespace: 'orders' },
initialState: { orderCount: 0, totalRevenue: 0, failedOrders: [] },
conflictResolution: DistributedConflictResolution.CUSTOM,
mergeStrategy: (local, remote) => ({
orderCount: Math.max(local.orderCount, remote.orderCount),
totalRevenue: Math.max(local.totalRevenue, remote.totalRevenue),
failedOrders: [...new Set([...local.failedOrders, ...remote.failedOrders])]
}),
syncOnTransaction: true
});
// 2. Distributed scheduler with leader election + circuit breaker
const setup = await createDistributedSchedulerConfig({
distributed: { adapter, namespace: 'orders' },
scheduler: { maxParallel: 10 },
enableLeaderElection: true,
circuitBreaker: { failureThresholdPercentage: 40, minimumRequests: 5, recoveryTimeoutMs: 30000 }
});
// 3. Job handler — lock, process, update shared state atomically
const orderHandler = async (job) => {
const lock = await coordinator.acquireLock({
resource: `order:${job.orderId}:processing`, ttlMs: 60000
});
try {
await processOrderWorkflow(job); // stableWorkflow under the hood
// Update shared buffer with distributed lock
await withDistributedBufferLock({ buffer: sharedBuffer, coordinator }, async () => {
await sharedBuffer.run(state => {
state.orderCount += 1;
state.totalRevenue += job.total;
});
});
// Publish completion for other services
await coordinator.publish('order-events', { type: 'COMPLETED', orderId: job.orderId });
} catch (error) {
// Atomic error handling via 2PC transaction
await coordinator.executeTransaction([
{ type: DistributedTransactionOperationType.SET, key: `order:${job.orderId}:status`, value: { status: 'FAILED' } },
{ type: DistributedTransactionOperationType.INCREMENT, key: 'metrics:failed-orders', delta: 1 }
]);
} finally {
await coordinator.releaseLock(lock.handle);
}
};
// 4. Start — only the leader processes jobs
const scheduler = new StableScheduler({ ...setup.config, sharedBuffer }, orderHandler);
const isLeader = await setup.waitForLeadership(30000);
if (isLeader) scheduler.start();Stable Runner
Config-driven runner that executes core module jobs from JSON/ESM configs and can use StableScheduler for scheduled jobs.
Resilience Mechanisms
Execution Timeouts
Set maximum execution time for functions to prevent indefinite hangs. Timeouts are enforced at multiple levels with proper inheritance.
You can also set a workflow/gateway-level maxTimeout to cap total execution time (applies to stableWorkflow, stableWorkflowGraph, and stableApiGateway).
Function-Level Timeout
Set timeout directly on a function:
import { stableFunction } from '@emmvish/stable-infra';
const result = await stableFunction({
fn: async () => {
// Long-running operation
await processLargeDataset();
return 'success';
},
args: [],
returnResult: true,
executionTimeout: 5000, // 5 seconds max
attempts: 3,
});
if (!result.success && result.error?.includes('timeout')) {
console.log('Function timed out');
}Gateway-Level Timeout
Apply timeout to all functions in a gateway:
import { stableApiGateway, RequestOrFunction } from '@emmvish/stable-infra';
const results = await stableApiGateway(
[
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'task1',
functionOptions: {
fn: async () => await task1(),
args: [],
// No timeout specified - inherits from gateway
},
},
},
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'task2',
functionOptions: {
fn: async () => await task2(),
args: [],
executionTimeout: 10000, // Override gateway timeout
},
},
},
],
{
commonExecutionTimeout: 3000, // Default 3s for all functions
}
);Request Group Timeout
Different timeouts for different groups:
const results = await stableApiGateway(
[
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'critical',
groupId: 'criticalOps',
functionOptions: { fn: criticalOp, args: [] },
},
},
{
type: RequestOrFunction.FUNCTION,
function: {
id: 'background',
groupId: 'backgroundOps',
functionOptions: { fn: backgroundOp, args: [] },
},
},
],
{
requestGroups: [
{
id: 'criticalOps',
commonConfig: {
commonExecutionTimeout: 1000, // Strict 1s timeout
},
},
{
id: 'backgroundOps',
commonConfig: {
commonExecutionTimeout: 30000, // Lenient 30s timeout
},
},
],
}
);Workflow Phase Timeout
Apply timeout at phase level in workflows:
import { stableWorkflow } from '@emmvish/stable-infra';
const result = await stableWorkflow(
[
{
id: 'initialization',
functions: [
{
id: 'init',
functionOptions: {
fn: initializeSystem,
args: [],
},
},
],
commonConfig: {
commonExecutionTimeout: 5000, // 5s for initialization
},
},
{
id: 'processing',
functions: [
{
id: 'process',
functionOptions: {
fn: processData,
args: [],
},
},
],
commonConfig: {
commonExecutionTimeout: 30000, // 30s for processing
},
},
],
{
commonExecutionTimeout: 10000, // Default for phases without specific timeout
}
);Timeout Precedence
Timeouts follow the configuration cascade pattern:
Function > Group > Phase/Branch > Gateway
- Function-level
executionTimeoutalways wins - If not set, inherits from request group's
commonExecutionTimeout - If not set, inherits from phase/branch's
commonExecutionTimeout - If not set, inherits from gateway's
commonExecutionTimeout - If not set, no timeout is applied
Timeout Behavior
- Timeout applies to entire function execution including all retry attempts
- When timeout is exceeded, function returns failed result with timeout error
- Timeout does NOT stop execution mid-flight (no AbortController)
- Metrics are still collected even when timeout occurs
- Use with retries: timeout encompasses all attempts, not per-attempt
const result = await stableFunction({
fn: slowFunction,
args: [],
attempts: 5,
wait: 1000,
executionTimeout: 3000, // Total time for all 5 attempts
});
// If each attempt takes 800ms:
// - Attempt 1: 800ms
// - Attempt 2: starts at 1800ms (after 1s wait)
// - Attempt 3: would start at 3600ms → TIMEOUT at 3000msRetry Strategies
When a request or function fails and is retryable, retry with configurable backoff.
FIXED Strategy
Constant wait between retries.
import { stableRequest, RETRY_STRATEGIES } from '@emmvish/stable-infra';
interface DataRequest {}
interface DataResponse { data: any; }
const result = await stableRequest<DataRequest, DataResponse>({
reqData: { hostname: 'api.example.com', path: '/data' },
resReq: true,
attempts: 4,
wait: 500,
retryStrategy: RETRY_STRATEGIES.FIXED
// Retries at: 500ms, 1000ms, 1500ms
});LINEAR Strategy
Wait increases linearly with attempt number.
const result = await stableRequest<DataRequest, DataResponse>({
reqData: { hostname: 'api.example.com', path: '/data' },
resReq: true,
attempts: 4,
wait: 100,
retryStrategy: RETRY_STRATEGIES.LINEAR
// Retries at: 100ms, 200ms, 300ms (wait * attempt)
});EXPONENTIAL Strategy
Wait increases exponentially; useful for heavily loaded services.
const result = await stableRequest<DataRequest, DataResponse>({
reqData: { hostname: 'api.example.com', path: '/data' },
resReq: true,
attempts: 4,
wait: 100,
maxAllowedWait: 10000,
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL
// Retries at: 100ms, 200ms, 400ms (wait * 2^(attempt-1))
// Capped at maxAllowedWait
});Jitter
Add random milliseconds to prevent synchronization.
const result = await stableRequest<DataRequest, DataResponse>({
reqData: { hostname: 'api.example.com', path: '/data' },
resReq: true,
attempts: 3,
wait: 500,
jitter: 200, // Add 0-200ms randomness
retryStrategy: RETRY_STRATEGIES.EXPONENTIAL
});Perform All Attempts
Collect all outcomes instead of failing on first error.
const result = await stableRequest<DataRequest, DataResponse>({
reqData: { hostname: 'api.example.com', path: '/data' },
resReq: true,
attempts: 3,
performAllAttempts: true
// All 3 attempts execute; check result.successfulAttempts
});Circuit Breaker
Prevent cascading failures by failing fast when a dependency becomes unhealthy.
import { stableApiGateway, CircuitBreaker } from '@emmvish/stable-infra';
interface FlakyRequest {}
interface FlakyResponse { status: string; }
const breaker = new CircuitBreaker({
failureThresholdPercentage: 50,
minimumRequests: 10,
recoveryTimeoutMs: 30000,
successThresholdPercentage: 80,
halfOpenMaxRequests: 5
});
const requests = [
{ id: 'req-1', requestOptions: { reqData: { path: '/flaky' }, resReq: true } },
{ id: 'req-2', requestOptions: { reqData: { path: '/flaky' }, resReq: true } }
];
const responses = await stableApiGateway<FlakyRequest, FlakyResponse>(requests, {
circuitBreaker: breaker
});
// Circuit breaker states:
// CLOSED: Normal operation (accept all requests)
// OPEN: Too many failures; reject immediately
// HALF_OPEN: Testing recovery; allow limited requestsState Transitions:
- CLOSED → OPEN: Failure rate exceeds threshold after minimum requests
- OPEN → HALF_OPEN: Recovery timeout elapsed; attempt recovery
- HALF_OPEN → CLOSED: Success rate exceeds recovery threshold
- HALF_OPEN → OPEN: Success rate below recovery threshold; reopen
Caching
Cache responses to avoid redundant calls.
import { stableRequest, CacheManager } from '@emmvish/stable-infra';
interface UserRequest {}
interface UserResponse {
id: number;
name: string;
email: string;
}
const cache = new CacheManager({
enabled: true,
ttl: 5000 // 5 seconds
});
// First call: cache miss, hits API
const result1 = await stableRequest<UserRequest, UserResponse>({
reqData: { hostname: 'api.example.com', path: '/user/1' },
resReq: true,
cache
});
// Second call within 5s: cache hit, returns cached response
const result2 = await stableRequest<UserRequest, UserResponse>({
reqData: { hostname: 'api.example.com', path: '/user/1' },
resReq: true,
cache
});
// Respects Cache-Control headers if enabled
const cache2 = new CacheManager({
enabled: true,
ttl: 60000,
respectCacheControl: true // Uses max-age, no-cache, no-store
});Function Caching:
Arguments become cache key; identical args hit cache.
import { stableFunction } from '@emmvish/stable-infra';
const expensive = (x: number) => x * x * x; // Cubic calculation
const result1 = await stableFunction({
fn: expensive,
args: [5],
returnResult: true,
cache: { enabled: true, ttl: 10000 }
});
const result2 = await stableFunction({
fn: expensive,
args: [5], // Same args → cache hit
returnResult: true,
cache: { enabled: true, ttl: 10000 }
});Rate Limiting
Enforce max requests per time window.
import { stableApiGateway } from '@emmvish/stable-infra';
interface ItemRequest {}
interface ItemResponse {
id: number;
data: any;
}
const requests = Array.from({ length: 20 }, (_, i) => ({
id: `req-${i}`,
requestOptions: {
reqData: { path: `/item/${i}` },
resReq: true
}
}));
const responses = await stableApiGateway<ItemRequest, ItemResponse>(requests, {
concurrentExecution: true,
rateLimit: {
maxRequests: 5,
windowMs: 1000 // 5 requests per second
}
// Requests queued until window allows; prevents overwhelming API
});Concurrency Limiting
Limit concurrent in-flight requests.
import { stableApiGateway } from '@emmvish/stable-infra';
interface ItemRequest {}
interface ItemResponse {
id: number;
data: any;
}
const requests = Array.from({ length: 50 }, (_, i) => ({
id: `req-${i}`,
requestOptions: {
reqData: { path: `/item/${i}` },
resReq: true,
attempts: 1
}
}));
const responses = await stableApiGateway<ItemRequest, ItemResponse>(requests, {
concurrentExecution: true,
maxConcurrentRequests: 5 // Only 5 requests in-flight at a time
// Others queued and executed as slots free
});Workflow Patterns
Sequential & Concurrent Phases
Sequential (Default)
Each phase waits for the previous to complete.
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'phase-1',
requests: [{ id: 'r1', requestOptions: { reqData: { path: '/p1' }, resReq: true } }]
},
{
id: 'phase-2',
requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
},
{
id: 'phase-3',
requests: [{ id: 'r3', requestOptions: { reqData: { path: '/p3' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'sequential-phases',
concurrentPhaseExecution: false // Phase-1 → Phase-2 → Phase-3
});Concurrent Phases
Multiple phases run in parallel.
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'fetch-users',
requests: [{ id: 'get-users', requestOptions: { reqData: { path: '/users' }, resReq: true } }]
},
{
id: 'fetch-posts',
requests: [{ id: 'get-posts', requestOptions: { reqData: { path: '/posts' }, resReq: true } }]
},
{
id: 'fetch-comments',
requests: [{ id: 'get-comments', requestOptions: { reqData: { path: '/comments' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'parallel-phases',
concurrentPhaseExecution: true // All 3 phases in parallel
});Mixed Phases
Combine sequential and concurrent phases in one workflow.
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'init', // Sequential
requests: [{ id: 'setup', requestOptions: { reqData: { path: '/init' }, resReq: true } }]
},
{
id: 'fetch-a',
markConcurrentPhase: true, // Concurrent with next
requests: [{ id: 'data-a', requestOptions: { reqData: { path: '/a' }, resReq: true } }]
},
{
id: 'fetch-b',
markConcurrentPhase: true, // Concurrent with fetch-a
requests: [{ id: 'data-b', requestOptions: { reqData: { path: '/b' }, resReq: true } }]
},
{
id: 'finalize', // Sequential after fetch-a/b complete
requests: [{ id: 'done', requestOptions: { reqData: { path: '/finalize' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
concurrentPhaseExecution: false // Respects markConcurrentPhase per phase
});Non-Linear Workflows
Use decision hooks to dynamically control phase flow.
CONTINUE
Standard flow to next sequential phase.
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'check-status',
requests: [{ id: 'api', requestOptions: { reqData: { path: '/status' }, resReq: true } }],
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
{
id: 'process', // Executes after check-status
requests: [{ id: 'process-data', requestOptions: { reqData: { path: '/process' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true
});SKIP
Skip the next phase; execute the one after.
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'phase-1',
requests: [{ id: 'r1', requestOptions: { reqData: { path: '/p1' }, resReq: true } }],
phaseDecisionHook: async () => ({
action: PHASE_DECISION_ACTIONS.SKIP
})
},
{
id: 'phase-2', // Skipped
requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
},
{
id: 'phase-3', // Executes
requests: [{ id: 'r3', requestOptions: { reqData: { path: '/p3' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true
});
// Execution: phase-1 → phase-3JUMP
Jump to a specific phase by ID.
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'phase-1',
requests: [{ id: 'r1', requestOptions: { reqData: { path: '/p1' }, resReq: true } }],
phaseDecisionHook: async () => ({
action: PHASE_DECISION_ACTIONS.JUMP,
targetPhaseId: 'recovery'
})
},
{
id: 'phase-2', // Skipped
requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
},
{
id: 'recovery',
requests: [{ id: 'recover', requestOptions: { reqData: { path: '/recovery' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true
});
// Execution: phase-1 → recoveryREPLAY
Re-execute current phase; useful for polling.
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'wait-for-job',
allowReplay: true,
maxReplayCount: 5,
requests: [
{
id: 'check-job',
requestOptions: { reqData: { path: '/job/status' }, resReq: true, attempts: 1 }
}
],
phaseDecisionHook: async ({ phaseResult, executionHistory }) => {
const lastResponse = phaseResult.responses?.[0];
if ((lastResponse as any)?.data?.status === 'pending' && executionHistory.length < 5) {
return { action: PHASE_DECISION_ACTIONS.REPLAY };
}
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
{
id: 'process-result',
requests: [{ id: 'process', requestOptions: { reqData: { path: '/process' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true,
maxWorkflowIterations: 100
});
// Polls up to 5 times before continuingTERMINATE
Stop workflow early.
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'validate',
requests: [{ id: 'validate-input', requestOptions: { reqData: { path: '/validate' }, resReq: true } }],
phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
if (!phaseResult.success) {
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
}
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
},
{
id: 'phase-2', // Won't execute if validation fails
requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true
});
console.log(result.terminatedEarly); // true if TERMINATE triggeredBranched Workflows
Execute multiple independent branches with shared state.
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_BRANCH } from '@emmvish/stable-infra';
const branches: STABLE_WORKFLOW_BRANCH[] = [
{
id: 'branch-payment',
phases: [
{
id: 'process-payment',
requests: [
{
id: 'charge-card',
requestOptions: {
reqData: { path: '/payment/charge' },
resReq: true
}
}
]
}
]
},
{
id: 'branch-notification',
phases: [
{
id: 'send-email',
requests: [
{
id: 'send',
requestOptions: {
reqData: { path: '/notify/email' },
resReq: false
}
}
]
}
]
}
];
const result = await stableWorkflow([], {
workflowId: 'checkout',
enableBranchExecution: true,
branches,
sharedBuffer: { orderId: '12345' },
markConcurrentBranch: true // Branches run in parallel
});
// Both branches access/modify sharedBufferBranch Racing
When multiple branches execute concurrently, enable racing to accept the first successful branch and cancel others.
const result = await stableWorkflow([], {
workflowId: 'payment-racing',
enableBranchExecution: true,
enableBranchRacing: true, // First successful branch wins
branches: [
{
id: 'payment-provider-a',
phases: [/* ... */]
},
{
id: 'payment-provider-b',
phases: [/* ... */]
}
],
markConcurrentBranch: true
});
// Only winning branch's execution history recorded
// Losing branches marked as cancelledGraph-based Workflow Patterns
Key responsibilities:
- Define phases as DAG nodes with explicit dependency edges
- Execute independent phases in parallel automatically
- Support parallel groups, merge points, and conditional routing
- Validate graph structure (cycle detection, reachability, orphan detection)
- Provide deterministic execution order
- Offer higher parallelism than phased workflows for complex topologies
Graph-Based Workflows with Mixed Items
For complex topologies with explicit dependencies, use DAG execution mixing requests and functions.
import { stableWorkflowGraph, WorkflowGraphBuilder, RequestOrFunction } from '@emmvish/stable-infra';
import type { API_GATEWAY_ITEM } from '@emmvish/stable-infra';
// Request types
interface PostsRequest {}
interface PostsResponse { posts: Array<{ id: number; title: string }> };
interface UsersRequest {}
interface UsersResponse { users: Array<{ id: number; name: string }> };
// Function types
type AggregateArgs = [PostsResponse, UsersResponse];
type AggregateResult = {
combined: Array<{ userId: number; userName: string; postCount: number }>;
};
type AnalyzeArgs = [AggregateResult];
type AnalyzeResult = { totalPosts: number; activeUsers: number };
const graph = new WorkflowGraphBuilder<
PostsRequest | UsersRequest,
PostsResponse | UsersResponse,
AggregateArgs | AnalyzeArgs,
AggregateResult | AnalyzeResult
>()
.addPhase('fetch-posts', {
requests: [{
id: 'get-posts',
requestOptions: {
reqData: { path: '/posts' },
resReq: true
}
}]
})
.addPhase('fetch-users', {
requests: [{
id: 'get-users',
requestOptions: {
reqData: { path: '/users' },
resReq: true
}
}]
})
.addParallelGroup('fetch-all', ['fetch-posts', 'fetch-users'])
.addPhase('aggregate', {
functions: [{
id: 'combine-data',
functionOptions: {
fn: (posts: PostsResponse, users: UsersResponse): AggregateResult => ({
combined: users.users.map(user => ({
userId: user.id,
userName: user.name,
postCount: posts.posts.filter(p => p.id === user.id).length
}))
}),
args: [{ posts: [] }, { users: [] }] as AggregateArgs,
returnResult: true
}
}]
})
.addPhase('analyze', {
functions: [{
id: 'analyze-data',
functionOptions: {
fn: (aggregated: AggregateResult): AnalyzeResult => ({
totalPosts: aggregated.combined.reduce((sum, u) => sum + u.postCount, 0),
activeUsers: aggregated.combined.filter(u => u.postCount > 0).length
}),
args: [{ combined: [] }] as AnalyzeArgs,
returnResult: true
}
}]
})
.addMergePoint('sync', ['fetch-all'])
.connectSequence('fetch-all', 'sync', 'aggregate', 'analyze')
.setEntryPoint('fetch-all')
.build();
const result = await stableWorkflowGraph(graph, {
workflowId: 'data-aggregation'
});
console.log(`Graph workflow success: ${result.success}`);Parallel Phase Execution
Execute multiple phases concurrently within a group.
import { stableWorkflowGraph, WorkflowGraphBuilder } from '@emmvish/stable-infra';
const graph = new WorkflowGraphBuilder()
.addPhase('fetch-users', {
requests: [{
id: 'users',
requestOptions: { reqData: { path: '/users' }, resReq: true }
}]
})
.addPhase('fetch-posts', {
requests: [{
id: 'posts',
requestOptions: { reqData: { path: '/posts' }, resReq: true }
}]
})
.addPhase('fetch-comments', {
requests: [{
id: 'comments',
requestOptions: { reqData: { path: '/comments' }, resReq: true }
}]
})
.addParallelGroup('data-fetch', ['fetch-users', 'fetch-posts', 'fetch-comments'])
.setEntryPoint('data-fetch')
.build();
const result = await stableWorkflowGraph(graph, {
workflowId: 'data-aggregation'
});
// All 3 phases run concurrentlyMerge Points
Synchronize multiple predecessor phases.
const graph = new WorkflowGraphBuilder()
.addPhase('fetch-a', {
requests: [{ id: 'a', requestOptions: { reqData: { path: '/a' }, resReq: true } }]
})
.addPhase('fetch-b', {
requests: [{ id: 'b', requestOptions: { reqData: { path: '/b' }, resReq: true } }]
})
.addMergePoint('sync', ['fetch-a', 'fetch-b'])
.addPhase('aggregate', {
functions: [{
id: 'combine',
functionOptions: {
fn: () => 'combined',
args: [],
returnResult: true
}
}]
})
.connectSequence('fetch-a', 'sync')
.connectSequence('fetch-b', 'sync')
.connectSequence('sync', 'aggregate')
.setEntryPoint('fetch-a')
.build();
const result = await stableWorkflowGraph(graph, {
workflowId: 'parallel-sync'
});
// fetch-a and fetch-b run in parallel
// aggregate waits for both to completeLinear Helper
Convenience function for sequential phase chains.
import { createLinearWorkflowGraph } from '@emmvish/stable-infra';
const phases = [
{
id: 'init',
requests: [{ id: 'setup', requestOptions: { reqData: { path: '/init' }, resReq: true } }]
},
{
id: 'process',
requests: [{ id: 'do-work', requestOptions: { reqData: { path: '/work' }, resReq: true } }]
},
{
id: 'finalize',
requests: [{ id: 'cleanup', requestOptions: { reqData: { path: '/cleanup' }, resReq: true } }]
}
];
const graph = createLinearWorkflowGraph(phases);
const result = await stableWorkflowGraph(graph, {
workflowId: 'linear-workflow'
});Branch Racing in Graphs
Enable branch racing in workflow graphs to accept the first successful branch node when multiple branches are executed in parallel.
import { stableWorkflowGraph, WorkflowGraphBuilder } from '@emmvish/stable-infra';
const branch1 = {
id: 'provider-a',
phases: [{ /* ... */ }]
};
const branch2 = {
id: 'provider-b',
phases: [{ /* ... */ }]
};
const graph = new WorkflowGraphBuilder()
.addBranch('provider-a', branch1)
.addBranch('provider-b', branch2)
.addParallelGroup('race', ['provider-a', 'provider-b'])
.setEntryPoint('race')
.build();
const result = await stableWorkflowGraph(graph, {
workflowId: 'provider-racing',
enableBranchRacing: true // First successful branch wins
});
// Only winning branch's results recorded
// Losing branch marked as cancelledConfiguration & State
Config Cascading
Define defaults globally; override at group, phase, branch, or item level.
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'phase-1',
attempts: 5, // Override global attempts for this phase
wait: 1000,
requests: [
{
id: 'req-1',
requestOptions: {
reqData: { path: '/data' },
resReq: true,
attempts: 2 // Override phase attempts for this item
}
}
]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'cascade-demo',
commonAttempts: 1, // Global default
commonWait: 500,
retryStrategy: 'LINEAR' // Global default
// Final config per item: merge common → phase → request
});Hierarchy: global → group → phase → branch → item. Lower levels override.
Shared & State Buffers
Pass mutable state across phases, branches, and items. For concurrency-safe shared state, pass a StableBuffer instance instead of a plain object.
Shared Buffer (Workflow/Gateway)
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'fetch',
requests: [
{
id: 'user-data',
requestOptions: {
reqData: { path: '/users/1' },
resReq: true,
handleSuccessfulAttemptData: ({ successfulAttemptData, stableRequestOptions }) => {
// Mutate shared buffer
const sharedBuffer = (stableRequestOptions as any).sharedBuffer;
sharedBuffer.userId = (successfulAttemptData.data as any).id;
}
}
}
]
},
{
id: 'use-shared-data',
requests: [
{
id: 'dependent-call',
requestOptions: {
reqData: { path: '/user-posts' },
resReq: true,
preExecution: {
preExecutionHook: async ({ stableRequestOptions, commonBuffer }) => {
const sharedBuffer = (stableRequestOptions as any).sharedBuffer;
console.log(`Using userId: ${sharedBuffer.userId}`);
}
}
}
}
]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'shared-state-demo',
sharedBuffer: {} // Mutable across phases
});Common Buffer (Request Level)
import { stableRequest, PersistenceStage } from '@emmvish/stable-infra';
const commonBuffer = { transactionId: null };
const result = await stableRequest({
reqData: { path: '/transaction/start' },
resReq: true,
commonBuffer,
preExecution: {
preExecutionHook: async ({ commonBuffer, stableRequestOptions }) => {
// commonBuffer writable here
commonBuffer.userId = '123';
}
},
handleSuccessfulAttemptData: ({ successfulAttemptData }) => {
// commonBuffer readable in handlers
console.log(`Transaction for user ${commonBuffer.userId} done`);
}
});Hooks & Observability
Pre-Execution Hooks
Modify config or state before execution.
import { stableRequest } from '@emmvish/stable-infra';
interface SecureRequest {}
interface SecureResponse {
data: any;
token?: string;
}
const result = await stableRequest<SecureRequest, SecureResponse>({
reqData: { path: '/secure-data' },
resReq: true,
preExecution: {
preExecutionHook: async ({ inputParams, commonBuffer, stableRequestOptions }) => {
// Dynamically fetch auth token
const token = await getAuthToken();
// Return partial config override
return {
reqData: {
headers: { Authorization: `Bearer ${token}` }
}
};
},
preExecutionHookParams: { context: 'auth-fetch' },
applyPreExecutionConfigOverride: true,
continueOnPreExecutionHookFailure: false
}
});Analysis Hooks
Validate responses and errors.
Response Analyzer
import { stableRequest } from '@emmvish/stable-infra';
interface ResourceRequest {}
interface ApiResponse {
id: number;
status: 'active' | 'inactive';
}
const result = await stableRequest<ResourceRequest, ApiResponse>({
reqData: { path: '/resource' },
resReq: true,
responseAnalyzer: ({ data, reqData, trialMode }) => {
// Return true to accept, false to retry
if (!data || typeof data !== 'object') return false;
if (!('id' in data)) return false;
if ((data as any).status !== 'active') return false;
return true;
}
});Error Analyzer
Decide whether to suppress error gracefully.
import { stableRequest } from '@emmvish/stable-infra';
interface FeatureRequest {}
interface FeatureResponse {
enabled: boolean;
data?: any;
}
const result = await stableRequest<FeatureRequest, FeatureResponse>({
reqData: { path: '/optional-feature' },
resReq: true,
finalErrorAnalyzer: ({ error, reqData, trialMode }) => {
// Return true to suppress error and return failure result
// Return false to throw error
if (error.code === 'ECONNREFUSED') {
console.warn('Service unavailable, continuing with fallback');
return true; // Suppress, don't throw
}
return false; // Throw
}
});
if (result.success) {
console.log('Got data:', result.data);
} else {
console.log('Service offline, but we continue');
}Handler Hooks
Custom logging and processing.
Success Handler
import { stableRequest } from '@emmvish/stable-infra';
interface DataRequest {}
interface DataResponse {
id: number;
value: string;
}
const result = await stableRequest<DataRequest, DataResponse>({
reqData: { path: '/data' },
resReq: true,
logAllSuccessfulAttempts: true,
handleSuccessfulAttemptData: ({
successfulAttemptData,
reqData,
maxSerializableChars,
executionContext
}) => {
// Custom logging, metrics, state updates
console.log(
`Success in context ${executionContext.workflowId}`,
`data:`,
successfulAttemptData.data
);
}
});Error Handler
const result = await stableRequest<DataRequest, DataResponse>({
reqData: { path: '/data' },
resReq: true,
logAllErrors: true,
handleErrors: ({ errorLog, reqData, executionContext }) => {
// Custom error logging, alerting, retry logic
console.error(
`Error in ${executionContext.workflowId}:`,
errorLog.errorMessage,
`Retryable: ${errorLog.isRetryable}`
);
}
});Phase Handlers (Workflow)
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'phase-1',
requests: [{ id: 'r1', requestOptions: { reqData: { path: '/data' }, resReq: true } }]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'wf-handlers',
handlePhaseCompletion: ({ phaseResult, workflowId }) => {
console.log(`Phase ${phaseResult.phaseId} complete in ${workflowId}`);
},
handlePhaseError: ({ phaseResult, error, workflowId }) => {
console.error(`Phase ${phaseResult.phaseId} failed:`, error);
},
handlePhaseDecision: ({ decision, phaseResult }) => {
console.log(`Phase decision: ${decision.action}`);
}
});Decision Hooks
Dynamically determine workflow flow.
import { stableWorkflow, PHASE_DECISION_ACTIONS } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'fetch-data',
requests: [{ id: 'api', requestOptions: { reqData: { path: '/data' }, resReq: true } }],
phaseDecisionHook: async ({ phaseResult, sharedBuffer, executionHistory }) => {
if (!phaseResult.success) {
return { action: PHASE_DECISION_ACTIONS.TERMINATE };
}
if (phaseResult.responses[0].data?.needsRetry) {
return { action: PHASE_DECISION_ACTIONS.REPLAY };
}
return { action: PHASE_DECISION_ACTIONS.CONTINUE };
}
}
];
const result = await stableWorkflow(phases, {
enableNonLinearExecution: true
});Metrics & Logging
Automatic metrics collection across all execution modes.
Request Metrics
import { stableRequest } from '@emmvish/stable-infra';
interface DataRequest {}
interface DataResponse { data: any; }
const result = await stableRequest<DataRequest, DataResponse>({
reqData: { path: '/data' },
resReq: true,
attempts: 3
});
console.log(result.metrics); // {
// totalAttempts: 2,
// successfulAttempts: 1,
// failedAttempts: 1,
// totalExecutionTime: 450,
// averageAttemptTime: 225,
// infrastructureMetrics: {
// circuitBreaker: { /* state, stats, config */ },
// cache: { /* hits, misses, size */ },
// rateLimiter: { /* limit, current rate */ },
// concurrencyLimiter: { /* limit, in-flight */ }
// },
// validation: {
// isValid: true,
// anomalies: [],
// validatedAt: '2026-01-20T...'
// }
// }API Gateway Metrics
import { stableApiGateway } from '@emmvish/stable-infra';
import type { API_GATEWAY_REQUEST } from '@emmvish/stable-infra';
interface ApiRequest {}
interface ApiResponse { data: any; }
const requests: API_GATEWAY_REQUEST<ApiRequest, ApiResponse>[] = [
{ id: 'req-1', requestOptions: { reqData: { path: '/data/1' }, resReq: true } },
{ id: 'req-2', requestOptions: { reqData: { path: '/data/2' }, resReq: true } },
{ id: 'req-3', requestOptions: { reqData: { path: '/data/3' }, resReq: true } }
];
const result = await stableApiGateway<ApiRequest, ApiResponse>(requests, {
concurrentExecution: true,
maxConcurrentRequests: 5
});
console.log(result.metrics); // {
// totalRequests: 3,
// successfulRequests: 3,
// failedRequests: 0,
// successRate: 100,
// failureRate: 0,
// executionTime: 450, // Total execution time in ms
// timestamp: '2026-01-20T...', // ISO 8601 completion timestamp
// throughput: 6.67, // Requests per second
// averageRequestDuration: 150, // Average time per request in ms
// requestGroups: [/* per-group stats */],
// infrastructureMetrics: {
// circuitBreaker: { /* state, stats, config */ },
// cache: { /* hit rate, size, utilization */ },
// rateLimiter: { /* throttle rate, queue length */ },
// concurrencyLimiter: { /* utilization, queue */ }
// },
// validation: {
// isValid: true,
// anomalies: [],
// validatedAt: '2026-01-20T...'
// }
// }Workflow Metrics
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{ id: 'p1', requests: [{ id: 'r1', requestOptions: { reqData: { path: '/a' }, resReq: true } }] },
{ id: 'p2', requests: [{ id: 'r2', requestOptions: { reqData: { path: '/b' }, resReq: true } }] }
];
const result = await stableWorkflow(phases, {
workflowId: 'wf-metrics'
});
console.log(result); // {
// workflowId: 'wf-metrics',
// success: true,
// totalPhases: 2,
// completedPhases: 2,
// totalRequests: 2,
// successfulRequests: 2,
// failedRequests: 0,
// workflowExecutionTime: 1200,
// phases: [
// { phaseId: 'p1', success: true, responses: [...], validation: {...}, ... },
// { phaseId: 'p2', success: true, responses: [...], validation: {...}, ... }
// ],
// validation: {
// isValid: true,
// anomalies: [],
// validatedAt: '2026-01-20T...'
// }
// }Structured Error Logs
const result = await stableRequest<DataRequest, DataResponse>({
reqData: { path: '/flaky' },
resReq: true,
attempts: 3,
logAllErrors: true,
handleErrors: ({ errorLog }) => {
console.log(errorLog); // {
// attempt: '1/3',
// type: 'NetworkError',
// error: 'ECONNREFUSED',
// isRetryable: true,
// timestamp: 1234567890
// }
}
});
if (result.errorLogs) {
console.log(`${result.errorLogs.length} errors logged`);
}Advanced Features
Trial Mode
Dry-run workflows without side effects; simulate failures.
import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';
const phases: STABLE_WORKFLOW_PHASE[] = [
{
id: 'process',
requests: [
{
id: 'api-call',
requestOptions: {
reqData: { path: '/payment/charge' },
resReq: true,
trialMode: {
enabled: true,
requestFailureProbability: 0.3 // 30% simulated failure rate
}
}
}
]
}
];
const result = await stableWorkflow(phases, {
workflowId: 'payment-trial',
trialMode: {
enabled: true,
functionFailureProbability: 0.2
}
});
// Requests/functions execute but failures are simulated
// Real API calls happen; real side effects occur only if enabled
// Useful for testing retry logic, decision hooks, workflow topologyState Persistence
Persist state across retry attempts for distributed tracing.
The persistenceFunction receives a persistenceStage parameter (PersistenceStage.BEFORE_HOOK or PersistenceStage.AFTER_HOOK) to indicate when it is called.
import { stableRequest, PersistenceStage } from '@emmvish/stable-infra';
interface DataRequest {}
interface DataResponse { data: any; }
const result = await stableRequest<DataRequest, DataResponse>({
reqData: { path: '/data' },
resReq: true,
attempts: 3,
statePersistence: {
persistenceFunction: async ({ executionContext, buffer, params, persistenceStage }) => {
const key = `${executionContext.workflowId}:${executionContext.requestId}`;
if (persistenceStage === PersistenceStage.BEFORE_HOOK || params?.operation === 'load') {
// Load state for recovery
return await loadFromDatabase(key);
}
// Save state to database or distributed cache
await saveToDatabase({ key, state: buffer });
return buffer;
},
persist