npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@kaibanjs/workflow

v0.1.6

Published

A browser-compatible workflow engine for kaibanjs

Readme

@kaibanjs/workflow

A modern, type-safe workflow engine for building complex business processes with support for sequential execution, parallel processing, conditional branching, loops, and real-time streaming.

Features

  • Type-safe workflow definitions using Zod schemas for input/output validation
  • Multiple execution patterns: sequential, parallel, conditional, loops (do-while, do-until, foreach)
  • Real-time streaming with ReadableStream for live execution monitoring
  • Suspend and resume functionality for long-running workflows
  • State management with Zustand for reactive updates
  • Error handling with comprehensive error propagation
  • Data mapping between steps with flexible transformation options
  • Nested workflows support for complex compositions
  • Browser and Node.js compatible

Installation

npm install @kaibanjs/workflow

Quick Start

Basic Sequential Workflow

import { createStep, createWorkflow } from '@kaibanjs/workflow';
import { z } from 'zod';

// Create steps
const addStep = createStep({
  id: 'add',
  inputSchema: z.object({ a: z.number(), b: z.number() }),
  outputSchema: z.number(),
  execute: async ({ inputData }) => {
    const { a, b } = inputData as { a: number; b: number };
    return a + b;
  },
});

const multiplyStep = createStep({
  id: 'multiply',
  inputSchema: z.number(),
  outputSchema: z.number(),
  execute: async ({ inputData, getInitData }) => {
    const sum = inputData as number;
    const { a, b } = getInitData() as { a: number; b: number };
    return sum * a * b;
  },
});

// Create and configure workflow
const workflow = createWorkflow({
  id: 'math-workflow',
  inputSchema: z.object({ a: z.number(), b: z.number() }),
  outputSchema: z.number(),
});

workflow.then(addStep).then(multiplyStep);
workflow.commit();

// Execute workflow
const run = workflow.createRun();
const result = await run.start({ inputData: { a: 2, b: 3 } });

console.log(result); // { status: 'completed', result: 30 }

Real-time Streaming

const run = workflow.createRun();
const { stream, getWorkflowState } = run.stream({ inputData: { a: 2, b: 3 } });

// Read from the stream
const reader = stream.getReader();
try {
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    console.log('Event:', value);
    // value contains: { type, payload, timestamp, runId, workflowId }
  }
} finally {
  reader.releaseLock();
}

// Get final result
const finalResult = await getWorkflowState();

State Monitoring

const run = workflow.createRun();

// Subscribe to state changes
const unsubscribe = run.watch((event) => {
  console.log('Step status:', event.payload.stepStatus);
  console.log('Workflow state:', event.payload.workflowState);
});

// Execute workflow
await run.start({ inputData: { a: 2, b: 3 } });

// Clean up
unsubscribe();

Core Concepts

Steps

Steps are the basic building blocks of workflows. Each step has:

  • Input/Output schemas: Zod schemas for type validation
  • Execute function: Async function that performs the work
  • Optional suspend/resume schemas: For interactive workflows
const step = createStep({
  id: 'process-data',
  inputSchema: z.object({ data: z.string() }),
  outputSchema: z.object({ processed: z.string() }),
  execute: async ({ inputData }) => {
    return { processed: inputData.data.toUpperCase() };
  },
});

Workflows

Workflows orchestrate steps using various execution patterns:

const workflow = createWorkflow({
  id: 'my-workflow',
  inputSchema: z.object({ data: z.string() }),
  outputSchema: z.object({ result: z.string() }),
});

// Sequential execution
workflow.then(step1).then(step2);

// Parallel execution
workflow.parallel([step1, step2]).then(combineStep);

// Conditional branching
workflow.branch([
  [async ({ inputData }) => inputData.value > 10, highValueStep],
  [async () => true, defaultStep], // fallback
]);

// Loops
workflow.dowhile(loopStep, async ({ getStepResult }) => {
  const result = getStepResult(loopStep.id);
  return result.count < 5;
});

workflow.foreach(processStep, { concurrency: 3 });

workflow.commit(); // Finalize workflow definition

Runs

Runs represent individual executions of a workflow:

const run = workflow.createRun({ runId: 'unique-run-id' });

// Start execution
const result = await run.start({ inputData: { ... } });

// Resume suspended execution
const resumeResult = await run.resume({
  step: 'suspendable-step',
  resumeData: { continue: true, value: 5 },
});

Advanced Patterns

Data Mapping

Transform data between steps:

workflow
  .then(userStep)
  .map(async ({ getStepResult }) => {
    const userResult = getStepResult(userStep.id);
    return {
      profile: {
        id: userResult.user.id,
        name: userResult.user.name,
      },
    };
  })
  .then(profileStep);

Suspend and Resume

Create interactive workflows:

const suspendableStep = createStep({
  id: 'approval',
  inputSchema: z.object({ amount: z.number() }),
  outputSchema: z.object({ approved: z.boolean() }),
  resumeSchema: z.object({ approved: z.boolean() }),
  suspendSchema: z.object({ reason: z.string() }),
  execute: async ({ inputData, suspend, isResuming, resumeData }) => {
    if (isResuming) {
      return { approved: resumeData.approved };
    }

    if (inputData.amount > 1000) {
      await suspend({ reason: 'requires_approval' });
      return { approved: false };
    }

    return { approved: true };
  },
});

Nested Workflows

Compose complex workflows:

const nestedWorkflow = createWorkflow({
  id: 'nested',
  inputSchema: z.number(),
  outputSchema: z.number(),
});

nestedWorkflow.then(doubleStep);
nestedWorkflow.commit();

const mainWorkflow = createWorkflow({
  id: 'main',
  inputSchema: z.object({ a: z.number(), b: z.number() }),
  outputSchema: z.number(),
});

mainWorkflow.then(addStep).then(nestedWorkflow);
mainWorkflow.commit();

Parallel Execution with Concurrency Control

const workflow = createWorkflow({
  id: 'parallel-test',
  inputSchema: z.array(z.number()),
  outputSchema: z.array(z.number()),
});

workflow.foreach(processStep, { concurrency: 5 });
workflow.commit();

API Reference

createStep

Creates a new workflow step.

createStep<TInput, TOutput>(config: {
  id: string;
  description?: string;
  inputSchema: z.ZodType<TInput>;
  outputSchema: z.ZodType<TOutput>;
  resumeSchema?: z.ZodType;
  suspendSchema?: z.ZodType;
  execute: (context: StepContext<TInput>) => Promise<TOutput>;
}): Step<TInput, TOutput>

createWorkflow

Creates a new workflow.

createWorkflow<TInput, TOutput>(config: {
  id: string;
  description?: string;
  inputSchema: z.ZodType<TInput>;
  outputSchema: z.ZodType<TOutput>;
  retryConfig?: {
    attempts?: number;
    delay?: number;
  };
}): Workflow<TInput, TOutput>

Workflow Methods

  • then(step): Add sequential step
  • parallel(steps[]): Add parallel steps
  • branch(conditions[]): Add conditional execution
  • dowhile(step, condition): Add do-while loop
  • dountil(step, condition): Add do-until loop
  • foreach(step, options): Add foreach loop
  • map(mapping): Transform data between steps
  • commit(): Finalize workflow definition
  • createRun(options?): Create a new run instance

Run Methods

  • start({ inputData, runtimeContext }): Start execution
  • stream({ inputData, runtimeContext }): Start streaming execution
  • resume({ step, resumeData, runtimeContext }): Resume suspended execution
  • watch(callback): Subscribe to events
  • subscribe(callback): Subscribe to state changes
  • getState(): Get current state

StepContext

The context object passed to step execution functions:

interface StepContext<TInput> {
  inputData: TInput;
  getStepResult: <T>(stepId: string) => T;
  getInitData: <T>() => T;
  runtimeContext?: RuntimeContext;
  isResuming?: boolean;
  resumeData?: any;
  suspend: (suspendPayload: any) => Promise<StepResult>;
}

Error Handling

Workflows handle errors gracefully:

const result = await workflow.start({ inputData: { ... } });

if (result.status === 'completed') {
  console.log('Success:', result.result);
} else if (result.status === 'failed') {
  console.error('Error:', result.error);
} else if (result.status === 'suspended') {
  console.log('Suspended:', result.suspended);
}

State Management

The workflow engine uses Zustand for state management, providing:

  • Real-time state updates
  • Event streaming
  • Step result tracking
  • Execution path monitoring
  • Suspended state management

Browser Compatibility

This workflow engine is designed to work in both browser and Node.js environments, using:

  • zustand for state management
  • p-queue for execution queuing
  • zod for schema validation

Examples

See the test files in src/run.test.ts for comprehensive examples of:

  • Basic workflow execution
  • Parallel processing
  • Conditional branching
  • Loop execution (do-while, do-until, foreach)
  • Data mapping and transformation
  • Suspend and resume functionality
  • Error handling
  • State monitoring and observability
  • Streaming execution
  • Nested workflows
  • Complex workflow patterns

License

MIT