npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

open-rpc-flow

v1.4.4

Published

A standard way to define JSON-RPC flows with goal-oriented steps

Downloads

43

Readme

Flow Execution Engine

A flexible and type-safe execution engine for JSON-RPC based workflows. This engine allows you to define complex flows of operations including requests, transformations, conditions, and loops, with full support for data dependencies, execution optimization, and error handling.

Features

  • 🔄 JSON-RPC Request Handling: Execute JSON-RPC 2.0 requests with automatic request ID management and error handling
  • 🔀 Flow Control: Support for conditional execution and loops with proper variable scoping
  • 🔄 Data Transformation: Transform data between steps using map, filter, reduce, and other operations
  • 📊 Expression Evaluation: Dynamic expression evaluation with support for template literals and object paths
  • 🔗 Dependency Resolution: Automatic handling of data dependencies between steps
  • 🎯 Type Safety: Written in TypeScript with comprehensive type definitions
  • 🔍 Error Handling: Detailed error reporting, validation, and graceful error recovery
  • 🌍 Context Management: Global context available to all steps with proper scoping
  • 📦 Batch Processing: Support for processing data in configurable batch sizes

Examples

1. Team Member Processing

Process team members with nested operations and dynamic notifications. See the full example here:

src/examples/03-nested-loops.json


2. Data Pipeline with Error Recovery

Process data with validation, transformation, and error handling. See the full example here:

src/examples/05-complex-data-pipeline.json


3. API Data Aggregation

Aggregate data from multiple API endpoints:

const apiAggregationFlow: Flow = {
  name: 'api-aggregation',
  description: 'Aggregate data from multiple APIs',
  steps: [
    {
      name: 'fetchUsers',
      request: {
        method: 'users.list',
        params: { status: 'active' },
      },
    },
    {
      name: 'fetchUserDetails',
      loop: {
        over: '${fetchUsers.result}',
        as: 'user',
        step: {
          name: 'userDetails',
          transform: {
            input: '${user}',
            operations: [
              {
                type: 'map',
                using: `{
                  ...user,
                  profile: ${profile.result},
                  recentActivity: ${activity.result}
                }`,
              },
            ],
          },
        },
      },
    },
  ],
};

Installation

npm install @open-rpc/flow

Quick Start

Here's a simple example of defining and executing a flow:

import { FlowExecutor, Flow } from '@open-rpc/flow';

// Define your JSON-RPC handler
const jsonRpcHandler = async (request) => {
  // Implement your JSON-RPC handling logic
  return { result: 'Success' };
};

// Define a flow with data processing and error handling
const flow: Flow = {
  name: 'Data Processing Flow',
  description: 'Process and transform data with error handling',
  context: {
    minValue: 10,
  },
  steps: [
    {
      name: 'getData',
      request: {
        method: 'data.fetch',
        params: { source: 'api' },
      },
    },
    {
      name: 'validateData',
      condition: {
        if: '${getData.result.length > 0}',
        then: {
          name: 'processData',
          transform: {
            input: '${getData.result}',
            operations: [
              {
                type: 'filter',
                using: '${item.value > context.minValue}',
              },
              {
                type: 'map',
                using: '{ ...item, processed: true }',
              },
            ],
          },
        },
        else: {
          name: 'handleError',
          request: {
            method: 'error.log',
            params: { message: 'No data found' },
          },
        },
      },
    },
  ],
};

// Execute the flow
const executor = new FlowExecutor(flow, jsonRpcHandler);
const results = await executor.execute();

Flow Definition

A flow consists of a series of steps that can include:

Request Steps

Execute JSON-RPC requests with error handling:

{
  name: 'getUser',
  request: {
    method: 'user.get',
    params: { id: 1 }
  }
}

Transform Steps

Transform data using operations like map, filter, reduce:

{
  name: 'processUsers',
  transform: {
    input: '${getUser.result}',
    operations: [
      {
        type: 'filter',
        using: '${item.active === true}',
      },
      {
        type: 'map',
        using: '{ id: item.id, name: item.name }',
      },
      {
        type: 'reduce',
        using: '[...acc, item.id]',
        initial: [],
      }
    ]
  }
}

Conditional Steps

Execute steps based on conditions with error handling:

{
  name: 'validateUser',
  condition: {
    if: '${getUser.error}',
    then: {
      name: 'handleError',
      request: {
        method: 'error.log',
        params: { message: '${getUser.error.message}' }
      }
    },
    else: {
      name: 'processUser',
      transform: {
        input: '${getUser.result}',
        operations: [
          {
            type: 'map',
            using: '{ ...item, validated: true }'
          }
        ]
      }
    }
  }
}

Loop Steps

Iterate over collections with batch processing:

{
  name: 'processItems',
  loop: {
    over: '${getItems.result}',
    as: 'item',
    maxIterations: 100,
    step: {
      name: 'processItem',
      request: {
        method: 'item.process',
        params: {
          id: '${item.id}',
          batchIndex: '${metadata.current.index}'
        }
      }
    }
  }
}

Expression Evaluation

The engine supports dynamic expressions using the ${...} syntax:

  • Simple references: ${stepName}
  • Property access: ${stepName.property}
  • Array access: ${stepName[0]}
  • Nested properties: ${stepName.nested.property}
  • Template literals: `Value: ${stepName.value}`
  • Comparisons: ${value > 10}
  • Object literals: { id: ${item.id}, name: ${item.name} }
  • Error handling: ${stepName.error.message}

Error Handling

Flow provides built-in error handling capabilities including automatic retries and circuit breaker patterns for request steps.

Error Handling with Retry

Enable automatic retries for failed request steps:

const executor = new FlowExecutor(flow, jsonRpcHandler, {
  // Enable retry policy
  enableRetries: true,
  // Configure retry policy (or use DEFAULT_RETRY_POLICY)
  retryPolicy: {
    maxAttempts: 3, // Retry up to 3 times
    backoff: {
      initial: 100, // 100ms initial delay
      multiplier: 2, // Exponential backoff
      maxDelay: 5000, // Maximum 5s delay
    },
    retryableErrors: [ErrorCode.NETWORK_ERROR, ErrorCode.TIMEOUT_ERROR],
  },
});
Error Events

Listen for error events during flow execution:

const executor = new FlowExecutor(flow, jsonRpcHandler, {
  eventOptions: {
    emitFlowEvents: true,
    emitStepEvents: true,
  },
});

// Listen for flow-level errors
executor.events.on('flow:error', (event) => {
  console.error(`Flow error in ${event.flowName}:`, event.error);
  console.log(`Execution time before error: ${event.duration}ms`);
});

// Listen for step-level errors
executor.events.on('step:error', (event) => {
  console.error(`Step error in ${event.stepName}:`, event.error);
});

Timeout Configuration

Flow provides multi-level timeout configuration to control execution time at various scopes:

Step-Level Timeout

Set a timeout for a specific step:

const flow = {
  name: 'MyFlow',
  steps: [
    {
      name: 'longRunningStep',
      timeout: 5000, // 5 second timeout for this step
      request: {
        method: 'slowOperation',
        params: {},
      },
    },
  ],
};

Flow-Level Timeouts

Configure timeouts for all steps of a certain type within a flow:

const flow = {
  name: 'MyFlow',
  timeouts: {
    global: 30000, // 30s default for all steps
    request: 10000, // 10s for request steps
    transform: 5000, // 5s for transform steps
    condition: 2000, // 2s for condition steps
    loop: 60000, // 60s for loop steps
    expression: 1000, // 1s for expression evaluation
  },
  steps: [
    /* ... */
  ],
};

Executor-Level Timeouts

Set default timeouts when creating the executor:

const executor = new FlowExecutor(flow, jsonRpcHandler, {
  timeouts: {
    global: 30000, // 30s default
    request: 10000, // 10s for requests
    transform: 5000, // 5s for transformations
  },
});

Timeout resolution follows this precedence order:

  1. Step-level timeout (step.timeout)
  2. Flow-level type-specific timeout (flow.timeouts[stepType])
  3. Flow-level global timeout (flow.timeouts.global)
  4. Executor-level type-specific timeout
  5. Default timeout for the step type

All timeouts must be:

  • At least 50ms
  • No more than 1 hour (3,600,000ms)
  • A positive integer

Event Emitter Interface

The flow executor exposes a FlowExecutorEvents instance built on Node's EventEmitter. It emits strongly typed events during execution so you can monitor progress, log information or integrate with external systems in real time. All event names are available through the FlowEventType enum.

Using the Event Emitter

import { FlowExecutor, FlowEventType } from '@open-rpc/flow';

// Create a flow executor with event options
const executor = new FlowExecutor(flow, jsonRpcHandler, {
  eventOptions: {
    emitFlowEvents: true,
    emitStepEvents: true,
    includeResults: true,
  },
});

// Listen for flow start events
executor.events.on(FlowEventType.FLOW_START, (event) => {
  console.log(`Flow started: ${event.flowName}`);
  console.log(`Steps to execute: ${event.orderedSteps.join(', ')}`);
});

// Listen for step completion events
executor.events.on(FlowEventType.STEP_COMPLETE, (event) => {
  console.log(`Step completed: ${event.stepName} in ${event.duration}ms`);
  console.log('Result:', event.result);
});

// Execute the flow and receive streamed updates
const results = await executor.execute();

See src/examples/event-emitter-example.ts for a full working example.

Available Events

| Event Type | Description | | --------------------- | -------------------------------------------------- | | flow:start | Emitted when flow execution begins | | flow:complete | Emitted when flow execution completes successfully | | flow:error | Emitted when flow execution fails | | step:start | Emitted when a step execution begins | | step:complete | Emitted when a step execution completes | | step:error | Emitted when a step execution fails | | step:skip | Emitted when a step is skipped | | dependency:resolved | Emitted when dependencies are resolved |

Event Payloads

Each emitted event carries a typed payload. Below is a quick reference of the most useful fields:

| Event | Key fields | | --------------- | -------------------------------------------- | | flow:start | flowName, orderedSteps | | flow:complete | flowName, results, duration | | flow:error | flowName, error, duration | | step:start | stepName, stepType, context? | | step:complete | stepName, stepType, result, duration | | step:error | stepName, stepType, error, duration |

Configuration Options

You can configure the event emitter behavior when creating the flow executor:

const executor = new FlowExecutor(flow, jsonRpcHandler, {
  eventOptions: {
    // Whether to emit flow-level events
    emitFlowEvents: true,
    // Whether to emit step-level events
    emitStepEvents: true,
    // Whether to emit dependency resolution events
    emitDependencyEvents: false,
    // Whether to include result details in events
    includeResults: true,
    // Whether to include context details in events
    includeContext: false,
  },
});

You can also update the event options after creation:

executor.updateEventOptions({
  emitStepEvents: false,
  includeResults: false,
});

Type Safety

The engine is written in TypeScript and provides comprehensive type definitions:

interface Flow {
  name: string;
  description?: string;
  context?: Record<string, any>;
  steps: Step[];
}

type Step = RequestStep | TransformStep | ConditionStep | LoopStep;

interface RequestStep {
  name: string;
  request: {
    method: string;
    params?: Record<string, any>;
  };
}

// More type definitions available in the source

Contributing

Contributions are welcome! Please read our Contributing Guide for details on our code of conduct and the process for submitting pull requests.

Development

This project requires Node.js 22.15.0 as specified in the .node-version file. The CI pipeline tests against Node 18.x, 20.x, 21.x, and 22.x.

After cloning the repository, install dependencies and run the build and tests:

npm install
npm run build
npm test

License

This project is licensed under the MIT License, see the LICENSE file for details.