npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@multiplier-labs/stepflow

v0.3.4

Published

A durable, type-safe workflow orchestration engine for Node.js with SQLite persistence, scheduling, and real-time events

Readme

Stepflow

A durable, type-safe workflow orchestration engine for Node.js with SQLite or PostgreSQL persistence, scheduling, and real-time events.

Features

  • Durable Execution: Steps are persisted to SQLite or PostgreSQL, allowing workflows to resume after crashes
  • Type-Safe: Full TypeScript support with inferred types for inputs, outputs, and step results
  • Step Orchestration: Sequential step execution with automatic retry and timeout support
  • Scheduling: Cron-based scheduling and workflow completion triggers
  • Real-Time Events: Socket.IO and webhook adapters for event streaming
  • Concurrency Control: Limit concurrent workflow runs with priority queues
  • Multiple Storage Backends: Choose SQLite for simplicity or PostgreSQL for distributed deployments
  • Planning System: Rule-based recipe selection and dynamic plan generation

Installation

This package is published to GitHub Packages. See docs/github-packages-setup.md for authentication setup.

npm install @multiplier-labs/stepflow

Quick Start

import Database from 'better-sqlite3';
import { WorkflowEngine, SQLiteStorageAdapter } from '@multiplier-labs/stepflow';

// Create engine with SQLite storage
const db = new Database('./workflows.db');
const engine = new WorkflowEngine({
  storage: new SQLiteStorageAdapter({ db }),
});

// Register a workflow
engine.registerWorkflow({
  kind: 'order.process',
  name: 'Process Order',
  steps: [
    {
      key: 'validateOrder',
      name: 'Validate Order',
      handler: async (ctx) => {
        return { valid: true, total: 99.99 };
      },
    },
    {
      key: 'processPayment',
      name: 'Process Payment',
      handler: async (ctx) => {
        const { total } = ctx.results.validateOrder;
        return { transactionId: 'txn-123', amount: total };
      },
    },
    {
      key: 'sendConfirmation',
      name: 'Send Confirmation',
      handler: async (ctx) => {
        return { emailSent: true };
      },
    },
  ],
});

// Start a workflow run
const runId = await engine.startRun({
  kind: 'order.process',
  input: { orderId: 'order-123', items: ['item-1', 'item-2'] },
});

// Wait for completion
const result = await engine.waitForRun(runId);
console.log(result.status); // 'succeeded'

Core Concepts

Workflows

A workflow is a series of steps that execute sequentially. Each step's result is persisted, allowing the workflow to resume from where it left off if interrupted.

engine.registerWorkflow({
  kind: 'my.workflow',
  name: 'My Workflow',
  steps: [
    {
      key: 'step1',
      name: 'Step 1',
      handler: async (ctx) => {
        return { data: 'step1 result' };
      },
    },
    {
      key: 'step2',
      name: 'Step 2',
      handler: async (ctx) => {
        // Access previous step results
        const step1Result = ctx.results.step1;
        return { combined: step1Result.data + ' plus step2' };
      },
    },
  ],
});

Step Options

Steps support retry logic and timeouts:

engine.registerWorkflow({
  kind: 'resilient.workflow',
  name: 'Resilient Workflow',
  steps: [
    {
      key: 'fetchData',
      name: 'Fetch Data',
      handler: async (ctx) => {
        const response = await fetch(ctx.input.url);
        return response.json();
      },
      onError: 'retry',
      maxRetries: 3,
      retryDelay: 1000,
      timeout: 30000,
    },
  ],
});

Workflow Timeouts

Set a maximum duration for entire workflow runs via engine settings:

const engine = new WorkflowEngine({
  storage,
  settings: {
    defaultTimeout: 60000, // 1 minute default timeout for all runs
  },
});

Concurrency Control

Limit concurrent workflow executions with priority queues:

const engine = new WorkflowEngine({
  storage,
  settings: {
    maxConcurrency: 5, // Max 5 concurrent runs
  },
});

// Higher priority runs execute first
await engine.startRun({
  kind: 'urgent.workflow',
  input: { data: 'urgent' },
  priority: 10,
});

await engine.startRun({
  kind: 'normal.workflow',
  input: { data: 'normal' },
  priority: 1,
});

Lifecycle

The engine supports explicit initialization and shutdown:

const engine = new WorkflowEngine({
  storage: new PostgresStorageAdapter({
    connectionString: 'postgresql://localhost:5432/myapp',
  }),
});

// Initialize storage (required for PostgreSQL, creates tables)
await engine.initialize();

// ... use the engine ...

// Graceful shutdown (cancels active runs, closes connections)
await engine.shutdown();

Storage

In-Memory Storage

For testing and development:

import { MemoryStorageAdapter } from '@multiplier-labs/stepflow';

const storage = new MemoryStorageAdapter();

SQLite Storage

For single-process deployments:

import Database from 'better-sqlite3';
import { SQLiteStorageAdapter } from '@multiplier-labs/stepflow';

const db = new Database('./workflows.db'); // File path or ':memory:'
const storage = new SQLiteStorageAdapter({ db });

PostgreSQL Storage

For production use with distributed workers:

import { PostgresStorageAdapter } from '@multiplier-labs/stepflow';

// Option 1: Connection string
const storage = new PostgresStorageAdapter({
  connectionString: 'postgresql://user:pass@localhost:5432/myapp',
  schema: 'myapp', // Optional, defaults to 'public'
});

// Option 2: Share existing connection pool
import pg from 'pg';

const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
const storage = new PostgresStorageAdapter({ pool });

Note: pg and kysely are optional peer dependencies. They are loaded dynamically at runtime only when PostgresStorageAdapter is initialized, so users of other storage backends are not affected.

PostgreSQL storage provides:

  • Atomic dequeue: Safe concurrent processing with FOR UPDATE SKIP LOCKED
  • Connection pooling: Share database connections with your application
  • Schema isolation: Custom schema names for multi-tenant deployments
  • Stale workflow cleanup: Automatic timeout handling

Event Transports

Memory Events

Simple in-memory event emitter:

import { MemoryEventTransport } from '@multiplier-labs/stepflow';

const events = new MemoryEventTransport();
const engine = new WorkflowEngine({ storage, events });

// Subscribe to events
events.subscribeAll((event) => {
  console.log(`Event: ${event.eventType} for run ${event.runId}`);
});

Socket.IO Events

Real-time events via Socket.IO:

import { Server } from 'socket.io';
import { SocketIOEventTransport } from '@multiplier-labs/stepflow';

const io = new Server(httpServer);
const events = new SocketIOEventTransport({ io });

const engine = new WorkflowEngine({ storage, events });

// Set up client handlers with authorization
io.on('connection', (socket) => {
  events.setupClientHandlers(socket, async (runId, socket) => {
    return /* authorization check */;
  });
});

// Client-side:
// socket.emit('workflow:subscribe', runId);
// socket.on('workflow:event', (event) => console.log(event));

Webhook Events

POST events to HTTP endpoints:

import { WebhookEventTransport } from '@multiplier-labs/stepflow';

const events = new WebhookEventTransport({
  endpoints: [
    {
      id: 'slack',
      url: 'https://hooks.slack.com/...',
      eventTypes: ['run.completed', 'run.failed'],
    },
    {
      id: 'analytics',
      url: 'https://api.analytics.com/events',
      secret: 'webhook-secret', // HMAC-SHA256 signing
    },
  ],
});

const engine = new WorkflowEngine({ storage, events });

Scheduling

Cron Scheduler

Schedule workflows using cron expressions:

import Database from 'better-sqlite3';
import { CronScheduler, SQLiteSchedulePersistence, SQLiteStorageAdapter } from '@multiplier-labs/stepflow';

const db = new Database('./workflows.db');
const storage = new SQLiteStorageAdapter({ db });
const engine = new WorkflowEngine({ storage });

const schedulePersistence = new SQLiteSchedulePersistence({ db });
const scheduler = new CronScheduler({
  engine,
  persistence: schedulePersistence,
});

// Schedule a workflow to run every hour
await scheduler.addSchedule({
  id: 'hourly-cleanup',
  workflowKind: 'cleanup.workflow',
  triggerType: 'cron',
  cronExpression: '0 * * * *', // Every hour
  timezone: 'America/New_York',
  input: { dryRun: false },
});

// Start the scheduler
await scheduler.start();

Workflow Completion Triggers

Trigger workflows when other workflows complete:

await scheduler.addSchedule({
  id: 'post-order-notification',
  workflowKind: 'notification.send',
  triggerType: 'workflow_completed',
  triggerOnWorkflowKind: 'order.process',
  triggerOnStatus: ['succeeded', 'failed'],
  input: { template: 'order-status' },
});

Event Types

The engine emits the following event types:

  • run.created - Workflow run created
  • run.queued - Run queued (when at max concurrency)
  • run.dequeued - Run dequeued and starting
  • run.started - Run execution started
  • run.resumed - Run resumed after interruption
  • run.completed - Run completed successfully
  • run.failed - Run failed with error
  • run.canceled - Run was canceled
  • run.timeout - Run exceeded timeout
  • step.started - Step execution started
  • step.completed - Step completed successfully
  • step.failed - Step failed (may retry)
  • step.skipped - Step skipped (already completed)
  • step.retry - Step retrying after failure

Planning System

Stepflow includes a rule-based planning system for dynamic workflow generation:

import {
  RuleBasedPlanner,
  MemoryRecipeRegistry,
  MemoryStepHandlerRegistry,
  createRegistry,
} from '@multiplier-labs/stepflow';

// Create registries for recipes and step handlers
const { recipes, handlers } = createRegistry();

// Register recipes and handlers, then create plans
const planner = new RuleBasedPlanner({ recipeRegistry: recipes, handlerRegistry: handlers });
const plan = await planner.plan('process-order', { orderId: '123' });

See the API Reference for full documentation on recipes, step handlers, and planner configuration.

API Reference

For the complete API reference including all classes, interfaces, and configuration options, see docs/stepflow-api-reference.md.

Security

This project takes security seriously. The repository enforces:

  • Branch protection on main with required PR reviews and status checks
  • CODEOWNERS requiring designated reviewer approval for security-sensitive paths
  • Pinned GitHub Actions to full commit SHAs to prevent supply-chain attacks
  • Automated dependency scanning via Dependabot with weekly update PRs
  • npm audit in CI on every pull request and weekly scheduled runs
  • Secret scanning and push protection to prevent credential leaks

For vulnerability reporting, see SECURITY.md.

Contributing

  1. Fork the repository and create a feature branch from main
  2. Ensure your changes pass all checks: npm run typecheck && npm run build && npm test
  3. Open a pull request — CI will run automatically
  4. A CODEOWNERS reviewer will be assigned based on the files changed

Changelog

See CHANGELOG.md for a history of notable changes.

License

MIT