npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

async-pipeline

v0.3.3

Published

Async pipeline with transition constraints and time tracking

Downloads

17

Readme

async-pipeline

async-pipeline allows you to build a complex conditional flow from loosely coupled components.

Build Status

npm install async-pipeline

Imagine you have set of processing operations that might be applied in different order based on input and intermediate results.

Processing graph

Of course you can build it as Promise chain(s) but it will look hard to read at least. And over time, after more additions, you have a good chance it to turn into unreadable code mess.

Another much easier approach would be to build the flow based on PubSub, and let processing functions listen and emit messages with payloads. This would let you split the flow into easily tested atomic stages doing one thing at a time, not coupled to other components. Plus, you can have multiple functions listening to the same events and build parallel secondary flows like logging and tracking aside from the main logic.

async-pipeline is helping to build such message hub by adding few generic features on top of EventEmitter:

  • optionally apply transition constraint map
  • catch errors and route them to one place
  • track and log all transitions
  • provide flow control events: @end, @error
  • track execution time in milliseconds using process.hrtime

Example

new Pipeline({
  transitions: {
    'start': ['stage-1:progress', 'stage-1:done'],
    'stage-1:done': ['stage-2:done']
  }
})
  .on('@error', console.error.bind('ERROR:'))
  .on('@end', dump => {
    console.log('END:', inspect(dump, {depth: 10}))
  })
  .on('start', function (count) {
    let i = 0
    while (i++ < count) setTimeout(this.emit, i * 100, 'stage-1:progress', i)
    setTimeout(this.emit, (count + 1) * 100, 'stage-1:done')
  })
  .on('stage-1:done', function () {
    setTimeout(this.emit, 200, 'stage-2:done', 'hello', 'there')
  })
  .on('stage-2:done', function(...args) {
    console.log('after stage-2', ...args)
    this.end()
  })
  .on('stage-1:progress', console.log.bind(console, 'stage1'))
  .start('start', 5)

the code would output following:

stage1 1
stage1 2
stage1 3
stage1 4
stage1 5
after stage-2 hello there
END: [ { event: 'start',
    payload: [ 5 ],
    routes:
     [ { event: 'stage-1:progress',
         payload: [ 1 ],
         routes: [],
         time: 107 },
       { event: 'stage-1:progress',
         payload: [ 2 ],
         routes: [],
         time: 200 },
       { event: 'stage-1:progress',
         payload: [ 3 ],
         routes: [],
         time: 305 },
       { event: 'stage-1:progress',
         payload: [ 4 ],
         routes: [],
         time: 402 },
       { event: 'stage-1:progress',
         payload: [ 5 ],
         routes: [],
         time: 503 },
       { event: 'stage-1:done',
         payload: [],
         routes:
          [ { event: 'stage-2:done',
              payload: [ 'hello', 'there' ],
              routes: [],
              time: 806 } ],
         time: 604 } ],
    time: 0 } ]

API

Call new Pipeline(options) or just Pipeline(options) to get a pipeline instance. Where options are:

  • debug - optional function to print internal logs, easiest way to get debugging output is to pass {debug: console.log}
  • contextAPItrue by default, expose API controls to the handlers through this, otherwise pass as a first argument
  • transitions - optional mapping defining allowed transitions and entry points. Example:
{
  'from-event-0': ['to-event-1', 'to-event-2'],
  'from-event-2': ['to-event-3']
}

Pipeline methods

  • this.start('event', ...payload) - start the flow with event event and optional payload. This will throw if options.constraints is set and has no event key

  • this.context(dataObject) - defines data object available to all handlers through this.context()

Internal events and their payloads

  • @all (event, ...payload) - catches all events, emitted right before normal event. Useful mainly for testing
  • @end (trace) - fired once any handler calls this.end(). Also automatically triggered on unexpected errors
  • @error (error, trace) - fired if any handler has thrown or explicitly called this.end(error). This will cause an uncaught exception bubbling to the process level if no handler registered

Handler context methods

Each handler except ones for internal events has access to following methods exposed though call context. Keep in mind that handlers should be defined as function() {}, arrow functions would forcefully override the context

  • this.end([error]) - end the flow and void all eventual events. If error passed that it's considered an emergency shutdown. In either case if will end up triggering @end event

  • this.emit('event', ...payload) - cast an event with arbitrary payload. This will throw if emitted event is violating options.constraints

  • this.context([dataObject]) - getter/setter for context data kept shared for all handler. This should be used wisely though to keep handlers uncoupled

  • this.safe(fn) - decorator for async calls that may throw within handler. Following would throw to the top level unless wrapped into safe():

  pipeline.on('event', () => {
    const later = this.safe(() => { throw new Error() })
    setTimeout(later, 0)
  })

Error handling

Errors thrown from within event handlers will be caught and routed as @error event payload. However those would bubble up to the top if no handler defined

Errors thrown from within internal events (@end, @error) are always bypassing @error and propagate to the top to avoid recursive failures. Same for the failures in start() and on() calls

Alternative EventEmitter

If you Node's EventEmitter doesn't work for you for whatever reason then you can inject the alternative

Pipeline = Pipeline.di({EventEmitter: EventEmitterClass})