npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

hole

v0.9.1

Published

Async friendly, stream-based task consuming utility in Node.js

Downloads

260

Readme

Hole Build Status

Async friendly, stream-based task consuming utility for Node.js.

Concept

After years, it has been more important to write less-state code for me and naturally it has become more data-driven/functional style. But that and async programming are not always a good match. On one hand when you process too many async tasks at once, await Promise.all(promises100000) for example, you'd end up with error messages such as process out of memory, socket hang up, ETIMEDOUT or ECONNRESET. On the other hand, it's not efficient at all when you process whole data one by one in sequence. Reactive Extensions might be a solution though, I didn't want to tune timer functions for the problem; all I want is just to set limit number of parallel execution and finish a task at the best speed.

Then Node Stream object mode with beautiful backpressuring mechanism appears. Object mode lets you flow JavaScript object in a stream with highWaterMark option, which decides limit of number of buffering objects. By the native backpressure implementation, a busy writable stream halts data flowing and takes time before requesting another to upstream. And thanks to parallel-stream, a transform branches out to consume buffer parallely as it keeps order of data.

Node Hole offers a fun, easy and efficient way of parallel data consuming by wrapping solid Node Stream implementation with async/promise friendly API.

Usage

To install hole in your project, run:

$ npm install -S hole

... then utilize it like below. Suppose there is a blog post API where GET /posts/:id to get post detail JSON and GET /posts?page=1 for list of post summaries. Let's say we want to store post details of the latest 3 pages into local DB.

await fromArray([1, 2, 3])                      // Start with pages array
    .pipe(page => getPosts(page))               // (parallel async) Get list of posts for a page
    .split()                                    // Split the array into pieces
    .pipe(post => getPostDetail(post.id), 3)    // (parallel async) Get a detail of post, with maximum parallel request of 3
    .pipe(detail => saveToLocalDB(detail))      // (parallel async) Save the detail to local DB
    .catch(err => console.log(err));            // On any error occurs in the middle, stop the whole process and catch the error in here
console.log('All done :)');

API

All methods are typed by TypeScript.

hole(object)

Typed as hole<T>(object: T): Hole<T>.

A function to start stream with a single JavaScript object of any kind.

Example:

import hole from 'hole';
//...
await hole(998)
    .pipe(n => n + 1)
    .pipe(n => n + 1)
    .pipe(console.log)  // 1000

fromArray(array)

Typed as fromArray<T>(Array<T>): Hole<T>.

A function to start stream with fixed multiple objects with an array.

Example:

import {fromArray} from 'hole';
...
await fromArray([1, 2, 3, 4, 5])
    .pipe(n => n * 10)
    .pipe(console.log); // 10
                        // 20
                        // 30
                        // 40
                        // 50

fromStream(readable)

A function to start stream with an native Node readable stream.

Example:

import {fromStream} from 'hole';
import fs from 'fs';
import csv2 from 'csv2';
...
  const nameColumnIndex = 3;
  await fromStream(fs.createReadableStream('./data.csv'))
      .pipe(csv2())
      .pipe(record => record[nameColumnIndex])
      .pipe(console.log); // James
                          // John
                          // Robert
                          // Michael
                          // ...

.pipe(processor, opts)

Typed as pipe<U>(T => U, PipeOption?): Hole<U>.

When the processor is a synchronous function, it simply gets a value from the previous and passes processed value to the next.

If it returns null or undefined, that means it filters out the data that is not used any more.

Example:

await fromArray([1, 2, 3, 4, 5])
  .pipe(n => {
    if (n > 2) return n;
  })
  .pipe(console.log); // 3
                      // 4
                      // 5

Note that a processor function will be called with a transform's this context: you also can use .push(data) as usual in transform() function.

Example:

await hole(5)
    .pipe(function decrementAndPush(n) {
      if (n <= 0) return;
      this.push(n);
      decrementAndPush.call(this, n - 1);
    })
    .pipe(console.log); // 5
                        // 4
                        // 3
                        // 2
                        // 1

When the processor returns a promise, it's similar but resolved value will be passed to the next.

pipe<U>(T => Promise<U>, PipeOption): Hole<U>

Example:

await fromArray([1, 2, 3, 4])
  .pipe(async page => {
    const posts = await getPosts(page);
    return posts.filter(post => post.author !== 'anonimous');
  }, 2) // Limit maxParallel to 2
  .split()
  .pipe(post => console.log(post.title))  // Lorem ipsum ...
                                          // Ut enim ad...
                                          // Duis aute irure...
                                          // Excepteur sint...

.pipe(transformer, opts)

Typed as pipe<U>(Writable, PipeOption): Hole<U>.

Also .pipe() accepts Node native Transformer where you can utilize csv2 etc.

Example

.split()

Typed as split(): Hole<$ElementType<T, number>>.

Previous value should be array. It splits the array so the next process can handle each piece of it.

Example

.concat(size)

Typed as concat(number): Hole<Array<T>>.

It concatenates sequential data to be specified size of array. This is useful when you post array data at once in the way that Elasticsearch Bulk API does.

Example:

await fromArray([1, 2, 3, 4, 5])
    .concat(2)
    .pipe(console.log); // [ 1, 2 ]
                        // [ 3, 4 ]
                        // [ 5 ]

.collect()

collect(): Promise<Array<T>>

It collects all data that the previous process returns and gives you an array. Note that, when number of data is enormous, the collected array may oppresse room of your memory.

Example:

const results = await fromArray([1, 2, 3, 4, 5])
    .pipe(n => n * 10)
    .collect();
console.log(results); // [10, 20, 30, 40, 50]

Hole

The core class that this package exports.

Hole extends LazyPromise that starts streaming when .then() or .catch() is called. Other extended functions are listed below.

PipeOption

type PipeOption =
  | {
      maxParallel: number,
      highWaterMark: number,
    }
  | number;

If it's a number, it is treated as a maxParallel value.

| Option property | Default value | Valid when first argument of .pipe() is | Meaning | | --------------------- | ------------- | --------- | ----------- | | maxParallel | 5 | Async function | Maximum number of parallel execution of process | | highWaterMark | 16 | Transformer, Sync and async function | Maximum number of buffer that will be consumed by processor. Read more |

License

MIT