@billdaddy/queuekit
v0.1.1
Published
Tiny, type-safe async task queue — concurrency limit, priority, pause/resume, onIdle/onEmpty drain, and AbortSignal. Zero dependencies.
Maintainers
Readme
queuekit
Tiny, type-safe async task queue — concurrency limit, priority, pause/resume,
onIdle/onEmptydrain, andAbortSignal. Zero dependencies.
When you have a known list, you map it with a concurrency limit and you're done.
But when work arrives over time — webhook handlers, a crawler discovering
links, jobs trickling off a stream — you need something stateful: a queue you
can keep adding to, that bounds how much runs at once and lets you await the
drain. That's queuekit.
import { Queue } from "@billdaddy/queuekit";
const queue = new Queue({ concurrency: 4 });
for (const url of discoverUrls()) {
queue.add(() => crawl(url)); // returns a promise for this task's result
}
await queue.onIdle(); // resolves when everything has settledWhy queuekit?
- Stateful, not one-shot. Add tasks whenever; each
add()returns a promise that settles with that task's value (or its error — failures don't stall the queue). - Concurrency you can change live. Set a limit up front, raise or lower it at runtime and the queue immediately starts (or stops feeding) tasks.
- Priority. Higher-priority tasks jump ahead; ties keep FIFO order.
- Pause / resume / clear. Stage work with
autoStart: false,pause()to hold,clear()to drop what's waiting. - Await the drain.
onEmpty()(nothing waiting) andonIdle()(nothing waiting and nothing running). - Cancellable. A queue-level
AbortSignal, plus a per-task signal to cancel a single queued task. - Zero dependencies, ESM + CJS + types.
Install
npm install @billdaddy/queuekit
# or: pnpm add @billdaddy/queuekit / yarn add @billdaddy/queuekit / bun add @billdaddy/queuekitAPI
new Queue(options?)
interface QueueOptions {
concurrency?: number; // max running at once (default Infinity)
autoStart?: boolean; // process as added (default true)
signal?: AbortSignal; // abort clears pending + blocks new adds
}queue.add(task, options?) → Promise<T>
interface AddOptions {
priority?: number; // higher first, ties FIFO (default 0)
signal?: AbortSignal; // cancel this task while still queued
}
const value = await queue.add(() => fetchJson(url));
await queue.add(() => fetchJson(url), { priority: 10 });addAll(tasks, options?) enqueues many and resolves to their results in order.
Control & inspection
| Member | Description |
| --- | --- |
| start() / pause() | Resume / stop starting new tasks (in-flight finish). |
| clear() | Drop pending tasks; each rejects with QueueClearedError. |
| onEmpty() | Promise that resolves when nothing is waiting. |
| onIdle() | Promise that resolves when fully drained. |
| size | Tasks waiting to start. |
| pending | Tasks currently running. |
| isPaused | Whether processing is paused. |
| concurrency | Get/set the limit (settable at runtime). |
Patterns
Rate-stable worker — stage, then run in priority order:
const queue = new Queue({ concurrency: 2, autoStart: false });
jobs.forEach((j) => queue.add(() => run(j), { priority: j.urgent ? 1 : 0 }));
queue.start();
await queue.onIdle();Backpressure — wait for room before queueing more:
for (const item of hugeStream) {
if (queue.size > 1000) await queue.onEmpty();
queue.add(() => process(item));
}
await queue.onIdle();Pairs well with
| Need | Use |
| --- | --- |
| Map a known list with a concurrency cap | runpool |
| Cap rate (per second / token bucket) | ratebucket |
| Serialize a critical section | mutexkit |
| Retry a flaky task | retryfn |
Contributors ✨
This project follows the all-contributors specification. Contributions of any kind are welcome — code, docs, bug reports, ideas, reviews! See the emoji key for how each contribution is recognized, and open a PR or issue to get involved.
Thanks goes to these wonderful people:
License
MIT © Tung Tran
