npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

push-buffer

v1.4.0

Published

Abstraction for converting between push/pull APIs, and managing asynchronous value distribution

Readme

push-buffer

An abstraction for handling buffering of asynchronous pull-based and push-based operations.

  • Request a value from the buffer, receive a Promise.
  • Depending on mode of operation, values are either acquired through an asynchronous callback (pull), or provided externally at a later point (push).
  • Values that come in when there are no outstanding request are buffered, to be reconciled with later requests; ie. the abstraction matches both requests and results, and reconciles them in an order-preserving manner.
  • Support for dividing values into 'lanes'; ie. independent request queues where values get assigned to specific lanes based on some predicate function, for implementing complex value distribution patterns.
  • Supports @promistream/NoValue to explicitly represent cases where no value is produced; this means you can send null and undefined to a queue just like any other value.
  • Values and errors can be sent to one lane, multiple lanes (multicast), or all lanes (broadcast).

Typical usecases

  • Implementing Promistreams, particularly forking streams.
  • Implementing asynchronous task queues and task distribution systems.
  • Splitting out asynchronously obtain values into different worker threads.
  • Converting push-based APIs (eg. EventEmitters) into pull-based APIs (asynchronous iterators, Promistreams, pull-streams, etc.)

Caveats

  • The internal queues are unbounded, and so may use an arbitrary amount of memory. You should ensure that you are continuously requesting values from all lanes, so that they never become too full.
  • Likewise, there is nothing that prevents results from sitting in a lane forever and never actually being read, even when the process terminates; you must continuously drive reads to ensure you are not missing any values.
  • The buffer can currently only operate in either push or pull mode, because there are no obviously-correct semantics for a combined mode; if you still need a combined mode, please file an issue describing your exact usecase, so that I can better understand what the real-world requirements of such cases are.
  • While using this abstraction is much less error-prone than manually implementing asynchronous reconciliation, it is still somewhat difficult to get right, and very difficult to debug when you don't; if possible, prefer using higher-level off-the-shelf libraries for your usecase instead.

Lanes

This library supports a concept of 'lanes'. Each lane is an independent queue of values and errors; you make a request for a value to a specific lane, and values and errors are assigned to specific lane, even though the source of values is shared among all of them. Additionally, it is possible to broadcast errors or values to all lanes.

If you just want to convert a push-based API to a pull-based API, you probably won't need lanes, and you can safely ignore all options related to it. However, if you are implementing some kind of value or task distribution mechanism, you will probably want to use lanes to ensure that the right values end up at the right place. It's up to you what to do with the lane assignments; they're just a list of zero-indexed queues internally, and you can map the lane indexes to some sort of other lookup table in your own code if needed.

If you are implementing a forking Promistream, the most useful lane configuration will most likely be: one lane per output stream, with error broadcasts enabled.

Permitted pullers

In normal pull-mode operation, the buffer will attempt a pull any time a value is requested from any lane. It is possible to restrict this, such that only some specific lanes may initiate a pull; other lanes can still request a value, but they will have to wait for one to appear as a side-effect of a permitted lane making a request. This is mostly useful when the broadcastValues option is enabled, causing all lanes to receive every value, to implement concepts such as 'leader mode' - where a specific lane determines the rate at which new values are requested, and other lanes follow its lead.

There are some things to be aware of when using this mode, though; when it is enabled, it becomes possible for a request to never become satisfied. Depending on how you are using push-buffer, this can cause your process to exit quietly in the middle of the task. To prevent this, you must ensure that there is always at least one permitted lane issuing requests until all the other lanes' requests are exhausted. This can be implemented by checking queue lengths for each lane, for example, or in some cases (such as Promistreams, with EndOfStream markers) the values or errors passed down can cause non-permitted lanes to stop attempting to read, allowing the permitted lane(s) to bottom out the queue.

In other words, only use this feature if you understand what it means for your asynchronous flow control. If you don't already know why you need it, then you probably don't, and you should leave it unused.

Example

API

let buffer = pushBuffer(options)

Note that using lanes is optional; all options and functions default to lane 0, which is the only lane when no lane count is specified (and so it functions as if lanes didn't exist).

  • options: The settings for this pushBuffer.
    • mode: Default: "pull". The mode to operate in. One of "push", "pull".
    • lanes Default: 1. The amount of lanes to create. Each lane is an independent queue of values and errors, though broadcasts are possible.
    • pull: Required only in pull mode. An (async) callback that's called to acquire a new value/result, and which is expected to return a Promise. This should be providing your values in pull mode.
    • sequential: Default: false. Only used in pull mode. Whether to handle requests sequentially; that is, a second pull will not be started until the previous pull has completed, even if a second request is made.
    • permittedPullers Default: every lane. Only used in pull mode. Which lanes are allowed to initiate upstream pulls through their reads, expressed as an array of (0-based) lane indexes. See the "Permitted pullers" section above for important information about how to use this option safely.
    • select: Default: 0. A callback that receives a value, and returns the index of the lane to assign it to (or an array of such indexes). The callback may return a Promise.
    • selectError: Default: 0. Like select, but it receives errors instead of values.
    • broadcastValues: Default: false. Whether to broadcast all values to all lanes by default. The select callback will not be called for broadcast values. In push mode, this can be overridden for individual values.
    • broadcastErrors: Default: true. Like broadcastValues, but for errors. The selectError callback will not be called for broadcast errors. In push mode, this can be overridden for individual errors.

Returns: A new pushBuffer.

buffer.request(lane)

Requests the next value (for a given lane).

  • lane: Default: 0. The lane to request a value for.

Returns: a Promise that will eventually either resolve or reject, depending on the value/error acquired.

buffer.push(value, broadcast)

Pushes a value (or a Promise) to the next pending request.

  • value: Required. The value to push.
  • broadcast: Default: the broadcastValues setting. Whether to broadcast this value to all lanes.

buffer.pushError(error, broadcast)

Pushes an error to the next pending request. Note that if you wish to push a Promise that may fail (rather than an error you already have), you should use buffer.push instead; it will automatically be handled as an error if it ends up rejecting.

  • error: Required. The value to push.
  • broadcast: Default: the broadcastErrors setting. Whether to broadcast this error to all lanes.

buffer.countLane(lane)

Provides queue lengths for the given lane.

  • lane: Default: 0. The lane to request a value for.

Returns: an object with queue length properties:

  • values: The amount of pending results that have not been reconciled with a request yet - note that this includes errors/rejections!
  • requests: The amount of pending requests that have not been reconciled with a result yet.

buffer.tryDiscardValues(lane, count)

Attempts to remove up to count values from the start of the given lane's value queue. If the value queue is empty, no values are removed. If the value queue is smaller than the specified count, then the entire value queue is emptied (ie. as many as possible up to count). If the value queue is larger than the specified count, then exactly count values are removed.

Note that this doesn't affect the request queue; only if readily available values are stored for the lane, will they be removed. This will typically only happen if you are either in push mode, or you are using broadcastValues or broadcastErrors, as those are the only conditions under which a value can end up being buffered without a corresponding request.

  • count: Required. The amount of values to attempt to remoove.

buffer.setPermittedPullers(permittedPullers)

Changes the set of permitted pullers, or (if null is passed), unsets it so that all lanes are permitted to initiate upstream pulls.

  • permittedPullers: Required. The list of permitted pullers, or null. Accepted values are identical to those in the initial permittedPullers option when creating a buffer.