npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

async-rivers

v1.1.0

Published

Asynchronous processing using intuitive rivers analogy.

Readme

Asynchronous processing using intuitive rivers analogy.

Overview

async-rivers is a small queue/channel style library where producers and consumers are decoupled.

Instead of calling handlers directly, you send boats into a river and let the current handle the rest. Each boat accepted onto the river is processed in a first-in-first-out manner. Nobody likes when other people jump the line!

Each boat flows through this pipeline, in order:

  1. sink rules try to sink it.
  2. hail and hope observers react.
  3. dock rules can stop further travel.
  4. fork rules can route it to another river.

Boats can be any JavaScript value.

import { riverside } from "async-rivers";

const river = riverside();

/* Register a function to hail incoming boats */
river.hail((boat) => {
  console.log(`Look Mama! Another boat called '${boat.name}'!`);
});

/* Place boats on the river and let them sail! */

river.dump({ name: "Endurance" });

river
  .sail({ name: "Knot on Call" })
  .then((res) => console.log(res ? "Here we go!" : "Something went wrong..."));

// > Look Mama! Another boat called 'Endurance'!
// > Here we go!
// > Look Mama! Another boat called 'Knot on Call'!

Rationale

What best to represent information flow than... rivers?

Direct async calls create tight coupling between producers and consumers.

Information should flow inside and between systems.

Producers of information should not have to know who will consume it.

Consumers of information should be ignorant of how the data was produced.

Consuming information doesn't destroy it; multiple consumers can see it.

A buffer between producers and consumers allows processing variability.

Installing

npm install async-rivers

Creating

riverside(options?)

Create a new river to interact with.

Options:

  • buffer: Custom buffer object.
  • bufferSize: Shorthand for fixedBuffer(bufferSize). Default is 0.
  • ramp: Alias for bufferSize (fixed buffer main queue size). If both are provided, ramp is used.
  • parking: Shorthand for fixedBuffer(..., { maxPending: parking }).
  • onOverflow: Shorthand for fixedBuffer(..., { onOverflow }). Used with parking.
  • capacity: Maximum number of boats processed simultaneously. Default is 10.
  • fork: One predefined junction (see river.fork).
  • forks: Multiple predefined junctions.
  • onError: (error, context). Function to handle errors in boats processing. Receives context with { boat?, handler?, process?, stage } props. Defaults to console.error.
  • ...rest: Copied directly onto the returned river object (name, metadata, etc.).

bufferSize: 0 (or ramp: 0) in default fixedBuffer means no queue size limit. If buffer is provided, bufferSize/ramp/parking/onOverflow are ignored.

import { riverside } from "async-rivers";

const high = riverside({ name: "High Priority Canal" });
const low = riverside({ name: "Low Priority Canal" });

const intake = riverside({
  name: "Intake River",
  bufferSize: 3,
  forks: [
    [{ river: high, when: (boat) => boat.priority === "high" }],
    [{ river: low }],
  ],
});

Capacity And Backpressure Model

When using riverside with a fixedBuffer (or shorthand properties), three quantities shape throughput and pressure:

  • capacity: boats already on the river = currently being processed (in-flight concurrency).
  • ramp: boats lined up at the ramp = queued in the buffer waiting for a processing slot.
  • parking: boats lined-up in the parking = waiting to enter the buffer queue once the ramp is full.
  • onOverflow: what happens when parking is full ("drop", "slide", or custom function).

This means pressure is applied in layers: active processing, queued, then pending overflow.

Navigating

Boats can be anything you want: canoe, yacht, cruise ship...

Boats can be any JavaScript value (primitive, object, array, error, function...).

river.sail(boat, { signal })

Take a boat to the river and wait your turn to set sail.

Queue a boat and wait for acceptance on the river.

Returns a promise that resolves when the boat is actually placed (or not) on the river (the ones being processed).

Resolved value can be:

  • true: Boat accepted; it is afloat and will get processed
  • false: Boat NOT accepted (buffer overflow policy, signal aborted...)
  • null: Boat NOT accepted; river is sealed
const departed = await river.sail("This is a boat too");

console.log(departed);
// > true

An AbortController signal (AbortSignal) can be passed as argument to control the abortion of the boat sailing.

const spouseController = new AbortController();

/* Let's say the river already has a lot of boats pending.
 * Boat owner still gets in line and waits... */
river
  .sail({ id: "B" }, { signal: spouseController.signal })
  .then((departed) => {
    /* This won't happen just now, it depends on the wait. */
    console.log(departed ? "Let's sail!" : "Let's go home...");
  });

/* Time passes...
 * Spouse sends the signal to abort and come back home */
spouseController.abort();

// > "Let's go home..."

river.dump(boat)

Take a boat to the river and rudely dump it in line. Surely, someone else will eventually place it on the river!

Fire-and-forget enqueue. No status is returned.

This method does not provide backpressure to the caller.

When you dump, you do not wait for acceptance, scheduling, or processing outcome. The caller keeps moving immediately, and any overflow/close behavior is handled internally by the configured buffer policy.

Use river.dump when best-effort delivery is fine. Use river.sail when you need flow control and an explicit accepted/rejected signal.

/* Told you boats could be anything! */
river.dump(42);

Reacting

You can get to the side of the river and react to the passing boats.

Multiple hail and hope handlers run concurrently for the same boat.

river.hail(callback, options)

You can hail the boats you like, wave at them.

Register a listener for passing boats.

Handler signature:

  • fn(boat, { remainingCount })

Options:

  • when(boat): run only when true.
  • unless(boat): skip when true.
  • map(boat): transform boat before handler. Must be synchronous.
  • once: auto-remove after first successful match.
  • timeout: auto-remove after N ms.
  • onTimeout(timeout): callback if timeout removes it.

Returns { quit() }.

/* Inspector hails all boats needing inspection. */
const inspector = river.hail(
  (boat) => {
    planInspection(boat.id);
  },
  {
    when: (boat) => boat.needsInspection,

    /* Inspector stays by the river for two hours only. */
    timeout: 2 * 3600 * 1000,
  }
);

/* Inspector must leave prematurely... */
inspector.quit();

river.hope(options)

You can get to the river and expect to see a specific boat, abandoning after a certain time.

Wait for one matching boat. Returns a promise that resolves with the hoped for boat or with the notFound option argument value (defaults to undefined).

timeout is required and must be a finite positive number to prevent memory leaks.

An AbortController signal (AbortSignal) can be passed as argument to control when the observer is losing hope.

const hopeController = new AbortController();

river
  .hope({
    /* Look for a specific boat */
    when: (boat) => boat.id === 42,
    /* Pass an AbortSignal as argument */
    signal: hopeController.signal,
    /* Wait for 12 seconds */
    timeout: 12_000,
    /* `notFound` can be any default value */
    notFound: { missing: true },
  })
  .then(({ missing }) =>
    console.log(missing ? "Oh well..." : "I got THE answer!")
  );

/* Observer gets distracted before timeout */
hopeController.abort();

// > "Oh well..."

Damaging

Boats might face challenges on their way that could damage them.

river.sink(sinker)

Obstacles or even enemy submarines could target specific boats and sink them.

Register sink rules to remove boats before listeners/forks.

Sinker options:

  • when(boat): sink when true.
  • unless(boat): do not sink when true.

Returns { quit() }.

If when is omitted, the sinker behaves like "sink by default unless blocked by unless".

const antiPiracy = river.sink({
  when: (boat) => boat.contraband === true,
});

antiPiracy.quit();

river.blow()

People can get very upset about having boats on their river and decide to blow them all at once!

Clear queued boats (and pending additions in fixedBuffer).

Continuing

At the end of the river, boats can encounter branches to veer to or harbours to dock to.

river.fork(branchesOrFork)

The river could have branches or forks boats might sail to next.

Add a junction that routes boats to other rivers.

  • branchesOrFork can either be a single branch or an array of branches.
  • river.fork can be called multiple times.
  • Forking stops at the first matching branch across junction order (boats can only take one branch of a junction).

Branch fields:

  • river: destination river (required).
  • when(boat): choose branch when true.
  • unless(boat): skip branch when true.
  • map(boat): transform before forwarding. Must be synchronous.

Returns { join() } to remove that junction.

const east = riverside({ name: "East Branch" });
const west = riverside({ name: "West Branch" });

river.fork([
  {
    river: east,
    when: (boat) => boat.region === "east",
    map: (boat) => ({ ...boat, branch: "east" }),
  },
  { river: west },
]);
const seasonal = river.fork({ river: east, when: (boat) => boat.spring });
seasonal.join();

river.dock(harbour)

To avoid continuing on a river fork, a boat might instead dock to a harbour.

Register dock rules. Docked boats do not continue to forks.

Harbour options:

  • when(boat): dock when true.
  • unless(boat): do not dock when true.

Returns { shut() }.

If when is omitted, docking is the default unless unless returns true.

const localDock = river.dock({
  when: (boat) => boat.destination === "local",
});

localDock.shut();

Controlling

river.clog()

Big obstacles might fall into the river and clog it, preventing any boat from navigating.

Pause processing.

river.plow()

To unclog a river, we simply plow it and remove the obstacles. Boats can navigate again!

Resume processing after clog().

river.seal()

As the river owner, you could decide not to let any new boat navigate, without being able to stop those already on.

Close river for new boats.

river.open()

Due to community pressure, you might choose to open the river once more to new boats.

Re-open a sealed river.

river.tide()

You might want to check the water level first...

Read current processing concurrency.

console.log(river.tide());
// > 10

river.rise(capacityOrFunction)

Water level changes the number of boats that can navigate the river simultaneously.

Update processing concurrency at runtime. Argument can be:

  • a number to set the capacity to
  • a function of the current capacity (capacity) => newCapacity

Increasing capacity lets more boats process at once; decreasing limits new starts until in-flight work drops below the new limit.

river.rise(25);
console.log(river.tide());
// > 25

river.rise((level) => level + 5);
console.log(river.tide());
// > 30

river.size(pending?)

You can count boats in-flight, queued, and pending.

Current size from the active buffer.

With default fixedBuffer:

  • river.size() returns in-flight + queued + pending.
  • river.size(false) returns in-flight + queued.
  • river.size(true) returns pending only.
console.log("Total:", river.size());
console.log("Queued:", river.size(false));
console.log("Pending:", river.size(true));

Buffers

Buffers are used to decouple producers and consumers by having them constrain their processing capacity.

import riverside, {
  fixedBuffer,
  slidingBuffer,
  droppingBuffer,
} from "async-rivers";

fixedBuffer(size, options?)

Capacity-limited buffer with pending queue.

When main capacity is full:

  • add() becomes pending.
  • push() adds to pending directly.

Options:

  • maxPending: Optional limit to pending list size.
  • onOverflow: "drop", "slide", or custom function.
const river = riverside({
  buffer: fixedBuffer(2, {
    maxPending: 3,
    onOverflow: "drop",
  }),
});

river.sail("A");
river.sail("B");
const third = river.sail("C"); // Waits until space is free

onOverflow function

A function argument to onOverflow must return an array representing the new pendings list.

onOverflow: (pendings, pending) => reorderedPendings

  • pendings: array of current pending entries, each shaped like { _id, val }
  • pending: the new incoming pending entry, shaped like { _id, val }

Return an array describing the new pendings list, using those same { _id, val } entries.

Rules:

  • Must return an array (or undefined to keep current pendings).
  • Only _ids are considered in reconciliation.
  • Unknown _ids are ignored.
  • Duplicate entries are deduplicated.
  • Any candidate (existing or new) not kept is resolved as not added (false).
  • No check is done on the returned value to see if it respects the maxPending.
const custom = fixedBuffer(1, {
  maxPending: 2,
  /* Keep only high priority elements when overflowing */
  onOverflow: (pendings, val) =>
    [...pendings, val].filter((pending) => pending.val?.priority > 3),
});

slidingBuffer(size)

Keeps latest size boats. Oldest is removed when full.

const river = riverside({ buffer: slidingBuffer(2) });
river.dump("A");
river.dump("B");
river.dump("C"); // A leaves the chat, B and C stay

droppingBuffer(size)

Keeps first size boats. New arrivals are dropped when full.

const river = riverside({ buffer: droppingBuffer(2) });
await river.sail("A"); // true
await river.sail("B"); // true
await river.sail("C"); // false

Batching

canalLock(options)

Rivers can flow freely, but sometimes boats should gather and cross together through a canal lock.

Create a canal lock that groups boats from an input river into batches, then forwards each locked batch into a canal (another river).

A canal lock is useful when downstream work is more efficient in groups (bulk writes, batched API calls, grouped processing).

Options:

  • size: Required. Maximum number of boats per lock cycle (must be a positive finite number).
  • waitMs: Time to wait in ms before locking a partial batch when no more boats are currently passing. Default: 100.
  • river: Options used to create the upstream river where individual boats are received.
  • canal: Options used to create the downstream canal where grouped boats are sent.
  • retryCount: Number of retries when a lock transfer fails. Default: 0.
  • onLockFail: What to do if lock transfer still fails after retries:
    • "prepend" (default): put failed boats back at the front of the waiting batch.
    • "append": put failed boats at the end of the waiting batch.
    • "drop": discard failed boats.
    • function(boats): custom failure handler.
    • falsy value: fire-and-forget transfer (dump) with no failure handling.
  • onError(error): Optional handler for unexpected internal errors.
  • ...rest: Copied directly onto the returned lock object (name, metadata, etc.).
import { canalLock } from "async-rivers";

const lock = canalLock({
  name: "North Canal Lock",
  size: 3,
  waitMs: 500,
  retryCount: 2,
  onLockFail: "prepend",
});

/* Boats arrive one by one in the river */
lock.dump({ id: "A" });
lock.dump({ id: "B" });
lock.dump({ id: "C" }); // lock is full -> [A,B,C] sent to canal

Canal Lock Methods

The lock exposes most river controls for incoming boats:

  • dump, sail
  • hail, hope
  • sink
  • blow, clog, plow, seal, open
  • size

And lock-specific methods:

  • scan(fn, options): hail batches on the canal side (each boat is now a batch array).
  • lock(): manually request a lock/flush attempt.
  • getSize(): current batch size.
  • setSize(size): update batch size.
lock.scan(async (boats) => {
  console.log(
    "Lock opened for batch:",
    boats.map((b) => b.id)
  );
});

Lock Failure Strategies

When lock transfer uses sail and fails (false or null), onLockFail decides what happens next.

const resilientLock = canalLock({
  size: 5,
  retryCount: 3,
  onLockFail: "prepend", // keep failed boats first in line
});

const lossyLock = canalLock({
  size: 5,
  onLockFail: "drop", // discard failed lock batches
});

const customLock = canalLock({
  size: 5,
  onLockFail: async (boats) => {
    await saveToDeadLetterQueue(boats);
  },
});

This way, the river can stay fluid while the lock controls how boats are grouped and how failures are handled.

Patterns

Simple Work Queue

const jobs = riverside({ bufferSize: 5 });

jobs.hail(
  async (job) => {
    await job.run();
  },
  { when: (job) => job.enabled !== false }
);

Wait For A Specific Event

const events = riverside();

const confirmationP = events.hope({
  when: (event) =>
    event.type === "payment.confirmed" && event.orderId === "o-123",
  timeout: 5_000,
  notFound: null,
});

events.dump({ type: "payment.confirmed", orderId: "o-123" });
const confirmation = await confirmationP;

Route By Predicate

const premium = riverside({ name: "Premium Stream" });
const standard = riverside({ name: "Standard Stream" });
const intake = riverside();

intake.fork([
  { river: premium, when: (boat) => boat.plan === "premium" },
  { river: standard },
]);

Decoupled request/response

hope enables decoupled request/response behaviours:

Process A

  1. Sail a uniquely identifiable boat on toProcessRiver (to ensure it has been queued)
  2. Hope for a boat with same id on processedRiver with a predefined timeout

Process B

  1. Hail (potentially filtered) incoming boats on toProcessRiver
  2. Sail a response boat with same id on processedRiver

Process A could use a single AbortController signal to cancel both sail and hope. If hope resolves to the notFound value, it may abort the shared signal.

import { riverside } from "async-rivers";

const requests = riverside();
const responses = riverside();

/* Process B */
requests.hail(async (req) => {
  const result = req.value * 2;
  await responses.sail({ id: req.id, result });
});

/* Process A */
const controller = new AbortController();
const id = "req-1";

const queued = await requests.sail(
  { id, value: 21 },
  { signal: controller.signal }
);

if (queued === true) {
  const reply = await responses.hope({
    when: (boat) => boat.id === id,
    timeout: 2000,
    notFound: null,
    signal: controller.signal,
  });

  if (reply === null) controller.abort(); // optional shared cancel
  console.log(reply); // { id: 'req-1', result: 42 } or null
}

License

ISC