npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@alessiofrittoli/stream-reader

v3.0.0

Published

Easly read pushed data from a Stream

Downloads

623

Readme

Stream Reader 📚

NPM Latest Version Coverage Status Socket Status NPM Monthly Downloads Dependencies

GitHub Sponsor

Easly read pushed data from a Stream

The StreamReader class is a utility for reading data from a ReadableStream in a structured and event-driven manner. It extends the EventEmitter class, providing events for stream lifecycle management and error handling.

Table of Contents


Getting started

Run the following command to start using stream-reader in your projects:

npm i @alessiofrittoli/stream-reader

or using pnpm

pnpm i @alessiofrittoli/stream-reader

API Reference

Importing the library

import { StreamReader } from '@alessiofrittoli/stream-reader'
import type { ... } from '@alessiofrittoli/stream-reader/types'

StreamReader Class API Reference

Constructor

The StreamReader class constructor accepts a ReadableStream argument. You can optionally pass types arguments I and O to define the type of the streamed data being read and the type of the transformed output chunk.

const reader  = new StreamReader<Buffer>( ... )
const reader2 = new StreamReader<Buffer, string>( ... )

type Input = Buffer;
type Output = Buffer;
const stream = new TransformStream<Input, Output>();
const reader = new StreamReader(stream.readable); // type of `StreamReader<Output, Output>`

Properties

Here are listed the StreamReader class instance accessible properties:

| Parameter | Type | Description | | --------- | -------------------------------- | ------------------------------------------------------ | | reader | ReadableStreamDefaultReader<T> | The reader obtained from the input ReadableStream<T> | | closed | boolean | Indicates whether the stream has been closed. |


| Parameter | Default | Description | | --------- | --------- | ---------------------------------------------------------------------------------------------------- | | I | unknown | The type of input data being read from the stream. | | O | I | The type of output data transformed after reading from the stream. Defaults to the same type of I. |


Methods
StreamReader.read()

The StreamReader.read() method read the on-demand pushed data from the given stream.

It internally uses the StreamReader.readChunks() method to read the received chunks.

It emits usefull events such as:

  • data - Emitted when a chunk is received from the stream and processed by the optional transform function.
  • close - Emitted when the stream is closed.
  • error - Emitted when an error occurs while reading.

Type: Promise<ReadChunks<O> | void>

A new Promise that resolves to an array of processed chunks if the given options.inMemory is set to true, void otherwise.


StreamReader.readChunks()

The StreamReader.readChunks() method read the on-demand pushed data from the given stream.

Type: AsyncGenerator<ReadChunk<I>>

An async iterable object for consuming chunks of data.


StreamReader.abort()

The StreamReader.abort() method it's pretty usefull when stream data reading is no longer needed, regardless of stream writer state.

This method will cancel the reader, release the lock, emit an 'abort' event, and remove data, close and abort event listeners.

It emits the abort event.


Static Methods
StreamReader.generatorToReadableStream()

The StreamReader.generatorToReadableStream() method is a utiliy function that converts a Generator or AsyncGenerator to a ReadableStream.

| Parameter | Type | Default | Description | | ----------- | -------------------- | -------------------------- | ------------------------------------------- | | generator | StreamGenerator<T> | StreamGenerator<unknown> | The Generator or AsyncGenerator to convert. |


| Parameter | Type | Default | Description | | --------- | ---- | --------- | ------------------------------------------ | | T | T | unknown | The type of data produced by the iterator. |


Listening Events

The StreamReader class extends the EventEmitter class, providing events for stream lifecycle management and error handling.

| Event | Arguments | Type | Description | | ------- | --------- | --------------- | -------------------------------------------------------------------------------------------------------- | | data | | | Emitted when a chunk of data is read from the stream and processed by the optional transform function. | | | chunk | ReadChunk<O> | The chunk of data read from the stream. | | close | | | Emitted when the stream is closed. | | | chunks | ReadChunks<O> | An array of chunks read from the stream before it was closed. | | error | | | Emitted when an error occurs during reading. | | | error | Error | The error that occurred during the reading process. | | abort | | | Emitted when the reading process is aborted. | | | reason | AbortError | An AbortError explaing the reason for aborting the operation. |


data event
const reader = new StreamReader( ... )
reader.on( 'data', chunk => {
  console.log( 'received chunk', chunk )
} )

close event
const reader = new StreamReader( ... )
reader.on( 'close', chunks => {
  console.log( 'chunks', chunks )
} )

error event
const reader = new StreamReader( ... )
reader.on( 'error', error => {
  console.error( error )
} )

abort event
const reader = new StreamReader( ... )
reader.on( 'abort', reason => {
  console.log( 'reading aborted', reason.message )
} )

Usage

In the following examples we reference streamData which is an async function that writes data and closes the stream once finished:

const sleep = (ms: number) =>
  new Promise<void>((resolve) => setTimeout(resolve, ms));

const defaultChunks = ["data 1", "data 2"];
const erroredChunks = ["data 1", new Error("Test Error"), "data 2"];

const streamData = async ({
  writer,
  chunks,
}: {
  writer: WritableStreamDefaultWriter<Buffer>;
  chunks?: (string | Error)[];
}) => {
  chunks ||= defaultChunks;
  for await (const chunk of chunks) {
    if (chunk instanceof Error) {
      throw chunk;
    }
    await writer.write(Buffer.from(chunk));
    await sleep(50);
  }
  await writer.close();
  writer.releaseLock();
};
const stream = new TransformStream<Buffer, Buffer>();
const writer = stream.writable.getWriter();
const reader = new StreamReader(stream.readable);

streamData({ writer });

const chunks = await reader.read();

const response = await fetch( ... )
let resourceSize = 0

if ( response.body ) {
    const reader  = new StreamReader( response.body )
    const decoder = new TextDecoder()
    reader.on( 'data', chunk => {
        const decoded = decoder.decode( chunk, { stream: true } )
        resourceSize += chunk.BYTES_PER_ELEMENT * chunk.length
    } )
    const chunks = await reader.read()
}

const stream = new TransformStream<Buffer, Buffer>();
const writer = stream.writable.getWriter();
const reader = new StreamReader<Buffer, string>(stream.readable, {
  transform(chunk) {
    return chunk.toString("base64url");
  },
});

streamData({ writer });

reader.on("data", (chunk) => {
  console.log(chunk); // chunk is now a base64url string
});
const chunks = await reader.read(); // `string[]`

const inMemory = false;
const stream = new TransformStream<Buffer, Buffer>();
const writer = stream.writable.getWriter();
const reader = new StreamReader(stream.readable, { inMemory });

streamData({ writer });

reader.on("data", (chunk) => {
  console.log(chunk);
});
const chunks = await reader.read(); // void

const stream = new TransformStream<Buffer, Buffer>();
const writer = stream.writable.getWriter();
const reader = new StreamReader(stream.readable);

streamData({ writer });

reader.read();

abortButton.addEventListener("click", () => {
  reader.abort("Reading no longer needed");
});

Error handling

When an error occurs while reading stream data (such as unexpected stream abort), the StreamReader uses an internal error function which handles the thrown Error.

By default, if no listener is attached to the error event, the StreamReader.read() method will re-throw the caught error.

In that case, you need to await and wrap the StreamReader.read() method call in a trycatch block like so:

try {
  const chunks = await reader.read();
} catch (err) {
  const error = err as Error;
  console.error("An error occured", error.message);
}

with error event listener:

reader.read();
reader.on("error", (error) => {
  console.error("An error occured", error.message);
});

Types API Reference

ReadChunk<T>

Represents a chunk of data that can be read, which can either be of type T or a promise that resolves to T.

  • Type Parameter:
    • T: The type of the data chunk. Defaults to unknown if not specified.

ReadChunks<T>

Represents an array of ReadChunk objects.

  • Type Parameter:
    • T: The type of data contained in each ReadChunk.

TransformChunk<I, O>

A function that transforms a chunk of data.

  • Type Parameters:

    • I: The type of the input chunk. Defaults to unknown.
    • O: The type of the output chunk. Defaults to I.
  • Parameters:

    • chunk: The chunk of data to be transformed.
  • Returns:
    A transformed chunk of data, which can be either a synchronous result or a promise that resolves to the result.


StreamReaderEvents<O>

Defines event types emitted by the StreamReader.

  • Type Parameter:

    • O: The type of data being read from the stream and eventually transformed before the event is emitted.
  • Event Types:

    • data: Emitted when a chunk of data is read.
      • Parameters: chunk (ReadChunk<O>)
    • close: Emitted when the stream is closed.
      • Parameters: chunks (ReadChunks<O>)
    • error: Emitted when an error occurs during reading.
      • Parameters: error (Error)
    • abort: Emitted when the reading process is aborted.
      • Parameters: reason (AbortError)

Listener<K, O>

A listener function for events emitted by the StreamReader.

  • Type Parameters:

    • K: The event type to listen for.
    • O: The type of data being read from the stream.
  • Parameters:

    • ...args: The arguments emitted with the event, based on the event type K.

OnDataEventListener<O>

Listener for the data event.

  • Type Parameter:

    • O: The type of data being read.
  • Parameters:

    • chunk (ReadChunk<O>): The chunk of data that was read.

OnCloseEventListener<O>

Listener for the close event.

  • Type Parameter:

    • O: The type of data being read.
  • Parameters:

    • chunks (ReadChunks<O>): An array of chunks read before the stream was closed.

OnCancelEventListener

Deprecated. Use OnAbortEventListener type instead.


OnAbortEventListener

Listener for the abort event.

  • Parameters:
    • reason (AbortError): Explains the reason for aborting the operation.

OnErrorEventListener

Listener for the error event.

  • Parameters:
    • error (Error): The error that occurred during reading.

StreamGenerator<T>

A generator that produces chunks of data asynchronously. It can be either a regular generator or an async generator.

  • Type Parameter:
    • T: The type of data produced by the generator.

Development

Install depenendencies

npm install

or using pnpm

pnpm i

Build the source code

Run the following command to test and build code for distribution.

pnpm build

ESLint

warnings / errors check.

pnpm lint

Jest

Run all the defined test suites by running the following:

# Run tests and watch file changes.
pnpm test:watch

# Run tests in a CI environment.
pnpm test:ci

Run tests with coverage.

An HTTP server is then started to serve coverage files from ./coverage folder.

⚠️ You may see a blank page the first time you run this command. Simply refresh the browser to see the updates.

test:coverage:serve

Contributing

Contributions are truly welcome!

Please refer to the Contributing Doc for more information on how to start contributing to this project.

Help keep this project up to date with GitHub Sponsor.

GitHub Sponsor


Security

If you believe you have found a security vulnerability, we encourage you to responsibly disclose this and NOT open a public issue. We will investigate all legitimate reports. Email [email protected] to disclose any security vulnerabilities.

Made with ☕