npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

flowmesh

v3.9.13

Published

Type-safe workflow engine for TypeScript with declarative approach and NestJS integration

Readme

FlowMesh

npm version License: MIT Test Coverage TypeScript

A type-safe workflow engine for TypeScript with declarative approach and NestJS integration.

Features

  • Full type safety with TypeScript
  • Declarative workflow definitions using decorators
  • Multiple transition types (automatic, explicit, conditional, dynamic)
  • Lifecycle hooks for workflows and states
  • Automatic persistence with adapter pattern
  • Concurrency control (sequential, parallel, throttle modes)
  • Suspend/resume workflows
  • NestJS integration with dependency injection
  • Plugin system for extensibility
  • Flexible error handling with custom error handlers
  • Workflow graph generation for visualization and analysis

Installation

npm install flowmesh reflect-metadata

Peer dependencies:

  • reflect-metadata (required)
  • @nestjs/common, @nestjs/core (optional, for NestJS integration)

Quick Start

import { Workflow, State, WorkflowEngine, StateRegistry, WorkflowContext, StateActions } from 'flowmesh';

// Define states enum
enum OrderState {
  CREATED = 'CREATED',
  INVENTORY_CHECK = 'INVENTORY_CHECK',
  COMPLETED = 'COMPLETED',
}

// Define data and outputs interfaces
interface OrderData {
  orderId: string;
  items: string[];
}

interface OrderOutputs {
  [OrderState.CREATED]: { orderId: string };
  [OrderState.INVENTORY_CHECK]: { available: boolean };
}

// Define workflow
@Workflow({
  name: 'OrderWorkflow',
  states: OrderState,
  initialState: OrderState.CREATED,
})
export class OrderWorkflow {}

// Define state handlers
@State(OrderState.CREATED)
export class CreatedState {
  execute(
    ctx: WorkflowContext<OrderData, OrderOutputs>,
    actions: StateActions<OrderData, OrderOutputs, OrderState.CREATED>
  ) {
    actions.next({ output: { orderId: ctx.data.orderId } });
  }
}

@State(OrderState.INVENTORY_CHECK)
export class InventoryCheckState {
  execute(
    ctx: WorkflowContext<OrderData, OrderOutputs>,
    actions: StateActions<OrderData, OrderOutputs, OrderState.INVENTORY_CHECK>
  ) {
    const available = ctx.data.items.length > 0;
    actions.next({ output: { available } });
  }
}

@State(OrderState.COMPLETED)
export class CompletedState {
  execute(
    ctx: WorkflowContext<OrderData, OrderOutputs>,
    actions: StateActions<OrderData, OrderOutputs, OrderState.COMPLETED>
  ) {
    console.log('Order completed:', ctx.data.orderId);
    actions.complete({ output: {} });
  }
}

// Execute workflow
const engine = new WorkflowEngine();
StateRegistry.autoRegister([CreatedState, InventoryCheckState, CompletedState]);

const result = await engine.execute(OrderWorkflow, {
  data: { orderId: 'ORD-001', items: ['item1', 'item2'] }
});

console.log(result.status); // 'completed'
console.log(result.outputs[OrderState.CREATED]?.orderId); // 'ORD-001'

Table of Contents

Core Concepts

Workflow

A workflow is a class decorated with @Workflow that defines:

  • Workflow name
  • States enum
  • Initial state
  • Transitions configuration
  • Concurrency settings

State

A state is a class decorated with @State that implements business logic:

  • Receives typed context with data and outputs
  • Has access to actions (next, goto, suspend, updateData)
  • Can define lifecycle hooks

Context

WorkflowContext<TData, TOutputs> provides:

  • executionId: Unique workflow execution identifier
  • groupId: Group identifier for concurrency control
  • currentState: Current state value
  • data: Workflow input data
  • outputs: Type-safe map of previous state outputs
  • history: Array of state transitions
  • metadata: Custom metadata object

State Actions

StateActions<TData, TOutputs, TCurrentState> provides:

  • next(options): Move to next state with output
  • goto(state, options): Move to specific state
  • suspend(options): Suspend workflow execution
  • complete(options): Explicitly complete workflow with final state context

Workflow Definition

Basic Workflow

@Workflow({
  name: 'OrderWorkflow',
  states: OrderState,
  initialState: OrderState.CREATED,
})
export class OrderWorkflow {}

With Explicit Transitions

@Workflow({
  name: 'OrderWorkflow',
  states: OrderState,
  initialState: OrderState.CREATED,
  transitions: [
    { from: [OrderState.CREATED], to: OrderState.PAYMENT },
    { from: [OrderState.PAYMENT], to: OrderState.COMPLETED },
  ],
})
export class OrderWorkflow {}

With Conditional Transitions

@Workflow({
  name: 'OrderWorkflow',
  states: OrderState,
  initialState: OrderState.CREATED,
  conditionalTransitions: [
    {
      from: OrderState.INVENTORY_CHECK,
      conditions: [
        {
          condition: (ctx) => ctx.data.amount > 1000,
          to: OrderState.MANAGER_APPROVAL
        },
        {
          condition: (ctx) => ctx.data.inStock,
          to: OrderState.PAYMENT
        },
      ],
      default: OrderState.OUT_OF_STOCK,
    },
  ],
})
export class OrderWorkflow {}

With Concurrency Control

@Workflow({
  name: 'OrderWorkflow',
  states: OrderState,
  initialState: OrderState.CREATED,
  concurrency: {
    mode: ConcurrencyMode.SEQUENTIAL,
    groupBy: 'orderId', // or function: (data) => data.orderId
  },
})
export class OrderWorkflow {}

State Handlers

Basic State

@State(OrderState.CREATED)
export class CreatedState implements IState<OrderData, OrderOutputs, OrderState.CREATED> {
  execute(
    ctx: WorkflowContext<OrderData, OrderOutputs>,
    actions: StateActions<OrderData, OrderOutputs, OrderState.CREATED>
  ) {
    // Business logic
    actions.next({ output: { orderId: ctx.data.orderId } });
  }
}

State with Lifecycle Hooks

@State(OrderState.PAYMENT)
export class PaymentState implements IState {
  @OnStateStart()
  onStart(ctx: WorkflowContext, outputs: OrderOutputs) {
    console.log('Starting payment processing');
  }

  execute(ctx: WorkflowContext, actions: StateActions) {
    // Process payment
    actions.next({ output: { transactionId: 'tx-123' } });
  }

  @OnStateSuccess()
  onSuccess(ctx: WorkflowContext, output: any, outputs: OrderOutputs) {
    console.log('Payment processed successfully');
  }

  @OnStateFailure()
  onFailure(ctx: WorkflowContext, error: Error, outputs: OrderOutputs) {
    console.error('Payment failed:', error);
  }

  @OnStateFinish()
  onFinish(ctx: WorkflowContext, outputs: OrderOutputs) {
    console.log('Payment state finished');
  }
}

State with Dynamic Transitions

@State(OrderState.INVENTORY_CHECK)
export class InventoryCheckState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    const { available, quantity } = this.checkInventory(ctx.data.items);

    if (!available) {
      actions.goto(OrderState.OUT_OF_STOCK, {
        output: { available: false }
      });
    } else if (quantity < 10) {
      actions.goto(OrderState.LOW_STOCK_WARNING, {
        output: { quantity }
      });
    } else {
      actions.next({ output: { available: true, quantity } });
    }
  }
}

State with Suspend

@State(OrderState.AWAITING_APPROVAL)
export class AwaitingApprovalState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    if (ctx.data.approved) {
      actions.next({ output: { approvedAt: new Date() } });
    } else {
      actions.suspend({
        waitingFor: 'manager_approval',
        output: { pendingAt: new Date() }
      });
    }
  }
}

State with Complete

Use complete() to explicitly end workflow execution with full control over final state context:

@State(OrderState.PROCESSING)
export class ProcessingState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    const result = this.processOrder(ctx.data);

    // Conditionally complete workflow early
    if (result.skipRemaining) {
      actions.complete({
        data: { completedEarly: true },
        output: { result: 'skipped', reason: result.reason }
      });
      return;
    }

    // Otherwise continue to next state
    actions.next({ output: { result: 'processed' } });
  }
}

When to use complete():

  • Early workflow termination with success status
  • Conditional completion based on business logic
  • Explicit control over final workflow state
  • Bypassing remaining states when they're not needed

Note: complete() follows "last action wins" pattern - if another action is called after it, the last action takes effect.

State with Error Handling

@State(OrderState.VALIDATE)
export class ValidateState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    if (!ctx.data.orderId) {
      throw new Error('Order ID is required');
    }

    if (ctx.data.items.length === 0) {
      throw new Error('Order must contain at least one item');
    }

    actions.next({ output: { validated: true } });
  }
}

Shared State Classes

One state class can handle multiple states using array syntax. This is useful when you have the same logic that needs to execute at different points in the workflow.

enum ProcessingState {
  // First pass
  VALIDATE = 'VALIDATE',
  TRANSFORM = 'TRANSFORM',
  CHECK = 'CHECK',

  // Retry pass (after queue)
  VALIDATE_RETRY = 'VALIDATE_RETRY',
  TRANSFORM_RETRY = 'TRANSFORM_RETRY',
  CHECK_RETRY = 'CHECK_RETRY',

  QUEUE = 'QUEUE',
  COMPLETE = 'COMPLETE',
}

// One class handles both VALIDATE and VALIDATE_RETRY
@State([ProcessingState.VALIDATE, ProcessingState.VALIDATE_RETRY])
export class ValidateState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    // Use ctx.currentState to determine which pass we're in
    const attemptNumber = ctx.currentState === ProcessingState.VALIDATE ? 1 : 2;

    const isValid = this.validate(ctx.data);

    actions.next({
      output: {
        isValid,
        attemptNumber,
        validatedAt: new Date(),
      },
    });
  }

  private validate(data: any): boolean {
    // Shared validation logic
    return data.value > 0;
  }
}

@State([ProcessingState.TRANSFORM, ProcessingState.TRANSFORM_RETRY])
export class TransformState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    // Same transformation logic for both states
    const transformed = this.transform(ctx.data);
    actions.next({ output: { transformed } });
  }

  private transform(data: any): any {
    return { ...data, processed: true };
  }
}

@Workflow({
  name: 'ProcessingWorkflow',
  states: ProcessingState,
  initialState: ProcessingState.VALIDATE,
  transitions: [
    // First pass
    { from: [ProcessingState.VALIDATE], to: ProcessingState.TRANSFORM },
    { from: [ProcessingState.TRANSFORM], to: ProcessingState.CHECK },
    { from: [ProcessingState.CHECK], to: ProcessingState.QUEUE },

    // Retry pass - same classes, different states
    { from: [ProcessingState.QUEUE], to: ProcessingState.VALIDATE_RETRY },
    { from: [ProcessingState.VALIDATE_RETRY], to: ProcessingState.TRANSFORM_RETRY },
    { from: [ProcessingState.TRANSFORM_RETRY], to: ProcessingState.CHECK_RETRY },
    { from: [ProcessingState.CHECK_RETRY], to: ProcessingState.COMPLETE },
  ],
})
class ProcessingWorkflow {}

Benefits:

  • ✅ No code duplication - same logic reused for multiple states
  • ✅ Separate outputs - each state gets its own output entry
  • ✅ Clear workflow graph - states are explicit in transitions
  • context.currentState tells you which state is executing

Use cases:

  • Retry workflows that go through the same steps twice
  • Multi-pass validation with same logic
  • Shared processing logic at different workflow stages

State Behavior Decorators

@Timeout

Set execution timeout for a state. If the state execution exceeds the timeout, an error will be thrown.

@State(OrderState.PAYMENT)
@Timeout(30000) // 30 seconds
export class PaymentState implements IState {
  async execute(ctx: WorkflowContext, actions: StateActions) {
    // If payment processing takes more than 30 seconds, timeout error is thrown
    await this.processPayment(ctx.data);
    actions.next({ output: { paid: true } });
  }
}

@Retry

Configure automatic retry logic with exponential backoff for failed state executions.

@State(OrderState.API_CALL)
@Retry({
  maxAttempts: 3,
  strategy: 'exponential',
  initialDelay: 1000,    // 1 second
  maxDelay: 10000,       // 10 seconds
  multiplier: 2,         // Delay doubles each retry
})
export class ApiCallState implements IState {
  async execute(ctx: WorkflowContext, actions: StateActions) {
    // Will retry up to 3 times with exponential backoff if it fails
    const response = await this.externalApiCall(ctx.data);
    actions.next({ output: { response } });
  }
}

Retry strategies:

  • exponential: Delay = initialDelay * (multiplier ^ attemptNumber), capped at maxDelay
  • linear: Delay = initialDelay * attemptNumber, capped at maxDelay
  • fixed: Delay = initialDelay for all attempts

Example timing with exponential strategy:

Attempt 1: fails immediately
Attempt 2: waits 1000ms (1s)
Attempt 3: waits 2000ms (2s)
Final attempt: waits 4000ms (4s)

@UnlockAfter

Release concurrency lock after this state completes. Useful for long-running workflows that should allow other executions to start.

@State(OrderState.PAYMENT)
@UnlockAfter()
export class PaymentState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    // Process payment
    // After this state completes, the hard lock is released
    // allowing other workflow executions to start
    actions.next({ output: { paid: true } });
  }
}

@State(OrderState.SEND_EMAIL)
export class SendEmailState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    // This state runs without holding the hard lock
    // Other workflows can now execute in parallel
    this.emailService.send(ctx.data.email, 'Order Confirmation');
    actions.next({ output: { emailSent: true } });
  }
}

Use cases for @UnlockAfter:

  • Release lock after critical operations (payment, inventory reservation)
  • Allow parallel processing of non-critical operations (emails, notifications)
  • Improve throughput in sequential concurrency mode
  • Enable throttle mode with maxConcurrentAfterUnlock

Combining Decorators

You can combine multiple behavior decorators on the same state:

@State(OrderState.EXTERNAL_API)
@Timeout(60000)
@Retry({
  maxAttempts: 3,
  strategy: 'exponential',
  initialDelay: 2000,
  maxDelay: 10000,
  multiplier: 2,
})
@UnlockAfter()
export class ExternalApiState implements IState {
  async execute(ctx: WorkflowContext, actions: StateActions) {
    // Will timeout after 60 seconds
    // Will retry up to 3 times with exponential backoff
    // Will release lock after completion
    const result = await this.callExternalApi(ctx.data);
    actions.next({ output: { result } });
  }
}

Transitions

FlowMesh supports four types of transitions, evaluated in this order:

1. Dynamic Transitions (goto)

Highest priority. Programmatically determine next state in state handler.

@State(OrderState.RISK_CHECK)
export class RiskCheckState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    const riskScore = this.calculateRisk(ctx.data);

    if (riskScore > 80) {
      actions.goto(OrderState.MANUAL_REVIEW);
    } else if (riskScore > 50) {
      actions.goto(OrderState.ADDITIONAL_VERIFICATION);
    } else {
      actions.goto(OrderState.APPROVED);
    }
  }
}

2. Conditional Transitions

Evaluated based on conditions at runtime.

@Workflow({
  conditionalTransitions: [
    {
      from: OrderState.REVIEW,
      conditions: [
        { condition: (ctx) => ctx.data.amount > 10000, to: OrderState.DIRECTOR_APPROVAL },
        { condition: (ctx) => ctx.data.amount > 1000, to: OrderState.MANAGER_APPROVAL },
      ],
      default: OrderState.AUTO_APPROVED,
    },
  ],
})

Conditions are evaluated in order. First matching condition determines next state.

Virtual Outputs for Skipped States

When using conditional transitions that skip states, you can define virtual outputs for those skipped states. This is useful when you want to standardize output data regardless of which path was taken through the workflow.

@Workflow({
  conditionalTransitions: [
    {
      from: OrderState.START,
      conditions: [
        {
          condition: (ctx) => ctx.data.amount > 1000,
          to: OrderState.COMPLETED,
          // Virtual outputs for skipped states
          virtualOutputs: {
            [OrderState.HIGH_VALUE]: { priority: 'high', approved: true },
            [OrderState.MEDIUM_VALUE]: { priority: 'medium', approved: false },
            [OrderState.LOW_VALUE]: { priority: 'low', approved: false },
          },
        },
        {
          condition: (ctx) => ctx.data.amount > 100,
          to: OrderState.COMPLETED,
          virtualOutputs: {
            [OrderState.HIGH_VALUE]: { priority: 'high', approved: false },
            [OrderState.MEDIUM_VALUE]: { priority: 'medium', approved: true },
            [OrderState.LOW_VALUE]: { priority: 'low', approved: false },
          },
        },
      ],
      default: OrderState.COMPLETED,
      defaultVirtualOutputs: {
        [OrderState.HIGH_VALUE]: { priority: 'high', approved: false },
        [OrderState.MEDIUM_VALUE]: { priority: 'medium', approved: false },
        [OrderState.LOW_VALUE]: { priority: 'low', approved: true },
      },
    },
  ],
})

Benefits:

  • Access consistent output structure in final states without checking which path was taken
  • Avoid conditional logic in states like "did we go through HIGH_VALUE or MEDIUM_VALUE?"
  • Simplify state logic by having predictable output data

Dynamic Virtual Outputs:

Virtual outputs can also be functions that receive the workflow context:

conditionalTransitions: [
  {
    from: OrderState.START,
    conditions: [
      {
        condition: (ctx) => ctx.data.amount > 1000,
        to: OrderState.COMPLETED,
        virtualOutputs: {
          [OrderState.PROCESSING]: (ctx) => ({
            processedAt: new Date(),
            amount: ctx.data.amount,
            status: 'high-value',
          }),
        },
      },
    ],
  },
]

Example Usage in State:

@State(OrderState.COMPLETED)
export class CompletedState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    // Access virtual outputs without checking the path
    const highValue = ctx.outputs[OrderState.HIGH_VALUE];
    const mediumValue = ctx.outputs[OrderState.MEDIUM_VALUE];
    const lowValue = ctx.outputs[OrderState.LOW_VALUE];

    // Find which priority was set
    const priority = highValue?.priority || mediumValue?.priority || lowValue?.priority;

    actions.next({
      output: {
        completedAt: new Date(),
        priority
      }
    });
  }
}

3. Explicit Transitions

Defined in workflow configuration.

@Workflow({
  transitions: [
    { from: [OrderState.CREATED, OrderState.UPDATED], to: OrderState.VALIDATION },
    { from: [OrderState.VALIDATION], to: OrderState.PROCESSING },
    { from: [OrderState.PROCESSING], to: OrderState.COMPLETED },
  ],
})

4. Automatic Transitions

Lowest priority. Follows enum value order.

enum OrderState {
  CREATED = 'CREATED',        // → VALIDATION
  VALIDATION = 'VALIDATION',  // → PROCESSING
  PROCESSING = 'PROCESSING',  // → COMPLETED
  COMPLETED = 'COMPLETED',
}

Transition Evaluation Priority

FlowMesh evaluates transitions in the following order:

  1. Dynamic (goto) - Highest priority, determined in state execute() method
  2. Conditional Transitions - Evaluated with conditions and default fallback
  3. Explicit Transitions - Configured in workflow decorator (with or without inline conditions)
  4. Automatic - Lowest priority, follows enum order if nothing else matches
Priority Order:
Dynamic (goto) > Conditional > Explicit > Automatic

Transition Comparison

| Method | Flexibility | Complexity | Use Case | |--------|------------|-----------|----------| | Automatic | ⭐ | ⭐ | Simple linear workflows | | Explicit | ⭐⭐ | ⭐⭐ | Clearly defined flow | | Conditional | ⭐⭐⭐⭐ | ⭐⭐⭐ | Complex logic with fallback | | Dynamic (goto) | ⭐⭐⭐⭐⭐ | ⭐⭐ | Highly dynamic runtime logic | | Explicit + Condition | ⭐⭐⭐ | ⭐⭐ | Simple conditional paths | | Multiple From States | ⭐⭐⭐ | ⭐⭐ | Converging to single point | | Combined | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | Maximum flexibility |

Multiple From States

Multiple states can transition to the same target state:

@Workflow({
  transitions: [
    {
      from: [OrderState.CREATED, OrderState.OUT_OF_STOCK, OrderState.RETRY],
      to: OrderState.INVENTORY_CHECK,
    },
    {
      from: [OrderState.PAYMENT, OrderState.SHIPPING],
      to: OrderState.COMPLETED,
    },
  ],
})

Result:

  • CREATED → INVENTORY_CHECK
  • OUT_OF_STOCK → INVENTORY_CHECK (retry check)
  • RETRY → INVENTORY_CHECK (retry check)
  • PAYMENT → COMPLETED (skip shipping)
  • SHIPPING → COMPLETED (normal path)

Combining Transition Types

You can combine different transition types for maximum flexibility:

@Workflow({
  transitions: [
    { from: [OrderState.CREATED], to: OrderState.INVENTORY_CHECK },
    { from: [OrderState.OUT_OF_STOCK], to: OrderState.CANCELLED },
  ],
  conditionalTransitions: [
    {
      from: OrderState.INVENTORY_CHECK,
      conditions: [
        { condition: (ctx) => ctx.data.available, to: OrderState.PAYMENT }
      ],
      default: OrderState.OUT_OF_STOCK,
    },
  ],
})

Transitions Best Practices

DO:

  • Use automatic transitions for simple linear flows
  • Combine different approaches for complex workflows
  • Add default in conditionalTransitions for edge cases
  • Use goto() for highly dynamic runtime logic
  • Document complex transition logic

DON'T:

  • Don't create overly complex conditions inline - extract to methods
  • Don't forget default in conditionalTransitions
  • Don't confuse priority order: conditional transitions are checked BEFORE explicit transitions

Lifecycle Hooks

Workflow Lifecycle

@Workflow({
  name: 'OrderWorkflow',
  states: OrderState,
  initialState: OrderState.CREATED,
})
export class OrderWorkflow {
  @OnWorkflowStart()
  onStart(ctx: WorkflowContext) {
    console.log('Workflow started');
  }

  @OnWorkflowComplete()
  onComplete(ctx: WorkflowContext) {
    console.log('Workflow completed');
  }

  @OnWorkflowError()
  onError(ctx: WorkflowContext, error: Error) {
    console.error('Workflow error:', error);
  }

  @BeforeState()
  beforeState(ctx: WorkflowContext, stateName: string) {
    console.log('Before state:', stateName);
  }

  @AfterState()
  afterState(ctx: WorkflowContext, stateName: string) {
    console.log('After state:', stateName);
  }
}

State Lifecycle

@State(OrderState.PAYMENT)
export class PaymentState implements IState {
  @OnStateStart()
  onStart(ctx: WorkflowContext, outputs: OrderOutputs) {
    // Called before execute()
  }

  execute(ctx: WorkflowContext, actions: StateActions) {
    // Main state logic
  }

  @OnStateSuccess()
  onSuccess(ctx: WorkflowContext, output: any, outputs: OrderOutputs) {
    // Called after successful execute()
  }

  @OnStateFailure()
  onFailure(ctx: WorkflowContext, error: Error, outputs: OrderOutputs) {
    // Called if execute() throws error
  }

  @OnStateFinish()
  onFinish(ctx: WorkflowContext, outputs: OrderOutputs) {
    // Always called after execute() completes (success or failure)
  }
}

Complete Lifecycle Execution Order:

@OnWorkflowStart
  → @BeforeState
    → @OnStateStart
      → execute()
    → @OnStateSuccess (or @OnStateFailure if error)
    → @OnStateFinish
  → @AfterState
  → (repeat for next state)
@OnWorkflowComplete (or @OnWorkflowError if workflow fails)

Success path:

OnStateStart → execute → OnStateSuccess → OnStateFinish

Failure path:

OnStateStart → execute (throws) → OnStateFailure → OnStateFinish

Important: Hook errors are swallowed and logged as warnings. They do not stop workflow execution.

Error Transformation in @OnStateFailure

The @OnStateFailure hook can transform errors by returning or throwing a different error. This allows you to:

  • Convert technical errors to user-friendly messages
  • Enrich errors with context from the workflow
  • Wrap errors with additional debugging information
  • Transform errors before they reach the workflow error handler

Basic Error Override (Return):

@State('PROCESSING')
class ProcessingState implements IState {
  execute(ctx, actions) {
    throw new Error('Database timeout');
  }

  @OnStateFailure()
  onFailure(ctx: WorkflowContext, error: Error): Error {
    // Transform technical error to user-friendly error
    return new Error('Service temporarily unavailable. Please try again.');
  }
}

Error Override (Throw):

@State('VALIDATION')
class ValidationState implements IState {
  @OnStateFailure()
  onFailure(ctx: WorkflowContext, error: Error): void {
    // Throwing a new error also overrides the original
    throw new ValidationError(`Validation failed: ${error.message}`);
  }
}

Conditional Transformation:

class RetryableError extends Error {
  constructor(message: string, public readonly originalError: Error) {
    super(message);
  }
}

@State('API_CALL')
class ApiCallState implements IState {
  @OnStateFailure()
  onFailure(ctx: WorkflowContext, error: Error): Error | void {
    // Only transform specific error types
    if (error.message.includes('timeout') || error.message.includes('503')) {
      return new RetryableError('Temporary service error', error);
    }
    // Return void or nothing to keep original error
  }
}

Enriching Errors with Context:

class ContextualError extends Error {
  constructor(
    message: string,
    public readonly context: { userId: string; orderId: string },
    public readonly cause: Error
  ) {
    super(message);
  }
}

@State('ORDER_PROCESSING')
class OrderProcessingState implements IState {
  @OnStateFailure()
  onFailure(ctx: WorkflowContext, error: Error): Error {
    // Add workflow context to error for better debugging
    return new ContextualError(
      `Order processing failed: ${error.message}`,
      {
        userId: ctx.data.userId,
        orderId: ctx.data.orderId,
      },
      error
    );
  }
}

Async Error Transformation:

@State('EXTERNAL_API')
class ExternalApiState implements IState {
  @OnStateFailure()
  async onFailure(ctx: WorkflowContext, error: Error): Promise<Error> {
    // Can perform async operations (e.g., lookup error codes)
    const errorDetails = await this.lookupErrorCode(error);
    return new Error(`External API error: ${errorDetails.userMessage}`);
  }

  private async lookupErrorCode(error: Error) {
    // Fetch additional error information
    return { userMessage: 'Service unavailable' };
  }
}

Working with Previous State Outputs:

@State('PAYMENT')
class PaymentState implements IState {
  @OnStateFailure()
  onFailure(ctx: WorkflowContext, error: Error): Error {
    // Access outputs from previous states
    const orderOutput = ctx.outputs['ORDER_VALIDATION'];
    return new Error(
      `Payment failed for order ${orderOutput.orderId}: ${error.message}`
    );
  }
}

Note: If @OnStateFailure returns void, null, or undefined, the original error is preserved. The transformed error is passed to the workflow-level error handler for further processing.

Error Handling

FlowMesh provides a flexible error handling system that allows you to control how errors are processed at different phases of workflow execution. This is essential for building resilient workflows in distributed systems.

Overview

Error handlers give you fine-grained control over error behavior:

  • Gracefully handle distributed lock failures
  • Continue execution despite non-critical errors
  • Exit workflows without marking them as failed
  • Custom error logging and monitoring
  • Conditional error recovery based on error type

Error Handler Configuration

Configure an error handler in your workflow decorator:

import { ErrorHandler, ErrorContext, ErrorHandlingDecision } from 'flowmesh';

class CustomErrorHandler implements ErrorHandler {
  handle(context: ErrorContext): ErrorHandlingDecision {
    const { error, phase, workflowContext } = context;

    // Log error
    console.error(`Error in ${phase}:`, error.message);

    // Decide how to handle based on phase and error type
    if (phase === 'lock_acquisition') {
      // Another workflow is processing this group, exit gracefully
      return ErrorHandlingDecision.EXIT;
    }

    if (error.message.includes('Temporary')) {
      // Continue despite temporary errors
      return ErrorHandlingDecision.CONTINUE;
    }

    // Default: mark as failed and persist
    return ErrorHandlingDecision.FAIL;
  }
}

@Workflow({
  name: 'OrderWorkflow',
  states: OrderState,
  initialState: OrderState.CREATED,
  errorHandler: new CustomErrorHandler(),
})
export class OrderWorkflow {}

Error Phases

Errors can occur at different phases of workflow execution:

1. lock_acquisition

Triggered when workflow cannot acquire a distributed lock (Sequential mode).

Use case: Handle concurrent execution attempts in distributed systems.

handle(context: ErrorContext): ErrorHandlingDecision {
  if (context.phase === 'lock_acquisition') {
    // Another node is processing this workflow, exit silently
    this.logger.info('Lock held by another instance, exiting');
    return ErrorHandlingDecision.EXIT;
  }
}

2. workflow_start

Triggered when @OnWorkflowStart() hook throws an error.

Use case: Handle initialization failures.

handle(context: ErrorContext): ErrorHandlingDecision {
  if (context.phase === 'workflow_start') {
    if (context.error.message.includes('Config not loaded')) {
      // Non-critical initialization error, continue anyway
      return ErrorHandlingDecision.CONTINUE;
    }
  }
}

3. before_state

Triggered when @BeforeState() hook throws an error.

Use case: Skip beforeState hook but continue with state execution.

handle(context: ErrorContext): ErrorHandlingDecision {
  if (context.phase === 'before_state') {
    // Logging failed but state execution can continue
    this.logger.warn('beforeState logging failed, continuing');
    return ErrorHandlingDecision.CONTINUE;
  }
}

4. state_execute

Triggered when state's execute() method throws an error.

Use case: Handle business logic failures.

Note: CONTINUE is not supported for state_execute (treated as EXIT with warning).

handle(context: ErrorContext): ErrorHandlingDecision {
  if (context.phase === 'state_execute') {
    // State execution failed, decide whether to persist failure
    if (context.error.message.includes('Validation')) {
      // Validation errors should not be persisted
      return ErrorHandlingDecision.FAIL_NO_PERSIST;
    }
    return ErrorHandlingDecision.FAIL;
  }
}

5. after_state

Triggered when @AfterState() hook throws an error.

Use case: Skip afterState hook but continue with transition to next state.

handle(context: ErrorContext): ErrorHandlingDecision {
  if (context.phase === 'after_state') {
    // Analytics tracking failed, but workflow should continue
    return ErrorHandlingDecision.CONTINUE;
  }
}

6. workflow_complete

Triggered when @OnWorkflowComplete() hook throws an error.

Use case: Handle cleanup failures without failing the workflow.

handle(context: ErrorContext): ErrorHandlingDecision {
  if (context.phase === 'workflow_complete') {
    // Logging or cleanup failed, but workflow is already done
    return ErrorHandlingDecision.CONTINUE;
  }
}

Error Handling Decisions

The error handler can return one of these decisions:

CONTINUE

Skip the failed hook and continue workflow execution. Only supported for lifecycle hooks (workflow_start, before_state, after_state).

Behavior:

  • workflow_start: Skip onStart hook, begin state execution
  • before_state: Skip beforeState hook, execute current state
  • after_state: Skip afterState hook, transition to next state
  • state_execute: Not supported (treated as EXIT with warning)
  • lock_acquisition: Return execution without workflow execution
return ErrorHandlingDecision.CONTINUE;

Use cases:

  • Non-critical logging/monitoring failures
  • Optional analytics tracking errors
  • Non-essential notifications

EXIT

Stop workflow execution gracefully without marking as failed. Execution status remains RUNNING.

Behavior:

  • No onError hooks are called
  • Execution is not persisted as failed
  • Workflow simply stops executing
  • Execution object is returned to caller
return ErrorHandlingDecision.EXIT;

Use cases:

  • Distributed lock conflicts (another instance processing)
  • Graceful shutdown scenarios
  • Business rule violations that aren't errors
  • Rate limiting or throttling

Example:

class DistributedErrorHandler implements ErrorHandler {
  handle(context: ErrorContext): ErrorHandlingDecision {
    if (context.phase === 'lock_acquisition') {
      // Another workflow instance is handling this, exit gracefully
      this.metricsService.increment('workflow.lock_conflict');
      return ErrorHandlingDecision.EXIT;
    }

    if (context.error.message.includes('RATE_LIMIT_EXCEEDED')) {
      // Rate limited, exit without failing
      return ErrorHandlingDecision.EXIT;
    }

    return ErrorHandlingDecision.FAIL;
  }
}

FAIL

Mark workflow as failed, persist failed status, call onError hooks, and throw error.

Behavior:

  • Execution status set to FAILED
  • Failed execution persisted to database
  • @OnWorkflowError() hook is called
  • Plugin onError() hooks are called
  • Error is re-thrown to caller
return ErrorHandlingDecision.FAIL;

Use cases:

  • Critical business logic failures
  • Data integrity violations
  • Payment processing errors
  • Unrecoverable errors that need investigation

Example:

class BusinessErrorHandler implements ErrorHandler {
  handle(context: ErrorContext): ErrorHandlingDecision {
    const error = context.error;

    // Always fail on critical business errors
    if (error.message.includes('Payment')) {
      this.alertService.sendAlert('Payment failure', {
        executionId: context.workflowContext.executionId,
        error: error.message
      });
      return ErrorHandlingDecision.FAIL;
    }

    // Fail on data integrity issues
    if (error.message.includes('Constraint')) {
      return ErrorHandlingDecision.FAIL;
    }

    return ErrorHandlingDecision.CONTINUE;
  }
}

FAIL_NO_PERSIST

Call onError hooks and throw error WITHOUT persisting failed status to database.

Behavior:

  • Execution status remains unchanged (not set to FAILED)
  • No persistence update for failure
  • @OnWorkflowError() hook is called
  • Plugin onError() hooks are called
  • Error is re-thrown to caller
return ErrorHandlingDecision.FAIL_NO_PERSIST;

Use cases:

  • Validation errors that should be reported but not stored
  • Test/development environments
  • Temporary errors you don't want in metrics
  • When you want to fail fast without database writes

TRANSITION_TO

Transition to a different state to handle the error. Only supported for state_execute phase.

Behavior:

  • Transition to specified target state
  • Current failed state's transition marked as error_recovery
  • Optional output can be set for the failed state
  • Workflow continues from the target state
  • Transition is validated before execution
return {
  decision: ErrorHandlingDecision.TRANSITION_TO,
  targetState: 'ERROR_RECOVERY',
  output: { reason: 'Payment failed, rolling back' }
};

Use cases:

  • Rollback operations after critical errors
  • Error recovery flows
  • Compensation transactions
  • Fallback to alternative processing paths

Example: Conversion Rollback

class ConversionError extends Error {
  constructor(message: string) {
    super(message);
    this.name = 'ConversionError';
  }
}

class WithdrawalErrorHandler implements ErrorHandler {
  handle(context: ErrorContext): ErrorHandlingResult {
    const { error, phase, workflowContext } = context;

    // Handle conversion errors by transitioning to rollback state
    if (error instanceof ConversionError && phase === 'state_execute') {
      return {
        decision: ErrorHandlingDecision.TRANSITION_TO,
        targetState: 'ROLLING_BACK_CONVERSION',
        output: {
          reason: error.message,
          fromState: String(workflowContext.currentState),
          timestamp: new Date(),
        }
      };
    }

    return ErrorHandlingDecision.FAIL;
  }
}

@Workflow({
  name: 'WithdrawalWorkflow',
  states: WithdrawalState,
  initialState: WithdrawalState.CREATED,
  errorHandler: new WithdrawalErrorHandler(),
})
export class WithdrawalWorkflow {}

// Any state can throw ConversionError to trigger rollback
@State(WithdrawalState.VALIDATING)
export class ValidatingState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    if (ctx.data.shouldRollbackConversion) {
      throw new ConversionError('Validation failed, rollback needed');
    }

    actions.next({ output: { validated: true } });
  }
}

@State(WithdrawalState.ROLLING_BACK_CONVERSION)
export class RollingBackConversionState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    // Access error context from outputs
    const errorInfo = ctx.outputs[ctx.currentState];
    console.log('Rolling back:', errorInfo.reason);

    // Perform rollback logic

    actions.next({
      output: {
        rolledBack: true,
        originalError: errorInfo.reason
      }
    });
  }
}

History Tracking:

When TRANSITION_TO is used, the workflow history tracks both the error and recovery:

const result = await engine.execute(WithdrawalWorkflow, {
  data: { shouldRollbackConversion: true }
});

// History shows error recovery
console.log(result.history);
// [
//   { from: 'CREATED', to: 'VALIDATING', status: 'success' },
//   { from: 'VALIDATING', to: 'VALIDATING', status: 'error_recovery' },
//   { from: 'VALIDATING', to: 'ROLLING_BACK_CONVERSION', status: 'success' },
//   { from: 'ROLLING_BACK_CONVERSION', to: 'COMPLETED', status: 'success' }
// ]

Important Notes:

  • TRANSITION_TO only works in state_execute phase
  • Target state must be a valid transition (validated with canTransition())
  • Invalid transitions are treated as EXIT with a warning
  • Output is set on the failed state, not the target state

Context Transform Callback:

For advanced scenarios where you need to modify workflow context during error recovery, use onContextTransform:

return {
  decision: ErrorHandlingDecision.TRANSITION_TO,
  targetState: 'ROLLBACK',
  output: {
    failureReason: error.message  // Set on failed state
  },
  onContextTransform: (ctx) => {
    // Modify target state output
    ctx.outputs['ROLLBACK'] = {
      rollbackType: 'SIMPLE',
      originalState: ctx.currentState
    };

    // Modify workflow data
    ctx.data.rollbackInitiated = true;

    // Set outputs for multiple states
    ctx.outputs['SOME_OTHER_STATE'] = { ... };
  }
};

Benefits:

  • Set output directly on target state (not just failed state)
  • Modify workflow data during error recovery
  • Set outputs for multiple states in one go
  • Full type safety with WorkflowContext<TData, TOutputs>

Example: Rollback with Context Transform

class RollbackErrorHandler implements ErrorHandler<OrderData, OrderOutputs> {
  handle(context: ErrorContext<OrderData, OrderOutputs>) {
    if (context.error instanceof PaymentFailedError) {
      return {
        decision: ErrorHandlingDecision.TRANSITION_TO,
        targetState: OrderState.ROLLBACK,
        onContextTransform: (ctx) => {
          // Prepare rollback data for target state
          ctx.outputs[OrderState.ROLLBACK] = {
            rollbackType: 'PAYMENT_FAILURE',
            originalAmount: ctx.data.amount,
            failedAt: new Date(),
          };

          // Mark in data for downstream states
          ctx.data.rollbackReason = 'payment_failed';
        }
      };
    }
    return ErrorHandlingDecision.FAIL;
  }
}

STOP_RETRY

Stop retry attempts immediately and proceed with the current error handling. Only applicable when used with @Retry decorator.

Behavior:

  • Stops any remaining retry attempts
  • Error is handled according to default behavior (FAIL)
  • Useful when you want to fail fast on specific errors
return ErrorHandlingDecision.STOP_RETRY;

Use cases:

  • Validation errors that won't succeed on retry
  • Authentication failures
  • Business rule violations
  • Rate limiting or quota exceeded errors

Error Context

The error handler receives an ErrorContext with full information:

interface ErrorContext {
  error: Error;               // The original error
  phase: ErrorPhase;          // Which phase threw the error
  workflowContext: WorkflowContext;  // Full workflow context
}

Access workflow data to make informed decisions:

class SmartErrorHandler implements ErrorHandler {
  handle(context: ErrorContext): ErrorHandlingDecision {
    const { error, phase, workflowContext } = context;

    // Access workflow data
    const isTestOrder = workflowContext.data.orderId?.startsWith('TEST-');

    if (isTestOrder) {
      // Don't persist failures for test orders
      return ErrorHandlingDecision.FAIL_NO_PERSIST;
    }

    // Access current state
    if (workflowContext.currentState === 'PAYMENT') {
      // Payment failures always get persisted
      return ErrorHandlingDecision.FAIL;
    }

    // Access outputs from previous states
    const paymentCompleted = workflowContext.outputs['PAYMENT']?.completed;
    if (paymentCompleted && phase === 'after_state') {
      // Payment succeeded, continue despite afterState error
      return ErrorHandlingDecision.CONTINUE;
    }

    return ErrorHandlingDecision.FAIL;
  }
}

Practical Examples

Example 1: Distributed Lock Handling

Handle Prisma error P2002 when another workflow instance acquires the lock:

class DistributedLockHandler implements ErrorHandler {
  constructor(private readonly logger: LoggerService) {}

  handle(context: ErrorContext): ErrorHandlingDecision {
    const { error, phase } = context;

    if (phase === 'lock_acquisition') {
      this.logger.info('Lock held by another instance', {
        executionId: context.workflowContext.executionId,
        groupId: context.workflowContext.groupId,
      });

      // Exit gracefully, another instance is processing
      return ErrorHandlingDecision.EXIT;
    }

    // Check for database lock errors (Prisma P2002, etc.)
    if (error.message.includes('P2002') || error.message.includes('unique constraint')) {
      this.logger.warn('Database lock conflict, exiting');
      return ErrorHandlingDecision.EXIT;
    }

    return ErrorHandlingDecision.FAIL;
  }
}

@Workflow({
  name: 'OrderWorkflow',
  states: OrderState,
  initialState: OrderState.CREATED,
  concurrency: {
    mode: ConcurrencyMode.SEQUENTIAL,
    groupBy: 'orderId',
  },
  errorHandler: new DistributedLockHandler(loggerService),
})
export class OrderWorkflow {}

Example 2: Retry on Specific Errors

Continue execution on temporary/retryable errors:

class RetryableErrorHandler implements ErrorHandler {
  private readonly retryableErrors = [
    'ETIMEDOUT',
    'ECONNRESET',
    'ECONNREFUSED',
    'NetworkError',
    'TemporaryFailure'
  ];

  handle(context: ErrorContext): ErrorHandlingDecision {
    const { error, phase } = context;

    // Only apply retry logic to state execution
    if (phase === 'state_execute') {
      // Check if error is retryable
      const isRetryable = this.retryableErrors.some(
        msg => error.message.includes(msg)
      );

      if (isRetryable) {
        // Use @Retry decorator on state for automatic retries
        // Error handler can control retry behavior on each attempt
        return ErrorHandlingDecision.FAIL;  // Allows @Retry to work
      }

      return ErrorHandlingDecision.FAIL;
    }

    // For lifecycle hooks, can use CONTINUE
    if (phase === 'before_state' || phase === 'after_state') {
      return ErrorHandlingDecision.CONTINUE;
    }

    return ErrorHandlingDecision.FAIL;
  }
}

Example 3: Conditional Error Routing

Route errors to different handling based on error type:

class ConditionalErrorHandler implements ErrorHandler {
  constructor(
    private readonly logger: LoggerService,
    private readonly alertService: AlertService,
    private readonly metricsService: MetricsService
  ) {}

  handle(context: ErrorContext): ErrorHandlingDecision {
    const { error, phase, workflowContext } = context;

    // Log all errors
    this.logger.error(`Workflow error in ${phase}`, {
      error: error.message,
      executionId: workflowContext.executionId,
      state: workflowContext.currentState,
    });

    // Increment error metrics
    this.metricsService.increment(`workflow.error.${phase}`);

    // Validation errors - fail without persisting
    if (error.name === 'ValidationError') {
      return ErrorHandlingDecision.FAIL_NO_PERSIST;
    }

    // Critical errors - alert and fail
    if (error.message.includes('CRITICAL') || error.message.includes('FATAL')) {
      this.alertService.sendCriticalAlert({
        workflow: workflowContext.data.workflowName,
        executionId: workflowContext.executionId,
        error: error.message,
      });
      return ErrorHandlingDecision.FAIL;
    }

    // Lock conflicts - exit gracefully
    if (phase === 'lock_acquisition') {
      return ErrorHandlingDecision.EXIT;
    }

    // Non-critical hook errors - continue
    if (phase === 'before_state' || phase === 'after_state') {
      if (error.message.includes('Logging') || error.message.includes('Analytics')) {
        return ErrorHandlingDecision.CONTINUE;
      }
    }

    // Default: fail and persist
    return ErrorHandlingDecision.FAIL;
  }
}

Example 4: Environment-Specific Handling

Different behavior for development vs production:

class EnvironmentAwareErrorHandler implements ErrorHandler {
  constructor(
    private readonly env: 'development' | 'production',
    private readonly logger: LoggerService
  ) {}

  handle(context: ErrorContext): ErrorHandlingDecision {
    const { error, phase, workflowContext } = context;

    if (this.env === 'development') {
      // In development, log everything and fail without persisting
      console.error(`[DEV] Error in ${phase}:`, error);
      console.error('Context:', workflowContext);
      return ErrorHandlingDecision.FAIL_NO_PERSIST;
    }

    // Production behavior
    if (phase === 'lock_acquisition') {
      // Silently exit on lock conflicts
      return ErrorHandlingDecision.EXIT;
    }

    if (phase === 'state_execute') {
      // Log state execution errors with full context
      this.logger.error('State execution failed', {
        executionId: workflowContext.executionId,
        state: workflowContext.currentState,
        error: error.message,
        stack: error.stack,
      });
      return ErrorHandlingDecision.FAIL;
    }

    // Continue on hook errors in production
    if (phase === 'before_state' || phase === 'after_state') {
      this.logger.warn(`${phase} hook failed, continuing`, {
        error: error.message,
      });
      return ErrorHandlingDecision.CONTINUE;
    }

    return ErrorHandlingDecision.FAIL;
  }
}

Integration with Monitoring

Integrate error handler with monitoring systems:

class MonitoredErrorHandler implements ErrorHandler {
  constructor(
    private readonly sentry: SentryService,
    private readonly datadog: DatadogService,
    private readonly pagerduty: PagerDutyService
  ) {}

  handle(context: ErrorContext): ErrorHandlingDecision {
    const { error, phase, workflowContext } = context;

    // Send to Sentry
    this.sentry.captureException(error, {
      tags: {
        workflow: workflowContext.data.workflowName,
        phase,
        executionId: workflowContext.executionId,
      },
      extra: {
        state: workflowContext.currentState,
        data: workflowContext.data,
      },
    });

    // Send metrics to Datadog
    this.datadog.increment('workflow.error', {
      workflow: workflowContext.data.workflowName,
      phase,
      state: workflowContext.currentState,
    });

    // Page on critical errors
    if (error.message.includes('Payment') || error.message.includes('CRITICAL')) {
      this.pagerduty.trigger({
        severity: 'critical',
        summary: `Workflow ${workflowContext.data.workflowName} failed`,
        details: {
          error: error.message,
          executionId: workflowContext.executionId,
          phase,
        },
      });
      return ErrorHandlingDecision.FAIL;
    }

    // Exit gracefully on lock conflicts
    if (phase === 'lock_acquisition') {
      return ErrorHandlingDecision.EXIT;
    }

    return ErrorHandlingDecision.FAIL;
  }
}

Default Behavior

Without an error handler, FlowMesh uses default FAIL behavior:

  • All errors result in FAIL decision
  • Execution marked as failed and persisted
  • onError hooks are called
  • Error is thrown to caller
// No error handler configured
@Workflow({
  name: 'OrderWorkflow',
  states: OrderState,
  initialState: OrderState.CREATED,
})
export class OrderWorkflow {}

// Equivalent to:
@Workflow({
  name: 'OrderWorkflow',
  states: OrderState,
  initialState: OrderState.CREATED,
  errorHandler: {
    handle: () => ErrorHandlingDecision.FAIL
  },
})
export class OrderWorkflow {}

Best Practices

DO:

  • Use EXIT for distributed lock conflicts in sequential mode
  • Use CONTINUE for non-critical monitoring/logging failures
  • Use FAIL for business-critical errors that need investigation
  • Use FAIL_NO_PERSIST for validation errors in development
  • Log all errors regardless of decision
  • Integrate with monitoring/alerting systems
  • Test error handling with different error scenarios

DON'T:

  • Don't use CONTINUE for state_execute errors (not supported)
  • Don't exit silently without logging in production
  • Don't persist validation errors in test environments
  • Don't ignore critical errors like payment failures
  • Don't make error handler itself throw errors (fallback to FAIL)

Error Handler Safety:

  • If error handler throws, FlowMesh falls back to FAIL decision
  • Error handlers should be defensive and never throw
  • Always have a default case that returns a decision
class SafeErrorHandler implements ErrorHandler {
  handle(context: ErrorContext): ErrorHandlingDecision {
    try {
      // Your error handling logic
      return this.handleError(context);
    } catch (handlerError) {
      // Handler itself failed, log and use default
      console.error('Error handler failed:', handlerError);
      return ErrorHandlingDecision.FAIL;
    }
  }

  private handleError(context: ErrorContext): ErrorHandlingDecision {
    // Implementation
    return ErrorHandlingDecision.FAIL;
  }
}

Concurrency Control

FlowMesh provides three concurrency modes for controlling parallel workflow executions.

Sequential Mode

Only one workflow execution per group at a time. Subsequent executions wait for lock.

@Workflow({
  concurrency: {
    mode: ConcurrencyMode.SEQUENTIAL,
    groupBy: 'orderId',
  },
})
export class OrderWorkflow {}

Use cases:

  • Order processing (prevent duplicate orders)
  • Payment transactions
  • Critical state updates

Parallel Mode

No concurrency restrictions. All executions run simultaneously.

@Workflow({
  concurrency: {
    mode: ConcurrencyMode.PARALLEL,
  },
})
export class AnalyticsWorkflow {}

Use cases:

  • Independent analytics
  • Notifications
  • Read-only operations

Throttle Mode

Limit concurrent executions after initial lock release.

@Workflow({
  concurrency: {
    mode: ConcurrencyMode.THROTTLE,
    groupBy: 'userId',
    maxConcurrentAfterUnlock: 3,
  },
})
export class ApiRequestWorkflow {}

Requires @UnlockAfter() decorator on at least one state:

@State(ApiState.VALIDATE)
@UnlockAfter()
export class ValidateState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    // After this state, hard lock is released
    // Soft lock allows up to maxConcurrentAfterUnlock executions
    actions.next({ output: { validated: true } });
  }
}

Use cases:

  • API rate limiting
  • Resource pool management
  • Batch processing with limits

Partial Unlock

Release hard lock early to allow other operations while workflow continues.

@State(OrderState.PAYMENT)
@UnlockAfter()
export class PaymentState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    // Process payment
    // After this state, lock is released
    actions.next({ output: { paid: true } });
  }
}

@State(OrderState.SEND_EMAIL)
export class SendEmailState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    // This runs without holding the hard lock
    // Other workflows can start
    actions.next({ output: { emailSent: true } });
  }
}

Suspend and Resume

Workflows can be suspended to wait for external events and resumed later.

How Resume Works

Important: When you resume a suspended workflow with default (RETRY) strategy:

  1. The suspended state's execute() method runs again with updated data
  2. The options.data is merged with existing execution data
  3. Your state logic must check the updated data to decide whether to continue or suspend again

This allows the state to re-evaluate its condition with fresh data.

Other strategies (SKIP, GOTO) bypass the suspended state and move to the next or target state directly.

Suspend Workflow

When a state needs to wait for external data (webhook, user approval, etc.), use actions.suspend():

@State(OrderState.AWAITING_PAYMENT)
export class AwaitingPaymentState implements IState {
  execute(ctx: WorkflowContext, actions: StateActions) {
    // Check if payment data is available (e.g., set via resume)
    if (!ctx.data.paymentReceived) {
      // Still waiting - suspend the workflow
      actions.suspend({
        waitingFor: 'payment_webhook',
        output: { pendingAt: new Date() }
      });
    } else {
      // Payment received via resume - continue to next state
      actions.next({
        output: {
          paymentId: ctx.data.paymentId,
          processedAt: new Date()
        }
      });
    }
  }
}

Key points:

  • State checks ctx.data to decide: suspend or continue
  • When resumed, the same state executes again with updated data
  • State can suspend multiple times if condition isn't met

Resume Workflow

There are two ways to provide data when resuming:

Option 1: Pass data in options (Recommended)

// Resume with updated data (default RETRY strategy)
const result = await engine.resume(OrderWorkflow, executionId, {
  data: {
    paymentReceived: true,
    paymentId: 'pay_123'
  }
});

// The AWAITING_PAYMENT state will execute again
// This time ctx.data.paymentReceived = true, so it continues

Option 2: Update Persistence First

// Update data in database
await persistence.update(executionId, {
  data: {
    ...existingData,
    paymentReceived: true,
    paymentId: 'pay_123'
  }
});

// Resume without passing data (uses data from DB)
const result = await engine.resume(OrderWorkflow, executionId);

Both approaches work, but Option 1 is more atomic and recommended.

Complete Example

// 1. Initial execution - workflow suspends
const execution = await engine.execute(OrderWorkflow, {
  data: {
    orderId: 'ORD-001',
    paymentReceived: false
  }
});

console.log(execution.status);        // 'suspended'
console.log(execution.currentState);  // 'AWAITING_PAYMENT'

// 2. External event occurs (webhook received)
// Resume with updated data
const resumed = await engine.resume(OrderWorkflow, execution.id, {
  data: {
    paymentReceived: true,
    paymentId: 'pay_xyz123'
  }
});

console.log(resumed.status);  // 'completed' (if no more states)
console.log(resumed.outputs[OrderState.AWAITING_PAYMENT]?.paymentId);  // 'pay_xyz123'

Resume Strategies

FlowMesh supports different strategies for resuming suspended workflows:

RETRY Strategy (Default)

Re-executes the suspended state with updated data. This is the default behavior.

import { ResumeStrategy } from 'flowmesh';

// Explicit RETRY strategy
const resumed = await engine.resume(OrderWorkflow, executionId, {
  strategy: ResumeStrategy.RETRY,
  data: { paymentReceived: true }
});

// Or omit strategy (defaults to RETRY)
const resumed = await engine.resume(OrderWorkflow, executionId, {
  data: { paymentReceived: true }
});

Use case: When the suspended state needs to re-evaluate its condition with new data.

SKIP Strategy

Skips the suspended state and moves to the next state in the workflow.

const resumed = await engine.resume(OrderWorkflow, executionId, {
  strategy: ResumeStrategy.SKIP,
  data: { skipReason: 'Manual override' }
});

Use case: When you want to bypass the suspended state (e.g., manual intervention, error recovery).

Important:

  • The output for the suspended state remains as it was when suspended
  • A transition is added to history showing the skip (duration: 0)
  • If there's no next state, the workflow completes

GOTO Strategy

Jumps to a specific state, bypassing the normal flow.

const resumed = await engine.resume(OrderWorkflow, executionId, {
  strategy: ResumeStrategy.GOTO,
  targetState: OrderState.SHIPPING,
  data: { fastTrack: true }
});

Use case: When you need explicit control over which state to execute next (e.g., error recovery, workflow correction).

Important:

  • targetState is required when using GOTO strategy
  • The target state must be registered in the StateRegistry
  • A transition is added to history from suspended state to target state

Strategy Comparison

| Strategy | Suspended State Re-executes? | Next State | Use Case | |----------|------------------------------|------------|----------| | RETRY (default) | ✅ Yes | Determined by state logic | Normal resume with updated data | | SKIP | ❌ No | Next in workflow sequence | Bypass suspended state | | GOTO | ❌ No | Explicit target state | Jump to specific state |

Check Workflow Status

const execution = await engine.getExecution(executionId);

if (execution.status === WorkflowStatus.SUSPENDED) {
  console.log('Waiting for:', execution.suspension?.waitingFor);
  console.log('Suspended at state:', execution.currentState);
}

Common Pattern: Webhook Handler

// Webhook endpoint
app.post('/webhook/payment/:executionId', async (req, res) => {
  const { executionId } = req.params;
  const webhookData = req.body;

  // Resume workflow with webhook data
  await engine.resume(OrderWorkflow, executionId, {
    data: {
      paymentReceived: true,
      paymentId: webhookData.paymentId,
      paymentStatus: webhookData.status
    }
  });

  res.sendStatus(200);
});

Adapters

FlowMesh uses adapter pattern for external dependencies.

Persistence Adapter

Store and retrieve workflow executions.

interface PersistenceAdapter {
  save(execution: WorkflowExecution): Promise<void>;
  load(executionId: string): Promise<WorkflowExecution | null>;
  update(executionId: string, updates: Partial<WorkflowExecution>): Promise<void>;
  find(filter: ExecutionFilter): Promise<WorkflowExecution[]>;
}

interface ExecutionFilter {
  status?: WorkflowStatus | WorkflowStatus[];
  groupId?: string;
  workflowName?: string;
  currentState?: string;
}

Built-in adapter:

import { InMemoryPersistenceAdapter } from 'flowmesh';

const engine = new WorkflowEngine({
  persistence: new InMemoryPersistenceAdapter()
});

Custom adapter example:

class PostgresPersistenceAdapter implements IPersistenceAdapter {
  constructor(private readonly pool: Pool) {}

  async save(execution: WorkflowExecution): Promise<void> {
    await this.pool.query(
      'INSERT INTO workflow_executions (id, workflow_name, data, status) VALUES ($1, $2, $3, $4)',
      [execution.id, execution.workflowName, execution.data, execution.status]
    );
  }

  async load(executionId: string): Promise<WorkflowExecution | null> {
    const result = await this.pool.query(
      'SELECT * FROM workflow_executions WHERE id = $1',
      [executionId]
    );
    return result.rows[0] || null;
  }

  // ... implement update and find
}

Lock Adapter

Manage distributed locks for concurrency control.

interface ILockAdapter {
  acquire(key: string, executionId: string, ttl?: number): Promise<boolean>;
  release(key: string): Promise<void>;
  extend(key: string, ttl: number): Promise<boolean>;
  isLocked(key: string): Promise<boolean>;
}

Built-in adapter:

import { InMemoryLockAdapter } from 'flowmesh';

const engine = new WorkflowEngine({
  lockAdapter: new InMemoryLockAdapter()
});

Redis adapter example:

import Redis from 'ioredis';

class RedisLockAdapter implements ILockAdapter {
  constructor(private readonly redis: Redis) {}

  async acquire(key: string, executionId: string, ttl = 60000): Promise<boolean> {
    const result = await this.redis.set(key, executionId, 'PX', ttl, 'NX');
    return result === 'OK';
  }

  async release(key: string): Promise<void> {
    await this.redis.del(key);
  }

  async extend(key: string, ttl: number): Promise<boolean> {
    const result = await this.redis.pexpire(key, ttl);
    return result === 1;
  }

  async isLocked(key: string): Promise<boolean> {
    const value = await this.redis.get(key);
    return value !== null;
  }
}

Logger Adapter

Custom logging implementation.

interface LoggerAdapter {
  log(message: string, context?: unknown): void;
  error(message: string, error?: Error, context?: unknown): void;
  warn(message: string, context?: unknown): void;
  debug(message: string, context?: unknown): void;
}

NestJS Integration

FlowMesh integrates seamlessly with NestJS, providing fully automatic dependency injection for workflows and states with zero boilerplate.

Module Setup

Simply import FlowMeshModule - everything else is automatic:

import { Module } from '@nestjs/common';
import { FlowMeshModule } from 'flowmesh';

@Module({
  imports: [FlowMeshModule],  // Global by default
})
export class AppModule {}

That's it! FlowMeshModule automatically:

  • Discovers all @State decorated classes
  • Registers them with StateRegistry
  • Sets up dependency injection for all workflows
  • Configures adapters from @WorkflowConfig

Creating Executable Workflows

Workflows extend ExecutableWorkflow and use @WorkflowConfig for per-workflow configuration:

import { Injectable } from '@nestjs/common';
import { Workflow, WorkflowConfig, ExecutableWorkflow } from 'flowmesh';

@Workflow({
  name: 'OrderProcessing',
  states: OrderState,
  initialState: OrderState.CREATED,
  transitions: [
    { from: [OrderState.CREATED], to: OrderState.PAYMENT },
    { from: [OrderState.PAYMENT], to: OrderState.COMPLETE },
  ],
})
@WorkflowConfig({
  persistenc