npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@nxtedition/scheduler

v4.1.12

Published

A high-performance, priority-based task scheduler for Node.js with support for concurrency limiting, byte-rate throttling, and multi-worker coordination via `SharedArrayBuffer`.

Readme

@nxtedition/scheduler

A high-performance, priority-based task scheduler for Node.js with support for concurrency limiting, byte-rate throttling, and multi-worker coordination via SharedArrayBuffer.

Install

npm install @nxtedition/scheduler

Usage

Basic

import { Scheduler } from '@nxtedition/scheduler'

const scheduler = new Scheduler({ concurrency: 4 })

const result = await scheduler.run(async () => {
  const res = await fetch('https://example.com')
  return res.json()
})

Priority

Seven priority levels are available:

| Name | Value | | --------- | ----- | | lowest | -3 | | lower | -2 | | low | -1 | | normal | 0 | | high | 1 | | higher | 2 | | highest | 3 |

// Using string priority
await scheduler.run(() => importantWork(), 'high')

// Using static constants
await scheduler.run(() => backgroundWork(), Scheduler.LOW)

Per-Priority Concurrency

Concurrency can be configured per priority to reserve capacity for higher-priority work. A task at priority p is admitted on the fast path only when total running tasks are below that priority's cap; otherwise the task queues. The overall max is always a hard ceiling.

// At most 100 concurrent tasks total. Background work (low/lowest) is held
// to a small fraction so interactive (normal/high) traffic isn't queued behind
// a flood of batch jobs.
const scheduler = new Scheduler({
  concurrency: { max: 100, low: 20, lowest: 5 },
})

Unspecified priorities inherit the cap from the priority just below them — caps propagate downward, not upward. Priorities above the topmost explicit fall back to max, and an unspecified lowest seeds from the bottommost explicit. The example above resolves to lowest=lower=5, low=20, normal..highest=100. Each per-priority value is itself capped at max.

Starvation prevention

Per-priority caps act as backpressure: when running tasks exceed a priority's cap, new tasks at that priority queue. If higher-priority tasks keep arriving, the capped queue could be starved indefinitely. To prevent that, the dispatch loop's fairness lottery (already used to give lower priorities a turn under uniform concurrency) bypasses per-priority caps when it picks a queued priority — so a fully-capped queue still drains slowly. The overall max is still respected. When the lottery picks an empty tier, its share goes to the highest-priority backlogged queue (within caps), so unused tiers never inflate lower-priority throughput.

In SharedArrayBuffer mode, the second constructor argument carries per-priority limits (the buffer carries max across workers):

const sharedState = Scheduler.makeSharedState(100) // global max=100
const scheduler = new Scheduler(sharedState, {
  concurrency: { low: 20, lowest: 5 }, // per-worker caps
})

Low-Level API

For more control, use acquire / release directly:

scheduler.acquire(
  async (opaque) => {
    try {
      await doWork(opaque)
    } finally {
      scheduler.release()
    }
  },
  Scheduler.NORMAL,
  opaqueData,
)

Error contract: if fn throws synchronously, the scheduler releases the slot itself (the error propagates to the acquire() caller on the fast path, and surfaces as a deferred uncaught exception when thrown from a queued dispatch). Do not also call release() on the synchronous-throw path — a try/finally around synchronous work would double-release and free a slot belonging to some other in-flight task. With an async callback as above the finally is correct: a rejecting async function does not throw synchronously, so the slot is yours to release exactly once.

Multi-Worker Coordination

Share a concurrency limit across worker threads using SharedArrayBuffer:

// Main thread
import { Scheduler } from '@nxtedition/scheduler'
import { Worker } from 'node:worker_threads'

const sharedState = Scheduler.makeSharedState(8)
const worker = new Worker('./worker.js', { workerData: sharedState })

// Worker thread
import { Scheduler } from '@nxtedition/scheduler'
import { workerData } from 'node:worker_threads'

const scheduler = new Scheduler(workerData)
await scheduler.run(() => work())

The global max is a hard cap across all workers — a worker that finds the global counter at the limit queues its task locally and waits (via Atomics.waitAsync on the shared counter). When another worker releases a slot, Atomics.notify wakes the waiter, which retries dispatch.

Atomics.waitAsync does not keep the Node event loop alive. If a worker is fully idle (running === 0) with tasks queued waiting for capacity, you need something else holding the loop open — usually trivial in real applications (HTTP servers, intervals, etc.), but standalone scripts may need a setInterval(() => {}, …) until their work is done.

UV Thread Pool Scheduling

A practical use case is coordinating access to the libuv thread pool (UV_THREADPOOL_SIZE) across multiple worker threads. For example, several HTTP file-serving workers can share a scheduler so that file-system operations (which consume UV thread pool slots) are prioritized and throttled globally:

// main.js
import { Scheduler } from '@nxtedition/scheduler'
import { Worker } from 'node:worker_threads'

const UV_THREADPOOL_SIZE = parseInt(process.env.UV_THREADPOOL_SIZE || '4', 10)
const sharedState = Scheduler.makeSharedState(UV_THREADPOOL_SIZE)

for (let i = 0; i < 4; i++) {
  new Worker('./http-worker.js', { workerData: { sharedState } })
}
// http-worker.js
import { Scheduler, parsePriority } from '@nxtedition/scheduler'
import { workerData } from 'node:worker_threads'
import fs from 'node:fs/promises'

const scheduler = new Scheduler(workerData.sharedState)

async function handleRequest(req, res) {
  // Derive priority from a request header, e.g. "X-Priority: high"
  const priority = parsePriority(req.headers['x-priority'] || 'normal')

  const data = await scheduler.run(() => fs.readFile(filePath), priority)
  res.end(data)
}

This ensures that high-priority requests get file-system access first, while low-priority background work (thumbnails, transcoding, etc.) yields thread pool capacity without starving entirely — thanks to the built-in starvation prevention.

Monitoring

const { running, pending, queues } = scheduler.stats
  • running — currently executing tasks
  • pending — tasks waiting in queues
  • queues — per-priority queue counts

API

new Scheduler(opts, options?)

  • opts.concurrency — concurrency configuration (default: Infinity). Either a number (the overall max) or an object with max and per-priority caps:

    type ConcurrencyOptions =
      | number
      | {
          max?: number
          highest?: number
          higher?: number
          high?: number
          normal?: number
          low?: number
          lower?: number
          lowest?: number
        }
  • opts may also be a SharedArrayBuffer created by Scheduler.makeSharedState(). In that case, an optional second argument { concurrency } may carry per-priority caps (the buffer's max is always the global ceiling).

scheduler.run(fn, priority?, opaque?): Promise<T>

Execute fn within the scheduler. Returns a promise that resolves with the return value of fn (thenables are assimilated via Promise.resolve). After the scheduler has been disposed, run() rejects with Error('Scheduler is disposed') instead of hanging; with an overall concurrency of 0 (which could never dispatch anything) it rejects with Error('Scheduler concurrency is 0').

scheduler.acquire(fn, priority?, opaque?): boolean

Low-level task acquisition. Returns true if a slot was available and fn ran synchronously, or false if the task was queued (also false if the scheduler has been disposed). The fast path additionally requires that nothing is queued at the same priority, preserving FIFO order within a priority; a fresh task can still start ahead of work queued at a different priority while a freed slot is in flight (a deliberate latency trade-off, bounded to a microtask in shared mode). You must call scheduler.release() when fn completes — except when fn throws synchronously, in which case the scheduler has already released the slot (see the Low-Level API example).

scheduler.tryAcquire(priority?): boolean

Non-blocking. Takes a slot and returns true only if one is free at the given priority and nothing is queued at that priority (mirroring limiter.tryConsume); otherwise returns false without queuing. A successful tryAcquire must be paired with release().

scheduler.release(): void

Signal task completion. Dequeues the next pending task if concurrency allows.

scheduler.concurrency

The overall concurrency ceiling (Infinity when unlimited). In shared mode this is the buffer's global max; a per-instance options.concurrency.max is enforced but not reflected here.

scheduler.stats

{ running, pending, queues, shared } — locally running tasks, locally queued tasks, per-priority queue counts, and (shared mode only) the global { running, waiters } counters. Allocates fresh objects on every read; poll accordingly.

Scheduler.makeSharedState(concurrency): SharedArrayBuffer

Create shared state for cross-worker scheduling. concurrency must be Infinity or a non-negative integer that fits in a signed 32-bit integer (≤ 2,147,483,647) — the counter is stored as Int32.

scheduler[Symbol.dispose]()

Disposes the scheduler: releases any global slots it still holds (shared mode), rejects queued run() promises with Error('Scheduler is disposed'), drops the reference to a pending shared-mode Atomics.waitAsync, and makes all further calls inert (acquire / tryAcquire return false; run rejects). Use when abandoning a shared-mode Scheduler with tasks still queued so the instance and its parked wait can be garbage-collected (it is otherwise kept alive until process exit so its held slots can be released). Works with using. Note that slots held by still-in-flight tasks are returned to the shared pool immediately — until those tasks actually finish, other workers can transiently admit more than max (the limit is soft by design).

parsePriority(value): number

Parse a string or number into a normalized priority value.


Limiter

Limiter is a token-bucket rate limiter that controls how many tokens per second are processed — typically bytes, but any integer unit works. It shares the same priority system as Scheduler.

Previously exported as Throttle. The Throttle alias is retained for backward compatibility but is deprecated — prefer Limiter.

Tokens refill automatically: the constructor starts an internal timer, and refills are also triggered lazily from consume() / tryConsume(). There is no manual refill step. While work is queued the timer holds the event loop open (so a script never exits with queued callbacks silently dropped); an idle limiter does not keep the process alive.

Basic

import { Limiter } from '@nxtedition/scheduler'

// Allow 1 MB/s (tokens are bytes here)
const limiter = new Limiter({ tokensPerSecond: 1_000_000 })

// Consume `chunk.length` tokens at normal priority, running the callback when available
limiter.consume(() => send(chunk), chunk.length)

Streaming

limiter.stream() returns a Node.js Transform stream that enforces backpressure — it won't pass a chunk through until enough tokens are available. Each chunk consumes chunk.length tokens:

import { createReadStream, createWriteStream } from 'node:fs'
import { pipeline } from 'node:stream/promises'

const limiter = new Limiter({ tokensPerSecond: 1_000_000 }) // 1 MB/s

await pipeline(createReadStream('input.mp4'), limiter.stream(), createWriteStream('output.mp4'))

Priority

Both consume() and stream() accept a priority. Higher-priority work drains first when tokens are available:

// Low-priority stream (yields to higher-priority work)
const stream = limiter.stream('low')

Multi-Worker Coordination

Share a token bucket across worker threads using SharedArrayBuffer, exactly like Scheduler:

// Main thread
import { Limiter } from '@nxtedition/scheduler'
import { Worker } from 'node:worker_threads'

const sharedState = Limiter.makeSharedState(1_000_000) // 1 MB/s shared across all workers
new Worker('./worker.js', { workerData: sharedState })

// Worker thread
import { Limiter } from '@nxtedition/scheduler'
import { workerData } from 'node:worker_threads'

const limiter = new Limiter(workerData)
limiter.consume(() => send(chunk), chunk.length)

Low-Level API

const ran = limiter.consume(
  () => {
    sendPacket(data)
  },
  data.byteLength,
  'normal',
)

consume returns true if the callback ran immediately (tokens were available and nothing was queued ahead of it — the call refills the bucket from elapsed time first, so an idle or freshly created limiter answers true when the credit covers the request), or false if it was queued. Note the boundary is not airtight: a false (queued) task can still run synchronously within the same call when the drain the call triggers reaches it immediately.

API

new Limiter(opts)

  • opts.tokensPerSecond — tokens (e.g. bytes) per second. Required; must be a positive integer that fits in a signed 32-bit integer (1 … 2,147,483,647), since tokens are tracked as Int32. 0, negative, non-integer, NaN, Infinity, and values above 2,147,483,647 all throw.
  • opts may also be a SharedArrayBuffer created by Limiter.makeSharedState(tokensPerSecond).

limiter.consume(fn, bytes, priority?, opaque?): boolean

Consume bytes tokens, running fn(opaque) when they are available. Returns true if fn ran immediately, false if it was queued. opaque is passed through to fn on both paths.

bytes must be a non-negative integer no larger than 2,147,483,647 (the signed Int32 max — tokens are tracked as Int32); negative, non-integer, NaN, Infinity, and out-of-range values throw (a negative or wrapping value would otherwise add tokens via Atomics.sub and corrupt the shared bucket). A bytes of 0 consumes no tokens and never waits on the bucket. It runs immediately, returning true, only when nothing is queued at all (pending === 0); otherwise consume returns false, but once it reaches the front of its priority queue it drains regardless of the token balance — even while the bucket is empty or in debt. Queued behind token-starved work at the same priority it waits its turn like any other task (FIFO head-of-line; a different, unblocked priority queue can drain it sooner).

Starvation guard: a queued task larger than the instantaneous token balance could otherwise starve under sustained traffic — smaller tasks would keep spending each refill's credit first. Any queue whose front item makes no progress for ~1 second is force-admitted into token debt; the negative balance is repaid by subsequent refills before anything else drains, so the long-run rate still holds (the trade-off is a transient burst, at most one stalled item per queue per second). The same mechanism bounds the wait for a request larger than tokensPerSecond (the bucket's capacity, which it could never fit): it is admitted once the bucket fills or the ~1s stall guard fires, whichever comes first — i.e. it is throttled, not dropped or stalled forever. In shared mode the guard is per-instance best-effort: other workers may keep consuming the shared bucket.

limiter.tryConsume(bytes): boolean

Non-blocking. Consumes bytes and returns true only if enough tokens are available right now and nothing is queued; otherwise returns false without queuing.

limiter.tokensPerSecond

The configured rate (and bucket capacity).

limiter.stats

{ tokens, pending, queues } — current token balance (negative while in debt), queued tasks, and per-priority queue counts. Allocates fresh objects on every read.

limiter.stream(priority?, options?): Transform

Returns a Transform stream that rate-limits data passing through it. Each chunk consumes chunk.length tokens; a chunk with no non-negative-integer length (e.g. in objectMode) is treated as weightless (0 tokens) — it never waits on tokens, though it still preserves the limiter's queue ordering (if other work is already queued, it drains in turn rather than jumping ahead). options is forwarded to the underlying Transform (e.g. objectMode, highWaterMark). Destroying the stream while a chunk is still queued in the limiter does not push into the dead stream.

Limiter.makeSharedState(tokensPerSecond): SharedArrayBuffer

Create shared state for cross-worker rate limiting.


License

MIT