@datarster/workflow-engine
v0.0.21
Published
The Workflow Engine is a powerful tool designed to automate and manage complex workflows. It allows users to define, execute, and monitor workflows with ease.
Maintainers
Readme
Workflow Engine
Overview
@datarster/workflow-engine is a powerful and extensible workflow management library for Node.js. It enables the execution of workflows using a JSON-based workflow definition. This engine supports sequential, parallel, conditional, looping, switch-case, and event-based workflows, with optional rollback functionality in case of failure.
Features
- Supports various workflow step types: task, decision, parallel, loop, wait, commit, execute_then_wait, and switch.
- Event-driven execution with workflow resumption capabilities.
- Integration with Redis for state management and workflow caching.
- MongoDB logging for workflow execution history.
- Rollback support for failed steps with custom rollback workers.
- Dynamic worker registration with timeout support.
- Switch/case logic for conditional execution.
- Flexible configuration for Redis and MongoDB connections.
Installation
npm install @datarster/workflow-engineUsage
Import and Initialize the Engine
import { WorkflowEngine } from "@datarster/workflow-engine";
const config = {
redis: { host: "localhost", port: 6379 },
useLogging: true,
mongo: { uri: "mongodb://localhost:27017", database: "workflowDB", collection: "logs" }
};
const engine = new WorkflowEngine();
await engine.init(config);Define a Workflow
const workflow = {
name: "Order Processing",
steps: [
{
id: "1",
type: "task",
name: "Validate Order",
worker: "validateOrder",
workerTimeout: 30000,
rollback: {
id: "1_rollback",
name: "Rollback Validation",
type: "task",
worker: "validateOrderRollback"
}
},
{ id: "2", type: "task", name: "Process Payment", worker: "processPayment" },
{ id: "3", type: "task", name: "Ship Order", worker: "shipOrder" }
]
};
await engine.registerWorkflow("order_processing", workflow);Register Workers
engine.registerWorker("validateOrder", async (data) => {
console.log("Validating order", data);
// Your validation logic here
});
engine.registerWorker("validateOrderRollback", async (data) => {
console.log("Rolling back order validation", data);
// Your rollback logic here
});
engine.registerWorker("processPayment", async (data) => {
console.log("Processing payment", data);
// Your payment logic here
});
engine.registerWorker("shipOrder", async (data) => {
console.log("Shipping order", data);
// Your shipping logic here
});Execute the Workflow
const executionId = await engine.executeWorkflow({
workflowId: "order_processing",
inputData: { orderId: 12345 },
callbackOptions: {
onStepComplete: (step, result) => {
console.log(`Step ${step.name} completed:`, result);
},
onError: (step, error) => {
console.error(`Step ${step.name} failed:`, error);
}
}
});
console.log("Workflow execution started with ID:", executionId);Resume Workflow with Event
// Resume a waiting workflow with an event
await engine.resumeWorkflow(executionId, "external_approval_received", {
approved: true,
approver: "[email protected]"
});Fetch Workflow Logs
const log = await engine.fetchWorkflowLog(executionId);
console.log("Execution Log:", log);Supported Workflow Step Types
| Step Type | Description |
|------------|-------------|
| task | Executes a registered worker function. |
| decision | Evaluates a condition and executes a true/false branch. |
| parallel | Runs multiple steps in parallel. |
| loop | Repeats execution based on a condition. |
| wait | Pauses execution until an event is received. |
| execute_then_wait | Executes a task and then waits for an external event. |
| switch | Evaluates a field value and executes matching case. |
| commit | Marks a workflow step as a commit step. |
| switch_case | Represents a case within a switch step (used internally in switch logic). |
Public API Methods
The following methods are available on the WorkflowEngine class:
| Method | Description |
|--------|-------------|
| init(config) | Initializes the engine with Redis and/or MongoDB configuration. |
| registerWorkflow(id, workflow) | Registers a workflow definition with a unique ID. |
| registerWorker(name, worker) | Registers a worker function by name. |
| executeWorkflow(options) | Executes a registered workflow. Returns an execution ID. |
| resumeWorkflow(executionId, eventName, inputData, options?) | Resumes a waiting workflow when an event is received. |
Note: The
fetchWorkflowLogmethod is shown in usage examples, but is not implemented in the main class. For workflow logs, use your own MongoDB queries or extend the engine as needed.
WorkflowStep Interface (Advanced)
Each workflow step supports the following fields:
id(string): Unique identifier for the step.type(string): Step type (see Supported Workflow Step Types).name(string): Name of the step.worker(string, optional): Name of the worker function to execute.condition(string, optional): Condition for decision/loop steps.true_branch/false_branch(array, optional): Steps for decision branches.parallel_steps(array, optional): Steps to run in parallel.switch_cases(array, optional): Cases for switch steps.switch_case_steps(array, optional): Steps for a specific switch case.event_name(string, optional): Event to wait for (wait/execute_then_wait).rollback(object, optional): Step to execute for rollback.status(string, optional): Status of the step.createdAt/updatedAt(string, optional): Timestamps.workerTimeout(number, optional): Timeout for the worker in ms.field/value(any, optional): Used in switch/case logic.default_case(object, optional): Default step for switch.
Advanced Workflow Examples
Switch/Case Workflow
const switchWorkflow = {
name: "Order Status Processing",
steps: [
{
id: "1",
type: "switch",
name: "Process Based on Status",
switch_cases: [
{
id: "case1",
field: "status",
value: "COMMITTED",
switch_case_steps: [
{ id: "1.1", type: "task", name: "Handle Committed", worker: "handleCommitted" }
]
},
{
id: "case2",
field: "status",
value: "CANCELLED",
switch_case_steps: [
{ id: "1.2", type: "task", name: "Handle Cancelled", worker: "handleCancelled" }
]
}
],
default_case: {
id: "default",
type: "task",
name: "Handle Default",
worker: "handleDefault"
}
}
]
};Parallel Execution
const parallelWorkflow = {
name: "Parallel Processing",
steps: [
{
id: "1",
type: "parallel",
name: "Process Multiple Tasks",
parallel_steps: [
{ id: "1.1", type: "task", name: "Task A", worker: "workerA" },
{ id: "1.2", type: "task", name: "Task B", worker: "workerB" },
{ id: "1.3", type: "task", name: "Task C", worker: "workerC" }
]
}
]
};Event-Driven Workflow
const eventWorkflow = {
name: "Event-Driven Process",
steps: [
{ id: "1", type: "task", name: "Initial Task", worker: "initialWorker" },
{
id: "2",
type: "wait",
name: "Wait for Approval",
event_name: "approval_received"
},
{ id: "3", type: "task", name: "Final Task", worker: "finalWorker" }
]
};Configuration Options
Redis Configuration
const redisConfig = {
host: "localhost",
port: 6379,
// OR provide your own Redis client
redisClient: existingRedisClient
};MongoDB Configuration
const mongoConfig = {
uri: "mongodb://localhost:27017",
database: "workflowDB",
collection: "logs",
// OR provide your own MongoDB client
mongoClient: existingMongoClient
};Error Handling and Rollback
- If a task fails, the workflow engine triggers rollback steps if they are defined.
- Rollback steps are executed in reverse order (LIFO - Last In, First Out).
- Errors during rollback steps are logged but do not halt the rollback process.
- Each step can have its own rollback logic defined in the
rollbackproperty.
State Management
- Redis: Used for caching workflow execution state and registered workflows.
- MongoDB: Stores detailed workflow execution history and logs (optional).
- In-Memory: Falls back to in-memory storage if Redis is not configured.
TypeScript Support
The library is written in TypeScript and provides full type definitions:
import { WorkflowEngine, WorkflowStepType, WorkflowDefinition } from "@datarster/workflow-engine";Contributing
Contributions are welcome! Feel free to submit issues or pull requests on GitHub.
License
MIT
