npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

concurrex

v1.0.1

Published

Fully adaptive task executor. Self-regulates concurrency limits, sheds load under pressure, and schedules fairly across lanes — no manual tuning, no environment-specific configuration.

Downloads

222

Readme

Concurrex

Fully adaptive task executor for Node.js and browsers. Self-regulates concurrency limits, sheds load under pressure, and schedules fairly across lanes — no manual tuning, no environment-specific configuration. Automatically derives detection thresholds, filter parameters, and convergence rates.

Zero dependencies. ESM-only. Works in Node.js (>=18), Deno, Bun, and browsers.

Install

npm install concurrex

Quick Start

import { Executor } from 'concurrex';

const executor = new Executor();
executor.registerPool("http", { baselineConcurrency: 50 });
executor.start();

await executor.run("http", () => handleRequest(), { lane: "tenant-123" });

executor.stop(); // rejects queued tasks, cleans up timers

Use Cases

Concurrex wraps any async operation — if it can saturate, overload, or fail under pressure, Concurrex can manage it.

  • HTTP servers — admission control for Express, Fastify, Koa. Shed excess requests with instant 503s instead of letting the entire server slow down.
  • Database access — cap concurrent queries to Postgres, MySQL, Redis. Prevent connection pool exhaustion and protect downstream capacity.
  • External API calls — rate-limit outbound requests to third-party services. Per-lane fairness ensures one tenant cannot monopolize API quota.
  • Message processing — throttle Kafka, RabbitMQ, or SQS consumers. Back-pressure signal (isOverloaded) tells the consumer when to pause fetching.
  • Background jobs — manage parallel execution of cron jobs, data pipelines, or batch imports without overwhelming shared resources.
  • AI/LLM inference — control concurrent requests to model endpoints with automatic latency-based backoff.
  • File I/O and uploads — limit parallel disk or network operations to avoid thrashing.
  • Multi-tenant systems — lane-based fairness prevents noisy neighbors. Each tenant gets equal access regardless of request volume.

Multiple pools let you isolate different workloads (e.g. user-facing commands vs background sync) with independent limits and detection sensitivity.

How It Works

Five mechanisms cooperate:

  1. ProDel (Probabilistic Delay Load-shedding) — sojourn-based AQM. Drop probability P = 1 - threshold/sojourn. Adaptive LIFO/FIFO admission (FIFO when healthy, LIFO when dropping to protect fresh work).
  2. Probabilistic early shedding — rejects new arrivals at enqueue time with P = dropRate/(dropRate+completionRate) * shrinkage when ProDel is dropping and pool is at capacity. Instant rejections.
  3. WoLF-EWMA throughput regulator — latency detection via operational Little's Law (W = integral N(t)dt / completions), log-transformed, smoothed by a WoLF Kalman filter (IMQ-weighted outlier rejection), with a z-test on the filtered trend. Concurrency adjusted via a convergent step formula with bisection damping for O(log L) equilibrium convergence.
  4. Per-lane error shedding — each lane tracks its own error rate EWMA. High-error lanes probabilistically reject new requests without affecting pool-wide concurrency.
  5. Fair lane scheduling — round-robin across lanes (per-tenant, per-user, or shared). Prevents noisy neighbors from monopolizing capacity.

Single-Constant Design

All statistical parameters derive from one constant: zScoreThreshold (default: 2). This determines HALF_LIFE (EWMA decay), WoLF Kalman Q/R, IMQ outlier threshold, Bayesian shrinkage strength, warm-up period, evaluation cadence, and detection sensitivity. Configurable globally and per-pool.

// Global default
const executor = new Executor({ zScoreThreshold: 2 });

// Per-pool override — tighter detection for user-facing, looser for background
executor.registerPool("commands", { zScoreThreshold: 1.5 });
executor.registerPool("background", { zScoreThreshold: 3 });
executor.registerPool("queries"); // inherits global z=2

Pool Configuration

executor.registerPool("commands", {
    delayThreshold: 100,       // Max acceptable sojourn time (ms) before ProDel reacts
    controlWindow: 100,        // Time window for ProDel grace period and throughput measurement
    baselineConcurrency: 50,   // Starting concurrency limit; gravity target during recovery
    minimumConcurrency: 5,     // Floor — limit never decreases below this
    maximumConcurrency: 200,   // Ceiling — regulator never increases above this
    zScoreThreshold: 1.5,      // Override detection sensitivity for this pool
});

| Parameter | Default | Description | |---|---|---| | delayThreshold | 100ms | Sojourn time above which ProDel considers the queue unhealthy | | controlWindow | 100ms | Time window for ProDel grace period and throughput measurement | | baselineConcurrency | 100 | Starting limit; gravity pulls back to this during recovery | | minimumConcurrency | 1 | Absolute floor for the concurrency limit | | maximumConcurrency | Infinity | Absolute ceiling for the concurrency limit | | zScoreThreshold | (inherit) | Detection sensitivity; overrides the executor-level default |

Lanes

// Per-user lane — fairness across users
await executor.run("commands", handleCommand, { lane: "tenant-123" });

// No lane — each call gets a unique transient lane (maximum fairness)
await executor.run("commands", handleCommand);

// Shared lane — all requests compete in one queue
await executor.run("http", handleRequest, { lane: "shared" });

Debouncing

import { Executor, DebounceMode } from 'concurrex';

// Only executes once even if called 3 times concurrently
const p1 = executor.runDebounced("queries", "user-123", () => fetchUser("123"));
const p2 = executor.runDebounced("queries", "user-123", () => fetchUser("123"));
const p3 = executor.runDebounced("queries", "user-123", () => fetchUser("123"));
// p1, p2, p3 all resolve to the same result

// BeforeResult mode — deduplicate until the task completes
executor.runDebounced("queries", "user-123", () => fetchUser("123"), {
    mode: DebounceMode.BeforeResult
});

Two modes:

  • DebounceMode.BeforeExecution (default): Deduplicate until the task starts running.
  • DebounceMode.BeforeResult: Deduplicate until the task completes.

Back-Pressure and Inspection

executor.isOverloaded("commands");         // true when in DROPPING state
executor.isThroughputDegraded("commands"); // latency degradation detected
executor.getInFlight("commands");          // current in-flight count
executor.getQueueLength("commands");       // current queue depth
executor.getConcurrencyLimit("commands");  // current regulated limit
executor.getRegulatorState("commands");    // full filter state snapshot

isOverloaded returns true only during confirmed sustained overload (dropping state). Use this to pause upstream work fetching.

getRegulatorState returns a RegulatorState with the filter internals: logW, logWBar, zScore, degrading, se, dLogWBarEwma, dLogWBarVariance, inFlightEwma, completionRateEwma, dropRateEwma, errorRateEwma, regulationPhase, regulationDepth, elapsedWindows, alpha.

Error Handling

import { Executor, ResourceExhaustedError, ConcurrexError } from 'concurrex';

try {
    await executor.run("commands", () => handleCommand());
} catch (err) {
    if (err instanceof ResourceExhaustedError) {
        // Task rejected — overloaded, early shed, or per-lane shed
        return res.status(503).send("Service busy");
    }
    throw err; // re-throw application errors
}

// Or catch all concurrex errors
try { ... } catch (err) {
    if (err instanceof ConcurrexError) { /* any concurrex error */ }
}

Error classes:

  • ConcurrexError — base class for all concurrex errors. Use for catch-all.
  • ResourceExhaustedError — task rejected due to overload (ProDel drop, early shed, or per-lane shed).
  • ExecutorNotRunningErrorrun() called after stop().
  • ArgumentError — invalid configuration (duplicate pool, bad parameters).

Logger

Defaults to console. Pass any object with info, warn, error, and debug methods (pino, winston, etc.).

const executor = new Executor(); // uses console
const executor2 = new Executor({ logger: myPinoLogger });

API

class Executor {
    constructor(options?: { logger?: Logger; zScoreThreshold?: number });

    // Lifecycle
    start(): void;
    stop(): void;

    // Configuration
    registerPool(name: string, options?: PoolOptions): void;

    // Task execution
    run<T>(pool: string, task: () => T, options?: TaskRunOptions): Promise<T>;
    runDebounced<T>(pool: string, key: string, task: () => Promise<T> | T,
                    options?: TaskRunDebouncedOptions): Promise<T>;

    // Inspection
    isOverloaded(pool: string): boolean;
    isThroughputDegraded(pool: string): boolean;
    getInFlight(pool: string): number;
    getQueueLength(pool: string): number;
    getConcurrencyLimit(pool: string): number;
    getRegulatorState(pool: string): RegulatorState;

    // Derived constants (read-only, executor-level defaults)
    readonly zScoreThreshold: number;
    readonly halfLife: number;
}

Theory

See THEORY.md for formal analysis with 16 theorems and proofs covering convergence guarantees, stability bounds, and fairness properties.

License

MIT