npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@understand-ai/async-transformer

v1.1.0

Published

concurrent processing of iterators

Downloads

333

Readme

Async Transformers

lint/build/test

We find node.js streams are hard to use and implementing them correctly yourself adds a lot of boilerplate.

Once you have implemented a fully-compliant stream interface you will also find that actually executing parts of the streamed-processing-chain in parallel, e.g. io-bound tasks like network requests, is not supported out-of-the-box by node.js

Instead, async generators are a lot easier to reason about and write even when consuming native streams.

Enter async-transformers

Async-Transformers is a tiny no-frills no-dependencies ts-first implementation of a buffered asynchronous generator.

  • It will queue up numberOfParallelExecutions promises from a user-provided stream/generator up to the specified maximum number.

Overview of async transformer functionality

The method asyncBufferedTransformer() was inspired by rust futures buffered().

Usage

npm add @understand-ai/async-transformer

Here's an example that downloads all status code images from http.cat, but only 7 at a time to be a good internet citizen.

You can run this using npx ts-node examples/fetch-http-cats.ts

(Note: this library does not depend on nodejs (and has zero dependencies), just this example)

import { PromiseWrapper, asyncBufferedTransformer } from "../dist";
import fetch from "node-fetch";

async function* streamAllHttpCats(): AsyncIterable<
  PromiseWrapper<{
    status: number;
    responseStatus: number;
    body: ArrayBuffer | undefined;
  }>
> {
  for (let status = 100; status < 600; status += 1) {
    // Note the wrapping into an object with the `promise` property
    yield {
      promise: (async () => {
        console.log(`Fetching http cat for status ${status}`);
        const response = await fetch(`https://http.cat/${status}`);
        return {
          status,
          responseStatus: response.status,
          body: response.ok ? await response.arrayBuffer() : undefined,
        };
      })(),
    };
  }
}

const main = async () => {
  // if numberOfParallelExecutions === 0 || numberOfParallelExecutions === 1 we just serially execute
  const numberOfParallelExecutions = 7;
  for await (const { status, responseStatus, body } of asyncBufferedTransformer(
    streamAllHttpCats(),
    { numberOfParallelExecutions }
  )) {
    if (body) {
      console.log(`Status ${status} has body of length ${body.byteLength}`);
    } else {
      console.log(`Status ${status} failed with status code ${responseStatus}`);
    }
  }
};

main().catch(console.log);

Example output:

# npx ts-node examples/fetch-http-cats.ts
Fetching http cat for status 100
Fetching http cat for status 101
Fetching http cat for status 102
Fetching http cat for status 103
Fetching http cat for status 104
Fetching http cat for status 105
Fetching http cat for status 106
Status 100 has body of length 38059
Fetching http cat for status 107
Status 101 has body of length 37527
Fetching http cat for status 108
Status 102 has body of length 45702
Fetching http cat for status 109
Status 103 has body of length 27995
Fetching http cat for status 110
Status 104 failed with status code 404
Fetching http cat for status 111
Status 105 failed with status code 404

We also provide the convenience functions drainStream and collectAll to easily collect all requests

//will resolve once all elements have been processed or reject the first time there is an error in any processed chunk
await drainStream(asyncBufferedTransformer(yourAsyncGenerator(inputStream), {
    noOfParallelExecutions
}))

//will resolve with all outputs in-order
const results = await collectAll(asyncBufferedTransformer(yourAsyncGenerator(inputStream), {
    noOfParallelExecutions
}))