npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@datarster/workflow-engine

v0.0.21

Published

The Workflow Engine is a powerful tool designed to automate and manage complex workflows. It allows users to define, execute, and monitor workflows with ease.

Readme

Workflow Engine

Overview

@datarster/workflow-engine is a powerful and extensible workflow management library for Node.js. It enables the execution of workflows using a JSON-based workflow definition. This engine supports sequential, parallel, conditional, looping, switch-case, and event-based workflows, with optional rollback functionality in case of failure.

Features

  • Supports various workflow step types: task, decision, parallel, loop, wait, commit, execute_then_wait, and switch.
  • Event-driven execution with workflow resumption capabilities.
  • Integration with Redis for state management and workflow caching.
  • MongoDB logging for workflow execution history.
  • Rollback support for failed steps with custom rollback workers.
  • Dynamic worker registration with timeout support.
  • Switch/case logic for conditional execution.
  • Flexible configuration for Redis and MongoDB connections.

Installation

npm install @datarster/workflow-engine

Usage

Import and Initialize the Engine

import { WorkflowEngine } from "@datarster/workflow-engine";

const config = {
    redis: { host: "localhost", port: 6379 },
    useLogging: true,
    mongo: { uri: "mongodb://localhost:27017", database: "workflowDB", collection: "logs" }
};

const engine = new WorkflowEngine();
await engine.init(config);

Define a Workflow

const workflow = {
    name: "Order Processing",
    steps: [
        { 
            id: "1", 
            type: "task", 
            name: "Validate Order", 
            worker: "validateOrder",
            workerTimeout: 30000,
            rollback: {
                id: "1_rollback",
                name: "Rollback Validation",
                type: "task",
                worker: "validateOrderRollback"
            }
        },
        { id: "2", type: "task", name: "Process Payment", worker: "processPayment" },
        { id: "3", type: "task", name: "Ship Order", worker: "shipOrder" }
    ]
};

await engine.registerWorkflow("order_processing", workflow);

Register Workers

engine.registerWorker("validateOrder", async (data) => {
    console.log("Validating order", data);
    // Your validation logic here
});

engine.registerWorker("validateOrderRollback", async (data) => {
    console.log("Rolling back order validation", data);
    // Your rollback logic here
});

engine.registerWorker("processPayment", async (data) => {
    console.log("Processing payment", data);
    // Your payment logic here
});

engine.registerWorker("shipOrder", async (data) => {
    console.log("Shipping order", data);
    // Your shipping logic here
});

Execute the Workflow

const executionId = await engine.executeWorkflow({
    workflowId: "order_processing",
    inputData: { orderId: 12345 },
    callbackOptions: {
        onStepComplete: (step, result) => {
            console.log(`Step ${step.name} completed:`, result);
        },
        onError: (step, error) => {
            console.error(`Step ${step.name} failed:`, error);
        }
    }
});
console.log("Workflow execution started with ID:", executionId);

Resume Workflow with Event

// Resume a waiting workflow with an event
await engine.resumeWorkflow(executionId, "external_approval_received", {
    approved: true,
    approver: "[email protected]"
});

Fetch Workflow Logs

const log = await engine.fetchWorkflowLog(executionId);
console.log("Execution Log:", log);

Supported Workflow Step Types

| Step Type | Description | |------------|-------------| | task | Executes a registered worker function. | | decision | Evaluates a condition and executes a true/false branch. | | parallel | Runs multiple steps in parallel. | | loop | Repeats execution based on a condition. | | wait | Pauses execution until an event is received. | | execute_then_wait | Executes a task and then waits for an external event. | | switch | Evaluates a field value and executes matching case. | | commit | Marks a workflow step as a commit step. | | switch_case | Represents a case within a switch step (used internally in switch logic). |

Public API Methods

The following methods are available on the WorkflowEngine class:

| Method | Description | |--------|-------------| | init(config) | Initializes the engine with Redis and/or MongoDB configuration. | | registerWorkflow(id, workflow) | Registers a workflow definition with a unique ID. | | registerWorker(name, worker) | Registers a worker function by name. | | executeWorkflow(options) | Executes a registered workflow. Returns an execution ID. | | resumeWorkflow(executionId, eventName, inputData, options?) | Resumes a waiting workflow when an event is received. |

Note: The fetchWorkflowLog method is shown in usage examples, but is not implemented in the main class. For workflow logs, use your own MongoDB queries or extend the engine as needed.

WorkflowStep Interface (Advanced)

Each workflow step supports the following fields:

  • id (string): Unique identifier for the step.
  • type (string): Step type (see Supported Workflow Step Types).
  • name (string): Name of the step.
  • worker (string, optional): Name of the worker function to execute.
  • condition (string, optional): Condition for decision/loop steps.
  • true_branch / false_branch (array, optional): Steps for decision branches.
  • parallel_steps (array, optional): Steps to run in parallel.
  • switch_cases (array, optional): Cases for switch steps.
  • switch_case_steps (array, optional): Steps for a specific switch case.
  • event_name (string, optional): Event to wait for (wait/execute_then_wait).
  • rollback (object, optional): Step to execute for rollback.
  • status (string, optional): Status of the step.
  • createdAt / updatedAt (string, optional): Timestamps.
  • workerTimeout (number, optional): Timeout for the worker in ms.
  • field / value (any, optional): Used in switch/case logic.
  • default_case (object, optional): Default step for switch.

Advanced Workflow Examples

Switch/Case Workflow

const switchWorkflow = {
    name: "Order Status Processing",
    steps: [
        {
            id: "1",
            type: "switch",
            name: "Process Based on Status",
            switch_cases: [
                {
                    id: "case1",
                    field: "status",
                    value: "COMMITTED",
                    switch_case_steps: [
                        { id: "1.1", type: "task", name: "Handle Committed", worker: "handleCommitted" }
                    ]
                },
                {
                    id: "case2", 
                    field: "status",
                    value: "CANCELLED",
                    switch_case_steps: [
                        { id: "1.2", type: "task", name: "Handle Cancelled", worker: "handleCancelled" }
                    ]
                }
            ],
            default_case: {
                id: "default",
                type: "task",
                name: "Handle Default",
                worker: "handleDefault"
            }
        }
    ]
};

Parallel Execution

const parallelWorkflow = {
    name: "Parallel Processing",
    steps: [
        {
            id: "1",
            type: "parallel",
            name: "Process Multiple Tasks",
            parallel_steps: [
                { id: "1.1", type: "task", name: "Task A", worker: "workerA" },
                { id: "1.2", type: "task", name: "Task B", worker: "workerB" },
                { id: "1.3", type: "task", name: "Task C", worker: "workerC" }
            ]
        }
    ]
};

Event-Driven Workflow

const eventWorkflow = {
    name: "Event-Driven Process",
    steps: [
        { id: "1", type: "task", name: "Initial Task", worker: "initialWorker" },
        {
            id: "2",
            type: "wait",
            name: "Wait for Approval",
            event_name: "approval_received"
        },
        { id: "3", type: "task", name: "Final Task", worker: "finalWorker" }
    ]
};

Configuration Options

Redis Configuration

const redisConfig = {
    host: "localhost",
    port: 6379,
    // OR provide your own Redis client
    redisClient: existingRedisClient
};

MongoDB Configuration

const mongoConfig = {
    uri: "mongodb://localhost:27017",
    database: "workflowDB",
    collection: "logs",
    // OR provide your own MongoDB client
    mongoClient: existingMongoClient
};

Error Handling and Rollback

  • If a task fails, the workflow engine triggers rollback steps if they are defined.
  • Rollback steps are executed in reverse order (LIFO - Last In, First Out).
  • Errors during rollback steps are logged but do not halt the rollback process.
  • Each step can have its own rollback logic defined in the rollback property.

State Management

  • Redis: Used for caching workflow execution state and registered workflows.
  • MongoDB: Stores detailed workflow execution history and logs (optional).
  • In-Memory: Falls back to in-memory storage if Redis is not configured.

TypeScript Support

The library is written in TypeScript and provides full type definitions:

import { WorkflowEngine, WorkflowStepType, WorkflowDefinition } from "@datarster/workflow-engine";

Contributing

Contributions are welcome! Feel free to submit issues or pull requests on GitHub.

License

MIT