npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

loop-stream

v1.4.1

Published

Simple function for looping your Node.js stream with C-like for loop features: break and continue

Downloads

23

Readme

loop-stream

npm package Build Status Downloads Issues Code Coverage Commitizen Friendly Semantic Release

Simple function for looping your Node.js stream with C-like for loop features: break and continue

Why use this package?

If all you want to do is consume a Node.js stream in total, then probably the easiest way is async iteration.

However if you only want to consume it partly, there are some limitations. Using break will result in closing the reading stream. Imagine the user is writing to stdin of your program and you want to reply to their 'Hello" with 'Hi!' and then just leave the stream for consumption to the other part of your application.

for await (const chunk of process.stdin) {
    if(chunk.toString() === 'Hello\n') {
        console.log('Hi!');
        break;
    }
}

process.stdin.on('data', (chunk) => {
  // This will never run :(
});

instead you could just use loopStream:

await loopStream(process.stdin, (chunk) => {
      if(chunk.toString() === 'Hello\n') {
          console.log('Hi!');
          return { action: 'break' };
      }
  
      return { action: 'continue' };
  })

process.stdin.on('data', (chunk) => {
  // This will work as expected!
});

More examples

Parsing HTTP headers

There are some more use case's for this little function. Let's suppose you want to parse incoming HTTP headers and then read the body using different listeners.

async function readHttpHeaders(stream: Readable): Promise<Record<string, string>> {
    /* Assuming the structure of the request is
        Header1: Value1\r\n
        Header2: Value2\r\n
        \r\n
        [Body]
    */
    return loopStream(stream, '', (chunk, acc) => {
        acc += chunk;

        const headEndSeqIndex = acc.indexOf("\r\n\r\n");

        // if we didn't get to the end of headers section of HTTP request, just continue reading the headers
        if (headEndSeqIndex === -1) {
            return { action: "continue", acc };
        }

        const rawHeaders = acc.slice(0, headEndSeqIndex);
        // when we know we already got to the end of headers we have to make sure we didn't read a part of HTTP body
        const bodyBeginning = acc.slice(headEndSeqIndex + "\r\n\r\n".length);

        // if so, we want to return it as 'unconsumedData' so it can be unshifted into the original stream
        return {
            action: "break",
            acc: rawHeaders,
            unconsumedData: bodyBeginning
        };
    });
}

// Consume just the headers part of the HTTP request
const headers = await readHttpHeaders(request);

// and then we just read the remaining stream to get the body
let body = '';
for await (const bodyChunk of request)
  body += bodyChunk;

Install

npm install loop-stream

Reference

loopStream(stream, iter)

  • stream - Readable stream
  • iter - IterateWithoutState
  • Returns - Promise that resolves either when { action: 'break' } is returned or when the stream has ended.

IterateWithoutState

  • chunk - Chunk of stream data obtained using stream.read()
  • Returns - { action: 'continue'; } or { action: 'break'; unconsumedData?: any; };

Consumes chunk of data.

Example:

await loopStream(process.stdin, (chunk) => {
    if(chunk === 'Hi\n') {
        console.log('Hello');
        return { action: 'break' };
    }

    return { action: 'continue' };
});

loopStream(stream, acc, iter)

  • stream - Readable stream
  • initialAcc - initial value of the accumulator
  • iter - IterateWithState
  • Returns - Promise that resolves either when { action: 'break' } is returned or when the stream has ended.

It's a bit like arr.reduce() for streams. You can accumulate chunks from streams into some value the will be later returned from loopStream.

IterateWithState

  • chunk - Chunk of stream data obtained using stream.read()
  • acc - Accumulator returned from the previous iteration. In the first iteration it's initialAcc.
  • Returns - { action: 'continue'; acc: Accumulator; } or { action: 'break'; unconsumedData?: any; acc: Accumulator; };

Consumes chunk of data and accumulates processed stream.

Example:

const stream = new PassThrough({ objectMode: true });
[2, 1, 3, -1, 3].forEach(num => stream.write(num));

const sum = await loopStream(stream, 0, (chunk, acc: number) => {
    if(chunk === -1) {
        return { action: 'break', acc };
    }
    
    return { action: 'continue', acc: acc + chunk };
});

// sum: 6