stream-join
v2.0.0
Published
Join values from multiple object-mode Readable streams into a single Readable, with proper backpressure handling.
Maintainers
Readme
stream-join 
stream-join is a toolkit of N→1 stream combinators — functions that take an array of streams and return a single stream, with proper backpressure handling. Two flavors share a single algorithm per component: Node Streams (Readable) via the default entry, Web Streams (ReadableStream) via the stream-join/web subpath. Four primitives cover the useful control-flow shapes:
zip— symmetric advance: one value per non-ended stream per round, combined viajoinItemsselect— asymmetric advance: a user-definedpickchooses one slot per round from a bufferrace— emit-as-ready: whichever stream's data resolves first winsconcat— sequential drain: stream 0, then 1, …, then N-1
Plus a small set of helpers under stream-join/utils/ for composing common merge patterns (k-way merge of sorted streams, priority-queue merge, drift-tolerant merge).
stream-join is a lightweight micro-package (ESM, Node 22+) built on stream-chain and nano-binary-search — its only runtime dependencies. It is distributed under New BSD license.
Installation
npm i stream-joinQuick start
zip is the default export — taking values from each stream in lockstep:
import zip from 'stream-join';
import {Readable} from 'node:stream';
const s1 = Readable.from([1, 2, 3]);
const s2 = Readable.from(['a', 'b', 'c']);
zip([s1, s2]).on('data', data => console.log(data));
// [1, 'a']
// [2, 'b']
// [3, 'c']For other patterns, import the corresponding component:
import select from 'stream-join/select.js';
import race from 'stream-join/race.js';
import concat from 'stream-join/concat.js';
import mergeSorted from 'stream-join/utils/merge-sorted.js';For Web Streams, swap 'stream-join' for 'stream-join/web':
import zip from 'stream-join/web';
import {zip, select, race, concat} from 'stream-join/web';
import select from 'stream-join/web/select.js';
import mergeSorted from 'stream-join/web/utils/merge-sorted.js';The Web entry expects ReadableStream inputs and returns a ReadableStream output. The algorithm, options surface, and helpers are identical to the Node side — only the I/O type changes. The Web tree pulls in no node:stream code, so it stays bundleable for browsers and edge runtimes.
import zip from 'stream-join/web';
const fromArray = arr => {
let i = 0;
return new ReadableStream({
pull(c) {
i < arr.length ? c.enqueue(arr[i++]) : c.close();
}
});
};
const out = zip([fromArray([1, 2, 3]), fromArray(['a', 'b', 'c'])]);
const reader = out.getReader();
for (;;) {
const {value, done} = await reader.read();
if (done) break;
console.log(value);
}
// [1, 'a']
// [2, 'b']
// [3, 'c']The four primitives
zip(streams, options?) — symmetric N-round combine
zip<T = readonly (unknown | null)[]>(
streams: Readable[],
options?: {
joinItems?: (sink: {push(v: T): void}, items: readonly (unknown | null)[]) => void | Promise<void>;
skipEvents?: boolean; // legacy no-op
// …plus any ReadableOptions
}
): Readable;Parameters.
streams— non-empty array of object-modeReadablestreams. ThrowsTypeErrorif missing or empty.options.joinItems(sink, items)— optional combine callback called once per round with the per-stream values (in positional order;nullfor ended streams). Callsink.push(value)0 or more times to emit. May beasync. Default:(sink, items) => sink.push(items).options.skipEvents— legacy 1.x option, accepted as no-op in 2.x.
Returns an object-mode Readable that emits the combined values; ends when every input stream has ended; propagates input-stream 'error' events with the original value preserved.
import zip from 'stream-join';
const s1 = Readable.from([1, 2, 3, 4]);
const s2 = Readable.from(['a', 'b']);
zip([s1, s2]).on('data', data => console.log(data));
// [1, 'a']
// [2, 'b']
// [3, null] // s2 has ended
// [4, null]Custom output via joinItems:
zip([s1, s2], {
joinItems(sink, items) {
items.forEach(item => {
if (item !== null) sink.push(item);
});
}
}).on('data', data => console.log(data));
// 1, 'a', 2, 'b', 3select(streams, options) — buffered pick-one
select(
streams: Readable[],
options: {
pick: (items: readonly Slot<T>[]) => number; // required
insert?: (items: Slot<T>[], newSlot: Slot<T>, lastPos?: number) => void;
remove?: (items: Slot<T>[], lastPos: number) => void;
windowSize?: number; // default 1
// …plus any ReadableOptions
}
): Readable;
interface Slot<T> { item: T; index: number; }Parameters.
streams— non-empty array of object-modeReadablestreams. ThrowsTypeErrorif missing or empty.options.pick(items)— required. Returns the index initemsof the slot to emit and refill. Stop signal: any return value outside[0, items.length)(negative,NaN,undefined,null, ±Infinity, non-integer, ≥ length) ends the merge.options.insert(items, newSlot, lastPos?)— optional. Mutatesitemsin place.lastPos === undefinedduring initial fill (length MAY grow);lastPosdefined during steady-state refill (length MUST stay unchanged). Default: replace atlastPos(orpushwhen undefined).options.remove(items, lastPos)— optional. Called when the source stream ofitems[lastPos]has exhausted; must decreaseitems.lengthby 1. Default:items.splice(lastPos, 1).options.windowSize— optional positive integer; per-stream buffer depth. Default1. Larger values tolerate local disorder in input streams.
Returns an object-mode Readable that emits one value per round; output element type is the union of the input streams' value types; ends when every stream has exhausted or pick returns a stop signal; propagates input-stream 'error' events with the original value preserved.
import select from 'stream-join/select.js';
import pickMin from 'stream-join/utils/pick-min.js';
// Priority-queue merge: emit the smallest available value each round
select([Readable.from([1, 4, 7]), Readable.from([2, 5, 8]), Readable.from([3, 6, 9])], {
pick: pickMin((a, b) => a < b)
}).on('data', x => console.log(x));
// 1, 2, 3, 4, 5, 6, 7, 8, 9race(streams, options?) — emit-as-ready
race(streams: Readable[], options?: ReadableOptions): Readable;Parameters.
streams— non-empty array of object-modeReadablestreams. ThrowsTypeErrorif missing or empty.options— optional. StandardReadableOptions(the output is forced toobjectMode: true).
Returns an object-mode Readable that emits values in event-loop-arrival order from across all input streams; output element type is the union of the input streams' value types; ends when every stream has ended; propagates input-stream 'error' events with the original value preserved.
Output order is non-deterministic — it reflects how the input streams' data events interleave in the event loop.
import race from 'stream-join/race.js';
race([logStreamA, logStreamB, logStreamC]).on('data', event => process(event));concat(streams, options?) — sequential drain
concat(streams: Readable[], options?: ReadableOptions): Readable;Parameters.
streams— non-empty array of object-modeReadablestreams. Drained left-to-right. ThrowsTypeErrorif missing or empty.options— optional. StandardReadableOptions(the output is forced toobjectMode: true).
Returns an object-mode Readable that emits stream 0's values, then stream 1's, …, then stream N-1's; output element type is the union of the input streams' value types; ends when every stream has ended; propagates input-stream 'error' events with the original value preserved. Pullers are created lazily, one stream at a time, so later streams don't pre-buffer.
import concat from 'stream-join/concat.js';
concat([part1, part2, part3]).on('data', chunk => collect(chunk));Helpers
import pickFirst from 'stream-join/utils/pick-first.js';
import pickMin from 'stream-join/utils/pick-min.js';
import sortedInsert from 'stream-join/utils/sorted-insert.js';
import mergeSorted from 'stream-join/utils/merge-sorted.js';pickFirst() → 0
Constant-time picker. Takes no arguments; always returns 0. Pair with sortedInsert for k-way merge of sorted streams.
pickMin(lessFn) → (items) => number
Parameters. lessFn(a, b) — comparator on item values; returns true if a should come before b.
Returns a picker function suitable for select's pick option. Takes items: readonly Slot<T>[] and returns the index of the slot whose item is smallest per lessFn. Ties resolve to the first occurrence. O(items.length) per call, no allocations.
sortedInsert(lessFn) → (items, newSlot, lastPos?) => void
Parameters. lessFn(a, b) — comparator on item values; returns true if a should come before b.
Returns an insert callback suitable for select's insert option. Takes items: Slot<T>[], newSlot: Slot<T>, and lastPos?: number; mutates items in place to maintain sorted order. Built on nano-binary-search. Smart-replace optimization: when the new slot belongs at the same position as the just-removed one, replaces in place (one assignment, no splice).
mergeSorted(streams, lessFn, options?) → Readable
Parameters.
streams— non-empty array of object-modeReadablestreams (each sorted perlessFn, or locally disordered withinwindowSize).lessFn(a, b)— comparator on item values; returnstrueifashould come beforeb.options— optional.ReadableOptionspluswindowSize?(default1; larger values tolerate local disorder).
Returns an object-mode Readable emitting the merged sequence in sorted order. Umbrella for select + pickFirst + sortedInsert(lessFn). Equivalent to:
select(streams, {...options, pick: pickFirst, insert: sortedInsert(lessFn)});lessFn(a, b) always compares item values (not slots) across all helpers; helpers unwrap slot.item internally so the same comparator is reusable across helpers.
K-way merge of sorted streams
import mergeSorted from 'stream-join/utils/merge-sorted.js';
mergeSorted([sortedStream1, sortedStream2, sortedStream3], (a, b) => a.timestamp < b.timestamp).on(
'data',
x => console.log(x)
);Drift-tolerant merge
mergeSorted([s1, s2], (a, b) => a < b, {windowSize: 4});Composition with stream-chain
Every main component returns a plain Readable, so it slots naturally as the first item in a stream-chain pipeline:
import chain from 'stream-chain';
import zip from 'stream-join';
chain([
zip([Readable.from([1, 2, 3]), Readable.from([10, 20, 30])]),
([a, b]) => a + b,
x => x * 2
]).on('data', x => console.log(x));
// 22, 44, 66Errors
Errors from any input stream are propagated to the output with the original error value preserved. On the Node side this is the output Readable's 'error' event; on the Web side it surfaces via the ReadableStream controller's error() signal (visible to readers as a rejected read()). The runtime pullers from stream-chain v4 are non-destructive and preserve raw error values — no AbortError wrapping.
What this package is not for
- Sorting — that's
stream-sorting's job (forthcoming). - Key-based SQL-style joins —
mergeJoinwill live instream-sorting(forthcoming); requires sorted-by-key inputs. - Set operations (union / intersection / difference) on sorted streams — same,
stream-sorting. - 1→N operations — that's
stream-fork.
stream-join's primitives don't know about sortedness, keys, or anything domain-specific. They just combine N streams into 1.
Documentation
Per-component reference and worked examples live in the wiki:
Release History
- 2.0.0 ESM, new functions:
zip,select,race,concat. Support for Web Streams. - 1.0.1 Technical release, no need to upgrade.
- 1.0.0 The initial release.
The full release notes are in the wiki: Release notes.
