npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@emmvish/stable-infra

v2.0.4

Published

A production-grade stable & flexible execution engine for resilient workflows, API integrations, and batch processing.

Readme

@emmvish/stable-infra

A stability-first production-grade TypeScript framework for resilient API integrations, batch processing, and orchestrating complex workflows with deterministic error handling, type safety, and comprehensive observability.

Table of Contents


Overview

@emmvish/stable-infra evolved from a focused library for resilient API calls to a comprehensive execution framework. Originally addressing API integration challenges via stableRequest, it expanded to include:

  1. Batch orchestration via stableApiGateway for processing groups of mixed requests/functions
  2. Phased workflows via stableWorkflow for array-based multi-phase execution with dynamic control flow
  3. Graph-based workflows via stableWorkflowGraph for DAG execution with higher parallelism
  4. Generic function execution via stableFunction, inheriting all resilience guards
  5. Queue based scheduling via StableScheduler, with option to preserve scheduler state and recover from saved state
  6. Transactional shared state via StableBuffer, a concurrency-safe buffer you can pass as commonBuffer or sharedBuffer
  7. Distributed coordination via DistributedCoordinator for multi-node locking, state, leader election, pub/sub, and 2PC transactions

All core modules support the same resilience stack: retries, jitter, circuit breaking, caching, rate/concurrency limits, config cascading, shared buffers, trial mode, comprehensive hooks, and metrics. This uniformity makes it trivial to compose requests and functions in any topology. Finally, Stable Runner executes jobs from config.


Core Concepts

Resilience as Default

Every execution—whether a single request, a pure function, or an entire workflow—inherits built-in resilience:

  • Retries with configurable backoff strategies (FIXED, LINEAR, EXPONENTIAL)
  • Jitter to prevent thundering herd
  • Circuit breaker to fail fast and protect downstream systems
  • Caching for idempotent read operations
  • Rate & concurrency limits to respect external constraints
  • Metrics guardrails to validate execution against thresholds with automatic anomaly detection

Type Safety

All examples in this guide use TypeScript generics for type-safe request/response data and function arguments/returns. Analyzers validate shapes at runtime; TypeScript ensures compile-time safety.

Config Cascading

Global defaults → group overrides → phase overrides → branch overrides → item overrides. Lower levels always win, preventing repetition while maintaining expressiveness.

Shared State

Workflows and gateways support sharedBuffer for passing computed state across phases/branches/items without global state.


Core Modules

stableRequest

Single API call with resilience, type-safe request and response types.

import { stableRequest, REQUEST_METHODS, VALID_REQUEST_PROTOCOLS } from '@emmvish/stable-infra';

interface GetUserRequest {
  // Empty for GET requests with no body
}

interface User {
  id: number;
  name: string;
}

const result = await stableRequest<GetUserRequest, User>({
  reqData: {
    method: REQUEST_METHODS.GET,
    protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
    hostname: 'api.example.com',
    path: '/users/1'
  },
  resReq: true,
  attempts: 3,
  wait: 500,
  jitter: 100,
  cache: { enabled: true, ttl: 5000 },
  rateLimit: { maxRequests: 10, windowMs: 1000 },
  maxConcurrentRequests: 5,
  responseAnalyzer: ({ data }) => {
    return typeof data === 'object' && data !== null && 'id' in data;
  },
  handleSuccessfulAttemptData: ({ successfulAttemptData }) => {
    console.log(`User loaded: ${successfulAttemptData.data.name}`);
  }
});

if (result.success) {
  console.log(result.data.name, result.metrics.totalAttempts);
} else {
  console.error(result.error);
}

Key responsibilities:

  • Execute a single HTTP request with automatic retry and backoff
  • Validate response shape via analyzer; retry if invalid
  • Cache successful responses with TTL
  • Apply rate and concurrency limits
  • Throw or gracefully suppress errors via finalErrorAnalyzer
  • Collect attempt metrics and infra dashboards (circuit breaker, cache, rate limiter state)

stableFunction

Generic async/sync function execution with identical resilience.

import { stableFunction, RETRY_STRATEGIES } from '@emmvish/stable-infra';

type ComputeArgs = [number, number];
type ComputeResult = number;

const multiply = (a: number, b: number) => a * b;

const result = await stableFunction<ComputeArgs, ComputeResult>({
  fn: multiply,
  args: [5, 3],
  returnResult: true,
  attempts: 2,
  wait: 100,
  retryStrategy: RETRY_STRATEGIES.EXPONENTIAL,
  responseAnalyzer: ({ data }) => data > 0,
  cache: { enabled: true, ttl: 10000 }
});

if (result.success) {
  console.log('Result:', result.data); // 15
}

Key responsibilities:

  • Execute any async or sync function with typed arguments and return
  • Support argument-based cache key generation
  • Retry on error or analyzer rejection
  • Enforce success criteria via analyzer
  • Optionally suppress exceptions

stableApiGateway

Batch orchestration of mixed requests and functions.

import {
  stableApiGateway,
  REQUEST_METHODS,
  VALID_REQUEST_PROTOCOLS,
  RequestOrFunction
} from '@emmvish/stable-infra';
import type { API_GATEWAY_ITEM } from '@emmvish/stable-infra';

// Define request types
interface ApiRequestData {
  filters?: Record<string, any>;
}

interface ApiResponse {
  id: number;
  value: string;
}

// Define function types
type TransformArgs = [ApiResponse[], number];
type TransformResult = {
  transformed: ApiResponse[];
  count: number;
};

type ValidateArgs = [TransformResult];
type ValidateResult = boolean;

const items: API_GATEWAY_ITEM<ApiRequestData, ApiResponse, TransformArgs | ValidateArgs, TransformResult | ValidateResult>[] = [
  {
    type: RequestOrFunction.REQUEST,
    request: {
      id: 'fetch-data',
      requestOptions: {
        reqData: {
          method: REQUEST_METHODS.GET,
          protocol: VALID_REQUEST_PROTOCOLS.HTTPS,
          hostname: 'api.example.com',
          path: '/data'
        },
        resReq: true,
        attempts: 3
      }
    }
  },
  {
    type: RequestOrFunction.FUNCTION,
    function: {
      id: 'transform-data',
      functionOptions: {
        fn: (data: ApiResponse[], threshold: number): TransformResult => ({
          transformed: data.filter(item => item.id > threshold),
          count: data.length
        }),
        args: [[], 10] as TransformArgs,
        returnResult: true,
        attempts: 2,
        cache: { enabled: true, ttl: 5000 }
      }
    }
  },
  {
    type: RequestOrFunction.FUNCTION,
    function: {
      id: 'validate-result',
      functionOptions: {
        fn: (result: TransformResult): ValidateResult => result.count > 0,
        args: [{ transformed: [], count: 0 }] as ValidateArgs,
        returnResult: true
      }
    }
  }
];

const responses = await stableApiGateway<ApiRequestData, ApiResponse>(items, {
  concurrentExecution: true,
  stopOnFirstError: false,
  sharedBuffer: {},
  commonAttempts: 2,
  commonWait: 300,
  maxConcurrentRequests: 3
});

// Access individual responses
responses.forEach((resp, i) => {
  console.log(`Item ${i}: success=${resp.success}`);
});

// Access aggregate metrics
console.log(`Success rate: ${responses.metrics.successRate.toFixed(2)}%`);
console.log(`Execution time: ${responses.metrics.executionTime}ms`);
console.log(`Throughput: ${responses.metrics.throughput.toFixed(2)} req/s`);
console.log(`Average duration: ${responses.metrics.averageRequestDuration.toFixed(2)}ms`);

Request/Function Racing

Enable racing to accept the first successful request or function and cancel others, useful for redundant API calls or failover scenarios.

const responses = await stableApiGateway(items, {
  concurrentExecution: true,
  enableRacing: true, // First successful item wins, others cancelled
  maxConcurrentRequests: 10
});

// responses contains only the winning result
// Losing items marked as cancelled with appropriate error

Key responsibilities:

  • Execute a batch of requests and functions concurrently or sequentially
  • Apply global, group-level, and item-level config overrides
  • Maintain shared buffer across items for state passing
  • Stop on first error or continue despite failures
  • Collect per-item and aggregate metrics (success rates, execution time, throughput)
  • Support request grouping with group-specific config
  • Track infrastructure metrics (circuit breaker, cache, rate limiter, concurrency)

stableWorkflow

Phased array-based workflows with sequential/concurrent phases, mixed items, and non-linear control flow.

You can start a workflow from a specific phase using startPhaseIndex (0-based). When starting inside a concurrent group (markConcurrentPhase), execution aligns to the group’s first phase.

import { stableWorkflow, PHASE_DECISION_ACTIONS, RequestOrFunction, REQUEST_METHODS } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE, API_GATEWAY_ITEM } from '@emmvish/stable-infra';

// Define types for requests
interface FetchRequestData {}
interface FetchResponse {
  users: Array<{ id: number; name: string }>;
  posts: Array<{ id: number; title: string }>;
}

// Define types for functions
type ProcessArgs = [FetchResponse];
type ProcessResult = {
  merged: Array<{ userId: number; userName: string; postTitle: string }>;
};

type AuditArgs = [ProcessResult, string];
type AuditResult = { logged: boolean; timestamp: string };

const phases: STABLE_WORKFLOW_PHASE<FetchRequestData, FetchResponse, ProcessArgs | AuditArgs, ProcessResult | AuditResult>[] = [
  {
    id: 'fetch-data',
    requests: [
      {
        id: 'get-users-posts',
        requestOptions: {
          reqData: {
            hostname: 'api.example.com',
            path: '/users-and-posts'
          },
          resReq: true,
          attempts: 3
        }
      }
    ]
  },
  {
    id: 'process-and-audit',
    markConcurrentPhase: true,
    items: [
      {
        type: RequestOrFunction.FUNCTION,
        function: {
          id: 'process-data',
          functionOptions: {
            fn: (data: FetchResponse): ProcessResult => ({
              merged: data.users.map((user, idx) => ({
                userId: user.id,
                userName: user.name,
                postTitle: data.posts[idx]?.title || 'No post'
              }))
            }),
            args: [{ users: [], posts: [] }] as ProcessArgs,
            returnResult: true
          }
        }
      },
      {
        type: RequestOrFunction.FUNCTION,
        function: {
          id: 'audit-processing',
          functionOptions: {
            fn: async (result: ProcessResult, auditId: string): Promise<AuditResult> => {
              console.log(`Audit ${auditId}:`, result);
              return { logged: true, timestamp: new Date().toISOString() };
            },
            args: [{ merged: [] }, 'audit-123'] as AuditArgs,
            returnResult: true
          }
        }
      }
    ],
    phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
      if (!phaseResult.success) {
        return { action: PHASE_DECISION_ACTIONS.TERMINATE };
      }
      return { action: PHASE_DECISION_ACTIONS.CONTINUE };
    }
  },
  {
    id: 'finalize',
    requests: [
      {
        id: 'store-result',
        requestOptions: {
          reqData: {
            hostname: 'api.example.com',
            path: '/store',
            method: REQUEST_METHODS.POST
          },
          resReq: false
        }
      }
    ]
  }
];

const result = await stableWorkflow(phases, {
  workflowId: 'data-pipeline',
  concurrentPhaseExecution: false, // Phases sequential
  enableNonLinearExecution: true,
  sharedBuffer: { userId: '123' },
  commonAttempts: 2,
  commonWait: 200,
  handlePhaseCompletion: ({ phaseResult, workflowId }) => {
    console.log(`Phase ${phaseResult.phaseId} complete in workflow ${workflowId}`);
  }
});

console.log(`Workflow succeeded: ${result.success}, phases: ${result.totalPhases}`);

Key responsibilities:

  • Execute phases sequentially or concurrently
  • Support mixed requests and functions per phase
  • Enable non-linear flow (CONTINUE, SKIP, REPLAY, JUMP, TERMINATE)
  • Maintain shared buffer across all phases
  • Apply phase-level and request-level config cascading
  • Support branching with parallel/sequential branches
  • Collect per-phase metrics and workflow aggregates

stableWorkflowGraph

DAG-based execution for higher parallelism and explicit phase dependencies.

import { stableWorkflowGraph, WorkflowGraphBuilder } from '@emmvish/stable-infra';

const graph = new WorkflowGraphBuilder()
  .addPhase('fetch-posts', {
    requests: [{
      id: 'get-posts',
      requestOptions: {
        reqData: { hostname: 'api.example.com', path: '/posts' },
        resReq: true
      }
    }]
  })
  .addPhase('fetch-users', {
    requests: [{
      id: 'get-users',
      requestOptions: {
        reqData: { hostname: 'api.example.com', path: '/users' },
        resReq: true
      }
    }]
  })
  .addParallelGroup('fetch-all', ['fetch-posts', 'fetch-users'])
  .addPhase('aggregate', {
    functions: [{
      id: 'combine',
      functionOptions: {
        fn: () => ({ posts: [], users: [] }),
        args: [],
        returnResult: true
      }
    }]
  })
  .addMergePoint('sync', ['fetch-all'])
  .connectSequence('fetch-all', 'sync', 'aggregate')
  .setEntryPoint('fetch-all')
  .build();

const result = await stableWorkflowGraph(graph, {
  workflowId: 'data-aggregation'
});

console.log(`Graph workflow success: ${result.success}`);

Key responsibilities:

  • Define phases as DAG nodes with explicit dependency edges
  • Execute independent phases in parallel automatically
  • Support parallel groups, merge points, and conditional routing
  • Validate graph structure (cycle detection, reachability, orphan detection)
  • Provide deterministic execution order
  • Offer higher parallelism than phased workflows for complex topologies

StableScheduler

Queue-based scheduler for cron/interval/timestamp execution with concurrency limits and recoverable state via custom persistence handlers.

Key responsibilities:

  • Enforce max-parallel job execution
  • Schedule jobs with cron, interval, or timestamp(s)
  • Persist and restore scheduler state via user-provided handlers

StableBuffer

Transactional, concurrency-safe shared state. It’s opt-in: pass a StableBuffer instance as commonBuffer or sharedBuffer to serialize updates across concurrent executions.

Key features:

  • Serialized transactions via FIFO queue
  • Snapshot reads with read()
  • Optional transaction timeouts
  • Optional transaction logging with logTransaction
import { StableBuffer } from '@emmvish/stable-infra';

const buffer = new StableBuffer({
  initialState: { counter: 0 },
  transactionTimeoutMs: 500,
  logTransaction: (log) => {
    // persist log.transactionId, log.activity, log.hookName, log.stateBefore, log.stateAfter
  }
});

await buffer.run(
  (state) => { state.counter += 1; },
  { activity: 'workflow-phase', hookName: 'phase-1', workflowId: 'wf-1' }
);

Replay utility (transaction logs → deterministic state replay):

import { replayStableBufferTransactions } from '@emmvish/stable-infra';

const replay = await replayStableBufferTransactions({
  logs, // StableBufferTransactionLog[]
  handlers: {
    'phase-1': (state) => { state.counter += 1; }
  },
  initialState: { counter: 0 }
});

console.log(replay.buffer.getState());

Distributed Infrastructure

Multi-node coordination via a shared backend (Redis, PostgreSQL, etcd, etc.). Nodes connect independently to the backend — no peer-to-peer discovery or IP exchange required. All coordination goes through the DistributedCoordinator, backed by a pluggable DistributedAdapter.

Key capabilities: distributed locking with fencing tokens, compare-and-swap (CAS), quorum-based leader election, pub/sub with delivery guarantees (at-most-once / at-least-once / exactly-once), two-phase commit transactions, distributed buffers, and distributed schedulers.

import { DistributedCoordinator, InMemoryDistributedAdapter } from '@emmvish/stable-infra';

const coordinator = new DistributedCoordinator({
  adapter: new InMemoryDistributedAdapter('node-1'), // Use Redis/Postgres adapter in production
  namespace: 'my-app',
  enableLeaderElection: true,
  leaderHeartbeatMs: 5000,
});

await coordinator.connect();

// Distributed lock with fencing token
const lock = await coordinator.acquireLock({ resource: 'order:123', ttlMs: 30000 });

// Leader election — nodes register via shared backend, no IP discovery needed
await coordinator.registerForElection('scheduler-leader');
await coordinator.campaignForLeader({ electionKey: 'scheduler-leader' });

// Shared state & pub/sub across nodes
await coordinator.setState('user:123', { balance: 100 });
await coordinator.publish('events', { type: 'ORDER_CREATED', orderId: 123 });

An InMemoryDistributedAdapter is included for testing and single-instance use. For production multi-node deployments, implement the DistributedAdapter interface with your backend of choice.

Distributed Buffer — Cross-Node StableBuffer

Wrap a StableBuffer with distributed sync so transactions on one node propagate to all others via pub/sub. Conflict resolution is configurable.

import {
  createDistributedStableBuffer,
  DistributedConflictResolution,
  withDistributedBufferLock
} from '@emmvish/stable-infra';

const { buffer, sync, refresh, disconnect } = await createDistributedStableBuffer({
  distributed: { adapter, namespace: 'my-app' },
  initialState: { counter: 0, items: [] },
  conflictResolution: DistributedConflictResolution.LAST_WRITE_WINS, // or MERGE, CUSTOM
  syncOnTransaction: true // auto-push state after each buffer.run()
});

// Use exactly like a local StableBuffer — sync happens automatically
await buffer.run(state => { state.counter += 1; });

// Acquire a distributed lock for critical sections
await withDistributedBufferLock({ buffer, coordinator }, async () => {
  await buffer.run(state => { state.counter -= 100; }); // exclusive cross-node access
}, { ttlMs: 10000 });

await refresh();    // Force pull latest state from remote
await disconnect();

Distributed Resilience — Shared Circuit Breaker, Rate Limiter, Cache

Every resilience component (CircuitBreaker, RateLimiter, ConcurrencyLimiter, CacheManager) supports a persistence interface. The distributed layer provides an implementation backed by coordinator.getState/setState, so component state (failure counts, open/closed status, rate windows) is synchronized across nodes.

Create them individually or as a bundle:

import { createDistributedInfrastructureBundle } from '@emmvish/stable-infra';

// One coordinator, all components share the same backend
const infra = await createDistributedInfrastructureBundle({
  distributed: { adapter, namespace: 'my-service' },
  circuitBreaker: {
    failureThresholdPercentage: 50,
    minimumRequests: 10,
    recoveryTimeoutMs: 30000
  },
  rateLimiter: { maxRequests: 1000, windowMs: 60000 },
  concurrencyLimiter: { limit: 50 },
  cacheManager: { enabled: true, ttl: 300000 }
});

// Plug directly into any core module via sharedInfrastructure
const scheduler = new StableScheduler({
  maxParallel: 10,
  sharedInfrastructure: {
    circuitBreaker: infra.circuitBreaker,   // shared state across nodes
    rateLimiter: infra.rateLimiter,
    concurrencyLimiter: infra.concurrencyLimiter,
    cacheManager: infra.cacheManager
  }
}, handler);

await infra.disconnect(); // cleanup

Standalone factories are also available: createDistributedCircuitBreaker(), createDistributedRateLimiter(), createDistributedConcurrencyLimiter(), createDistributedCacheManager().

Distributed Scheduler — Leader-Based Execution

Wrap StableScheduler so only the elected leader node processes jobs, with distributed state persistence and shared resilience infrastructure.

import { runAsDistributedScheduler } from '@emmvish/stable-infra';

const runner = await runAsDistributedScheduler({
  distributed: { adapter, namespace: 'workers' },
  scheduler: { maxParallel: 5 },
  circuitBreaker: { failureThresholdPercentage: 50, minimumRequests: 10, recoveryTimeoutMs: 30000 },
  rateLimiter: { maxRequests: 100, windowMs: 60000 },
  createScheduler: (config) => new StableScheduler(config, async (job) => {
    await processJob(job);
  })
});

await runner.start();    // Campaigns for leadership, starts scheduler when elected
runner.isLeader();       // Check current status
await runner.stop();     // Graceful shutdown, resigns leadership

Internally, scheduler state (job queue, execution history) is persisted via the coordinator. When the leader crashes or resigns, a new leader is elected (after the previous leader’s lease expires). The new leader automatically restores state from the coordinator and runs remaining jobs, so work resumes from where the previous leader left off.

Multi-Node Integration Example

Combining distributed buffer, shared circuit breaker, leader election, pub/sub, locking, and workflows:

import {
  createDistributedStableBuffer, createDistributedSchedulerConfig,
  DistributedCoordinator, DistributedConflictResolution,
  DistributedTransactionOperationType, withDistributedBufferLock,
  StableScheduler
} from '@emmvish/stable-infra';

// 1. Distributed buffer with custom conflict resolution
const { buffer: sharedBuffer } = await createDistributedStableBuffer({
  distributed: { adapter, namespace: 'orders' },
  initialState: { orderCount: 0, totalRevenue: 0, failedOrders: [] },
  conflictResolution: DistributedConflictResolution.CUSTOM,
  mergeStrategy: (local, remote) => ({
    orderCount: Math.max(local.orderCount, remote.orderCount),
    totalRevenue: Math.max(local.totalRevenue, remote.totalRevenue),
    failedOrders: [...new Set([...local.failedOrders, ...remote.failedOrders])]
  }),
  syncOnTransaction: true
});

// 2. Distributed scheduler with leader election + circuit breaker
const setup = await createDistributedSchedulerConfig({
  distributed: { adapter, namespace: 'orders' },
  scheduler: { maxParallel: 10 },
  enableLeaderElection: true,
  circuitBreaker: { failureThresholdPercentage: 40, minimumRequests: 5, recoveryTimeoutMs: 30000 }
});

// 3. Job handler — lock, process, update shared state atomically
const orderHandler = async (job) => {
  const lock = await coordinator.acquireLock({
    resource: `order:${job.orderId}:processing`, ttlMs: 60000
  });
  try {
    await processOrderWorkflow(job); // stableWorkflow under the hood

    // Update shared buffer with distributed lock
    await withDistributedBufferLock({ buffer: sharedBuffer, coordinator }, async () => {
      await sharedBuffer.run(state => {
        state.orderCount += 1;
        state.totalRevenue += job.total;
      });
    });

    // Publish completion for other services
    await coordinator.publish('order-events', { type: 'COMPLETED', orderId: job.orderId });
  } catch (error) {
    // Atomic error handling via 2PC transaction
    await coordinator.executeTransaction([
      { type: DistributedTransactionOperationType.SET, key: `order:${job.orderId}:status`, value: { status: 'FAILED' } },
      { type: DistributedTransactionOperationType.INCREMENT, key: 'metrics:failed-orders', delta: 1 }
    ]);
  } finally {
    await coordinator.releaseLock(lock.handle);
  }
};

// 4. Start — only the leader processes jobs
const scheduler = new StableScheduler({ ...setup.config, sharedBuffer }, orderHandler);
const isLeader = await setup.waitForLeadership(30000);
if (isLeader) scheduler.start();

Stable Runner

Config-driven runner that executes core module jobs from JSON/ESM configs and can use StableScheduler for scheduled jobs.


Resilience Mechanisms

Execution Timeouts

Set maximum execution time for functions to prevent indefinite hangs. Timeouts are enforced at multiple levels with proper inheritance.

You can also set a workflow/gateway-level maxTimeout to cap total execution time (applies to stableWorkflow, stableWorkflowGraph, and stableApiGateway).

Function-Level Timeout

Set timeout directly on a function:

import { stableFunction } from '@emmvish/stable-infra';

const result = await stableFunction({
  fn: async () => {
    // Long-running operation
    await processLargeDataset();
    return 'success';
  },
  args: [],
  returnResult: true,
  executionTimeout: 5000, // 5 seconds max
  attempts: 3,
});

if (!result.success && result.error?.includes('timeout')) {
  console.log('Function timed out');
}

Gateway-Level Timeout

Apply timeout to all functions in a gateway:

import { stableApiGateway, RequestOrFunction } from '@emmvish/stable-infra';

const results = await stableApiGateway(
  [
    {
      type: RequestOrFunction.FUNCTION,
      function: {
        id: 'task1',
        functionOptions: {
          fn: async () => await task1(),
          args: [],
          // No timeout specified - inherits from gateway
        },
      },
    },
    {
      type: RequestOrFunction.FUNCTION,
      function: {
        id: 'task2',
        functionOptions: {
          fn: async () => await task2(),
          args: [],
          executionTimeout: 10000, // Override gateway timeout
        },
      },
    },
  ],
  {
    commonExecutionTimeout: 3000, // Default 3s for all functions
  }
);

Request Group Timeout

Different timeouts for different groups:

const results = await stableApiGateway(
  [
    {
      type: RequestOrFunction.FUNCTION,
      function: {
        id: 'critical',
        groupId: 'criticalOps',
        functionOptions: { fn: criticalOp, args: [] },
      },
    },
    {
      type: RequestOrFunction.FUNCTION,
      function: {
        id: 'background',
        groupId: 'backgroundOps',
        functionOptions: { fn: backgroundOp, args: [] },
      },
    },
  ],
  {
    requestGroups: [
      {
        id: 'criticalOps',
        commonConfig: {
          commonExecutionTimeout: 1000, // Strict 1s timeout
        },
      },
      {
        id: 'backgroundOps',
        commonConfig: {
          commonExecutionTimeout: 30000, // Lenient 30s timeout
        },
      },
    ],
  }
);

Workflow Phase Timeout

Apply timeout at phase level in workflows:

import { stableWorkflow } from '@emmvish/stable-infra';

const result = await stableWorkflow(
  [
    {
      id: 'initialization',
      functions: [
        {
          id: 'init',
          functionOptions: {
            fn: initializeSystem,
            args: [],
          },
        },
      ],
      commonConfig: {
        commonExecutionTimeout: 5000, // 5s for initialization
      },
    },
    {
      id: 'processing',
      functions: [
        {
          id: 'process',
          functionOptions: {
            fn: processData,
            args: [],
          },
        },
      ],
      commonConfig: {
        commonExecutionTimeout: 30000, // 30s for processing
      },
    },
  ],
  {
    commonExecutionTimeout: 10000, // Default for phases without specific timeout
  }
);

Timeout Precedence

Timeouts follow the configuration cascade pattern:

Function > Group > Phase/Branch > Gateway

  • Function-level executionTimeout always wins
  • If not set, inherits from request group's commonExecutionTimeout
  • If not set, inherits from phase/branch's commonExecutionTimeout
  • If not set, inherits from gateway's commonExecutionTimeout
  • If not set, no timeout is applied

Timeout Behavior

  • Timeout applies to entire function execution including all retry attempts
  • When timeout is exceeded, function returns failed result with timeout error
  • Timeout does NOT stop execution mid-flight (no AbortController)
  • Metrics are still collected even when timeout occurs
  • Use with retries: timeout encompasses all attempts, not per-attempt
const result = await stableFunction({
  fn: slowFunction,
  args: [],
  attempts: 5,
  wait: 1000,
  executionTimeout: 3000, // Total time for all 5 attempts
});

// If each attempt takes 800ms:
// - Attempt 1: 800ms
// - Attempt 2: starts at 1800ms (after 1s wait)
// - Attempt 3: would start at 3600ms → TIMEOUT at 3000ms

Retry Strategies

When a request or function fails and is retryable, retry with configurable backoff.

FIXED Strategy

Constant wait between retries.

import { stableRequest, RETRY_STRATEGIES } from '@emmvish/stable-infra';

interface DataRequest {}
interface DataResponse { data: any; }

const result = await stableRequest<DataRequest, DataResponse>({
  reqData: { hostname: 'api.example.com', path: '/data' },
  resReq: true,
  attempts: 4,
  wait: 500,
  retryStrategy: RETRY_STRATEGIES.FIXED
  // Retries at: 500ms, 1000ms, 1500ms
});

LINEAR Strategy

Wait increases linearly with attempt number.

const result = await stableRequest<DataRequest, DataResponse>({
  reqData: { hostname: 'api.example.com', path: '/data' },
  resReq: true,
  attempts: 4,
  wait: 100,
  retryStrategy: RETRY_STRATEGIES.LINEAR
  // Retries at: 100ms, 200ms, 300ms (wait * attempt)
});

EXPONENTIAL Strategy

Wait increases exponentially; useful for heavily loaded services.

const result = await stableRequest<DataRequest, DataResponse>({
  reqData: { hostname: 'api.example.com', path: '/data' },
  resReq: true,
  attempts: 4,
  wait: 100,
  maxAllowedWait: 10000,
  retryStrategy: RETRY_STRATEGIES.EXPONENTIAL
  // Retries at: 100ms, 200ms, 400ms (wait * 2^(attempt-1))
  // Capped at maxAllowedWait
});

Jitter

Add random milliseconds to prevent synchronization.

const result = await stableRequest<DataRequest, DataResponse>({
  reqData: { hostname: 'api.example.com', path: '/data' },
  resReq: true,
  attempts: 3,
  wait: 500,
  jitter: 200, // Add 0-200ms randomness
  retryStrategy: RETRY_STRATEGIES.EXPONENTIAL
});

Perform All Attempts

Collect all outcomes instead of failing on first error.

const result = await stableRequest<DataRequest, DataResponse>({
  reqData: { hostname: 'api.example.com', path: '/data' },
  resReq: true,
  attempts: 3,
  performAllAttempts: true
  // All 3 attempts execute; check result.successfulAttempts
});

Circuit Breaker

Prevent cascading failures by failing fast when a dependency becomes unhealthy.

import { stableApiGateway, CircuitBreaker } from '@emmvish/stable-infra';

interface FlakyRequest {}
interface FlakyResponse { status: string; }

const breaker = new CircuitBreaker({
  failureThresholdPercentage: 50,
  minimumRequests: 10,
  recoveryTimeoutMs: 30000,
  successThresholdPercentage: 80,
  halfOpenMaxRequests: 5
});

const requests = [
  { id: 'req-1', requestOptions: { reqData: { path: '/flaky' }, resReq: true } },
  { id: 'req-2', requestOptions: { reqData: { path: '/flaky' }, resReq: true } }
];

const responses = await stableApiGateway<FlakyRequest, FlakyResponse>(requests, {
  circuitBreaker: breaker
});

// Circuit breaker states:
// CLOSED: Normal operation (accept all requests)
// OPEN: Too many failures; reject immediately
// HALF_OPEN: Testing recovery; allow limited requests

State Transitions:

  • CLOSED → OPEN: Failure rate exceeds threshold after minimum requests
  • OPEN → HALF_OPEN: Recovery timeout elapsed; attempt recovery
  • HALF_OPEN → CLOSED: Success rate exceeds recovery threshold
  • HALF_OPEN → OPEN: Success rate below recovery threshold; reopen

Caching

Cache responses to avoid redundant calls.

import { stableRequest, CacheManager } from '@emmvish/stable-infra';

interface UserRequest {}
interface UserResponse {
  id: number;
  name: string;
  email: string;
}

const cache = new CacheManager({
  enabled: true,
  ttl: 5000 // 5 seconds
});

// First call: cache miss, hits API
const result1 = await stableRequest<UserRequest, UserResponse>({
  reqData: { hostname: 'api.example.com', path: '/user/1' },
  resReq: true,
  cache
});

// Second call within 5s: cache hit, returns cached response
const result2 = await stableRequest<UserRequest, UserResponse>({
  reqData: { hostname: 'api.example.com', path: '/user/1' },
  resReq: true,
  cache
});

// Respects Cache-Control headers if enabled
const cache2 = new CacheManager({
  enabled: true,
  ttl: 60000,
  respectCacheControl: true // Uses max-age, no-cache, no-store
});

Function Caching:

Arguments become cache key; identical args hit cache.

import { stableFunction } from '@emmvish/stable-infra';

const expensive = (x: number) => x * x * x; // Cubic calculation

const result1 = await stableFunction({
  fn: expensive,
  args: [5],
  returnResult: true,
  cache: { enabled: true, ttl: 10000 }
});

const result2 = await stableFunction({
  fn: expensive,
  args: [5], // Same args → cache hit
  returnResult: true,
  cache: { enabled: true, ttl: 10000 }
});

Rate Limiting

Enforce max requests per time window.

import { stableApiGateway } from '@emmvish/stable-infra';

interface ItemRequest {}
interface ItemResponse {
  id: number;
  data: any;
}

const requests = Array.from({ length: 20 }, (_, i) => ({
  id: `req-${i}`,
  requestOptions: {
    reqData: { path: `/item/${i}` },
    resReq: true
  }
}));

const responses = await stableApiGateway<ItemRequest, ItemResponse>(requests, {
  concurrentExecution: true,
  rateLimit: {
    maxRequests: 5,
    windowMs: 1000 // 5 requests per second
  }
  // Requests queued until window allows; prevents overwhelming API
});

Concurrency Limiting

Limit concurrent in-flight requests.

import { stableApiGateway } from '@emmvish/stable-infra';

interface ItemRequest {}
interface ItemResponse {
  id: number;
  data: any;
}

const requests = Array.from({ length: 50 }, (_, i) => ({
  id: `req-${i}`,
  requestOptions: {
    reqData: { path: `/item/${i}` },
    resReq: true,
    attempts: 1
  }
}));

const responses = await stableApiGateway<ItemRequest, ItemResponse>(requests, {
  concurrentExecution: true,
  maxConcurrentRequests: 5 // Only 5 requests in-flight at a time
  // Others queued and executed as slots free
});

Workflow Patterns

Sequential & Concurrent Phases

Sequential (Default)

Each phase waits for the previous to complete.

import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'phase-1',
    requests: [{ id: 'r1', requestOptions: { reqData: { path: '/p1' }, resReq: true } }]
  },
  {
    id: 'phase-2',
    requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
  },
  {
    id: 'phase-3',
    requests: [{ id: 'r3', requestOptions: { reqData: { path: '/p3' }, resReq: true } }]
  }
];

const result = await stableWorkflow(phases, {
  workflowId: 'sequential-phases',
  concurrentPhaseExecution: false // Phase-1 → Phase-2 → Phase-3
});

Concurrent Phases

Multiple phases run in parallel.

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'fetch-users',
    requests: [{ id: 'get-users', requestOptions: { reqData: { path: '/users' }, resReq: true } }]
  },
  {
    id: 'fetch-posts',
    requests: [{ id: 'get-posts', requestOptions: { reqData: { path: '/posts' }, resReq: true } }]
  },
  {
    id: 'fetch-comments',
    requests: [{ id: 'get-comments', requestOptions: { reqData: { path: '/comments' }, resReq: true } }]
  }
];

const result = await stableWorkflow(phases, {
  workflowId: 'parallel-phases',
  concurrentPhaseExecution: true // All 3 phases in parallel
});

Mixed Phases

Combine sequential and concurrent phases in one workflow.

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'init', // Sequential
    requests: [{ id: 'setup', requestOptions: { reqData: { path: '/init' }, resReq: true } }]
  },
  {
    id: 'fetch-a',
    markConcurrentPhase: true, // Concurrent with next
    requests: [{ id: 'data-a', requestOptions: { reqData: { path: '/a' }, resReq: true } }]
  },
  {
    id: 'fetch-b',
    markConcurrentPhase: true, // Concurrent with fetch-a
    requests: [{ id: 'data-b', requestOptions: { reqData: { path: '/b' }, resReq: true } }]
  },
  {
    id: 'finalize', // Sequential after fetch-a/b complete
    requests: [{ id: 'done', requestOptions: { reqData: { path: '/finalize' }, resReq: true } }]
  }
];

const result = await stableWorkflow(phases, {
  concurrentPhaseExecution: false // Respects markConcurrentPhase per phase
});

Non-Linear Workflows

Use decision hooks to dynamically control phase flow.

CONTINUE

Standard flow to next sequential phase.

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'check-status',
    requests: [{ id: 'api', requestOptions: { reqData: { path: '/status' }, resReq: true } }],
    phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
      return { action: PHASE_DECISION_ACTIONS.CONTINUE };
    }
  },
  {
    id: 'process', // Executes after check-status
    requests: [{ id: 'process-data', requestOptions: { reqData: { path: '/process' }, resReq: true } }]
  }
];

const result = await stableWorkflow(phases, {
  enableNonLinearExecution: true
});

SKIP

Skip the next phase; execute the one after.

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'phase-1',
    requests: [{ id: 'r1', requestOptions: { reqData: { path: '/p1' }, resReq: true } }],
    phaseDecisionHook: async () => ({
      action: PHASE_DECISION_ACTIONS.SKIP
    })
  },
  {
    id: 'phase-2', // Skipped
    requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
  },
  {
    id: 'phase-3', // Executes
    requests: [{ id: 'r3', requestOptions: { reqData: { path: '/p3' }, resReq: true } }]
  }
];

const result = await stableWorkflow(phases, {
  enableNonLinearExecution: true
});

// Execution: phase-1 → phase-3

JUMP

Jump to a specific phase by ID.

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'phase-1',
    requests: [{ id: 'r1', requestOptions: { reqData: { path: '/p1' }, resReq: true } }],
    phaseDecisionHook: async () => ({
      action: PHASE_DECISION_ACTIONS.JUMP,
      targetPhaseId: 'recovery'
    })
  },
  {
    id: 'phase-2', // Skipped
    requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
  },
  {
    id: 'recovery',
    requests: [{ id: 'recover', requestOptions: { reqData: { path: '/recovery' }, resReq: true } }]
  }
];

const result = await stableWorkflow(phases, {
  enableNonLinearExecution: true
});

// Execution: phase-1 → recovery

REPLAY

Re-execute current phase; useful for polling.

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'wait-for-job',
    allowReplay: true,
    maxReplayCount: 5,
    requests: [
      {
        id: 'check-job',
        requestOptions: { reqData: { path: '/job/status' }, resReq: true, attempts: 1 }
      }
    ],
    phaseDecisionHook: async ({ phaseResult, executionHistory }) => {
      const lastResponse = phaseResult.responses?.[0];
      if ((lastResponse as any)?.data?.status === 'pending' && executionHistory.length < 5) {
        return { action: PHASE_DECISION_ACTIONS.REPLAY };
      }
      return { action: PHASE_DECISION_ACTIONS.CONTINUE };
    }
  },
  {
    id: 'process-result',
    requests: [{ id: 'process', requestOptions: { reqData: { path: '/process' }, resReq: true } }]
  }
];

const result = await stableWorkflow(phases, {
  enableNonLinearExecution: true,
  maxWorkflowIterations: 100
});

// Polls up to 5 times before continuing

TERMINATE

Stop workflow early.

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'validate',
    requests: [{ id: 'validate-input', requestOptions: { reqData: { path: '/validate' }, resReq: true } }],
    phaseDecisionHook: async ({ phaseResult, sharedBuffer }) => {
      if (!phaseResult.success) {
        return { action: PHASE_DECISION_ACTIONS.TERMINATE };
      }
      return { action: PHASE_DECISION_ACTIONS.CONTINUE };
    }
  },
  {
    id: 'phase-2', // Won't execute if validation fails
    requests: [{ id: 'r2', requestOptions: { reqData: { path: '/p2' }, resReq: true } }]
  }
];

const result = await stableWorkflow(phases, {
  enableNonLinearExecution: true
});

console.log(result.terminatedEarly); // true if TERMINATE triggered

Branched Workflows

Execute multiple independent branches with shared state.

import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_BRANCH } from '@emmvish/stable-infra';

const branches: STABLE_WORKFLOW_BRANCH[] = [
  {
    id: 'branch-payment',
    phases: [
      {
        id: 'process-payment',
        requests: [
          {
            id: 'charge-card',
            requestOptions: {
              reqData: { path: '/payment/charge' },
              resReq: true
            }
          }
        ]
      }
    ]
  },
  {
    id: 'branch-notification',
    phases: [
      {
        id: 'send-email',
        requests: [
          {
            id: 'send',
            requestOptions: {
              reqData: { path: '/notify/email' },
              resReq: false
            }
          }
        ]
      }
    ]
  }
];

const result = await stableWorkflow([], {
  workflowId: 'checkout',
  enableBranchExecution: true,
  branches,
  sharedBuffer: { orderId: '12345' },
  markConcurrentBranch: true // Branches run in parallel
});

// Both branches access/modify sharedBuffer

Branch Racing

When multiple branches execute concurrently, enable racing to accept the first successful branch and cancel others.

const result = await stableWorkflow([], {
  workflowId: 'payment-racing',
  enableBranchExecution: true,
  enableBranchRacing: true, // First successful branch wins
  branches: [
    {
      id: 'payment-provider-a',
      phases: [/* ... */]
    },
    {
      id: 'payment-provider-b',
      phases: [/* ... */]
    }
  ],
  markConcurrentBranch: true
});

// Only winning branch's execution history recorded
// Losing branches marked as cancelled

Graph-based Workflow Patterns

Key responsibilities:

  • Define phases as DAG nodes with explicit dependency edges
  • Execute independent phases in parallel automatically
  • Support parallel groups, merge points, and conditional routing
  • Validate graph structure (cycle detection, reachability, orphan detection)
  • Provide deterministic execution order
  • Offer higher parallelism than phased workflows for complex topologies

Graph-Based Workflows with Mixed Items

For complex topologies with explicit dependencies, use DAG execution mixing requests and functions.

import { stableWorkflowGraph, WorkflowGraphBuilder, RequestOrFunction } from '@emmvish/stable-infra';
import type { API_GATEWAY_ITEM } from '@emmvish/stable-infra';

// Request types
interface PostsRequest {}
interface PostsResponse { posts: Array<{ id: number; title: string }> };

interface UsersRequest {}
interface UsersResponse { users: Array<{ id: number; name: string }> };

// Function types
type AggregateArgs = [PostsResponse, UsersResponse];
type AggregateResult = {
  combined: Array<{ userId: number; userName: string; postCount: number }>;
};

type AnalyzeArgs = [AggregateResult];
type AnalyzeResult = { totalPosts: number; activeUsers: number };

const graph = new WorkflowGraphBuilder<
  PostsRequest | UsersRequest,
  PostsResponse | UsersResponse,
  AggregateArgs | AnalyzeArgs,
  AggregateResult | AnalyzeResult
>()
  .addPhase('fetch-posts', {
    requests: [{
      id: 'get-posts',
      requestOptions: {
        reqData: { path: '/posts' },
        resReq: true
      }
    }]
  })
  .addPhase('fetch-users', {
    requests: [{
      id: 'get-users',
      requestOptions: {
        reqData: { path: '/users' },
        resReq: true
      }
    }]
  })
  .addParallelGroup('fetch-all', ['fetch-posts', 'fetch-users'])
  .addPhase('aggregate', {
    functions: [{
      id: 'combine-data',
      functionOptions: {
        fn: (posts: PostsResponse, users: UsersResponse): AggregateResult => ({
          combined: users.users.map(user => ({
            userId: user.id,
            userName: user.name,
            postCount: posts.posts.filter(p => p.id === user.id).length
          }))
        }),
        args: [{ posts: [] }, { users: [] }] as AggregateArgs,
        returnResult: true
      }
    }]
  })
  .addPhase('analyze', {
    functions: [{
      id: 'analyze-data',
      functionOptions: {
        fn: (aggregated: AggregateResult): AnalyzeResult => ({
          totalPosts: aggregated.combined.reduce((sum, u) => sum + u.postCount, 0),
          activeUsers: aggregated.combined.filter(u => u.postCount > 0).length
        }),
        args: [{ combined: [] }] as AnalyzeArgs,
        returnResult: true
      }
    }]
  })
  .addMergePoint('sync', ['fetch-all'])
  .connectSequence('fetch-all', 'sync', 'aggregate', 'analyze')
  .setEntryPoint('fetch-all')
  .build();

const result = await stableWorkflowGraph(graph, {
  workflowId: 'data-aggregation'
});

console.log(`Graph workflow success: ${result.success}`);

Parallel Phase Execution

Execute multiple phases concurrently within a group.

import { stableWorkflowGraph, WorkflowGraphBuilder } from '@emmvish/stable-infra';

const graph = new WorkflowGraphBuilder()
  .addPhase('fetch-users', {
    requests: [{
      id: 'users',
      requestOptions: { reqData: { path: '/users' }, resReq: true }
    }]
  })
  .addPhase('fetch-posts', {
    requests: [{
      id: 'posts',
      requestOptions: { reqData: { path: '/posts' }, resReq: true }
    }]
  })
  .addPhase('fetch-comments', {
    requests: [{
      id: 'comments',
      requestOptions: { reqData: { path: '/comments' }, resReq: true }
    }]
  })
  .addParallelGroup('data-fetch', ['fetch-users', 'fetch-posts', 'fetch-comments'])
  .setEntryPoint('data-fetch')
  .build();

const result = await stableWorkflowGraph(graph, {
  workflowId: 'data-aggregation'
});

// All 3 phases run concurrently

Merge Points

Synchronize multiple predecessor phases.

const graph = new WorkflowGraphBuilder()
  .addPhase('fetch-a', {
    requests: [{ id: 'a', requestOptions: { reqData: { path: '/a' }, resReq: true } }]
  })
  .addPhase('fetch-b', {
    requests: [{ id: 'b', requestOptions: { reqData: { path: '/b' }, resReq: true } }]
  })
  .addMergePoint('sync', ['fetch-a', 'fetch-b'])
  .addPhase('aggregate', {
    functions: [{
      id: 'combine',
      functionOptions: {
        fn: () => 'combined',
        args: [],
        returnResult: true
      }
    }]
  })
  .connectSequence('fetch-a', 'sync')
  .connectSequence('fetch-b', 'sync')
  .connectSequence('sync', 'aggregate')
  .setEntryPoint('fetch-a')
  .build();

const result = await stableWorkflowGraph(graph, {
  workflowId: 'parallel-sync'
});

// fetch-a and fetch-b run in parallel
// aggregate waits for both to complete

Linear Helper

Convenience function for sequential phase chains.

import { createLinearWorkflowGraph } from '@emmvish/stable-infra';

const phases = [
  {
    id: 'init',
    requests: [{ id: 'setup', requestOptions: { reqData: { path: '/init' }, resReq: true } }]
  },
  {
    id: 'process',
    requests: [{ id: 'do-work', requestOptions: { reqData: { path: '/work' }, resReq: true } }]
  },
  {
    id: 'finalize',
    requests: [{ id: 'cleanup', requestOptions: { reqData: { path: '/cleanup' }, resReq: true } }]
  }
];

const graph = createLinearWorkflowGraph(phases);

const result = await stableWorkflowGraph(graph, {
  workflowId: 'linear-workflow'
});

Branch Racing in Graphs

Enable branch racing in workflow graphs to accept the first successful branch node when multiple branches are executed in parallel.

import { stableWorkflowGraph, WorkflowGraphBuilder } from '@emmvish/stable-infra';

const branch1 = {
  id: 'provider-a',
  phases: [{ /* ... */ }]
};

const branch2 = {
  id: 'provider-b',
  phases: [{ /* ... */ }]
};

const graph = new WorkflowGraphBuilder()
  .addBranch('provider-a', branch1)
  .addBranch('provider-b', branch2)
  .addParallelGroup('race', ['provider-a', 'provider-b'])
  .setEntryPoint('race')
  .build();

const result = await stableWorkflowGraph(graph, {
  workflowId: 'provider-racing',
  enableBranchRacing: true // First successful branch wins
});

// Only winning branch's results recorded
// Losing branch marked as cancelled

Configuration & State

Config Cascading

Define defaults globally; override at group, phase, branch, or item level.

import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'phase-1',
    attempts: 5, // Override global attempts for this phase
    wait: 1000,
    requests: [
      {
        id: 'req-1',
        requestOptions: {
          reqData: { path: '/data' },
          resReq: true,
          attempts: 2 // Override phase attempts for this item
        }
      }
    ]
  }
];

const result = await stableWorkflow(phases, {
  workflowId: 'cascade-demo',
  commonAttempts: 1, // Global default
  commonWait: 500,
  retryStrategy: 'LINEAR' // Global default
  // Final config per item: merge common → phase → request
});

Hierarchy: global → group → phase → branch → item. Lower levels override.

Shared & State Buffers

Pass mutable state across phases, branches, and items. For concurrency-safe shared state, pass a StableBuffer instance instead of a plain object.

Shared Buffer (Workflow/Gateway)

import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'fetch',
    requests: [
      {
        id: 'user-data',
        requestOptions: {
          reqData: { path: '/users/1' },
          resReq: true,
          handleSuccessfulAttemptData: ({ successfulAttemptData, stableRequestOptions }) => {
            // Mutate shared buffer
            const sharedBuffer = (stableRequestOptions as any).sharedBuffer;
            sharedBuffer.userId = (successfulAttemptData.data as any).id;
          }
        }
      }
    ]
  },
  {
    id: 'use-shared-data',
    requests: [
      {
        id: 'dependent-call',
        requestOptions: {
          reqData: { path: '/user-posts' },
          resReq: true,
          preExecution: {
            preExecutionHook: async ({ stableRequestOptions, commonBuffer }) => {
              const sharedBuffer = (stableRequestOptions as any).sharedBuffer;
              console.log(`Using userId: ${sharedBuffer.userId}`);
            }
          }
        }
      }
    ]
  }
];

const result = await stableWorkflow(phases, {
  workflowId: 'shared-state-demo',
  sharedBuffer: {} // Mutable across phases
});

Common Buffer (Request Level)

import { stableRequest, PersistenceStage } from '@emmvish/stable-infra';

const commonBuffer = { transactionId: null };

const result = await stableRequest({
  reqData: { path: '/transaction/start' },
  resReq: true,
  commonBuffer,
  preExecution: {
    preExecutionHook: async ({ commonBuffer, stableRequestOptions }) => {
      // commonBuffer writable here
      commonBuffer.userId = '123';
    }
  },
  handleSuccessfulAttemptData: ({ successfulAttemptData }) => {
    // commonBuffer readable in handlers
    console.log(`Transaction for user ${commonBuffer.userId} done`);
  }
});

Hooks & Observability

Pre-Execution Hooks

Modify config or state before execution.

import { stableRequest } from '@emmvish/stable-infra';

interface SecureRequest {}
interface SecureResponse {
  data: any;
  token?: string;
}

const result = await stableRequest<SecureRequest, SecureResponse>({
  reqData: { path: '/secure-data' },
  resReq: true,
  preExecution: {
    preExecutionHook: async ({ inputParams, commonBuffer, stableRequestOptions }) => {
      // Dynamically fetch auth token
      const token = await getAuthToken();
      
      // Return partial config override
      return {
        reqData: {
          headers: { Authorization: `Bearer ${token}` }
        }
      };
    },
    preExecutionHookParams: { context: 'auth-fetch' },
    applyPreExecutionConfigOverride: true,
    continueOnPreExecutionHookFailure: false
  }
});

Analysis Hooks

Validate responses and errors.

Response Analyzer

import { stableRequest } from '@emmvish/stable-infra';

interface ResourceRequest {}
interface ApiResponse {
  id: number;
  status: 'active' | 'inactive';
}

const result = await stableRequest<ResourceRequest, ApiResponse>({
  reqData: { path: '/resource' },
  resReq: true,
  responseAnalyzer: ({ data, reqData, trialMode }) => {
    // Return true to accept, false to retry
    if (!data || typeof data !== 'object') return false;
    if (!('id' in data)) return false;
    if ((data as any).status !== 'active') return false;
    return true;
  }
});

Error Analyzer

Decide whether to suppress error gracefully.

import { stableRequest } from '@emmvish/stable-infra';

interface FeatureRequest {}
interface FeatureResponse {
  enabled: boolean;
  data?: any;
}

const result = await stableRequest<FeatureRequest, FeatureResponse>({
  reqData: { path: '/optional-feature' },
  resReq: true,
  finalErrorAnalyzer: ({ error, reqData, trialMode }) => {
    // Return true to suppress error and return failure result
    // Return false to throw error
    if (error.code === 'ECONNREFUSED') {
      console.warn('Service unavailable, continuing with fallback');
      return true; // Suppress, don't throw
    }
    return false; // Throw
  }
});

if (result.success) {
  console.log('Got data:', result.data);
} else {
  console.log('Service offline, but we continue');
}

Handler Hooks

Custom logging and processing.

Success Handler

import { stableRequest } from '@emmvish/stable-infra';

interface DataRequest {}
interface DataResponse {
  id: number;
  value: string;
}

const result = await stableRequest<DataRequest, DataResponse>({
  reqData: { path: '/data' },
  resReq: true,
  logAllSuccessfulAttempts: true,
  handleSuccessfulAttemptData: ({
    successfulAttemptData,
    reqData,
    maxSerializableChars,
    executionContext
  }) => {
    // Custom logging, metrics, state updates
    console.log(
      `Success in context ${executionContext.workflowId}`,
      `data:`,
      successfulAttemptData.data
    );
  }
});

Error Handler

const result = await stableRequest<DataRequest, DataResponse>({
  reqData: { path: '/data' },
  resReq: true,
  logAllErrors: true,
  handleErrors: ({ errorLog, reqData, executionContext }) => {
    // Custom error logging, alerting, retry logic
    console.error(
      `Error in ${executionContext.workflowId}:`,
      errorLog.errorMessage,
      `Retryable: ${errorLog.isRetryable}`
    );
  }
});

Phase Handlers (Workflow)

import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'phase-1',
    requests: [{ id: 'r1', requestOptions: { reqData: { path: '/data' }, resReq: true } }]
  }
];

const result = await stableWorkflow(phases, {
  workflowId: 'wf-handlers',
  handlePhaseCompletion: ({ phaseResult, workflowId }) => {
    console.log(`Phase ${phaseResult.phaseId} complete in ${workflowId}`);
  },
  handlePhaseError: ({ phaseResult, error, workflowId }) => {
    console.error(`Phase ${phaseResult.phaseId} failed:`, error);
  },
  handlePhaseDecision: ({ decision, phaseResult }) => {
    console.log(`Phase decision: ${decision.action}`);
  }
});

Decision Hooks

Dynamically determine workflow flow.

import { stableWorkflow, PHASE_DECISION_ACTIONS } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'fetch-data',
    requests: [{ id: 'api', requestOptions: { reqData: { path: '/data' }, resReq: true } }],
    phaseDecisionHook: async ({ phaseResult, sharedBuffer, executionHistory }) => {
      if (!phaseResult.success) {
        return { action: PHASE_DECISION_ACTIONS.TERMINATE };
      }
      if (phaseResult.responses[0].data?.needsRetry) {
        return { action: PHASE_DECISION_ACTIONS.REPLAY };
      }
      return { action: PHASE_DECISION_ACTIONS.CONTINUE };
    }
  }
];

const result = await stableWorkflow(phases, {
  enableNonLinearExecution: true
});

Metrics & Logging

Automatic metrics collection across all execution modes.

Request Metrics

import { stableRequest } from '@emmvish/stable-infra';

interface DataRequest {}
interface DataResponse { data: any; }

const result = await stableRequest<DataRequest, DataResponse>({
  reqData: { path: '/data' },
  resReq: true,
  attempts: 3
});

console.log(result.metrics); // {
//   totalAttempts: 2,
//   successfulAttempts: 1,
//   failedAttempts: 1,
//   totalExecutionTime: 450,
//   averageAttemptTime: 225,
//   infrastructureMetrics: {
//     circuitBreaker: { /* state, stats, config */ },
//     cache: { /* hits, misses, size */ },
//     rateLimiter: { /* limit, current rate */ },
//     concurrencyLimiter: { /* limit, in-flight */ }
//   },
//   validation: {
//     isValid: true,
//     anomalies: [],
//     validatedAt: '2026-01-20T...'
//   }
// }

API Gateway Metrics

import { stableApiGateway } from '@emmvish/stable-infra';
import type { API_GATEWAY_REQUEST } from '@emmvish/stable-infra';

interface ApiRequest {}
interface ApiResponse { data: any; }

const requests: API_GATEWAY_REQUEST<ApiRequest, ApiResponse>[] = [
  { id: 'req-1', requestOptions: { reqData: { path: '/data/1' }, resReq: true } },
  { id: 'req-2', requestOptions: { reqData: { path: '/data/2' }, resReq: true } },
  { id: 'req-3', requestOptions: { reqData: { path: '/data/3' }, resReq: true } }
];

const result = await stableApiGateway<ApiRequest, ApiResponse>(requests, {
  concurrentExecution: true,
  maxConcurrentRequests: 5
});

console.log(result.metrics); // {
//   totalRequests: 3,
//   successfulRequests: 3,
//   failedRequests: 0,
//   successRate: 100,
//   failureRate: 0,
//   executionTime: 450,              // Total execution time in ms
//   timestamp: '2026-01-20T...',     // ISO 8601 completion timestamp
//   throughput: 6.67,                // Requests per second
//   averageRequestDuration: 150,     // Average time per request in ms
//   requestGroups: [/* per-group stats */],
//   infrastructureMetrics: {
//     circuitBreaker: { /* state, stats, config */ },
//     cache: { /* hit rate, size, utilization */ },
//     rateLimiter: { /* throttle rate, queue length */ },
//     concurrencyLimiter: { /* utilization, queue */ }
//   },
//   validation: {
//     isValid: true,
//     anomalies: [],
//     validatedAt: '2026-01-20T...'
//   }
// }

Workflow Metrics

import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';

const phases: STABLE_WORKFLOW_PHASE[] = [
  { id: 'p1', requests: [{ id: 'r1', requestOptions: { reqData: { path: '/a' }, resReq: true } }] },
  { id: 'p2', requests: [{ id: 'r2', requestOptions: { reqData: { path: '/b' }, resReq: true } }] }
];

const result = await stableWorkflow(phases, {
  workflowId: 'wf-metrics'
});

console.log(result); // {
//   workflowId: 'wf-metrics',
//   success: true,
//   totalPhases: 2,
//   completedPhases: 2,
//   totalRequests: 2,
//   successfulRequests: 2,
//   failedRequests: 0,
//   workflowExecutionTime: 1200,
//   phases: [
//     { phaseId: 'p1', success: true, responses: [...], validation: {...}, ... },
//     { phaseId: 'p2', success: true, responses: [...], validation: {...}, ... }
//   ],
//   validation: {
//     isValid: true,
//     anomalies: [],
//     validatedAt: '2026-01-20T...'
//   }
// }

Structured Error Logs

const result = await stableRequest<DataRequest, DataResponse>({
  reqData: { path: '/flaky' },
  resReq: true,
  attempts: 3,
  logAllErrors: true,
  handleErrors: ({ errorLog }) => {
    console.log(errorLog); // {
    //   attempt: '1/3',
    //   type: 'NetworkError',
    //   error: 'ECONNREFUSED',
    //   isRetryable: true,
    //   timestamp: 1234567890
    // }
  }
});

if (result.errorLogs) {
  console.log(`${result.errorLogs.length} errors logged`);
}

Advanced Features

Trial Mode

Dry-run workflows without side effects; simulate failures.

import { stableWorkflow } from '@emmvish/stable-infra';
import type { STABLE_WORKFLOW_PHASE } from '@emmvish/stable-infra';

const phases: STABLE_WORKFLOW_PHASE[] = [
  {
    id: 'process',
    requests: [
      {
        id: 'api-call',
        requestOptions: {
          reqData: { path: '/payment/charge' },
          resReq: true,
          trialMode: {
            enabled: true,
            requestFailureProbability: 0.3 // 30% simulated failure rate
          }
        }
      }
    ]
  }
];

const result = await stableWorkflow(phases, {
  workflowId: 'payment-trial',
  trialMode: {
    enabled: true,
    functionFailureProbability: 0.2
  }
});

// Requests/functions execute but failures are simulated
// Real API calls happen; real side effects occur only if enabled
// Useful for testing retry logic, decision hooks, workflow topology

State Persistence

Persist state across retry attempts for distributed tracing.

The persistenceFunction receives a persistenceStage parameter (PersistenceStage.BEFORE_HOOK or PersistenceStage.AFTER_HOOK) to indicate when it is called.

import { stableRequest, PersistenceStage } from '@emmvish/stable-infra';

interface DataRequest {}
interface DataResponse { data: any; }

const result = await stableRequest<DataRequest, DataResponse>({
  reqData: { path: '/data' },
  resReq: true,
  attempts: 3,
  statePersistence: {
    persistenceFunction: async ({ executionContext, buffer, params, persistenceStage }) => {
      const key = `${executionContext.workflowId}:${executionContext.requestId}`;
      if (persistenceStage === PersistenceStage.BEFORE_HOOK || params?.operation === 'load') {
        // Load state for recovery
        return await loadFromDatabase(key);
      }
      // Save state to database or distributed cache
      await saveToDatabase({ key, state: buffer });
      return buffer;
    },
    persist