npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

stream-capacitor

v0.2.0

Published

Throttles streams based on customizable throughput.

Downloads

2

Readme

stream-capacitor

npm Dependencies Build Status Coverage Status JavaScript Standard Style

Throttles streams based on customizable throughput.

Purpose

The idea is to throttle a sequence of operations (i.e., Node.js streams) based on their throughput.

Where other implementations express their thresholds in bytes, this package uses an abstract number of items. This makes it suitable for use with object streams, where the raw payload size is either unknown or a less useful metric than a simple counter.

When you create a stream capacitor, it gives you access to two ports: the input port and the output port. When something flows through the input port, that increments the capacitor's queue size. When something flows through the output port, that decrements it again. Hence, you pipe the input port into the streams you want to throttle, and you pipe those back into the output port.

Here is a visualization of a typical setup:

Diagram

Note that the transforms in the input and output section are entirely optional.

Usage

This snippet uses pumpify to easily pipe a list of streams together. You can also use streams' native .pipe() function if you prefer.

import StreamCapacitor from 'stream-capacitor'
import pumpify from 'pumpify'

// When waiting for at least 10,000 items on the output side, pause input
const highWaterMark = 10000
// When the queue size drops below 3,000 again, resume input
const lowWaterMark = 3000
// Options are passed to the internal streams
const options = {objectMode: true}

// Create a new capacitor
const cap = new StreamCapacitor(highWaterMark, lowWaterMark, options)

// Put it all together
const flow = pumpify.obj(
  someReadable,         // Produce input
  inputTransform,       // Unthrottled input transform
  cap.input,            // Throttle every stream that follows
  transform1,           // Throttled transforms
  transform2,           // ...
  transform3,           // ...
  cap.output,           // End throttling
  outputTransform,      // Unthrottled output transform
  someWritable          // Consume output
)

Events

A capacitor is an EventEmitter that can emit two events:

  • close when throttling starts, and
  • open when throttling stops.

Custom Counting

By default, each item that flow through the input port will add one to the queue size, and each item going through the output port will subtract one. If you want to alter this behavior, you can use the count option. Its value is a function that maps an item to the number of items it represents.

For example, to parse log lines into data objects and then batch them into equally-sized arrays using batch-stream, you could write something like:

import StreamCapacitor from 'stream-capacitor'
import BatchStream from 'batch-stream'
import pumpify from 'pumpify'

const customCounter = (chunk, encoding) => {
  if (Array.isArray(chunk)) {
    // The output port receives arrays containing multiple items
    return chunk.length
  } else {
    // The input port receives one item at a time
    return 1
  }
}

const highWaterMark = 10000
const lowWaterMark = 3000
const options = {
  objectMode: true,
  count: customCounter  // See above
}
const cap = new StreamCapacitor(highWaterMark, lowWaterMark, options)
const flow = pumpify.obj(
  createLogLineReader(),
  cap.input,            // Adds one per log string
  createLogLineToDataObjectTransformer(),
  new BatchStream({size}),
  cap.output,           // Subtracts batch array length per batch
  createDataObjectWriter()
)

Adding and Removing Items

A throttled transform may need to add or remove items. For example, you might have a transform that transforms each input item into multiple output items, or it might drop invalid input objects.

In this case, you can set the delta property on the capacitor to the number of additional items that you're expecting on the output side. By default, it is zero, meaning the capacitor expects one output item for each input item.

Maintainer

Tim De Pauw

License

MIT