npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

stream-operators

v1.0.3

Published

A library for creating Readable, Writable and Transform stream which adopt two major tools in async world - Stream and Promise - with declarative programming approach.

Downloads

11

Readme

stream-operators npm GitHub license type PRs Welcome

A library for creating Readable, Writable and Transform stream which adopt two major tools in async world - Stream and Promise - with declarative programming approach.

Node.js native stream is very powerfully tool for creating high performance application. This library designed for using the power of stream for read, transform and write streams in object mode. the API is very declarative and inspired by RxJs. another design decision is combining streams with promise for better async handling; Most of the helpers could return promise instead of absolute data.

Installation

$ npm install stream-operators

Read stream creators

counter

counter: (limit: number = Infinity) => Readable

Creates a read stream that start a counter from zero to limit - the limit default value is Infinity.

sample
const { counter } = require("stream-operators");

counter(5)
    .pipe(process.stdout)

// console 0 1 2 3 4 

from

from: (list: any[]) => Readable

Creates a read stream from a list and push items in sequence.

sample

const { from } = require("stream-operators");

from([1, 2, 3, 4])
    .pipe(process.stdout)

// console: 1 2 3 4 

read

read: (readFn: (times: number, size: number) => any) => Readable

Creates a read stream from a read function, each time consumer send a read signal the reader function is called and the return value push to the stream. The return value could be a promise and in this case the resolved value push to the stream.

sample
read((page) => fetchProducts(page).then(list => list.map(p => p.name).join("\n")))
    .pipe(process.stdout)

// console apple\n orange\n banana

Write stream creators

write

write: (writeFn: (chunk: any) => any | Promise<any>) => Writable

Creates a write stream from a write function, each time a chunk is prepared this function is called and when the function execution finishes ( in case of promise if the promise resolved) the read signal send to top reader stream.

sample
counter(5)
    .pipe(write(console.log))

// console 0 1 2 3 4

readFileLineStream
    .pipe(write((line) => writeToDb(mapToProduct(line))))    

fork

fork: (...branches: ((stream: Stream) => void | Writable)[]) => Writable

forks N read stream from a main read stream. if the callback function return a write stream the finish event fire after branches finish event.

sample
req
   .pipe(fork(
       (stream) => stream.pipe(res), // echo request to response
       (stream) => stream.pipe(write(console.log)), // log request to console
   ))

Transform stream creators

map

map: (map: (item: any) => any | Promise<any>) => Transform

Creates a transform stream from a map function. for each chunk the the result of map function ( the resolved value in case of promise) push to stream.

sample
const logger = write(console.log);

counter(5)
    .pipe(map((item) => item * 2))
    .pipe(logger)

// console: 0 2 4 6 8

readProductCsvFile
    .pipe(map(mapToJson))
    .pipe(startWith("["))
    .pipe(endWith("]"))
    .pipe(res)

// response [ {id:1, name:"apple"}, {id:2, name:"orange"}, ... ]

filter

filter: (filter: (item: any) => boolean | Promise<boolean>) => Transform

Creates a transform stream from a map function. for each chunk if the result of filter be true (or it resolve to truthy value in case of promise) the chunk push to stream.

sample
const logger = write(console.log);
const isEven = (item) => item % 2 === 0;

counter(5)
    .pipe(filter(isEven)
    .pipe(logger)

// console: 0 2 4

delay

delay: (time: number) => Transform

Creates a transform stream that made delay on the input stream. the time unit is ms.

sample
const logger = write(console.log);
const isEven = (item) => item % 2 === 0;

counter(5)
    .pipe(delay(1000))
    .pipe(filter(isEven)
    .pipe(logger)

// console: 0 2 4

buffer

buffer: (bufferSize: number = Infinity) => Transform

Creates a transform stream that buffer N chunk and pushes this buffer to output stream. the bufferSize default value is Infinity - so the output stream push only one chunk when the input stream ends.

sample
const logger = write(console.log);

counter(5)
    .pipe(buffer(2))
    .pipe(logger)

// console: [0,1] [2,3] [4]

// writing a json stream data to db - each 1000 items insert in one bulk insert query for better performance
readEnormousJson
    .pipe(map(mapToStandardProduct))
    .pipe(buffer(1000))
    .pipe(writeProductListToDB)

startWith

startWith: (data: any) => Transform

Creates a transform stream that push a chunk with input value as first chunk.

sample
const logger = write(console.log);

counter(5)
    .pipe(startWith("start"))
    .pipe(logger)

// console: start 0 1 2 3 4

// read product list from DB and convert it to CSV first csv line is "id,name,price" and send stream as response
readProductFromDb
    .pipe(map(productToCSVLine))
    .pipe(startWith("id,name,price\n"))
    .pipe(res)

endWith

endWith: (data: any) => Transform

Creates a transform stream that push a chunk with input value as last chunk.

sample
const logger = write(console.log);

counter(5)
    .pipe(startWith("start"))
    .pipe(endWith("end"))
    .pipe(logger)

// console: start 0 1 2 3 4 end

readProductCsvFile
    .pipe(map(mapToJson))
    .pipe(startWith("["))
    .pipe(endWith("]"))
    .pipe(res)

// response [ {id:1, name:"apple"}, {id:2, name:"orange"}, ... ]

extract

extract: () => Transform

Create a transform stream that push item of a chunk (if chunk is am array or has forEach function) to output stream.

const logger = write(console.log);

from([[1,2,3],[4,5,6]])
    .pipe(extract())
    .pipe(logger)

// console: 1 2 3 4 5 6 7

// read each 1000 records in one query from DB then push array to stream the extract function extract the list for normalizing data and the rest of stream chain work with simple data instead of a list.
read((page) => readProductFromDB({limit:1000, page})
    .pipe(extract())
    .pipe(toJsonStream)
    .pipe(res)

reduce

reduce: (reducer: (acc: any, item: any) => any, reset?: any | (times:number) => any, bufferSize = Infinity) => Transform

Create a transform stream that reduce N chunk to one chunk. the N - bufferSize - default value is Infinity, so it reduce all chunk to one chunk by default. the initial value is reset value (or the output of reset if the reset is a function - the reset function give a times (chunk index) and return reset value ).

sample
const logger = write(console.log);

counter(5)
    .pipe(reduce((acc, sum)=>acc + sum , 0))
    .pipe(logger)

// console 10   

counter(5)
    .pipe(reduce((acc, sum)=>acc + sum , 0, 3))
    .pipe(logger)

// console 3 7

// implementing buffer transform stream with reduce helper.
const buffer = (size = Infinity) => reduce((list, chunk) => [...list, chunk], [])

inspect

inspect: () => Transform

Create a transform stream that log each chunk to console without any change to stream. this function is useful in development and debug purpose.

counter(5)
   .pipe(inspect())
   .pipe(write(() => void 0)) 

// console: 0\n 1\n 2\n 3\n 4\n