strict-stream
v1.0.26
Published
strict-stream - strictly / strongly typed stream
Maintainers
Readme
strict-stream
strict-stream is a tiny and lightweight library that helps manage strictly/strongly typed streams using AsyncIterable<Type> as the core principle to enable strict data pipelines with useful behavior.
It ensures that the data flowing through a stream conforms to a specific data type or structure, which helps catch errors early on, reduce bugs, and make code more reliable and easier to maintain.
Why Iterable<T> and AsyncIterable<T> Matter
In JavaScript and TypeScript, Iterable<T> and AsyncIterable<T> are two important interfaces that allow you to work with sequences of values.
An Iterable<T> is an object that can be iterated over using a for...of loop or the Spread operator, while an AsyncIterable<T> represents a sequence of values that are produced asynchronously, such as through a network request or database query.
Using these interfaces has several advantages:
Type safety: By using
Iterable<T>andAsyncIterable<T>, you can ensure that the data you're working with is strongly typed and conforms to a specific schema. This helps catch errors early in the development process and makes your code more robust and reliable.Composability: Because
Iterable<T>andAsyncIterable<T>are composable, you can easily create complex data pipelines that process, transform, and filter data in a modular way. This makes it easier to reason about your code and maintain it over time.Performance:
Iterable<T>andAsyncIterable<T>are highly optimized for performance, allowing you to process data streams with high throughput up to millions of records per second. This makes them an ideal choice for working with large datasets or real-time data streams.
For more information on Iterable<T> and AsyncIterable<T>, check out the following links:
Installation
To install strict-stream, you can use your preferred package manager:
npm install strict-stream
or
yarn add strict-stream
Usage
Here's a simple example that demonstrates how to use strict-stream:
import {of} from 'strict-stream';
import {filter} from 'strict-stream/filter';
async function* generateData() {
yield {name: 'Alice', age: 30};
yield {name: 'Bob', age: 40};
yield {name: 'Charlie', age: 50};
}
async function example() {
// AsyncIterable<{name: string, age: number}>
const stream = of(generateData())
.pipe(
filter(({age}) => age > 30)
);
for await (const data of stream) {
console.log(`Name: ${data.name}, Age: ${data.age}`);
}
// Name: Bob, Age: 40
// Name: Charlie, Age: 50
}
await example();- This code demonstrates how to use the
ofandfilterfunctions from the library to create a typed stream and filter the data. - First, the
generateDatafunction is anasync generatorfunction that yields objects with anameandageproperty. - Next, the of function is used to create a
typed streamfrom the generator functiongenerateData. The resulting stream is anAsyncIterableof objects with anameandageproperty. - The
pipemethod is then used to apply afilterto thestream, keeping only the objects where theageproperty isgreaterthan30. - Finally, the resulting stream is iterated over using a
for-await-ofloop. The output shows only the objects whereageisgreaterthan30.
IDE hints
It gives you these IDE hints with strong types and guides you to follow types.
There is a function getUsers() that gives you AsyncIterable<{name: string, age: number}> and all pipeline operations are type-safe.

A quick look at transformations
import {sequence} from "strict-stream/sequence";
import {map} from "strict-stream/map";
import {from} from "strict-stream/from";
import {filter} from "strict-stream/filter";
async function example() {
const usersStream =
from(
// gives AsyncIterable<number>
// sequence 0,1,2,3,4;
sequence(5)
)
.pipe(
// takes only 0, 2, 4
filter((id) => id % 2 === 0)
)
.pipe(
// maps to {type: string, id: number, name: string}
map((id) => ({
type: 'User',
id,
name: `User ${id}`
}))
)
// inferred type
// AsyncIterable<{type: string, id: number, name: string}>
for await (const user of usersStream) {
console.log(user)
}
// { type: 'User', id: 0, name: 'User 0' }
// { type: 'User', id: 2, name: 'User 2' }
// { type: 'User', id: 4, name: 'User 4' }
}- There is a
sequencefunction that generates a sequence of numbers 0,1,2,3,4. - This sequence is filtered using the
filterfunction to include only the even numbers (i.e., 0, 2, and 4). - And the resulting sequence is mapped using the
mapfunction to convert each number into auser objectwith atype,nameandid. - Finally
userStreamsis a strictly typedAsyncIterable<{type: string, id: number, name: string}>
How to make a stream with AsyncGenerator?
An example of how to create a stream / AsyncIterable with a generator:
async function* generateData() {
yield {name: 'Alice', age: 30};
yield {name: 'Bob', age: 40};
yield {name: 'Charlie', age: 50};
}
async function example() {
const stream = generateData();
for await (const data of stream) {
console.log(`Name: ${data.name}, Age: ${data.age}`);
}
// Name: Alice, Age: 30
// Name: Bob, Age: 40
// Name: Charlie, Age: 50
}
await example();- In this example,
generateDatais a generator function that yields three objects withnameandageproperties. - The
examplefunction creates a stream from the generator by simply calling it and assigns it to thestreamvariable. - Then, it iterates over the stream using a
for await...ofloop and logs thenameandageproperties of each object.
How to make a stream with reader?
reader function which creates an async iterable stream from a reader function.
The reader function is called every time a new value is requested from the stream and should return the value or DONE if there are no more values.
import { reader } from 'strict-stream/reader';
async function example() {
const array = [1, 2, 3];
const stream = reader<number>(() => {
const value = array.shift();
return value === undefined ? reader.DONE : value;
});
for await (const number of stream) {
console.log(number);
}
// Output: 1
// Output: 2
// Output: 3
}
await example();- In this example, the reader function is called with a generator function that pops a value from the array on each call.
- When there are no more values, it returns
reader.DONE, which signals to the stream that there are no more values to yield. - Finally, the for
await...ofloop is used to consume the values from thestream.
How to transform a stream?
import {reader} from 'strict-stream/reader';
import {map} from 'strict-stream/map';
async function example() {
const array = [1, 2, 3];
const stream = reader<number>(async () => {
const value = array.shift();
return value !== undefined ? value : reader.DONE;
});
const transformedStream = of(stream)
.pipe(
map((value) => value * 2)
);
for await (const value of transformedStream) {
console.log(value);
}
// Output: 2, 4, 6
}
await example();- In this example, the function passed to reader returns the next value in the array each time it is called, using
array.shift(). - If there are no more values in the array, it returns the special
reader.DONEvalue to indicate that the stream is complete. - The
offunction is then used to create acomposablestream from theAsyncIterablereturned byreader. - This stream has a
pipemethod that can be used to apply aseries of transformationsto thestream. - The
mapoperator is used totransformthe stream by multiplying each value by2. - The
mapoperator takes a function that is applied toeach value in the stream, and returns anew stream with the transformed values. - Finally, the transformed stream is iterated over using a for
await...ofloop. - In this case, the output will be
2, 4, 6which are the values of the original array multiplied by2.
API
of<Type>(inputStream: StrictStream<Type>): StrictStreamOf<Type>
ofis a factory function that creates a new instance of a composable stream by wrapping anAsyncIterable- The resulting stream can be composed with other stream functions using the
pipemethod.
An example:
import {of} from "strict-stream";
import {map} from "strict-stream/map";
async function* generateIds() {
yield 1
yield 2
yield 3
}
async function example() {
const stream = of(generateIds())
.pipe(
map(async (id) => ({id, name: `User ${id}`}))
);
for await (const data of stream) {
console.log(`Id: ${data.id}, Name: ${data.name}`);
}
// Id: 1, Name: User 1
// Id: 2, Name: User 2
// Id: 3, Name: User 3
}
await example();- The code above is a simple to create and
transformstreams of data. - The
generateIdsfunction is a generator that yields three numbers (1, 2, and 3) in sequence. - The
offunction is used to create a stream from the generator by passinggenerateIdsas its argument. - The
pipemethod is used to apply a transformation to the stream. - In this case, the
mapfunction is used to transform each item in the stream. - The
mapfunction takes a callback that is called with each item in the stream, and returns a new value for that item. - In this case, the callback takes an
idvalue and returns an object with two properties:idandname. - Finally, the transformed stream is consumed with a
for-await-ofloop, which iterates through each item in the stream and logs its id and name properties to the console. - The output will be
Id: 1, Name: User 1,Id: 2, Name: User 2, andId: 3, Name: User 3.
An example (advanced, custom mapper):
import {of, StrictStreamMapper} from "strict-stream";
async function* generateIds() {
yield 1
yield 2
yield 3
}
async function example() {
// my first stream mapper; maps inputStream to mappedStream;
function myMap<Input, Output>(mapper: (input: Input) => Promise<Output>): StrictStreamMapper<Input, Output> {
// receives inputStream
return (inputStream) => {
return (
async function* () {
// reads input stream
for await (const record of inputStream) {
// map values
yield await mapper(record)
}
}
)()
};
}
const stream = of(generateIds())
.pipe(
myMap(async (id) => ({id, name: `User ${id}`}))
);
for await (const data of stream) {
console.log(`Id: ${data.id}, Name: ${data.name}`);
}
// Id: 1, Name: User 1
// Id: 2, Name: User 2
// Id: 3, Name: User 3
}
await example();ofcreates a new stream instance from thegenerateIdsasync generator.- the resulting stream is composed with the
myMapfunction that transforms eachidinto an object withidandnameproperties. - finally, the transformed stream is iterated using a
for await...ofloop.
from<Input>(streamLike: StrictStreamLike<Input>): StrictStreamOf<Input>
The from function is used to convert any iterable object, whether synchronous or asynchronous, to a StrictStream.
It takes a single argument of type StrictStreamLike<Input>, which can be either an Iterable or an AsyncIterable;
The from function returns a StrictStream object of type StrictStreamOf<Input>, which has a pipe method that can be used to transform the stream.
StrictStreamLike<Type> type means AsyncIterable<Type> | Iterable<Type> | Type[]
An example
import {from} from "strict-stream/from";
import {map} from "strict-stream/map";
async function* generateIds() {
yield 1
yield 2
yield 3
}
async function example() {
const streamLike1: Iterable<number> = [1, 2, 3];
const streamLike2: AsyncIterable<number> = generateIds(); // is equivalent
// could consume `streamLike1` or `streamLike2`
const stream = from(streamLike1)
.pipe(
map(async (id) => ({id, name: `User ${id}`}))
);
for await (const data of stream) {
console.log(`Id: ${data.id}, Name: ${data.name}`);
}
// Id: 1, Name: User 1
// Id: 2, Name: User 2
// Id: 3, Name: User 3
}
await example();
- The example demonstrates how to use the
fromfunction to turn an iterable into acomposable stream. - An asynchronous generator function called
generateIdsis defined that yields the numbers1, 2, and 3. streamLike1is defined as an array containing the numbers1, 2, and 3.streamLike2is defined as anasync iterablethat is equivalent togenerateIds.- The
fromfunction is then used to create a stream fromstreamLike1. - This stream is then piped through a
mapfunction that maps each number to an object containing anidand anamefield. - Finally, the resulting stream is consumed using a
for awaitloop
tap<Input>(fn: (input: Input) => Promised<any>): StrictStreamMapper<Input, Input>
tap is a utility function that allows you to perform side-effects on each element of a stream without modifying the stream itself.
It works by taking a callback function as an argument, which is invoked for each element of the stream,
but then simply returns the original element, so that it can be passed on to the next step in the pipeline unchanged.
An example:
import {of} from "strict-stream";
import {tap} from "strict-stream/tap";
async function example() {
async function* generateIds() {
yield 1
yield 2
yield 3
}
const transformedStream = of(generateIds())
.pipe(
tap((value) => console.log(value))
);
for await (const value of transformedStream) {
/* empty */
}
// 1
// 2
// 3
}
await example();- In this example, we start with an asynchronous generator that yields three numbers: 1, 2, and 3.
- We then use the
offunction to wrap this generator in a composable stream, and then use thepipemethod to apply thetapfunction to the stream. - The
tapfunction simply logs each element of the stream to the console. - Finally, we iterate over the transformed stream using a
for-await-ofloop, which triggers the evaluation of the stream and executes theside-effectsof thetapfunction. - However, since tap returns each element unchanged, the loop does not actually output anything to the console.
- The output of the example, therefore, is simply the values 1, 2, and 3, printed to the console by the tap function.
run<Type, Default = undefined>(stream: StrictStream<Type>, defaultValue?: Default): Promise<Type | Default>
- Consumes the given
AsyncIterable, iterating over its values, and returns a Promise that resolves to thelast valueof thestream. - If the
streamis empty, the function returns adefault value, which is optional and defaults toundefined.
An example
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
async function example() {
async function* generateIds() {
yield 1
yield 2
yield 3
}
const stream = of(generateIds())
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// Output
// 1
// 2
// 3
}
await example();- It then creates a stream by calling of with
generateIdsas its argument. It then pipes this stream through a tap operation which logs each value emitted by the stream to the console. - Finally, it calls the run function to execute the stream. The run function returns a Promise that resolves when the stream has completed.
- In this case, it logs the numbers 1, 2, and 3 to the console.
sequence(length: number): StrictStream<number>
An example
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
import {sequence} from "strict-stream/sequence";
async function example() {
const sequenceStream = of(sequence(3))
.pipe(
tap((value) => console.log(value))
);
await run(sequenceStream)
// 0
// 1
// 2
}
await example();- The code is an example of how to use the
sequencefunction to generate a stream of numbers with agiven length - And then use the of and
pipefunctions totransformthestreamby appending atapfunction thatlogs each valuein thestreamto theconsole. - Specifically, the
sequencefunction generates astream of numbersfrom0 up to the given length. - The of function is then used to create a new stream from the output of the
sequencefunction, and thepipemethod is called to add thetapfunction as a transform to the stream. - Finally, the
runfunction is called to consume thestreamandlog each valueto the console.
When the example function is called
- It creates a new stream using
of(sequence(3)), which generates a stream of numbers from0 to 2. - The
pipemethod is then used to append atapfunction thatlogs each valuein thestreamto the console. - Finally, the
runfunction is called to consume the stream and log each value to the console. - The output is: 1, 2, 3
map<Input, Output>(mapper: (input: Input) => Promised<Output>): StrictStreamMapper<Input, Output>
The map function is a higher-order function that takes a function mapper as input and returns another function that applies the mapper function to every element in a stream.
The mapper function transforms each element of the stream and returns a new output element.
An example
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
import {sequence} from "strict-stream/sequence";
import {map} from "strict-stream/map";
async function example() {
const sequenceStream = of(sequence(3))
.pipe(
map((id) => id * 2)
)
.pipe(
tap((value) => console.log(value))
);
await run(sequenceStream)
// 0
// 2
// 4
}
await example();- In the example function, the
offunction is used to create a new stream from thesequence generatorfunction that generates a sequence of numbers from0 to 2. - This stream is then
pipedthrough themapfunction, which multiplies each number in the stream by 2. - The resulting stream is then piped through the
tapfunction, whichlogs each elementin the stream to the console. - Finally, the
runfunction is called to consume the stream and output its elements. - The output of the example function will log the numbers
0, 2, and 4to the console, which are the result of multiplying the original numbers generated by sequence by2.
filter<Input>(condition: (input: Input) => Promised<boolean | undefined | null>): StrictStreamMapper<Input, Input>
The filter function is a higher-order function that takes a condition function as its input and returns a function that can be used as a stream mapper.
The condition function is applied to each item in the stream, and only those items for which the condition function returns a truthy value are included in the output stream.
An example
import {of, run} from "strict-stream";
import {tap} from "strict-stream/tap";
import {sequence} from "strict-stream/sequence";
import {filter} from "strict-stream/filter";
async function example() {
const stream = of(sequence(3))
.pipe(
filter((id) => id > 0)
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// 1
// 2
}
await example();- In the example function, the
offunction is used to create a stream from thesequencegenerator that yields three numbers (0, 1, and 2). - This stream is then piped to a
filtermapper that only allows numbersgreater than 0to pass through. - The resulting stream is then piped to a
tapmapper thatlogs each itemto the console. - Finally, the
runfunction is used to execute the stream and log the output to the console. - The output of this code will be the numbers
1 and 2, since those are the only numbers in the original sequence that meet the filter conditiongreater than 0.
reduce<Input, Accumulator>(reducer: (accumulator: Accumulator, input: Input) => Promised<Accumulator>, initial: Accumulator): StrictStreamMapper<Input, Accumulator>
The reduce function is a higher-order function that takes a reducer function and an initial value as input, and returns a new function that can be used to transform a stream of values.
The reducer function takes an accumulator value and an input value, and returns a new accumulator value.
The reduce function applies the reducer function to each value in the stream, accumulating the results into a final value that is emitted by the resulting stream.
An example
import {of, run} from "strict-stream";
import {sequence} from "strict-stream/sequence";
import {reduce} from "strict-stream/reduce";
async function example() {
const stream = of(sequence(5))
.pipe(
reduce(({counter}) => ({counter: counter + 1}), {counter: 0})
);
const result = await run(stream);
console.log(result)
// { counter: 5 }
}
await example();- In the example, the reduce function is used to count the number of values in a stream.
- The stream is created using the
sequencefunction, which generates a stream of numbers from0 to 4. - The
reducefunction takes an object with acounterproperty as theinitial value, and a reducer function that increments the counter property for each input value. - The
resulting streamemits asingle objectwith the final value of the counter property, which is5in this case. - The
runfunction is used to execute the stream and log the final result.
batch<Input>(size: number): StrictStreamMapper<Input, Input[]>
batchis a function that returns amapperfunction that takes aninput streamand emits an array of inputs that are processed inbatches of a given size.- And when the batch reaches the desired size it emits the batch downstream.
An example
import {of, run} from "strict-stream";
import {sequence} from "strict-stream/sequence";
import {batch} from "strict-stream/batch";
async function example() {
const stream = of(sequence(3))
.pipe(
batch(2)
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// Output
// [ 0, 1 ]
// [ 2 ]
}
await example();- The example code creates a
sequencestream of 3 numbers - And pipes it through the
batchfunction with abatch size of 2. - The resulting stream emits two arrays,
- The first with the values
[0, 1]and the second with the value[2]. - The
tapfunction is used to log each emitted value to the console.
flat<Type>(): StrictStreamMapper<Type | StrictStreamLike<Type>, Type>
- The
flatfunction is astream transformerthat flattens the first level of stream or an array (Iterable). - If the input stream contains arrays or nested streams
- the
flatfunction will iterate over each element in the array or nested stream and emit it as a separate item in the output stream.
An example
import {run} from "strict-stream";
import {from} from "strict-stream/from";
import {flat} from "strict-stream/flat";
import {tap} from "strict-stream/tap";
async function example() {
const stream = from(
[
[1, 2],
[3, 4],
5
]
)
.pipe(
flat()
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// 1
// 2
// 3
// 4
// 5
}
await example();- In the example code, the
fromfunction is used to create a stream from an array that contains nested arrays and a single value. - The
flatfunction is then used toflattenthe first level of stream so that each element in the nested arrays is emitted as a separate item in the output stream. - Finally, the
tapfunction is used to log each item. - When the
examplefunction is run, the output stream contains each element in the nested arrays and the single value, emitted as separate items in the stream.
flatMap<Input, Output>(mapper: (input: Input) => Promised<Output | StrictStreamLike<Output>>): StrictStreamMapper<Input, Output>
flatMapis a function thatmaps each elementof a stream to another stream andthen flattens the first level of resulting streamof streams into a single stream.- It takes a
mapperfunction thatmaps the input element. - The resulting
streamis thenflatmapped, meaning that it is flattened so that all elements are emitted in a single stream.
An example
import {run} from "strict-stream";
import {from} from "strict-stream/from";
import {flatMap} from "strict-stream/flatMap";
async function example() {
type User = {
id: number;
name: string;
orders: Order[];
};
type Order = {
id: number;
product: string;
price: number;
};
const users: User[] = [
{
id: 1,
name: "Alice",
orders: [
{id: 101, product: "Widget A", price: 10.0},
{id: 102, product: "Widget B", price: 20.0},
],
},
{
id: 2,
name: "Bob",
orders: [
{id: 201, product: "Widget C", price: 30.0},
{id: 202, product: "Widget D", price: 40.0},
{id: 203, product: "Widget E", price: 50.0},
],
},
];
async function fetchStreamOfUsers(): Promise<StrictStreamOf<User>> {
return from(users);
}
// StrictStreamOf<{userId: number, orderId: number}
const stream = (await fetchStreamOfUsers())
.pipe(
flatMap(async (user) => {
return from(user.orders)
.pipe(
map(
async (order) => {
return {
userId: user.id,
orderId: order.id,
price: order.price
}
})
)
})
)
.pipe(
tap((value) => console.log(value))
);
await run(stream)
// { userId: 1, orderId: 101, price: 10 }
// { userId: 1, orderId: 102, price: 20 }
// { userId: 2, orderId: 201, price: 30 }
// { userId: 2, orderId: 202, price: 40 }
// { userId: 2, orderId: 203, price: 50 }
}
await example();- In the provided example,
flatMapis used toflattenthe orders of theusers. - A stream of
usersis created using thefromfunction. - The
flatMapfunction is then called on this stream, mapping each user to a stream of orders using thefromfunction again. - The resulting
stream of ordersis thenmapped to an objectwith theuserId,orderId, andpriceusing themapfunction. - Finally, the resulting stream of objects is logged using the
tapfunction. - When the stream is run using the
runfunction, it logs each object in the stream, which contains theuserId,orderId, andpricefor each order.
pipe<In, Out>(mapper: StrictStreamMapper<In, Out>): StrictStreamPlumber<In, Out>
- The
pipefunction is used to createcomposable behaviorforStrictStreams. - It takes a
StrictStreamMapperas an input, which is a function that transforms aStrictStreamof one type to aStrictStreamof another type. pipethen returns aStrictStreamPlumber, which is a function that takes aStrictStreamof the original input type and returns aStrictStreamof the final output type.pipealso has apipemethod on the returned function, which allows for easycomposition of multipleStrictStreamMappers.
An example
import {run, pipe} from "strict-stream";
import {from} from "strict-stream/from";
import {map} from "strict-stream/map";
async function example() {
// composable behavior
const addFive = pipe(
map((input: number) => input + 4)
)
.pipe(
map(async (input) => input + 1)
)
// High order function to manage / compose part of the pipe
function multiple(x: number) {
return pipe(
map(async (value: number) => value * x)
);
}
const stream = from([1, 2, 3])
.pipe(
addFive
)
.pipe(multiple(2))
.pipe(tap((value) => console.log(value)))
await run(stream)
// 12
// 14
// 16
}
await example();- In the
examplefunction, we create two separateStrictStreamMappers usingpipe. - We then use the
multiplefunction to create anotherStrictStreamMapperthatmultiplies the input value by agiven number. - We then
composethese three mappers usingpipeand use the resultingStrictStreamPlumberto create astream of numbers. - Finally, we
runthestreamand log each value as it is processed. - The output will be
12, 14, 16.
An example of flatMap implementation
There is a composition of map and flat functions.
export function flatMap<Input, Output>(mapper: (input: Input) => Promised<Output | StrictStreamLike<Output>>): StrictStreamMapper<Input, Output> {
return pipe(
map(mapper)
).pipe(
flat()
);
}- The
flatMapfunction is implemented using thepipefunction, whichcomposesa set ofStrictStreamMapperfunctions into a singleStrictStreamMapper. - In the implementation of
flatMap, themapfunction is first applied to themapperargument - Resulting in a new
StrictStreamMapperthat transforms the input values using themapperfunction. - This transformation may result in an output value or a
StrictStreamLikeobject that contains a set of output values. - The resulting
StrictStreamMapperis then piped into theflatfunction, which flattens anyStrictStreamLikeobjects into a stream of individual output values.
scaleSync<Input, Output>(size: number, mapper: (input: Input) => Promised<Output>): StrictStreamMapper<Input, Output>
- Basically the
mapfunction with desiredconcurrencyto process records. That keeps the ordering of output stream unchanged. - The
scaleSyncfunction takes two arguments, the first one is anumberwhich represents the concurrency, and the second one is a mapper function that maps theinputto theoutput.
An example
import {run, of} from "strict-stream";
import {scaleSync} from "strict-stream/scaleSync";
async function fetchUserById(id: number) {
// some logic to fetch the use
return {
id,
userName: `User ${id}`
};
}
async function getUserIds() {
return sequence(3);
}
async function example() {
const usersStream = of(await getUserIds())
.pipe(
// run's the async queries concurrently, keeps the ordering of output stream unchanged
scaleSync(5, async (id) => fetchUserById(id))
)
.pipe(
tap((value) => console.log(value))
);
await run(usersStream)
// { id: 0, userName: 'User 0' }
// { id: 1, userName: 'User 1' }
// { id: 2, userName: 'User 2' }
}
await example();- In the example, the
scaleSyncfunction is used tofetch userdetails for a given set of user ids. - The
fetchUserByIdfunction fetches the user detailsasynchronouslyfor a givenuser id, and thegetUserIdsfunctiongenerates a streamof user ids. - The
usersStreamis created with concurrency of 5, and executing thefetchUserByIdfunction for each id. - The resulting
user detailsare logged to the console using thetapfunction.
concatenate<T>(...streams: StrictStream<any>[]): StrictStream<T>
concatenateis a function that concatenates multiple streams into a single stream- ensuring that the records are read sequentially one by one, and maintains the ordering of the output stream unchanged.
- The implementation of the function is done using rest parameters to allow for an
arbitrary number of streams to be concatenated
An example
import {run, of} from "strict-stream";
import {concatenate} from "strict-stream/concatenate";
import {from} from "strict-stream/from";
import {tap} from "strict-stream/tap";
async function* generateIds() {
yield 10
yield 20
yield 30
}
async function example() {
const streamLike1: Iterable<number> = [1, 2, 3];
const streamLike2: AsyncIterable<number> = generateIds(); // is equivalent
const stream = from(
concatenate(
from(streamLike1),
from(streamLike2),
)
).pipe(
tap((value) => console.log(value))
);
await run(stream)
// 1
// 2
// 3
// 10
// 20
// 30
}
await example();- In the provided example, two stream-likes, one iterable and one async iterable, are concatenated using
concatenate. - The resulting stream is then converted into a strict stream using the
fromfunction - And a
tapoperation is performed on it to log each record. - Finally, the stream is run using the
runfunction, which is a utility function to consume and execute the stream. - The output shows that the resulting stream contains all the records from both input streams in the correct order.
interval(ms: number, startImmediate = false): IInterval
intervalis a function that creates astreamthat emits a sequence of integers at regular intervals.- It takes two parameters: the
durationof the interval inmilliseconds, and aboolean flagindicating whether the stream should start emittingimmediatelyor after one interval has elapsed. - The function returns a
StrictStreamobject with an additional methodstopthat can be used to stop the interval stream.
An example
import {run, of} from "strict-stream";
import {tap} from "strict-stream/tap";
import {map} from "strict-stream/map";
import {interval} from "strict-stream/interval";
async function example() {
// every 300ms
const source = interval(300);
let counter = 0;
const stream = of(source)
.pipe(
map(() => {
counter++
if (counter > 3) {
// stops the interval stream
source.stop()
}
return counter;
})
)
.pipe(
tap((value) => console.log(value))
)
await run(stream)
// 1
// 2
// 3
// 4
}
await example();- This example creates an
interval streamthat emits every300ms - And uses the
mapoperator to increment a counter and stop the stream after4 emissions. - The
tapoperator is used to log the emitted values to the console.
Node.JS integration
nodeReadable<Output>(readable: Readable): StrictStreamOf<Output>
Turns readable to StrictStreamOf
import {nodeReadable} from "strict-stream/nodeReadable";
import {Readable} from "stream";
const readable = Readable.from('Hello Stream');
const stream = nodeReadable<string>(readable)
.pipe(map((chunk) => `${chunk} + OK`))nodeWritable<Type>(writable: Writable, encoding: BufferEncoding = 'utf-8'): StrictStreamMapper<Type, Type>
Integrates writable stream
import {from} from "strict-stream/from";
import {nodeWritable} from "strict-stream/nodeWritable";
import {Readable, Writable} from "stream";
const written: { chunk: any }[] = []
const myWritable = new Writable({
write(chunk, encoding: BufferEncoding, callback) {
written.push({chunk})
callback()
},
});
const buffer = Buffer.from([100, 101, 102]);
const stream = from([buffer])
.pipe(nodeWritable(myWritable));nodeTransform<Input, Output>(transform: Transform, options: ReadableOptions = {}): StrictStreamMapper<Input, Output>
Integrates transform stream
import {from} from "strict-stream/from";
import {nodeTransform} from "strict-stream/nodeTransform";
import {Readable, Transform} from "stream";
const myTransform = new Transform({
transform(chunk: any, encoding, callback) {
callback(null, `${chunk} + OK`)
},
});
const stream = from(Readable.from('Hello'))
.pipe(nodeTransform(myTransform));Beta API
Beta Transformations
scale<Input, Output>(max: number, mapper: (input: Input) => Promised<Output>): StrictStreamMapper<Input, Output>
Maps the stream with max concurrently. Does not guarantee the ordering of stream items for sure. See scaleSync for the ordered stream.
const out = of(sequence(4))
.pipe(
scale(10, async (value) => {
return value
})
);batchTimed<Input>(size: number, maxTimeout: Milliseconds): StrictStreamMapper<Input, Input[]>
Emit batches by size or maxTimeout; Useful in the infinity streams to handle batches.
// batch by timeout
const stream = of(sequence(5))
.pipe(tap(() => {
return delay(100)
}))
.pipe(batchTimed(2, 10));// batch by timeout
const stream = of(sequence(5))
.pipe(tap(() => {
return delay(100)
}))
.pipe(batchTimed(2, 10));// batch by size
const stream = of(sequence(5))
.pipe(tap(() => {
return delay(10)
}))
.pipe(batchTimed(2, 500));buffer<Input>(size: number): StrictStreamMapper<Input, Input>
Simply adds a bit of buffer to have more room for reader / upstream.
const out = of(sequence(4))
.pipe(
buffer(3)
);Beta Source Operations
merge<Type>(...streams: StrictStream<any>[]): StrictStream<Type>
Merge streams concurrently. Does not guarantee the ordering. See concatenate for ordered streams.
const usersV1Stream = from([{type: 'userV1', name: 'User Name'}])
.pipe(tap(() => delay(100)));
const usersV2Stream = from([{type: 'userV2', firstName: 'User', lastName: 'Name'}]);
const usersStream = merge(usersV1Stream, usersV2Stream);objectReader<T extends object | object[]>(read: () => Promised<T | null | undefined | boolean | number>): StrictStream<T>
Simplifies reading source of objects;
const array = [{id: 1}, {id: 2}]
const stream = objectReader(() => array.shift());Utilities
toArray<T>(input: StrictStream<T>): Promise<T[]>
Not recommended for production usage. Could lead to RAM consumption.
const stream = from([1, 2, 3]);
const outputs = await toArray(stream);
expect(outputs).toEqual([1, 2, 3])License
strict-stream is licensed under the MIT License.
