async-rivers
v1.1.0
Published
Asynchronous processing using intuitive rivers analogy.
Maintainers
Readme
Asynchronous processing using intuitive rivers analogy.
- Overview
- Rationale
- Installing
- Creating
- Navigating
- Reacting
- Damaging
- Continuing
- Controlling
- Buffers
- Batching
- Patterns
- License
Overview
async-rivers is a small queue/channel style library where producers and consumers are decoupled.
Instead of calling handlers directly, you send boats into a river and let the current handle the rest. Each boat accepted onto the river is processed in a first-in-first-out manner. Nobody likes when other people jump the line!
Each boat flows through this pipeline, in order:
sinkrules try to sink it.hailandhopeobservers react.dockrules can stop further travel.forkrules can route it to another river.
Boats can be any JavaScript value.
import { riverside } from "async-rivers";
const river = riverside();
/* Register a function to hail incoming boats */
river.hail((boat) => {
console.log(`Look Mama! Another boat called '${boat.name}'!`);
});
/* Place boats on the river and let them sail! */
river.dump({ name: "Endurance" });
river
.sail({ name: "Knot on Call" })
.then((res) => console.log(res ? "Here we go!" : "Something went wrong..."));
// > Look Mama! Another boat called 'Endurance'!
// > Here we go!
// > Look Mama! Another boat called 'Knot on Call'!Rationale
What best to represent information flow than... rivers?
Direct async calls create tight coupling between producers and consumers.
Information should flow inside and between systems.
Producers of information should not have to know who will consume it.
Consumers of information should be ignorant of how the data was produced.
Consuming information doesn't destroy it; multiple consumers can see it.
A buffer between producers and consumers allows processing variability.
Installing
npm install async-riversCreating
riverside(options?)
Create a new river to interact with.
Options:
buffer: Custom buffer object.bufferSize: Shorthand forfixedBuffer(bufferSize). Default is0.ramp: Alias forbufferSize(fixed buffer main queue size). If both are provided,rampis used.parking: Shorthand forfixedBuffer(..., { maxPending: parking }).onOverflow: Shorthand forfixedBuffer(..., { onOverflow }). Used withparking.capacity: Maximum number of boats processed simultaneously. Default is10.fork: One predefined junction (seeriver.fork).forks: Multiple predefined junctions.onError:(error, context). Function to handle errors in boats processing. Receives context with{ boat?, handler?, process?, stage }props. Defaults toconsole.error....rest: Copied directly onto the returned river object (name, metadata, etc.).
bufferSize: 0 (or ramp: 0) in default fixedBuffer means no queue size limit.
If buffer is provided, bufferSize/ramp/parking/onOverflow are ignored.
import { riverside } from "async-rivers";
const high = riverside({ name: "High Priority Canal" });
const low = riverside({ name: "Low Priority Canal" });
const intake = riverside({
name: "Intake River",
bufferSize: 3,
forks: [
[{ river: high, when: (boat) => boat.priority === "high" }],
[{ river: low }],
],
});Capacity And Backpressure Model
When using riverside with a fixedBuffer (or shorthand properties), three quantities shape throughput and pressure:
capacity: boats already on the river = currently being processed (in-flight concurrency).ramp: boats lined up at the ramp = queued in the buffer waiting for a processing slot.parking: boats lined-up in the parking = waiting to enter the buffer queue once the ramp is full.onOverflow: what happens when parking is full ("drop","slide", or custom function).
This means pressure is applied in layers: active processing, queued, then pending overflow.
Navigating
Boats can be anything you want: canoe, yacht, cruise ship...
Boats can be any JavaScript value (primitive, object, array, error, function...).
river.sail(boat, { signal })
Take a boat to the river and wait your turn to set sail.
Queue a boat and wait for acceptance on the river.
Returns a promise that resolves when the boat is actually placed (or not) on the river (the ones being processed).
Resolved value can be:
true: Boat accepted; it is afloat and will get processedfalse: Boat NOT accepted (buffer overflow policy, signal aborted...)null: Boat NOT accepted; river is sealed
const departed = await river.sail("This is a boat too");
console.log(departed);
// > trueAn AbortController signal (AbortSignal) can be passed as argument to control the abortion of the boat sailing.
const spouseController = new AbortController();
/* Let's say the river already has a lot of boats pending.
* Boat owner still gets in line and waits... */
river
.sail({ id: "B" }, { signal: spouseController.signal })
.then((departed) => {
/* This won't happen just now, it depends on the wait. */
console.log(departed ? "Let's sail!" : "Let's go home...");
});
/* Time passes...
* Spouse sends the signal to abort and come back home */
spouseController.abort();
// > "Let's go home..."river.dump(boat)
Take a boat to the river and rudely dump it in line. Surely, someone else will eventually place it on the river!
Fire-and-forget enqueue. No status is returned.
This method does not provide backpressure to the caller.
When you dump, you do not wait for acceptance, scheduling, or processing outcome. The caller keeps moving immediately, and any overflow/close behavior is handled internally by the configured buffer policy.
Use river.dump when best-effort delivery is fine.
Use river.sail when you need flow control and an explicit accepted/rejected signal.
/* Told you boats could be anything! */
river.dump(42);Reacting
You can get to the side of the river and react to the passing boats.
Multiple hail and hope handlers run concurrently for the same boat.
river.hail(callback, options)
You can hail the boats you like, wave at them.
Register a listener for passing boats.
Handler signature:
fn(boat, { remainingCount })
Options:
when(boat): run only when true.unless(boat): skip when true.map(boat): transform boat before handler. Must be synchronous.once: auto-remove after first successful match.timeout: auto-remove after N ms.onTimeout(timeout): callback if timeout removes it.
Returns { quit() }.
/* Inspector hails all boats needing inspection. */
const inspector = river.hail(
(boat) => {
planInspection(boat.id);
},
{
when: (boat) => boat.needsInspection,
/* Inspector stays by the river for two hours only. */
timeout: 2 * 3600 * 1000,
}
);
/* Inspector must leave prematurely... */
inspector.quit();river.hope(options)
You can get to the river and expect to see a specific boat, abandoning after a certain time.
Wait for one matching boat. Returns a promise that resolves with the hoped for boat or with the notFound option argument value (defaults to undefined).
timeout is required and must be a finite positive number to prevent memory leaks.
An AbortController signal (AbortSignal) can be passed as argument to control when the observer is losing hope.
const hopeController = new AbortController();
river
.hope({
/* Look for a specific boat */
when: (boat) => boat.id === 42,
/* Pass an AbortSignal as argument */
signal: hopeController.signal,
/* Wait for 12 seconds */
timeout: 12_000,
/* `notFound` can be any default value */
notFound: { missing: true },
})
.then(({ missing }) =>
console.log(missing ? "Oh well..." : "I got THE answer!")
);
/* Observer gets distracted before timeout */
hopeController.abort();
// > "Oh well..."Damaging
Boats might face challenges on their way that could damage them.
river.sink(sinker)
Obstacles or even enemy submarines could target specific boats and sink them.
Register sink rules to remove boats before listeners/forks.
Sinker options:
when(boat): sink when true.unless(boat): do not sink when true.
Returns { quit() }.
If when is omitted, the sinker behaves like "sink by default unless blocked by unless".
const antiPiracy = river.sink({
when: (boat) => boat.contraband === true,
});
antiPiracy.quit();river.blow()
People can get very upset about having boats on their river and decide to blow them all at once!
Clear queued boats (and pending additions in fixedBuffer).
Continuing
At the end of the river, boats can encounter branches to veer to or harbours to dock to.
river.fork(branchesOrFork)
The river could have branches or forks boats might sail to next.
Add a junction that routes boats to other rivers.
branchesOrForkcan either be a single branch or an array of branches.river.forkcan be called multiple times.- Forking stops at the first matching branch across junction order (boats can only take one branch of a junction).
Branch fields:
river: destination river (required).when(boat): choose branch when true.unless(boat): skip branch when true.map(boat): transform before forwarding. Must be synchronous.
Returns { join() } to remove that junction.
const east = riverside({ name: "East Branch" });
const west = riverside({ name: "West Branch" });
river.fork([
{
river: east,
when: (boat) => boat.region === "east",
map: (boat) => ({ ...boat, branch: "east" }),
},
{ river: west },
]);const seasonal = river.fork({ river: east, when: (boat) => boat.spring });
seasonal.join();river.dock(harbour)
To avoid continuing on a river fork, a boat might instead dock to a harbour.
Register dock rules. Docked boats do not continue to forks.
Harbour options:
when(boat): dock when true.unless(boat): do not dock when true.
Returns { shut() }.
If when is omitted, docking is the default unless unless returns true.
const localDock = river.dock({
when: (boat) => boat.destination === "local",
});
localDock.shut();Controlling
river.clog()
Big obstacles might fall into the river and clog it, preventing any boat from navigating.
Pause processing.
river.plow()
To unclog a river, we simply plow it and remove the obstacles. Boats can navigate again!
Resume processing after clog().
river.seal()
As the river owner, you could decide not to let any new boat navigate, without being able to stop those already on.
Close river for new boats.
river.open()
Due to community pressure, you might choose to open the river once more to new boats.
Re-open a sealed river.
river.tide()
You might want to check the water level first...
Read current processing concurrency.
console.log(river.tide());
// > 10river.rise(capacityOrFunction)
Water level changes the number of boats that can navigate the river simultaneously.
Update processing concurrency at runtime. Argument can be:
- a number to set the capacity to
- a function of the current capacity
(capacity) => newCapacity
Increasing capacity lets more boats process at once; decreasing limits new starts until in-flight work drops below the new limit.
river.rise(25);
console.log(river.tide());
// > 25
river.rise((level) => level + 5);
console.log(river.tide());
// > 30river.size(pending?)
You can count boats in-flight, queued, and pending.
Current size from the active buffer.
With default fixedBuffer:
river.size()returns in-flight + queued + pending.river.size(false)returns in-flight + queued.river.size(true)returns pending only.
console.log("Total:", river.size());
console.log("Queued:", river.size(false));
console.log("Pending:", river.size(true));Buffers
Buffers are used to decouple producers and consumers by having them constrain their processing capacity.
import riverside, {
fixedBuffer,
slidingBuffer,
droppingBuffer,
} from "async-rivers";fixedBuffer(size, options?)
Capacity-limited buffer with pending queue.
When main capacity is full:
add()becomes pending.push()adds to pending directly.
Options:
maxPending: Optional limit to pending list size.onOverflow:"drop","slide", or custom function.
const river = riverside({
buffer: fixedBuffer(2, {
maxPending: 3,
onOverflow: "drop",
}),
});
river.sail("A");
river.sail("B");
const third = river.sail("C"); // Waits until space is freeonOverflow function
A function argument to onOverflow must return an array representing the new pendings list.
onOverflow: (pendings, pending) => reorderedPendings
pendings: array of current pending entries, each shaped like { _id, val }pending: the new incoming pending entry, shaped like { _id, val }
Return an array describing the new pendings list, using those same { _id, val } entries.
Rules:
- Must return an array (or
undefinedto keep current pendings). - Only _ids are considered in reconciliation.
- Unknown _ids are ignored.
- Duplicate entries are deduplicated.
- Any candidate (existing or new) not kept is resolved as not added (
false). - No check is done on the returned value to see if it respects the
maxPending.
const custom = fixedBuffer(1, {
maxPending: 2,
/* Keep only high priority elements when overflowing */
onOverflow: (pendings, val) =>
[...pendings, val].filter((pending) => pending.val?.priority > 3),
});slidingBuffer(size)
Keeps latest size boats. Oldest is removed when full.
const river = riverside({ buffer: slidingBuffer(2) });
river.dump("A");
river.dump("B");
river.dump("C"); // A leaves the chat, B and C staydroppingBuffer(size)
Keeps first size boats. New arrivals are dropped when full.
const river = riverside({ buffer: droppingBuffer(2) });
await river.sail("A"); // true
await river.sail("B"); // true
await river.sail("C"); // falseBatching
canalLock(options)
Rivers can flow freely, but sometimes boats should gather and cross together through a canal lock.
Create a canal lock that groups boats from an input river into batches, then forwards each locked batch into a canal (another river).
A canal lock is useful when downstream work is more efficient in groups (bulk writes, batched API calls, grouped processing).
Options:
size: Required. Maximum number of boats per lock cycle (must be a positive finite number).waitMs: Time to wait in ms before locking a partial batch when no more boats are currently passing. Default:100.river: Options used to create the upstream river where individual boats are received.canal: Options used to create the downstream canal where grouped boats are sent.retryCount: Number of retries when a lock transfer fails. Default:0.onLockFail: What to do if lock transfer still fails after retries:"prepend"(default): put failed boats back at the front of the waiting batch."append": put failed boats at the end of the waiting batch."drop": discard failed boats.function(boats): custom failure handler.- falsy value: fire-and-forget transfer (
dump) with no failure handling.
onError(error): Optional handler for unexpected internal errors....rest: Copied directly onto the returned lock object (name, metadata, etc.).
import { canalLock } from "async-rivers";
const lock = canalLock({
name: "North Canal Lock",
size: 3,
waitMs: 500,
retryCount: 2,
onLockFail: "prepend",
});
/* Boats arrive one by one in the river */
lock.dump({ id: "A" });
lock.dump({ id: "B" });
lock.dump({ id: "C" }); // lock is full -> [A,B,C] sent to canalCanal Lock Methods
The lock exposes most river controls for incoming boats:
dump,sailhail,hopesinkblow,clog,plow,seal,opensize
And lock-specific methods:
scan(fn, options): hail batches on the canal side (each boat is now a batch array).lock(): manually request a lock/flush attempt.getSize(): current batch size.setSize(size): update batch size.
lock.scan(async (boats) => {
console.log(
"Lock opened for batch:",
boats.map((b) => b.id)
);
});Lock Failure Strategies
When lock transfer uses sail and fails (false or null), onLockFail decides what happens next.
const resilientLock = canalLock({
size: 5,
retryCount: 3,
onLockFail: "prepend", // keep failed boats first in line
});
const lossyLock = canalLock({
size: 5,
onLockFail: "drop", // discard failed lock batches
});
const customLock = canalLock({
size: 5,
onLockFail: async (boats) => {
await saveToDeadLetterQueue(boats);
},
});This way, the river can stay fluid while the lock controls how boats are grouped and how failures are handled.
Patterns
Simple Work Queue
const jobs = riverside({ bufferSize: 5 });
jobs.hail(
async (job) => {
await job.run();
},
{ when: (job) => job.enabled !== false }
);Wait For A Specific Event
const events = riverside();
const confirmationP = events.hope({
when: (event) =>
event.type === "payment.confirmed" && event.orderId === "o-123",
timeout: 5_000,
notFound: null,
});
events.dump({ type: "payment.confirmed", orderId: "o-123" });
const confirmation = await confirmationP;Route By Predicate
const premium = riverside({ name: "Premium Stream" });
const standard = riverside({ name: "Standard Stream" });
const intake = riverside();
intake.fork([
{ river: premium, when: (boat) => boat.plan === "premium" },
{ river: standard },
]);Decoupled request/response
hope enables decoupled request/response behaviours:
Process A
- Sail a uniquely identifiable boat on
toProcessRiver(to ensure it has been queued) - Hope for a boat with same id on
processedRiverwith a predefined timeout
Process B
- Hail (potentially filtered) incoming boats on
toProcessRiver - Sail a response boat with same id on
processedRiver
Process A could use a single AbortController signal to cancel both sail and hope.
If hope resolves to the notFound value, it may abort the shared signal.
import { riverside } from "async-rivers";
const requests = riverside();
const responses = riverside();
/* Process B */
requests.hail(async (req) => {
const result = req.value * 2;
await responses.sail({ id: req.id, result });
});
/* Process A */
const controller = new AbortController();
const id = "req-1";
const queued = await requests.sail(
{ id, value: 21 },
{ signal: controller.signal }
);
if (queued === true) {
const reply = await responses.hope({
when: (boat) => boat.id === id,
timeout: 2000,
notFound: null,
signal: controller.signal,
});
if (reply === null) controller.abort(); // optional shared cancel
console.log(reply); // { id: 'req-1', result: 42 } or null
}License
ISC
