stream-fitting
v1.0.0
Published
Branching pipelines
Maintainers
Readme
stream-fitting is an npm module for building branched stream pipelines.
Synopsis
In the following example, a Fitting is built to sort mixed incoming data by two outgoing streams: one for classified messages, another for the rest.
const {Fitting} = require ('stream-fitting')
// Basic Usage ================================================
const mixedIn = // ... a Readable source of messages
const classifiedOut = // ... some Writable for classified ones
const generalOut = // ... another Writable for the rest
const sorter = new Fitting ({objectMode: true,
write (o, _, callback) {
(o.isClassified ? classifiedOut : generalOut).write (o)
callback ()
}
})
.weld (classifiedOut) // observe for clog/drain, end on close
.weld (generalOut) // this one too
mixedIn.pipe (sorter)
// Lower Level ===============================================
const {CLOG, makeReportClogging} = require ('stream-fitting')
const myWritable = // ...some Writable
makeReportClogging (myWritable)
.on ('clog', () => {/* custom pause */})
.on ('drain', () => {/* custom resume */})
if (myWritable [CLOGGED]) {
// wait until drained, or even abort processing
}
else {
myWritable.write (moreData)
}
Rationale
In some applications, an input data stream is mapped to several output ones, to be processed in parallel. Then you need a Transform like object with the ability to feed multiple output streams.
One approach here is to implement a Writable with a custom _write () writing into two or more different Writables, some of which may happen to be PassThrough instances, for data receivers requiring Readable input.
But here comes the back pressure problem: to avoid memory leaks, you need to check the write () return value and stop the data processing until the 'drain' event lets you start it over. With standard pipelines, this is done automatically, but, alas, they are one dimensional, without any branching.
To cope with this issue, the 'stream-fitting' module lets the developer:
- extend any existing
Writableso it emits'clog'events in case whenwrite()returnsfalse(which complements standard'drain'); - install all necessary handlers to invoke
pause ()/resume ()automatically.
API
tl;dr jump to the Fitting class description.
The text below is organized to describe internals progressively.
makeReportClogging
This function takes a single Writable argument and returns it with the write () method overridden to emit the 'clog' event and maintain the [CLODDED] property (see below)
const {makeReportClogging} = require ('stream-fitting')
const myWritable = // ...some Writable
makeReportClogging (myWritable)
.on ('clog', /* custom pause */)
.on ('drain', /* custom resume */)'clog' event
This event precedes and mirrors the standard 'drain': it's emitted by the overridden (see above) write () just before returning false.
Note that, in any case, the callcack () is invoked prior to return, so 'clog' is emitted after calling callcack ().
[CLOGGED] property
This property, initially false, is
- set to
trueon'clog'and - reset back to
falseon'drain'.
const {makeReportClogging, CLOGGED} = require ('stream-fitting')
const myWritable = // ...some Writable
makeReportClogging (myWritable)
if (myWritable [CLOGGED]) {
// wait until drained, or even abort processing
}
else {
myWritable.write (moreData)
}No other event ('error', 'close', 'finish', etc.) is observed, so the false value here doesn't guarantee that write () is OK to call.
The switching is done by event handlers, so using methods like removeAllListeners () may break the logic here.
Fitting
This class extends the standard Writable and is presumed to be used in a similar manner, with write and finish using other Writables previously registered as its branches with the weld () method.
If its instance is subject to .pipe (), the source stream is pause ()d on each branch's 'clog' and resume ()d back when all of them are 'drain'ed.
const {Fitting} = require ('stream-fitting')
const mixedIn = // ... a Readable source of messages
const classifiedOut = // ... some Writable for classified ones
const generalOut = // ... another Writable for the rest
const sorter = new Fitting ({objectMode: true,
write (o, _, callback) {
(o.isClassified ? classifiedOut : generalOut).write (o)
callback ()
}
})
.weld (classifiedOut) // observe for clog/drain, end on close
.weld (generalOut) // this one too
mixedIn.pipe (sorter)weld
This method takes a single Writable argument, extends id with makeReportClogging (see above), adds to the internal collection of branches and installs necessary event handlers.
[CLOGGED] property
For a Fitting, this property is computed as the logical conjunction of all branches' [CLOGGED] values.
