@promistream/fork-select
v0.1.1
Published
General-purpose forking stream
Downloads
12
Readme
@promistream/fork-select
A general-purpose forking Promistream. Lets you fork a pipeline into an arbitrary number of downstream forks, and divide values among them based on an arbitrary predicate function. Can be used directly, or as a basis for other types of forking streams.
Stream characteristics:
- Promistream version: 0
- Stream type: Fork
- Supports parallelization: Yes (order-preserving)
- Buffering: Unbounded; one internal buffer per fork, buffer only fills as needed to unblock other forks
- Fork distribution strategy: User-specified
Example
A runnable version of this example is included in the package as example.js.
"use strict";
const pipe = require("@promistream/pipe");
const debug = require("@promistream/debug");
const fromIterable = require("@promistream/from-iterable");
const forkSelect = require("@promistream/fork-select");
const collect = require("@promistream/collect");
(async () => {
let [ a, b ] = await pipe([
fromIterable([ 0, 0, 0, 1, 1, 2 ]),
debug("source"),
forkSelect(2, (value) => {
return value % 2;
})
]).read();
setTimeout(async () => {
try {
let bResults = await pipe([
b,
// debug("pipeline B"),
collect()
]).read();
console.log({ bResults }); // { bResults: [ 1, 1 ] }
} catch (error) {
console.error("error from B", error);
}
}, 1000);
try {
let aResults = await pipe([
a,
// debug("pipeline A"),
collect()
]).read();
console.log({ aResults }); // { aResults: [ 0, 0, 0, 2 ] }
} catch (error) {
console.error("error from A", error);
}
})();API
forkSelect(forkCount, select)
Creates a new fork-select stream. Note that you can only control which fork the values are sent to; errors (including EndOfStream and Aborted markers) are always broadcast to all forks.
- forkCount: Required. The amount of forks to create.
- select: Required. The predicate callback to determine which fork a given value should be sent to. Receives the value as its argument, and is expected to return (a Promise of) the index of the fork to send it to. An array of fork indexes may also be specified to send the value to multiple forks.
