@nxtedition/scheduler
v4.1.0
Published
A high-performance, priority-based task scheduler for Node.js with support for concurrency limiting, byte-rate throttling, and multi-worker coordination via `SharedArrayBuffer`.
Keywords
Readme
@nxtedition/scheduler
A high-performance, priority-based task scheduler for Node.js with support for concurrency limiting, byte-rate throttling, and multi-worker coordination via SharedArrayBuffer.
Install
npm install @nxtedition/schedulerUsage
Basic
import { Scheduler } from '@nxtedition/scheduler'
const scheduler = new Scheduler({ concurrency: 4 })
const result = await scheduler.run(async () => {
const res = await fetch('https://example.com')
return res.json()
})Priority
Seven priority levels are available:
| Name | Value |
| --------- | ----- |
| lowest | -3 |
| lower | -2 |
| low | -1 |
| normal | 0 |
| high | 1 |
| higher | 2 |
| highest | 3 |
// Using string priority
await scheduler.run(() => importantWork(), 'high')
// Using static constants
await scheduler.run(() => backgroundWork(), Scheduler.LOW)Low-Level API
For more control, use acquire / release directly:
scheduler.acquire(
(opaque) => {
try {
doWork(opaque)
} finally {
scheduler.release()
}
},
Scheduler.NORMAL,
opaqueData,
)Multi-Worker Coordination
Share a concurrency limit across worker threads using SharedArrayBuffer:
// Main thread
import { Scheduler } from '@nxtedition/scheduler'
import { Worker } from 'node:worker_threads'
const sharedState = Scheduler.makeSharedState(8)
const worker = new Worker('./worker.js', { workerData: sharedState })
// Worker thread
import { Scheduler } from '@nxtedition/scheduler'
import { workerData } from 'node:worker_threads'
const scheduler = new Scheduler(workerData)
await scheduler.run(() => work())UV Thread Pool Scheduling
A practical use case is coordinating access to the libuv thread pool (UV_THREADPOOL_SIZE) across multiple worker threads. For example, several HTTP file-serving workers can share a scheduler so that file-system operations (which consume UV thread pool slots) are prioritized and throttled globally:
// main.js
import { Scheduler } from '@nxtedition/scheduler'
import { Worker } from 'node:worker_threads'
const UV_THREADPOOL_SIZE = parseInt(process.env.UV_THREADPOOL_SIZE || '4', 10)
const sharedState = Scheduler.makeSharedState(UV_THREADPOOL_SIZE)
for (let i = 0; i < 4; i++) {
new Worker('./http-worker.js', { workerData: { sharedState } })
}// http-worker.js
import { Scheduler, parsePriority } from '@nxtedition/scheduler'
import { workerData } from 'node:worker_threads'
import fs from 'node:fs/promises'
const scheduler = new Scheduler(workerData.sharedState)
async function handleRequest(req, res) {
// Derive priority from a request header, e.g. "X-Priority: high"
const priority = parsePriority(req.headers['x-priority'] || 'normal')
const data = await scheduler.run(() => fs.readFile(filePath), priority)
res.end(data)
}This ensures that high-priority requests get file-system access first, while low-priority background work (thumbnails, transcoding, etc.) yields thread pool capacity without starving entirely — thanks to the built-in starvation prevention.
Monitoring
const { running, pending, deferred, queues } = scheduler.statsrunning— currently executing taskspending— tasks waiting in queuesdeferred— total tasks that were queued (lifetime counter)queues— per-priority queue counts
API
new Scheduler(opts)
opts.concurrency— max concurrent tasks (default:Infinity)optsmay also be aSharedArrayBuffercreated byScheduler.makeSharedState()
scheduler.run(fn, priority?, opaque?): Promise<T>
Execute fn within the scheduler. Returns a promise that resolves with the return value of fn.
scheduler.acquire(fn, priority?, opaque?): void
Low-level task acquisition. You must call scheduler.release() when done.
scheduler.release(): void
Signal task completion. Dequeues the next pending task if concurrency allows.
Scheduler.makeSharedState(concurrency): SharedArrayBuffer
Create shared state for cross-worker scheduling.
parsePriority(value): number
Parse a string or number into a normalized priority value.
Throttle
Throttle is a token-bucket rate limiter that controls how many bytes per second are processed. It shares the same priority system as Scheduler.
Basic
import { Throttle } from '@nxtedition/scheduler'
// Allow 1 MB/s
const throttle = new Throttle({ bytesPerSecond: 1_000_000 })
// Refill tokens every 10ms
setInterval(() => throttle.refill(), 10)Multi-Worker Coordination
Streaming
throttle.stream() returns a Node.js Transform stream that enforces backpressure — it won't call the next write until tokens are available:
import { createReadStream, createWriteStream } from 'node:fs'
import { pipeline } from 'node:stream/promises'
const throttle = new Throttle({ bytesPerSecond: 1_000_000 }) // 1 MB/s
setInterval(() => throttle.refill(), 10)
await pipeline(createReadStream('input.mp4'), throttle.stream(), createWriteStream('output.mp4'))Priority
Both run() and stream() accept a priority. Higher-priority work drains first when tokens are available:
// Low-priority stream (yields to higher-priority work)
const stream = throttle.stream('low')Low-Level API
throttle.acquire(
() => {
sendPacket(data)
},
data.byteLength,
'normal',
)acquire returns true if the callback ran immediately (tokens were available), or false if it was queued.
API
new Throttle(opts)
opts.bytesPerSecond— bytes per second (default:Infinity)optsmay also be aSharedArrayBuffercreated byThrottle.makeSharedState()
throttle.acquire(fn, bytes, priority?): boolean
Low-level acquisition. Returns true if fn ran immediately, false if queued.
throttle.stream(priority?): Transform
Returns a Transform stream that rate-limits data passing through it. Each chunk consumes chunk.length tokens.
License
MIT
