npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

exframe-workflow

v5.5.1

Published

A lightweight workflow engine that can be embedded into any node service

Downloads

575

Readme

exframe Workflow

A library for facilitating a distributed workflow as a service system

Features and Assumptions

  • RabbitMQ driven workflow engine over durable queues
  • Managed state tracked in MongoDB
  • Distributed execution of tasks
  • Full worklog tracking
  • Blocks or sub-workflows
  • Fully restartable at any point

Available Topologies

  • Workflows

Usage

const logger = require('exframe-logger').create();
const dbClient = require('exframe-db').init(options);
const cacheManager = require('exframe-cache-manager').create(options);
const { WorkflowManager } = require('exframe-workflow');

dbClient.connect();

const serviceName = 'test-service';
const workflowManager = WorkflowManager.default(logger, serviceName, cacheManager, dbClient);

function addOne(context, { inputValue }) { return { result: inputValue + 1 }; }

const myWorkflow = WorkflowManager.createWorkflow('my-workflow');
myWorkflow.task(addOne);

const context = {
  user: { userId: 'test-id', userName: 'test-user' },
  log: logger
};

app.post('/addOne', (request, response) => {
  const instance = await myWorkflow.start(context, { inputValue: response.body.inputValue });
  response.send(await instance.getWorkContext());
});

Summary

The WorkflowManager handles the instantiation of the WorkflowInstanceManager and the Workflow. The workflow is bound to the Workflow is bound to the WorkflowInstanceManager and when a workflow is started or resumed it will coordinate the WorkflowInstanceManager to create or load a WorkflowInstance. A WorkflowInstance will manage traversing through the tasks, joining on child workflow results, and keeping work context state.

WorkflowManager

Handles the instantiation of the WorkflowInstanceManager and the Workflow. It has been configured with a default implementation that wraps the exframe-cache-manager and exframe-db for logging, caching, and coordination. WorkflowInstanceManager will listen on a queue that's unique by the serviceName. The distributed coordination for starting workflows across multiple instances will rely on this.

static default(logger, serviceName, cacheManager, dbClient, mqClient, options?) -> WorkflowManager

|field|description| |-----|-----------| |logger|exframe-logger| |serviceName|string -- the unique name for the service to use for coordinating the start of new workflow instances| |cacheManager|exframe-cache-manager -- provides the redis backing for the WorkflowInstance cache and the expiry queue for task execution expiration| |dbClient|exframe-db -- provides the mongo backing for the WorkflowInstance cache and the WorkLog storage| |options|optional, object -- configures the WorkflowManager|

options

|field|description| |-----|-----------| |serverId|string -- a unique identifier for the server running the service, can help with logging| |timeoutDuration|integer, default - 30 -- the number of seconds a service will wait between trying to pull items off the queue| |maxWorkers|integer, default - 30 -- the number of workers for the worker pool, this governs the number of concurrent workflows that can execute at any given time| |overflowWorkers|integer, default - 15 -- the number of workers over the max number of workers that can execute, this is a bucket reserved for incoming requests or parent workflows that are resuming after all children have completed| |workTtl|integer, default - 600 -- the number of seconds that a workflow execution is locked from being taken over by another service| |keepaliveInterval|integer, default - 60 -- the number of seconds between refreshing the locks|

create(name, options?) -> Workflow

|field|description| |-----|-----------| |name|string -- specifies the name of the workflow, generally for logging purposes| |options?|optional, object -- configures the workflow execution|

options

|field|description| |-----|-----------| |parallel|boolean -- specifies whether the workflow will execute sequentially through the tasks or in parallel essentially vertically or horizontally|

Workflow

task

task(name|action, action|options, options?) -> void

Add a task to the workflow. For shortcut purposes, action can be a named function and used in lieu of entering the string name

workflow.task('test', (context, workContext) => {});
workflow.task(function test(context, workContext) {});

|field|description| |-----|-----------| |name|string -- the name of the task, must be unique |action|function -- (context, workContext, { instanceId, index }) -> Promise<Result> -- the action that will be executed when the task is executed by the WorkflowInstance. The instanceId is the id for the workflow and index is the item index from a sourceIterator (if there is one). The result will be merged with the workContext. See Result| |options|optional, object -- configures the task execution|

options

|field|description| |-----|-----------| |next|optional, string -- the name of the next task to execute. If not set, next will automatically be set by whatever the subsequently added task is. If null, next will not be automatically set. If the value is a task command that command will be operated on. See Commands| |pre|optional, function -- (context, workContext) -> Promise<Result> -- a function that can mutate the context to be given to the action, this is mostly useful for blocks| |post|optional, function -- (context, workContext, result) -> Promise<Result> -- a function that can process the result relative to the work context and can further modify the result before merge into the work context, this is mostly useful for blocks| |case|optional, function -- (context, workContext) -> Promise<string> -- a function that can return the name of a task that should be executed next based upon the current work context following the execution of the task. If the value is a task command that command will be operated on. See Commands| |catch|optional, function -- (context, workContext, error) -> Promise<string> -- a function that can return the name of a task that should be executed next given the error that was thrown by the task. If the value is a task command that command will be operated on. See Commands|

pre

workflow.task('test',
  (context, preWorkContext) => {}, {
  pre: (context, workContext) => {
    const preWorkContext = workContext;
    return preWorkContext;
  }
});

post

workflow.task('test',
  (context, workContext) => {
    const result = workContext;
    return result;
  }, {
  post: (context, workContext, result) => {
    const postResult = result;
    return postResult;
  }
});

case

workflow.task('test',
  (context, workContext) => {}, 
  {
    case: (context, workContext) => {
      switch (workContext.value) {
        case 'test': return 'test',
        default: return null;
      }
    }
  }
);

catch

workflow.task('test',
  (context, workContext) => {}, 
  {
    catch: (context, workContext, error) => {
      switch (error.status) {
        case 403: return 'authorize',
        default: return null;
      }
    }
  }
);

block

block(name, buildSubWorkflow, options?) -> void

Adds a block task to the workflow. A block task is a wrapper around another workflow that will be executed with a separate and distinct work context. Tthese can be added until the server runs out of resources.

workflow.block('test', w => {
  w.task('sub-task', (context, workContext) => {});
  w.block('sub-block', subW => {
    subW.task('sub-block-task', (context, workContext) => {});
  });
});

|field|description| |-----|-----------| |name|string -- the name of the task, must be unique| |buildSubWorkflow|function -- (Workflow) -> void -- will give a Workflow with an identical signature to the current Workflow but will not be directly callable by the WorkflowInstanceManager| |options|optional, object -- configures the block task execution|

options

|field|description| |-----|-----------| |sourceIterator|optional, function -- (context, workContext) -> asyncIterator -- the source for all the items to apply the action to. This can be an array but it would be best if it's some sort of async iterator. Even a stream can be used as an async iterator source. So, theoretically, and endpoint that can be parseable in a streaming fashion could be used as a source.| |parallel|boolean, default - false -- indicates whether the created workflow should run in parallel |...|see task options for the rest|

iterate

iterate(name, source, action, options?) -> void

Adds an iteration task to the workflow. An iteration task will execute the given action over each item returned by the source. This will be done over an asyncIterator so this can be optimized to be something that's not fully resident in memory but rather a cursor or stream of some sort.

workflow.iterate('test',
  /* source */
  (context, workContext) => axios({
    ...,
    responseType: 'stream'
  }).then(response => response.data),
  /* action */
  (context, workContext) => {}
);

|field|description| |-----|-----------| |name|string -- the name of the task, must be unique| |source|function -- (context, workContext) -> asyncIterator -- the source for all the items to apply the action to. This can be an array but it would be best if it's some sort of async iterator. Even a stream can be used as an async iterator source. So, theoretically, and endpoint that can be parseable in a streaming fashion could be used as a source.| |action|function -- (context, workContext) -> Promise<Result> -- the action that will be executed for each item returned by the source| |options|optional, object -- configures the iterate task execution|

options

|field|description| |-----|-----------| |parallel|boolean, default - false -- indicates whether each item should be executed in parallel. This should be used with caution, if there are too many tasks, it could overload the system. Subsequent versions could limit the max number of tasks executing across all the services| |...|see task options for the rest|

Result

The result object is what augments the work context. Fields in the result object will be applied to not only the root object but to the nested fields. So to assist there are pre-merge operations that can be applied to the fields to further augment the update.

Pre-Merge operations

If you have data that you want to either overwrite or delete on the workcontext, you can do so by using the $overwrite and $delete pre-merge operations. These operations will be preformed on the the workContext before the workItemContext is merged into the workContext and persisted.

###$ $delete operation

workflow.task(someTask, {
  post: (context, workContext, result) => ({
    ...result,
    unwantedProperty: { $delete: 1 }
  });
});

OR

workflow.task('someTask', {
  action: (context, workContext) => {
    return {
      unwantedProperty: { $delete: 1 }
    }
  });
});

$overwrite operation

workflow.task(someTask, {
  post: (context, workContext, result) => ({
    ...result,
    changingProperty: {
      $overwrite: {
        newData: 'x'
      }
    }
  };
});

OR

workflow.task('someTask', {
  action: (context, workContext) => {
    return {
      changingProperty: {
        $overwrite: {
          newData: ['x']
        }
      }
    }
  };
});

Commands

Task commands influence what the workflow will do. Inherantly, all next values are commands to go to the next tasks. Accordingly, commands will adjust what happens next following the execution of a task.

$pause

workflow.task('someTask', {
  action: (context, workContext) => ({ test: true })
  case: (context, workContext) => {
    if (test) return '$pause';
  }
});