npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

stream-fitting

v1.0.0

Published

Branching pipelines

Readme

workflow Jest coverage

stream-fitting is an npm module for building branched stream pipelines.

Synopsis

In the following example, a Fitting is built to sort mixed incoming data by two outgoing streams: one for classified messages, another for the rest.

const {Fitting} = require ('stream-fitting')

// Basic Usage ================================================

const mixedIn =       // ... a Readable source of messages

const classifiedOut = // ... some Writable for classified ones
const generalOut =    // ... another Writable for the rest

const sorter = new Fitting ({objectMode: true,
  write (o, _, callback) {
    (o.isClassified ? classifiedOut : generalOut).write (o)
    callback ()
  }
})
.weld (classifiedOut)  // observe for clog/drain, end on close
.weld (generalOut)     // this one too

mixedIn.pipe (sorter)

// Lower Level ===============================================

const {CLOG, makeReportClogging} = require ('stream-fitting')

const myWritable = // ...some Writable

makeReportClogging (myWritable)
  .on ('clog',  () => {/* custom pause */})
  .on ('drain', () => {/* custom resume */})

if (myWritable [CLOGGED]) {
  // wait until drained, or even abort processing
}
else {
  myWritable.write (moreData)
}

Rationale

In some applications, an input data stream is mapped to several output ones, to be processed in parallel. Then you need a Transform like object with the ability to feed multiple output streams.

One approach here is to implement a Writable with a custom _write () writing into two or more different Writables, some of which may happen to be PassThrough instances, for data receivers requiring Readable input.

But here comes the back pressure problem: to avoid memory leaks, you need to check the write () return value and stop the data processing until the 'drain' event lets you start it over. With standard pipelines, this is done automatically, but, alas, they are one dimensional, without any branching.

To cope with this issue, the 'stream-fitting' module lets the developer:

  • extend any existing Writable so it emits 'clog' events in case when write() returns false (which complements standard 'drain');
  • install all necessary handlers to invoke pause () / resume () automatically.

API

tl;dr jump to the Fitting class description.

The text below is organized to describe internals progressively.

makeReportClogging

This function takes a single Writable argument and returns it with the write () method overridden to emit the 'clog' event and maintain the [CLODDED] property (see below)

const {makeReportClogging} = require ('stream-fitting')
const myWritable = // ...some Writable
makeReportClogging (myWritable)
  .on ('clog',  /* custom pause  */)
  .on ('drain', /* custom resume */)

'clog' event

This event precedes and mirrors the standard 'drain': it's emitted by the overridden (see above) write () just before returning false.

Note that, in any case, the callcack () is invoked prior to return, so 'clog' is emitted after calling callcack ().

[CLOGGED] property

This property, initially false, is

  • set to true on 'clog' and
  • reset back to false on 'drain'.
const {makeReportClogging, CLOGGED} = require ('stream-fitting')
const myWritable = // ...some Writable

makeReportClogging (myWritable)

if (myWritable [CLOGGED]) {
  // wait until drained, or even abort processing
}
else {
  myWritable.write (moreData)
}

No other event ('error', 'close', 'finish', etc.) is observed, so the false value here doesn't guarantee that write () is OK to call.

The switching is done by event handlers, so using methods like removeAllListeners () may break the logic here.

Fitting

This class extends the standard Writable and is presumed to be used in a similar manner, with write and finish using other Writables previously registered as its branches with the weld () method.

If its instance is subject to .pipe (), the source stream is pause ()d on each branch's 'clog' and resume ()d back when all of them are 'drain'ed.

const {Fitting} = require ('stream-fitting')
const mixedIn =       // ... a Readable source of messages

const classifiedOut = // ... some Writable for classified ones
const generalOut =    // ... another Writable for the rest

const sorter = new Fitting ({objectMode: true,
  write (o, _, callback) {
    (o.isClassified ? classifiedOut : generalOut).write (o)
    callback ()
  }
})
.weld (classifiedOut)  // observe for clog/drain, end on close
.weld (generalOut)     // this one too

mixedIn.pipe (sorter)

weld

This method takes a single Writable argument, extends id with makeReportClogging (see above), adds to the internal collection of branches and installs necessary event handlers.

[CLOGGED] property

For a Fitting, this property is computed as the logical conjunction of all branches' [CLOGGED] values.