npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@hardlydifficult/workflow-engine

v1.0.177

Published

A powerful workflow and pipeline execution engine with persistence, lifecycle management, and flexible execution patterns.

Downloads

1,311

Readme

@hardlydifficult/workflow-engine

A powerful workflow and pipeline execution engine with persistence, lifecycle management, and flexible execution patterns.

Installation

npm install @hardlydifficult/workflow-engine @hardlydifficult/state-tracker

Optionally, for pipeline support:

npm install @hardlydifficult/logger

Features

  • Pipeline Execution: Define linear sequences of steps with automatic state management and persistence
  • Gate Steps: Pause pipeline execution and resume later with optional data
  • Retry Logic: Automatic retry with recovery functions for failed steps
  • Lifecycle Hooks: Customizable hooks for step start/complete/failed events
  • Persistence: Automatic state saving and recovery across restarts
  • Cancellation: Graceful cancellation with AbortSignal support
  • Data Accumulation: Steps receive accumulated data from all previous steps
  • Status Tracking: Clear status strings (running:step, gate:step, completed, failed, cancelled)

Quick Start

import { Pipeline } from "@hardlydifficult/workflow-engine";
import { createLogger } from "@hardlydifficult/logger";

// Define steps
const steps = [
  {
    name: "create_plan",
    execute: async () => ({ plan: "create resources" }),
  },
  {
    name: "approve",
    gate: true, // Pause here until resume() is called
  },
  {
    name: "execute_plan",
    execute: async ({ data }) => {
      console.log(`Executing: ${data.plan}`);
      return { executed: true };
    },
  },
];

// Create pipeline
const pipeline = new Pipeline({
  key: "my-pipeline-123",
  steps,
  initialData: {},
  logger: createLogger({ name: "pipeline" }),
  stateDirectory: "./pipeline-state",
});

// Run and wait at gate
await pipeline.run();
console.log(pipeline.status); // "gate:approve"

// Resume with optional data
await pipeline.resume({ approved: true });

// Check completion
console.log(pipeline.status); // "completed"
console.log(pipeline.data);   // { plan: "...", executed: true, approved: true }

Step Types

Regular Steps

Execute immediately and return data to accumulate:

{
  name: "fetch_data",
  execute: async () => ({ data: "fetched" }),
}

Gate Steps

Pause execution until resume() is called:

{
  name: "manual_approval",
  gate: true,
}

Gate steps can also have execution logic:

{
  name: "pre_approval_check",
  gate: true,
  execute: async () => ({ check_passed: true }),
}

Retryable Steps

{
  name: "flaky_api_call",
  retries: 3,
  execute: async ({ data }) => {
    const result = await callApi();
    if (!result.success) throw new Error("API failed");
    return result;
  },
  recover: async ({ data }) => {
    // Cleanup between retries
    return { retry_count: (data.retry_count || 0) + 1 };
  },
}

General-Purpose Workflow Engine

A lower-level state machine with typed statuses, validated transitions, and persistent state.

import { WorkflowEngine } from "@hardlydifficult/workflow-engine";

type Status = "idle" | "running" | "completed" | "failed";
interface Data { count: number; result?: string; }

const engine = new WorkflowEngine<Status, Data>({
  key: "my-workflow",
  initialStatus: "idle",
  initialData: { count: 0 },
  transitions: {
    idle: ["running", "failed"],
    running: ["completed", "failed"],
    completed: [],
    failed: [],
  },
  stateDirectory: "/var/data",
  onTransition: (event) => console.log(`${event.from} -> ${event.to}`),
});

await engine.load();

await engine.transition("running", (draft) => {
  draft.count = 1;
});

await engine.transition("completed", (draft) => {
  draft.result = "done";
});

engine.isTerminal; // true

new WorkflowEngine<TStatus, TData>(options)

| Option | Type | Description | |--------|------|-------------| | key | string | Unique persistence key | | initialStatus | TStatus | Default status for new workflows | | initialData | TData | Default data for new workflows | | transitions | Record<TStatus, TStatus[]> | Allowed transitions per status | | stateDirectory | string? | Persistence directory | | autoSaveMs | number? | Auto-save interval (default 5000) | | onTransition | function? | Event callback |

Properties

| Property | Type | Description | |----------|------|-------------| | status | TStatus | Current status | | data | Readonly<TData> | Current data | | isLoaded | boolean | Whether load() has been called | | isPersistent | boolean | Whether disk storage is available | | isTerminal | boolean | Whether current status has no outgoing transitions |

Methods

| Method | Description | |--------|-------------| | load() | Load persisted state from disk. Safe to call multiple times. | | transition(to, updater?) | Change status, optionally mutate data. Validates transition, persists immediately. | | update(updater) | Mutate data without changing status. Persists immediately. | | save() | Force-save current state to disk. | | cursor(selector) | Create a DataCursor for safe nested data access with get(), find(), update(). | | canTransition(to) | Check if a transition is allowed from current status. | | allowedTransitions() | List statuses reachable from current status. |

cursor<TItem>(selector)

Creates a reusable cursor for safe navigation into nested engine data. Define the selector once, then use get(), find(), or update() without repeating navigation logic.

interface Data { items: Array<{ name: string; done: boolean }>; currentIndex?: number; }

const item = engine.cursor((d) => d.items[d.currentIndex ?? -1]);

item.get();       // returns item or throws "Cursor target not found"
item.find();      // returns item or undefined
await item.update((it) => { it.done = true; });            // persists, no-op if undefined
await item.update((it, d) => { d.currentIndex = undefined; }); // access parent data too

Updater Pattern

transition() and update() accept an updater callback that receives a structuredClone of the data. Mutate it directly — if the updater throws, nothing changes.

await engine.transition("running", (draft) => {
  draft.count += 1;
  draft.result = computeResult();
});

Lifecycle Hooks

const pipeline = new Pipeline({
  key: "my-pipeline",
  steps,
  hooks: {
    onStepStart: (name) => console.log(`Starting: ${name}`),
    onStepComplete: (name, data) => console.log(`Completed: ${name}`),
    onGateReached: (name) => console.log(`Reached gate: ${name}`),
    onComplete: () => console.log("Pipeline completed"),
    onFailed: (name, error) => console.error(`Failed at ${name}: ${error}`),
  },
});

Cancellation

const controller = new AbortController();

const pipeline = new Pipeline({
  key: "my-pipeline",
  steps,
  signal: controller.signal,
});

await pipeline.run();

// Cancel from elsewhere
controller.abort();

State Persistence

State is automatically saved to disk:

const pipeline = new Pipeline({
  key: "my-pipeline",
  steps,
  stateDirectory: "./state",
  autoSaveMs: 1000, // Save every second (default)
});

Pipeline API Reference

new Pipeline<TData>(options)

| Option | Type | Description | |--------|------|-------------| | key | string | Unique persistence key | | steps | StepDefinition<TData>[] | Ordered list of step definitions | | initialData | TData | Initial accumulated data | | logger | Logger | Logger instance (all lifecycle events logged automatically) | | stateDirectory | string? | Persistence directory | | autoSaveMs | number? | Auto-save interval (default 5000) | | hooks | PipelineHooks<TData>? | Lifecycle hooks for external integrations | | signal | AbortSignal? | Abort signal for cancellation |

Properties

| Property | Type | Description | |----------|------|-------------| | status | string | e.g. "running:create_plan", "gate:approve", "completed", "failed" | | data | Readonly<TData> | Accumulated output data | | steps | StepState[] | Per-step runtime state | | currentStep | string? | Name of current step, or undefined if terminal | | isTerminal | boolean | Whether pipeline is in a terminal state | | isWaitingAtGate | boolean | Whether pipeline is paused at a gate |

Methods

| Method | Description | |--------|-------------| | run() | Start or resume from crash. Loads persisted state, re-executes interrupted steps. | | resume(data?) | Continue past a gate, optionally merging partial data. | | cancel() | Transition to cancelled, abort signal fires. | | on(listener) | Subscribe to changes. Returns unsubscribe function. | | toSnapshot() | Return a read-only snapshot of { status, data, steps, isTerminal }. |

Step Definition

interface StepDefinition<TData> {
  name: string;
  execute?: (context: { data: TData }) => Promise<Partial<TData>>;
  gate?: boolean;
  retries?: number;
  recover?: (context: { data: TData }) => Promise<Partial<TData>>;
}

Hooks

All hooks are optional. Hook errors are swallowed to avoid breaking pipeline execution.

| Hook | Arguments | When | |------|-----------|------| | onStepStart | (name, data) | Before a step executes | | onStepComplete | (name, data) | After a step succeeds | | onStepFailed | (name, error, data) | When a step fails (after all retries) | | onGateReached | (name, data) | When a gate step pauses | | onComplete | (data) | When all steps finish | | onFailed | (name, error, data) | When pipeline enters failed state |

Status Values

  • running:step_name - Currently executing step
  • gate:gate_name - Waiting at a gate
  • completed - Successfully finished
  • failed - Failed execution
  • cancelled - Cancelled execution

Error Handling

try {
  await pipeline.run();
} catch (error) {
  if (error instanceof PipelineHasNoStepsError) {
    // Handle empty pipeline
  } else if (error instanceof DuplicatePipelineStepNameError) {
    // Handle duplicate step names
  } else if (error instanceof PipelineResumeError) {
    // Handle resume errors
    console.log(error.code); // "PIPELINE_NOT_AT_GATE"
  }
}

Crash Recovery

Pipeline state is persisted automatically. On restart, run() detects the interrupted state:

  • Mid-step: re-executes the step (steps should be idempotent)
  • At gate: stays at gate, waiting for resume()
  • Terminal: no-op

Migration

See the migration guide for upgrading from v0.x to v1.x.