@nxtedition/scheduler
v4.1.12
Published
A high-performance, priority-based task scheduler for Node.js with support for concurrency limiting, byte-rate throttling, and multi-worker coordination via `SharedArrayBuffer`.
Maintainers
Keywords
Readme
@nxtedition/scheduler
A high-performance, priority-based task scheduler for Node.js with support for concurrency limiting, byte-rate throttling, and multi-worker coordination via SharedArrayBuffer.
Install
npm install @nxtedition/schedulerUsage
Basic
import { Scheduler } from '@nxtedition/scheduler'
const scheduler = new Scheduler({ concurrency: 4 })
const result = await scheduler.run(async () => {
const res = await fetch('https://example.com')
return res.json()
})Priority
Seven priority levels are available:
| Name | Value |
| --------- | ----- |
| lowest | -3 |
| lower | -2 |
| low | -1 |
| normal | 0 |
| high | 1 |
| higher | 2 |
| highest | 3 |
// Using string priority
await scheduler.run(() => importantWork(), 'high')
// Using static constants
await scheduler.run(() => backgroundWork(), Scheduler.LOW)Per-Priority Concurrency
Concurrency can be configured per priority to reserve capacity for higher-priority work. A task at priority p is admitted on the fast path only when total running tasks are below that priority's cap; otherwise the task queues. The overall max is always a hard ceiling.
// At most 100 concurrent tasks total. Background work (low/lowest) is held
// to a small fraction so interactive (normal/high) traffic isn't queued behind
// a flood of batch jobs.
const scheduler = new Scheduler({
concurrency: { max: 100, low: 20, lowest: 5 },
})Unspecified priorities inherit the cap from the priority just below them — caps propagate downward, not upward. Priorities above the topmost explicit fall back to max, and an unspecified lowest seeds from the bottommost explicit. The example above resolves to lowest=lower=5, low=20, normal..highest=100. Each per-priority value is itself capped at max.
Starvation prevention
Per-priority caps act as backpressure: when running tasks exceed a priority's cap, new tasks at that priority queue. If higher-priority tasks keep arriving, the capped queue could be starved indefinitely. To prevent that, the dispatch loop's fairness lottery (already used to give lower priorities a turn under uniform concurrency) bypasses per-priority caps when it picks a queued priority — so a fully-capped queue still drains slowly. The overall max is still respected. When the lottery picks an empty tier, its share goes to the highest-priority backlogged queue (within caps), so unused tiers never inflate lower-priority throughput.
In SharedArrayBuffer mode, the second constructor argument carries per-priority limits (the buffer carries max across workers):
const sharedState = Scheduler.makeSharedState(100) // global max=100
const scheduler = new Scheduler(sharedState, {
concurrency: { low: 20, lowest: 5 }, // per-worker caps
})Low-Level API
For more control, use acquire / release directly:
scheduler.acquire(
async (opaque) => {
try {
await doWork(opaque)
} finally {
scheduler.release()
}
},
Scheduler.NORMAL,
opaqueData,
)Error contract: if fn throws synchronously, the scheduler releases the slot itself (the error propagates to the acquire() caller on the fast path, and surfaces as a deferred uncaught exception when thrown from a queued dispatch). Do not also call release() on the synchronous-throw path — a try/finally around synchronous work would double-release and free a slot belonging to some other in-flight task. With an async callback as above the finally is correct: a rejecting async function does not throw synchronously, so the slot is yours to release exactly once.
Multi-Worker Coordination
Share a concurrency limit across worker threads using SharedArrayBuffer:
// Main thread
import { Scheduler } from '@nxtedition/scheduler'
import { Worker } from 'node:worker_threads'
const sharedState = Scheduler.makeSharedState(8)
const worker = new Worker('./worker.js', { workerData: sharedState })
// Worker thread
import { Scheduler } from '@nxtedition/scheduler'
import { workerData } from 'node:worker_threads'
const scheduler = new Scheduler(workerData)
await scheduler.run(() => work())The global max is a hard cap across all workers — a worker that finds the global counter at the limit queues its task locally and waits (via Atomics.waitAsync on the shared counter). When another worker releases a slot, Atomics.notify wakes the waiter, which retries dispatch.
Atomics.waitAsync does not keep the Node event loop alive. If a worker is fully idle (running === 0) with tasks queued waiting for capacity, you need something else holding the loop open — usually trivial in real applications (HTTP servers, intervals, etc.), but standalone scripts may need a setInterval(() => {}, …) until their work is done.
UV Thread Pool Scheduling
A practical use case is coordinating access to the libuv thread pool (UV_THREADPOOL_SIZE) across multiple worker threads. For example, several HTTP file-serving workers can share a scheduler so that file-system operations (which consume UV thread pool slots) are prioritized and throttled globally:
// main.js
import { Scheduler } from '@nxtedition/scheduler'
import { Worker } from 'node:worker_threads'
const UV_THREADPOOL_SIZE = parseInt(process.env.UV_THREADPOOL_SIZE || '4', 10)
const sharedState = Scheduler.makeSharedState(UV_THREADPOOL_SIZE)
for (let i = 0; i < 4; i++) {
new Worker('./http-worker.js', { workerData: { sharedState } })
}// http-worker.js
import { Scheduler, parsePriority } from '@nxtedition/scheduler'
import { workerData } from 'node:worker_threads'
import fs from 'node:fs/promises'
const scheduler = new Scheduler(workerData.sharedState)
async function handleRequest(req, res) {
// Derive priority from a request header, e.g. "X-Priority: high"
const priority = parsePriority(req.headers['x-priority'] || 'normal')
const data = await scheduler.run(() => fs.readFile(filePath), priority)
res.end(data)
}This ensures that high-priority requests get file-system access first, while low-priority background work (thumbnails, transcoding, etc.) yields thread pool capacity without starving entirely — thanks to the built-in starvation prevention.
Monitoring
const { running, pending, queues } = scheduler.statsrunning— currently executing taskspending— tasks waiting in queuesqueues— per-priority queue counts
API
new Scheduler(opts, options?)
opts.concurrency— concurrency configuration (default:Infinity). Either a number (the overall max) or an object withmaxand per-priority caps:type ConcurrencyOptions = | number | { max?: number highest?: number higher?: number high?: number normal?: number low?: number lower?: number lowest?: number }optsmay also be aSharedArrayBuffercreated byScheduler.makeSharedState(). In that case, an optional second argument{ concurrency }may carry per-priority caps (the buffer'smaxis always the global ceiling).
scheduler.run(fn, priority?, opaque?): Promise<T>
Execute fn within the scheduler. Returns a promise that resolves with the return value of fn (thenables are assimilated via Promise.resolve). After the scheduler has been disposed, run() rejects with Error('Scheduler is disposed') instead of hanging; with an overall concurrency of 0 (which could never dispatch anything) it rejects with Error('Scheduler concurrency is 0').
scheduler.acquire(fn, priority?, opaque?): boolean
Low-level task acquisition. Returns true if a slot was available and fn ran synchronously, or false if the task was queued (also false if the scheduler has been disposed). The fast path additionally requires that nothing is queued at the same priority, preserving FIFO order within a priority; a fresh task can still start ahead of work queued at a different priority while a freed slot is in flight (a deliberate latency trade-off, bounded to a microtask in shared mode). You must call scheduler.release() when fn completes — except when fn throws synchronously, in which case the scheduler has already released the slot (see the Low-Level API example).
scheduler.tryAcquire(priority?): boolean
Non-blocking. Takes a slot and returns true only if one is free at the given priority and nothing is queued at that priority (mirroring limiter.tryConsume); otherwise returns false without queuing. A successful tryAcquire must be paired with release().
scheduler.release(): void
Signal task completion. Dequeues the next pending task if concurrency allows.
scheduler.concurrency
The overall concurrency ceiling (Infinity when unlimited). In shared mode this is the buffer's global max; a per-instance options.concurrency.max is enforced but not reflected here.
scheduler.stats
{ running, pending, queues, shared } — locally running tasks, locally queued tasks, per-priority queue counts, and (shared mode only) the global { running, waiters } counters. Allocates fresh objects on every read; poll accordingly.
Scheduler.makeSharedState(concurrency): SharedArrayBuffer
Create shared state for cross-worker scheduling. concurrency must be Infinity or a non-negative integer that fits in a signed 32-bit integer (≤ 2,147,483,647) — the counter is stored as Int32.
scheduler[Symbol.dispose]()
Disposes the scheduler: releases any global slots it still holds (shared mode), rejects queued run() promises with Error('Scheduler is disposed'), drops the reference to a pending shared-mode Atomics.waitAsync, and makes all further calls inert (acquire / tryAcquire return false; run rejects). Use when abandoning a shared-mode Scheduler with tasks still queued so the instance and its parked wait can be garbage-collected (it is otherwise kept alive until process exit so its held slots can be released). Works with using. Note that slots held by still-in-flight tasks are returned to the shared pool immediately — until those tasks actually finish, other workers can transiently admit more than max (the limit is soft by design).
parsePriority(value): number
Parse a string or number into a normalized priority value.
Limiter
Limiter is a token-bucket rate limiter that controls how many tokens per second are processed — typically bytes, but any integer unit works. It shares the same priority system as Scheduler.
Previously exported as
Throttle. TheThrottlealias is retained for backward compatibility but is deprecated — preferLimiter.
Tokens refill automatically: the constructor starts an internal timer, and refills are also triggered lazily from consume() / tryConsume(). There is no manual refill step. While work is queued the timer holds the event loop open (so a script never exits with queued callbacks silently dropped); an idle limiter does not keep the process alive.
Basic
import { Limiter } from '@nxtedition/scheduler'
// Allow 1 MB/s (tokens are bytes here)
const limiter = new Limiter({ tokensPerSecond: 1_000_000 })
// Consume `chunk.length` tokens at normal priority, running the callback when available
limiter.consume(() => send(chunk), chunk.length)Streaming
limiter.stream() returns a Node.js Transform stream that enforces backpressure — it won't pass a chunk through until enough tokens are available. Each chunk consumes chunk.length tokens:
import { createReadStream, createWriteStream } from 'node:fs'
import { pipeline } from 'node:stream/promises'
const limiter = new Limiter({ tokensPerSecond: 1_000_000 }) // 1 MB/s
await pipeline(createReadStream('input.mp4'), limiter.stream(), createWriteStream('output.mp4'))Priority
Both consume() and stream() accept a priority. Higher-priority work drains first when tokens are available:
// Low-priority stream (yields to higher-priority work)
const stream = limiter.stream('low')Multi-Worker Coordination
Share a token bucket across worker threads using SharedArrayBuffer, exactly like Scheduler:
// Main thread
import { Limiter } from '@nxtedition/scheduler'
import { Worker } from 'node:worker_threads'
const sharedState = Limiter.makeSharedState(1_000_000) // 1 MB/s shared across all workers
new Worker('./worker.js', { workerData: sharedState })
// Worker thread
import { Limiter } from '@nxtedition/scheduler'
import { workerData } from 'node:worker_threads'
const limiter = new Limiter(workerData)
limiter.consume(() => send(chunk), chunk.length)Low-Level API
const ran = limiter.consume(
() => {
sendPacket(data)
},
data.byteLength,
'normal',
)consume returns true if the callback ran immediately (tokens were available and nothing was queued ahead of it — the call refills the bucket from elapsed time first, so an idle or freshly created limiter answers true when the credit covers the request), or false if it was queued. Note the boundary is not airtight: a false (queued) task can still run synchronously within the same call when the drain the call triggers reaches it immediately.
API
new Limiter(opts)
opts.tokensPerSecond— tokens (e.g. bytes) per second. Required; must be a positive integer that fits in a signed 32-bit integer (1 … 2,147,483,647), since tokens are tracked asInt32.0, negative, non-integer,NaN,Infinity, and values above 2,147,483,647 all throw.optsmay also be aSharedArrayBuffercreated byLimiter.makeSharedState(tokensPerSecond).
limiter.consume(fn, bytes, priority?, opaque?): boolean
Consume bytes tokens, running fn(opaque) when they are available. Returns true if fn ran immediately, false if it was queued. opaque is passed through to fn on both paths.
bytes must be a non-negative integer no larger than 2,147,483,647 (the signed Int32 max — tokens are tracked as Int32); negative, non-integer, NaN, Infinity, and out-of-range values throw (a negative or wrapping value would otherwise add tokens via Atomics.sub and corrupt the shared bucket). A bytes of 0 consumes no tokens and never waits on the bucket. It runs immediately, returning true, only when nothing is queued at all (pending === 0); otherwise consume returns false, but once it reaches the front of its priority queue it drains regardless of the token balance — even while the bucket is empty or in debt. Queued behind token-starved work at the same priority it waits its turn like any other task (FIFO head-of-line; a different, unblocked priority queue can drain it sooner).
Starvation guard: a queued task larger than the instantaneous token balance could otherwise starve under sustained traffic — smaller tasks would keep spending each refill's credit first. Any queue whose front item makes no progress for ~1 second is force-admitted into token debt; the negative balance is repaid by subsequent refills before anything else drains, so the long-run rate still holds (the trade-off is a transient burst, at most one stalled item per queue per second). The same mechanism bounds the wait for a request larger than tokensPerSecond (the bucket's capacity, which it could never fit): it is admitted once the bucket fills or the ~1s stall guard fires, whichever comes first — i.e. it is throttled, not dropped or stalled forever. In shared mode the guard is per-instance best-effort: other workers may keep consuming the shared bucket.
limiter.tryConsume(bytes): boolean
Non-blocking. Consumes bytes and returns true only if enough tokens are available right now and nothing is queued; otherwise returns false without queuing.
limiter.tokensPerSecond
The configured rate (and bucket capacity).
limiter.stats
{ tokens, pending, queues } — current token balance (negative while in debt), queued tasks, and per-priority queue counts. Allocates fresh objects on every read.
limiter.stream(priority?, options?): Transform
Returns a Transform stream that rate-limits data passing through it. Each chunk consumes chunk.length tokens; a chunk with no non-negative-integer length (e.g. in objectMode) is treated as weightless (0 tokens) — it never waits on tokens, though it still preserves the limiter's queue ordering (if other work is already queued, it drains in turn rather than jumping ahead). options is forwarded to the underlying Transform (e.g. objectMode, highWaterMark). Destroying the stream while a chunk is still queued in the limiter does not push into the dead stream.
Limiter.makeSharedState(tokensPerSecond): SharedArrayBuffer
Create shared state for cross-worker rate limiting.
License
MIT
