npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

utilitystreams

v1.0.0

Published

Convenient Streams

Downloads

391

Readme

utilitystreams

Convenient Streams, simple and easy to use.

Usage

$ npm install utilitystreams
// ...
import { DebounceStream } from "utilitystreams";

// ...

app.use("/stream-req", async (req, res) => {
  await pipeline(
    req,
    new DebounceStream({ waitMs: 300 }),
    createWriteStream("/file/path/to/save"),
  );
});

Documentation

All stream have test files. Detailed usage can be found in the test file.

BufferStream

Collect the input data into an array.

Outputs the array of collected data if

  • the length of the array is same with the set size.
  • after the set waitMs (only if you set waitMs option)
import { BufferStream } from "utilitystreams";

await pipeline(
  process.stdout,
  new BufferStream({ size: 100, waitMs: 1000 }, { objectMode: true }),
  saveToDbStream,
);

ReduceStream

Accumulate the input data into the acc object.

import { ReduceStream } from "utilitystreams";

await pipeline(
  arguments,
  new ReduceStream(
    { acc: "", f: (acc, cur) => `${acc} ${cur}` },
    { objectMode: true },
  ),
  process.stdout,
);

ToArrayStream

Collects the input data into an array.

import { ToArrayStream } from "utilitystreams";

const csvLines = [];
await pipeline(
  createReadStream("data.csv"),
  csvParser,
  new ToArrayStream({ target: csvLines }),
);

DelayStream

Delays the input data by the set time.

  • The output data is in the same order as the input data.
  • It does not delay the following input data.
import { DelayStream } from "utilitystreams";

await pipeline(
  process.stdin,
  new DelayStream({ waitMs: 3000 }),
  process.stdout,
);

DebounceStream

Outputs only the last of the input data for the set time period.

import { DebounceStream } from "utilitystreams";

await pipeline(
  process.stdin,
  new DebounceStream({ waitMs: 100 }),
  process.stdout,
);

ThrottleStream

Ignore other input data for the time you set after the data output.

import { ThrottleStream } from "utilitystreams";

await pipeline(
  process.stdin,
  new ThrottleStream({ waitMs: 100 }),
  process.stdout,
);

MapStream

Make output data from input data using the mapper function.

  • If the input data is a promise, it will be resolved before passed into the mapper function.
  • If the output data is a promise, it will be resolved before push (passed to the next stream).
  • No concurrency. If you want a concurrent processing, you should change the input data as a collection of data manually.
import { MapStream } from "utilitystreams";

await pipeline(
  process.stdout,
  new MapStream(
    {
      f: (message: string) => {
        const logObj = {
          timestamp: new Date().toISOString(),
          message: message,
        };
        return JSON.stringify(logObj);
      },
    },
    { objectMode: true },
  ),
  createWriteStream("/path/to/file.log"),
);

FilterStream

Filter input data only passed by the predicate function.

  • If the input data is a promise, it will be resolved before passed into the predicate function.
  • If the predicate result is a promise, it will be resolved before push the data (passed to the next stream).
  • No concurrency. If you want a concurrent processing, you should change the input data as a collection of data manually.
import { FilterStream } from "utilitystreams";

await pipeline(
  naturalNumbers,
  new FilterStream(
    {
      f: (num: number): boolean => {
        return num % 2 === 0;
      },
    },
    { objectMode: true },
  ),
  createWriteStream("/even-nums"),
);

takeStreamFactory

Create a wrapped stream that yields at most n data from the source stream.

  • support curry style
    • takeStreamFactory({ n: 10 }, sourceStream) -> takeStreamFactory({ n: 10 })(sourceStream)
  • source stream will be closed automatically when wrapped stream is closed.
  • it returns async generator that is compatible with readable stream. If you want an exact stream, wrap it with Readable.from.
import { takeStreamFactory } from "utilitystreams";

await pipeline(
  takeStreamFactory({ n: 10 }, readableStream),
  // ... other streams
  process.stdout,
);

TakeStream

Yield at most n data from the input data.

  • If the source readable stream is large or infinite, you should prepare some end logic or use takeStreamFactory.
    • It's very hard to "end" the stream "pipeline" in the middle.
    • So, I prepare a callback function to do end the source readable stream.
    • You have to prepare some error handling from destroy call or call some custom end logic.
import { TakeStream } from "utilitystreams";

await pipeline(
  readableStream,
  // ... other streams
  new TakeStream({ n: 10 }),
  process.stdout,
);

takeWhileStreamFactory

Create a wrapped stream that yields data from the source stream while the predicate function returns true.

  • support curry style
    • takeWhileStreamFactory({ f: predicate }, sourceStream) -> takeWhileStreamFactory({ f: predicate })(sourceStream)
  • source stream will be closed automatically when wrapped stream is closed.
  • it returns async generator that is compatible with readable stream. If you want an exact stream, wrap it with Readable.from.
import { takeWhileStreamFactory } from "utilitystreams";

await pipeline(
  takeWhileStreamFactory({ f: predicate }, readableStream),
  // ... other streams
  process.stdout,
);

TakeWhileStream

Yield data while the predicate function returns true.

  • **If the source readable stream is large or infinite, you should prepare some end logic or use takeWhileStreamFactory. **
    • It's very hard to "end" the stream "pipeline" in the middle.
    • So, I prepare a callback function to do end the source readable stream.
    • You have to prepare some error handling from destroy call or call some custom end logic.
import { TakeWhileStream } from "utilitystreams";

await pipeline(
  readableStream,
  // ... other streams
  new TakeWhileStream({ f: predicate }),
  process.stdout,
);

takeUntilStreamFactory

Create a wrapped stream that yields data from the source stream until the predicate function returns true.

  • support curry style
    • takeUntilStreamFactory({ f: predicate }, sourceStream) -> takeUntilStreamFactory({ f: predicate })(sourceStream)
  • source stream will be closed automatically when wrapped stream is closed.
  • it returns async generator that is compatible with readable stream. If you want an exact stream, wrap it with Readable.from.
import { takeUntilStreamFactory } from "utilitystreams";

await pipeline(
  takeUntilStreamFactory({ f: predicate }, readableStream),
  // ... other streams
  process.stdout,
);

TakeUntilStream

Yield data until the predicate function returns true.

  • **If the source readable stream is large or infinite, you should prepare some end logic or use takeUntilStreamFactory. **
    • It's very hard to "end" the stream "pipeline" in the middle.
    • So, I prepare a callback function to do end the source readable stream.
    • You have to prepare some error handling from destroy call or call some custom end logic.
import { TakeUntilStream } from "utilitystreams";

await pipeline(
  readableStream,
  // ... other streams
  new TakeUntilStream({ f: predicate }),
  process.stdout,
);

TapStream

Execute the consumer function with input data.

  • If the input data is a promise, it will be resolved before passed into the consumer function.
  • If the output data is a promise, it will be resolved before push the original data (passed to the next stream).
  • No concurrency. If you want a concurrent processing, you should change the input data as a collection of data manually.
import { TapStream } from "utilitystreams";

await pipeline(
  messages,
  new TapStream(
    {
      f: (message: string) => {
        log.info(message);
      },
    },
    { objectMode: true },
  ),
);