npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

callbag-distribute

v2.0.0

Published

callbag that distributes (shared) sources to many sinks

Readme

callbag-distribute

Callbag operator that broadcasts a single source to multiple sinks. Does reference counting on sinks and starts the source when the first sink gets connected, similar to RxJS .share(). Works on either pullable or listenable sources. Distributes on data requests one at a time.

How it works

The distribute operator acts as a load balancer. It distributes data items from a single source to multiple sinks (workers). When a sink requests data, distribute forwards that request to the source. When the source emits data, distribute routes it specifically to the sink that requested it.

sequenceDiagram
    participant Source
    participant Distribute
    participant Worker1
    participant Worker2

    Note over Worker1, Distribute: 1. Worker joins
    Worker1->>Distribute: Handshake (0)
    Distribute->>Source: Handshake (0)

    Note over Worker1: 2. Worker asks for data
    Worker1->>Distribute: Request (1)
    Distribute->>Source: Request (1)
    Source->>Distribute: Data (1)
    Distribute->>Worker1: DataPayload (1)

    Note over Worker2: 3. Another Worker joins
    Worker2->>Distribute: Handshake (0)
   
    Note over Worker2: 4. Second Worker asks
    Worker2->>Distribute: Request (1)
    Distribute->>Source: Request (1)
    Source->>Distribute: Data (1)
    Distribute->>Worker2: DataPayload (1)

Usages

This pattern is ideal for a Task Queue -> Task Runner architecture. For example, using fromIter with an array of tasks, you can have multiple workers pulling tasks as they complete their previous work.

makeWorker takes three arguments:

  1. A Promise-returning function (unit of work)
  2. A callback for when the queue/worker is exhausted
  3. An optional ID for logging

When ran with NODE_ENV=dev, detailed logs of the queue processing are emitted.

installation

npm install callbag-distribute

example

Share a pullable source to 5 pullers:


import { distribute, makeWorker } from 'callbag-distribute'
import { fromIter } from 'callbag-from-iter';


function randomIntFromInterval(min: number, max: number): number {
    // min and max included
    return Math.floor(Math.random() * (max - min + 1) + min);
}
const MAX_WORKERS = 5;

// 1, 2, 3, ... 150
const arrayOfWork = Array.from(Array(150).keys());

const source = distribute(
    fromIter(arrayOfWork),
);

const DYNAMIC_WORKERS = arrayOfWork.length < MAX_WORKERS ? arrayOfWork.length : MAX_WORKERS;

for (let step = 0; step < DYNAMIC_WORKERS; step++) {
    source(
        0,
        makeWorker(
            data => { // argument one - what to do with 'each' data value from source.  this is your business logic.
                const randomTime = randomIntFromInterval(2000, 5000);
                console.log(`start ${data} expecting to take ${randomTime}`);
                console.time(`stop  ${data}`);
                return new Promise(resolve => {
                    const id = setTimeout(() => {
                        console.timeEnd(`stop  ${data}`);
                        clearTimeout(id);
                        resolve();
                    }, randomTime);
                });
            },
            () => console.log(`worker ${step} complete`), // upon completion, invoke this callback. this could resolve a promise in conjunction of Promise.all. 
        ),
    );
}

results in:

{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'0,223164', event: 'handshake'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'0,2108004', event: 'handshake done...  asking for first '}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'0,2271832', event: 'starting', value:' 10'}
start 10 expecting to take 2442
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'0,3892', event: 'handshake'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'0,60011', event: 'handshake done...  asking for first '}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'0,111226', event: 'starting', value:' 20'}
start 20 expecting to take 3607
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'0,3151', event: 'handshake'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'0,62442', event: 'handshake done...  asking for first '}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'0,103227', event: 'starting', value:' 30'}
start 30 expecting to take 2883
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'0,6640', event: 'handshake'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'0,52010', event: 'handshake done...  asking for first '}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'0,88967', event: 'starting', value:' 40'}
start 40 expecting to take 4497
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'0,2073', event: 'handshake'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'0,44151', event: 'handshake done...  asking for first '}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'0,82840', event: 'starting', value:' 50'}
start 50 expecting to take 3686
stop  10: 2442.566ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'2,445532320', event: 'ending/next', value:' 10'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'2,445671245', event: 'starting', value:' 60'}
start 60 expecting to take 4210
stop  30: 2883.590ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'2,885258026', event: 'ending/next', value:' 30'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'2,885691943', event: 'starting', value:' 70'}
start 70 expecting to take 4283
stop  20: 3611.561ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'3,612048365', event: 'ending/next', value:' 20'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'3,612151050', event: 'starting', value:' 80'}
start 80 expecting to take 3960
stop  50: 3686.255ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'3,686604276', event: 'ending/next', value:' 50'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'3,686682518', event: 'starting', value:' 90'}
start 90 expecting to take 3349
stop  40: 4497.284ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'4,497598329', event: 'ending/next', value:' 40'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'4,497673581', event: 'starting', value:' 100'}
start 100 expecting to take 2215
stop  60: 4214.282ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'6,660369948', event: 'ending/next', value:' 60'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'6,660509683', event: 'starting', value:' 110'}
start 110 expecting to take 2133
stop  100: 2216.248ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'6,714199881', event: 'ending/next', value:' 100'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'6,714290982', event: 'starting', value:' 120'}
start 120 expecting to take 4390
stop  90: 3352.460ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'7,39403927', event: 'ending/next', value:' 90'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'7,39469828', event: 'starting', value:' 130'}
start 130 expecting to take 4979
stop  70: 4285.245ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'7,171343486', event: 'ending/next', value:' 70'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'7,171479174', event: 'starting', value:' 140'}
start 140 expecting to take 3710
stop  80: 3963.032ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'7,575604833', event: 'ending/next', value:' 80'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'7,575750841', event: 'starting', value:' 150'}
start 150 expecting to take 2744
stop  110: 2133.248ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'8,794234102', event: 'ending/next', value:' 110'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'8,794378890', event: 'starting', value:' 160'}
start 160 expecting to take 4429
stop  150: 2746.237ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'10,322319219', event: 'ending/next', value:' 150'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'10,322390166', event: 'starting', value:' 170'}
start 170 expecting to take 3706
stop  140: 3710.244ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'10,882026939', event: 'ending/next', value:' 140'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'10,882087033', event: 'starting', value:' 180'}
start 180 expecting to take 2475
stop  120: 4391.874ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'11,106455004', event: 'ending/next', value:' 120'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'11,106540831', event: 'starting', value:' 190'}
start 190 expecting to take 3592
stop  130: 4980.057ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'12,19704877', event: 'ending/next', value:' 130'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'12,19755390', event: 'starting', value:' 200'}
start 200 expecting to take 4229
stop  160: 4429.071ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'13,223731139', event: 'ending/next', value:' 160'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'13,223796726', event: 'tasks exhausted'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'13,223846271', event: 'finished'}
worker 0 complete
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'13,221035639', event: 'tasks exhausted'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'13,220764960', event: 'tasks exhausted'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'13,220577889', event: 'tasks exhausted'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'13,220420585', event: 'tasks exhausted'}
stop  180: 2476.131ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'13,358533569', event: 'ending/next', value:' 180'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'13,358622798', event: 'finished', value:' 180'}
worker 2 complete
stop  170: 3707.192ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'14,29777617', event: 'ending/next', value:' 170'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'14,29829570', event: 'finished', value:' 170'}
worker 1 complete
stop  190: 3595.368ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'14,702740279', event: 'ending/next', value:' 190'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'14,702915802', event: 'finished', value:' 190'}
worker 3 complete
stop  200: 4230.908ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'16,250910924', event: 'ending/next', value:' 200'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'16,251002813', event: 'finished', value:' 200'}
worker 4 complete

Thanks to

https://blog.krawaller.se/posts/explaining-callbags-via-typescript-definitions/