callbag-distribute
v2.0.0
Published
callbag that distributes (shared) sources to many sinks
Readme
callbag-distribute
Callbag operator that broadcasts a single source to multiple sinks. Does reference counting on sinks and starts the source when the first sink gets connected, similar to RxJS .share(). Works on either pullable or listenable sources. Distributes on data requests one at a time.
How it works
The distribute operator acts as a load balancer. It distributes data items from a single source to multiple sinks (workers). When a sink requests data, distribute forwards that request to the source. When the source emits data, distribute routes it specifically to the sink that requested it.
sequenceDiagram
participant Source
participant Distribute
participant Worker1
participant Worker2
Note over Worker1, Distribute: 1. Worker joins
Worker1->>Distribute: Handshake (0)
Distribute->>Source: Handshake (0)
Note over Worker1: 2. Worker asks for data
Worker1->>Distribute: Request (1)
Distribute->>Source: Request (1)
Source->>Distribute: Data (1)
Distribute->>Worker1: DataPayload (1)
Note over Worker2: 3. Another Worker joins
Worker2->>Distribute: Handshake (0)
Note over Worker2: 4. Second Worker asks
Worker2->>Distribute: Request (1)
Distribute->>Source: Request (1)
Source->>Distribute: Data (1)
Distribute->>Worker2: DataPayload (1)Usages
This pattern is ideal for a Task Queue -> Task Runner architecture.
For example, using fromIter with an array of tasks, you can have multiple workers pulling tasks as they complete their previous work.
makeWorker takes three arguments:
- A Promise-returning function (unit of work)
- A callback for when the queue/worker is exhausted
- An optional ID for logging
When ran with NODE_ENV=dev, detailed logs of the queue processing are emitted.
installation
npm install callbag-distribute
example
Share a pullable source to 5 pullers:
import { distribute, makeWorker } from 'callbag-distribute'
import { fromIter } from 'callbag-from-iter';
function randomIntFromInterval(min: number, max: number): number {
// min and max included
return Math.floor(Math.random() * (max - min + 1) + min);
}
const MAX_WORKERS = 5;
// 1, 2, 3, ... 150
const arrayOfWork = Array.from(Array(150).keys());
const source = distribute(
fromIter(arrayOfWork),
);
const DYNAMIC_WORKERS = arrayOfWork.length < MAX_WORKERS ? arrayOfWork.length : MAX_WORKERS;
for (let step = 0; step < DYNAMIC_WORKERS; step++) {
source(
0,
makeWorker(
data => { // argument one - what to do with 'each' data value from source. this is your business logic.
const randomTime = randomIntFromInterval(2000, 5000);
console.log(`start ${data} expecting to take ${randomTime}`);
console.time(`stop ${data}`);
return new Promise(resolve => {
const id = setTimeout(() => {
console.timeEnd(`stop ${data}`);
clearTimeout(id);
resolve();
}, randomTime);
});
},
() => console.log(`worker ${step} complete`), // upon completion, invoke this callback. this could resolve a promise in conjunction of Promise.all.
),
);
}
results in:
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'0,223164', event: 'handshake'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'0,2108004', event: 'handshake done... asking for first '}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'0,2271832', event: 'starting', value:' 10'}
start 10 expecting to take 2442
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'0,3892', event: 'handshake'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'0,60011', event: 'handshake done... asking for first '}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'0,111226', event: 'starting', value:' 20'}
start 20 expecting to take 3607
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'0,3151', event: 'handshake'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'0,62442', event: 'handshake done... asking for first '}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'0,103227', event: 'starting', value:' 30'}
start 30 expecting to take 2883
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'0,6640', event: 'handshake'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'0,52010', event: 'handshake done... asking for first '}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'0,88967', event: 'starting', value:' 40'}
start 40 expecting to take 4497
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'0,2073', event: 'handshake'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'0,44151', event: 'handshake done... asking for first '}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'0,82840', event: 'starting', value:' 50'}
start 50 expecting to take 3686
stop 10: 2442.566ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'2,445532320', event: 'ending/next', value:' 10'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'2,445671245', event: 'starting', value:' 60'}
start 60 expecting to take 4210
stop 30: 2883.590ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'2,885258026', event: 'ending/next', value:' 30'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'2,885691943', event: 'starting', value:' 70'}
start 70 expecting to take 4283
stop 20: 3611.561ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'3,612048365', event: 'ending/next', value:' 20'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'3,612151050', event: 'starting', value:' 80'}
start 80 expecting to take 3960
stop 50: 3686.255ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'3,686604276', event: 'ending/next', value:' 50'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'3,686682518', event: 'starting', value:' 90'}
start 90 expecting to take 3349
stop 40: 4497.284ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'4,497598329', event: 'ending/next', value:' 40'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'4,497673581', event: 'starting', value:' 100'}
start 100 expecting to take 2215
stop 60: 4214.282ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'6,660369948', event: 'ending/next', value:' 60'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'6,660509683', event: 'starting', value:' 110'}
start 110 expecting to take 2133
stop 100: 2216.248ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'6,714199881', event: 'ending/next', value:' 100'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'6,714290982', event: 'starting', value:' 120'}
start 120 expecting to take 4390
stop 90: 3352.460ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'7,39403927', event: 'ending/next', value:' 90'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'7,39469828', event: 'starting', value:' 130'}
start 130 expecting to take 4979
stop 70: 4285.245ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'7,171343486', event: 'ending/next', value:' 70'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'7,171479174', event: 'starting', value:' 140'}
start 140 expecting to take 3710
stop 80: 3963.032ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'7,575604833', event: 'ending/next', value:' 80'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'7,575750841', event: 'starting', value:' 150'}
start 150 expecting to take 2744
stop 110: 2133.248ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'8,794234102', event: 'ending/next', value:' 110'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'8,794378890', event: 'starting', value:' 160'}
start 160 expecting to take 4429
stop 150: 2746.237ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'10,322319219', event: 'ending/next', value:' 150'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'10,322390166', event: 'starting', value:' 170'}
start 170 expecting to take 3706
stop 140: 3710.244ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'10,882026939', event: 'ending/next', value:' 140'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'10,882087033', event: 'starting', value:' 180'}
start 180 expecting to take 2475
stop 120: 4391.874ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'11,106455004', event: 'ending/next', value:' 120'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'11,106540831', event: 'starting', value:' 190'}
start 190 expecting to take 3592
stop 130: 4980.057ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'12,19704877', event: 'ending/next', value:' 130'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'12,19755390', event: 'starting', value:' 200'}
start 200 expecting to take 4229
stop 160: 4429.071ms
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'13,223731139', event: 'ending/next', value:' 160'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'13,223796726', event: 'tasks exhausted'}
{worker: 'dc464ce0-5d6b-11ea-9530-b78c45c386cc', time:'13,223846271', event: 'finished'}
worker 0 complete
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'13,221035639', event: 'tasks exhausted'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'13,220764960', event: 'tasks exhausted'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'13,220577889', event: 'tasks exhausted'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'13,220420585', event: 'tasks exhausted'}
stop 180: 2476.131ms
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'13,358533569', event: 'ending/next', value:' 180'}
{worker: 'dc46c211-5d6b-11ea-9530-b78c45c386cc', time:'13,358622798', event: 'finished', value:' 180'}
worker 2 complete
stop 170: 3707.192ms
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'14,29777617', event: 'ending/next', value:' 170'}
{worker: 'dc46c210-5d6b-11ea-9530-b78c45c386cc', time:'14,29829570', event: 'finished', value:' 170'}
worker 1 complete
stop 190: 3595.368ms
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'14,702740279', event: 'ending/next', value:' 190'}
{worker: 'dc46c212-5d6b-11ea-9530-b78c45c386cc', time:'14,702915802', event: 'finished', value:' 190'}
worker 3 complete
stop 200: 4230.908ms
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'16,250910924', event: 'ending/next', value:' 200'}
{worker: 'dc46c213-5d6b-11ea-9530-b78c45c386cc', time:'16,251002813', event: 'finished', value:' 200'}
worker 4 complete
Thanks to
https://blog.krawaller.se/posts/explaining-callbags-via-typescript-definitions/
