npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@elsium-ai/workflows

v0.6.0

Published

Multi-step workflow pipelines and DAG execution for ElsiumAI

Readme

@elsium-ai/workflows

Multi-step workflow pipelines and DAG execution for ElsiumAI.

npm License: MIT

Install

npm install @elsium-ai/workflows @elsium-ai/core

What's Inside

| Category | Export | Kind | | --- | --- | --- | | Types | StepConfig | interface | | | StepContext | interface | | | StepResult | interface | | | StepStatus | type alias | | | RetryConfig | interface | | | WorkflowConfig | interface | | | WorkflowResult | interface | | | WorkflowStatus | type alias | | | WorkflowRunOptions | interface | | Steps | step | function | | | executeStep | function | | Workflow | defineWorkflow | function | | | defineParallelWorkflow | function | | | defineBranchWorkflow | function | | | Workflow | interface | | | ParallelWorkflowConfig | interface | | | BranchConfig | interface |


Types

StepStatus

Union type representing the possible states of a step during execution.

type StepStatus = 'pending' | 'running' | 'completed' | 'failed' | 'skipped'

StepConfig<TInput, TOutput>

Configuration object that defines a single step's behavior, including its handler, optional input validation, retry policy, conditional execution, fallback logic, and timeout.

interface StepConfig<TInput = unknown, TOutput = unknown> {
  name: string
  input?: z.ZodType<TInput>
  handler: (input: TInput, context: StepContext) => Promise<TOutput>
  retry?: RetryConfig
  condition?: (input: TInput, context: StepContext) => boolean
  fallback?: (error: Error, input: TInput) => Promise<TOutput>
  timeoutMs?: number
}

| Field | Type | Description | | --- | --- | --- | | name | string | Unique identifier for the step within a workflow. | | input | z.ZodType<TInput> | Optional Zod schema used to validate the step's input before the handler runs. | | handler | (input: TInput, context: StepContext) => Promise<TOutput> | Async function that performs the step's work. | | retry | RetryConfig | Optional retry policy applied when the handler throws. | | condition | (input: TInput, context: StepContext) => boolean | Optional guard; when it returns false the step is skipped. | | fallback | (error: Error, input: TInput) => Promise<TOutput> | Optional async function invoked when all retries are exhausted. | | timeoutMs | number | Optional per-step timeout in milliseconds. |

StepContext

Runtime context passed to every step handler and condition function.

interface StepContext {
  workflowName: string
  stepIndex: number
  previousOutputs: Record<string, unknown>
  signal?: AbortSignal
}

| Field | Type | Description | | --- | --- | --- | | workflowName | string | Name of the workflow that owns this step. | | stepIndex | number | Zero-based position of the step within the workflow. | | previousOutputs | Record<string, unknown> | Map of step name to output for all previously completed steps. | | signal | AbortSignal | Optional abort signal forwarded from WorkflowRunOptions. |

StepResult<T>

Outcome returned after a step finishes (or is skipped/fails).

interface StepResult<T = unknown> {
  name: string
  status: StepStatus
  data?: T
  error?: string
  durationMs: number
  retryCount: number
}

| Field | Type | Description | | --- | --- | --- | | name | string | Name of the step that produced this result. | | status | StepStatus | Final status of the step. | | data | T | Output data, present when status is 'completed'. | | error | string | Error message, present when status is 'failed'. | | durationMs | number | Wall-clock time spent on the step in milliseconds. | | retryCount | number | Number of retries that were attempted before the final outcome. |

RetryConfig

Per-step retry policy with exponential backoff and jitter.

interface RetryConfig {
  maxRetries: number
  baseDelayMs?: number
  maxDelayMs?: number
  shouldRetry?: (error: Error) => boolean
}

| Field | Type | Default | Description | | --- | --- | --- | --- | | maxRetries | number | -- | Maximum number of retry attempts. | | baseDelayMs | number | 1000 | Base delay for exponential backoff in milliseconds. | | maxDelayMs | number | 30000 | Upper bound for the computed delay in milliseconds. | | shouldRetry | (error: Error) => boolean | -- | Optional predicate; when omitted, all errors except non-retryable ElsiumError instances are retried. |

Backoff formula: min(baseDelayMs * 2^(attempt-1), maxDelayMs) * random(0.5, 1.0).

WorkflowStatus

Union type representing the possible states of a workflow.

type WorkflowStatus = 'pending' | 'running' | 'completed' | 'failed'

WorkflowConfig

Configuration for a sequential workflow created via defineWorkflow.

interface WorkflowConfig {
  name: string
  steps: StepConfig[]
  onStepComplete?: (result: StepResult) => void | Promise<void>
  onStepError?: (error: Error, stepName: string) => void | Promise<void>
  onComplete?: (result: WorkflowResult) => void | Promise<void>
}

| Field | Type | Description | | --- | --- | --- | | name | string | Identifier for the workflow. | | steps | StepConfig[] | Ordered list of steps to execute sequentially. | | onStepComplete | (result: StepResult) => void \| Promise<void> | Optional callback fired after each step completes. | | onStepError | (error: Error, stepName: string) => void \| Promise<void> | Optional callback fired when a step fails. | | onComplete | (result: WorkflowResult) => void \| Promise<void> | Optional callback fired when the workflow finishes (success or failure). |

WorkflowResult

Final output returned by workflow.run().

interface WorkflowResult {
  name: string
  status: WorkflowStatus
  steps: StepResult[]
  totalDurationMs: number
  outputs: Record<string, unknown>
}

| Field | Type | Description | | --- | --- | --- | | name | string | Name of the workflow. | | status | WorkflowStatus | Overall status of the workflow. | | steps | StepResult[] | Results for each step in execution order. | | totalDurationMs | number | Total wall-clock time for the entire workflow in milliseconds. | | outputs | Record<string, unknown> | Map of step name to output for every completed step. |

WorkflowRunOptions

Options passed to workflow.run().

interface WorkflowRunOptions {
  signal?: AbortSignal
}

| Field | Type | Description | | --- | --- | --- | | signal | AbortSignal | Optional abort signal; forwarded to each step's StepContext. |


Steps

step

Factory function that creates a StepConfig by combining a name with the rest of the configuration, providing a concise shorthand for inline step definitions.

function step<TInput, TOutput>(
  name: string,
  config: Omit<StepConfig<TInput, TOutput>, 'name'>,
): StepConfig<TInput, TOutput>

| Parameter | Type | Description | | --- | --- | --- | | name | string | Unique name for the step. | | config | Omit<StepConfig<TInput, TOutput>, 'name'> | Step configuration without the name field. |

Returns: StepConfig<TInput, TOutput>

import { step } from '@elsium-ai/workflows'
import { z } from 'zod'

const fetchPage = step('fetch-page', {
  input: z.object({ url: z.string().url() }),
  handler: async (input) => {
    const res = await fetch(input.url)
    return res.text()
  },
  retry: { maxRetries: 3, baseDelayMs: 500 },
  timeoutMs: 10_000,
})

executeStep

Runs a single step to completion, handling input validation, condition checks, retries with exponential backoff, timeout enforcement, and fallback execution.

function executeStep<TInput, TOutput>(
  stepConfig: StepConfig<TInput, TOutput>,
  rawInput: unknown,
  context: StepContext,
): Promise<StepResult<TOutput>>

| Parameter | Type | Description | | --- | --- | --- | | stepConfig | StepConfig<TInput, TOutput> | The step definition to execute. | | rawInput | unknown | Raw input value; validated against stepConfig.input if a schema is provided. | | context | StepContext | Runtime context for the step. |

Returns: Promise<StepResult<TOutput>>

The execution order is:

  1. Validate rawInput against the Zod schema (if provided). Return 'failed' on validation error.
  2. Evaluate the condition guard (if provided). Return 'skipped' when false.
  3. Run the handler, retrying on failure up to retry.maxRetries times with backoff.
  4. On final failure, invoke fallback (if provided). If the fallback also fails, return 'failed'.
import { step, executeStep } from '@elsium-ai/workflows'
import type { StepContext } from '@elsium-ai/workflows'

const myStep = step('greet', {
  handler: async (input: { name: string }) => `Hello, ${input.name}!`,
})

const context: StepContext = {
  workflowName: 'demo',
  stepIndex: 0,
  previousOutputs: {},
}

const result = await executeStep(myStep, { name: 'World' }, context)
console.log(result.data) // "Hello, World!"

Workflow

Workflow

Interface implemented by all workflow variants (sequential, parallel, and branch).

interface Workflow {
  readonly name: string
  run(input: unknown, options?: WorkflowRunOptions): Promise<WorkflowResult>
}

| Member | Type | Description | | --- | --- | --- | | name | string (readonly) | The workflow identifier. | | run | (input: unknown, options?: WorkflowRunOptions) => Promise<WorkflowResult> | Executes the workflow with the given input and returns the aggregated result. |

defineWorkflow

Creates a sequential workflow that executes steps one after another, piping each step's output as the next step's input.

function defineWorkflow(config: WorkflowConfig): Workflow

| Parameter | Type | Description | | --- | --- | --- | | config | WorkflowConfig | Workflow configuration including steps and lifecycle callbacks. |

Returns: Workflow

When a step completes, its output is stored in outputs under the step's name and becomes the input for the next step. If any step fails, the workflow short-circuits and returns with status: 'failed'.

import { defineWorkflow, step } from '@elsium-ai/workflows'

const pipeline = defineWorkflow({
  name: 'etl-pipeline',
  steps: [
    step('extract', {
      handler: async (input: { source: string }) => {
        return await extractData(input.source)
      },
    }),
    step('transform', {
      handler: async (rawData: RawData) => {
        return transformData(rawData)
      },
    }),
    step('load', {
      handler: async (transformed: TransformedData) => {
        await loadData(transformed)
        return { loaded: true }
      },
    }),
  ],
  onStepComplete: (result) => {
    console.log(`Step "${result.name}" finished in ${result.durationMs}ms`)
  },
  onComplete: (result) => {
    console.log(`Workflow "${result.name}" ${result.status}`)
  },
})

const result = await pipeline.run({ source: 'database' })

ParallelWorkflowConfig

Configuration for a parallel workflow created via defineParallelWorkflow.

interface ParallelWorkflowConfig {
  name: string
  steps: StepConfig[]
  onComplete?: (result: WorkflowResult) => void | Promise<void>
}

| Field | Type | Description | | --- | --- | --- | | name | string | Identifier for the parallel workflow. | | steps | StepConfig[] | Steps to execute concurrently; all receive the same input. | | onComplete | (result: WorkflowResult) => void \| Promise<void> | Optional callback fired when all steps have settled. |

defineParallelWorkflow

Creates a parallel workflow that executes all steps concurrently using Promise.all, where every step receives the same input.

function defineParallelWorkflow(config: ParallelWorkflowConfig): Workflow

| Parameter | Type | Description | | --- | --- | --- | | config | ParallelWorkflowConfig | Parallel workflow configuration. |

Returns: Workflow

Each step's output is stored in outputs under its name. The workflow status is 'failed' if any step fails, 'completed' otherwise.

import { defineParallelWorkflow, step } from '@elsium-ai/workflows'

const enrichment = defineParallelWorkflow({
  name: 'enrich-profile',
  steps: [
    step('fetch-social', {
      handler: async (input: { userId: string }) => {
        return await fetchSocialProfile(input.userId)
      },
    }),
    step('fetch-activity', {
      handler: async (input: { userId: string }) => {
        return await fetchActivityLog(input.userId)
      },
    }),
    step('fetch-preferences', {
      handler: async (input: { userId: string }) => {
        return await fetchPreferences(input.userId)
      },
    }),
  ],
})

const result = await enrichment.run({ userId: 'u_123' })
// result.outputs['fetch-social'], result.outputs['fetch-activity'], etc.

BranchConfig

Defines a single branch in a branching workflow, pairing a condition with the workflow to execute when the condition is met.

interface BranchConfig {
  condition: (input: unknown) => boolean
  workflow: Workflow
}

| Field | Type | Description | | --- | --- | --- | | condition | (input: unknown) => boolean | Predicate evaluated against the workflow input. | | workflow | Workflow | Workflow to run when condition returns true. |

defineBranchWorkflow

Creates a branching workflow that evaluates conditions in order and delegates to the first matching branch's workflow, with an optional fallback.

function defineBranchWorkflow(
  name: string,
  branches: BranchConfig[],
  fallback?: Workflow,
): Workflow

| Parameter | Type | Description | | --- | --- | --- | | name | string | Identifier for the branch workflow. | | branches | BranchConfig[] | Ordered list of condition/workflow pairs; the first match wins. | | fallback | Workflow | Optional workflow to run when no branch condition matches. |

Returns: Workflow

If no branch matches and no fallback is provided, the workflow returns immediately with status: 'completed', an empty steps array, and empty outputs.

import { defineBranchWorkflow, defineWorkflow, step } from '@elsium-ai/workflows'

const textWorkflow = defineWorkflow({
  name: 'process-text',
  steps: [
    step('analyze-text', {
      handler: async (input: { content: string }) => {
        return await analyzeText(input.content)
      },
    }),
  ],
})

const imageWorkflow = defineWorkflow({
  name: 'process-image',
  steps: [
    step('analyze-image', {
      handler: async (input: { content: string }) => {
        return await analyzeImage(input.content)
      },
    }),
  ],
})

const router = defineBranchWorkflow(
  'content-router',
  [
    { condition: (input: any) => input.type === 'text', workflow: textWorkflow },
    { condition: (input: any) => input.type === 'image', workflow: imageWorkflow },
  ],
)

const result = await router.run({ type: 'text', content: 'Hello world' })

Part of ElsiumAI

This package is the workflow layer of the ElsiumAI framework. See the full documentation for guides and examples.

License

MIT