npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@mattstrom/async-primitives

v0.3.1

Published

Collection of async primitives

Readme

@mattstrom/async-primitives

A collection of async primitives for TypeScript: semaphores, mutexes, queues, resource pools, cancellable tasks, retry with backoff, rate limiters, circuit breakers, and bounded-concurrency mapping.

Installation

npm install @mattstrom/async-primitives

Primitives


Semaphore

Limits the number of concurrent operations to a fixed number of permits.

import { Semaphore } from '@mattstrom/async-primitives';

const sem = new Semaphore(3); // allow 3 concurrent operations

async function fetchWithLimit(url: string) {
	await sem.acquire();
	try {
		return await fetch(url);
	} finally {
		sem.release();
	}
}

Semaphore implements Disposable — callers waiting in the queue are dropped when disposed.


Mutex

A mutual-exclusion lock. Only one caller holds the lock at a time; others wait in FIFO order.

import { Mutex } from '@mattstrom/async-primitives';

const mutex = new Mutex();

// Manual acquire/release
const unlock = await mutex.acquire();
try {
	// critical section
} finally {
	unlock();
}

// Convenience wrapper
await mutex.withLock(async () => {
	// critical section
});

// Non-blocking attempt
const unlock = mutex.tryAcquire();
if (unlock) {
	try {
		/* ... */
	} finally {
		unlock();
	}
}

Pass a timeout (ms) to the constructor to detect deadlocks — waiting callers are rejected with "Deadlock timeout" if the lock is held longer than the threshold.


AsyncQueue

A FIFO queue that supports backpressure and async iteration. Producers that enqueue into a full queue are suspended until a consumer dequeues.

import { AsyncQueue } from '@mattstrom/async-primitives';

const queue = new AsyncQueue<number>(10); // capacity of 10

// Producer
async function produce() {
	for (let i = 0; i < 100; i++) {
		await queue.enqueue(i); // suspends when queue is full
	}
	queue.close();
}

// Consumer
async function consume() {
	for await (const item of queue) {
		console.log(item);
	}
}

Pipeline

Connects a source AsyncQueue to a sink AsyncQueue through an async worker function, with controlled concurrency.

import { AsyncQueue, pipeline } from '@mattstrom/async-primitives';

const source = new AsyncQueue<string>();
const sink = new AsyncQueue<number>();

await pipeline(source, async (item) => item.length, sink, 4 /* concurrency */);
// sink is automatically closed when the source is drained

Pool

A generic resource pool with lazy creation, FIFO waiting, and automatic cleanup.

import { Pool } from '@mattstrom/async-primitives';

const pool = new Pool({
	factory: () => createDatabaseConnection(),
	destroy: (conn) => conn.close(),
	maxSize: 10,
});

// Manual acquire/release
const conn = await pool.acquire();
try {
	await conn.query('SELECT 1');
} finally {
	pool.release(conn);
}

// Convenience wrapper (recommended)
const result = await pool.withResource(async (conn) => {
	return conn.query('SELECT 1');
});

// Inspect pool state
const { size, available, pending } = pool.stats();

// Tear down the pool
await pool.destroy(); // or: await using pool = new Pool(...)

Cancellation

CancellableTask

Wraps an AbortSignal-aware function with cancellation support.

import { CancellableTask } from '@mattstrom/async-primitives';

const task = new CancellableTask(async (signal) => {
	const res = await fetch('/api/data', { signal });
	return res.json();
});

const promise = task.start();
task.cancel(); // aborts the fetch

TaskGroup

Manages a collection of cancellable tasks.

import { TaskGroup } from '@mattstrom/async-primitives';

const group = new TaskGroup();

group.add(async (signal) => pollEndpoint(signal));
group.add(async (signal) => pollEndpoint(signal));

// Cancel everything
group.cancelAll();

// Wait for all tasks to settle
await group.waitForAll();

// Race: returns the first success, cancels the rest
const result = await group.race([(signal) => tryServer('us-east', signal), (signal) => tryServer('eu-west', signal)]);

withTimeout

Runs a task and cancels it if it doesn't complete within a time limit.

import { withTimeout } from '@mattstrom/async-primitives';

const result = await withTimeout(async (signal) => {
	return fetch('/slow-api', { signal }).then((r) => r.json());
}, 5000);

retry

Retries an async function with exponential backoff and optional jitter.

import { retry } from '@mattstrom/async-primitives';

const data = await retry(() => fetch('/api/resource').then((r) => r.json()), {
	maxAttempts: 5,
	baseDelayMs: 100, // default
	maxDelayMs: 10000, // default
	jitter: true, // default
	shouldRetry: (error, attempt) => !(error instanceof AuthError),
});

| Option | Default | Description | | ------------- | ----------- | -------------------------------- | | maxAttempts | Infinity | Maximum number of attempts | | baseDelayMs | 100 | Initial delay in milliseconds | | maxDelayMs | 10000 | Maximum delay cap | | jitter | true | Randomize delay by ±50% | | shouldRetry | always true | Predicate to stop retrying early |


CircuitBreaker

Prevents cascading failures by tracking successes and failures and tripping open when a threshold is exceeded. Transitions automatically from openhalf-open after a reset timeout, then back to closed on the next success.

States:

  • closed — requests pass through normally.
  • open — requests fail immediately with "Circuit open".
  • half-open — one request is allowed through to probe recovery; a success closes the circuit, a failure re-opens it.
import { CircuitBreaker } from '@mattstrom/async-primitives';

const breaker = new CircuitBreaker({
	failureThreshold: 5, // open after 5 consecutive failures
	resetTimeoutMs: 10000, // try again after 10s
});

try {
	const result = await breaker.execute(() => fetch('/api/data').then((r) => r.json()));
} catch (err) {
	if (err.message === 'Circuit open') {
		// fast-fail — upstream is unhealthy
	}
}

// Inspect state
breaker.getState(); // 'closed' | 'open' | 'half-open'
breaker.getStats(); // { successes: number, failures: number }

pMap / pMapSemaphore

Apply an async function to every item in an array with bounded concurrency. Both functions preserve result order and reject immediately on the first error.

pMap — uses an internal queue, dispatching the next item as soon as a slot opens:

import { pMap } from '@mattstrom/async-primitives';

const results = await pMap(
	urls,
	(url) => fetch(url).then((r) => r.json()),
	4, // at most 4 in-flight at once
);

pMapSemaphore — starts all tasks simultaneously but gates them through a Semaphore, so all promises are created eagerly. Prefer pMap for large arrays where eager allocation is wasteful.

import { pMapSemaphore } from '@mattstrom/async-primitives';

const results = await pMapSemaphore(urls, (url) => fetch(url).then((r) => r.json()), 4);

TokenBucket

Smooths bursts by refilling tokens at a fixed rate. Callers that exceed the rate wait until tokens are available.

import { TokenBucket } from '@mattstrom/async-primitives';

const bucket = new TokenBucket({ capacity: 10, refillRate: 5 }); // 5 tokens/sec

await bucket.acquire(); // wait for 1 token
await bucket.acquire(3); // wait for 3 tokens

// Non-blocking
if (bucket.tryAcquire()) {
	// proceed immediately
}

LeakyBucket

Enforces a strict output rate by queuing requests and draining them one at a time at a fixed rate. Unlike TokenBucket, bursts are not allowed — every request waits its turn regardless of prior idle time.

import { LeakyBucket } from '@mattstrom/async-primitives';

const bucket = new LeakyBucket({ capacity: 10, drainRate: 5 }); // 5 req/sec, up to 10 queued

try {
	await bucket.acquire(); // wait in line until drained
} catch {
	// thrown immediately when the queue is full — request is dropped
}

bucket.pending(); // number of requests currently queued
bucket.isFull(); // true when the queue has reached capacity

SlidingWindowLimiter

Enforces a maximum number of requests within a rolling time window.

import { SlidingWindowLimiter } from '@mattstrom/async-primitives';

const controller = new AbortController();
const limiter = new SlidingWindowLimiter({
	maxRequests: 100,
	windowMs: 60_000, // 100 requests per minute
	signal: controller.signal,
});

await limiter.acquire(); // waits if the window is full

// Non-blocking
if (limiter.tryAcquire()) {
	// proceed immediately
}

// Dispose when done
controller.abort();

rateLimitedMap

Applies an async function to each item in an array, throttled by a TokenBucket or SlidingWindowLimiter.

import { TokenBucket, rateLimitedMap } from '@mattstrom/async-primitives';

const bucket = new TokenBucket({ capacity: 10, refillRate: 10 });

const results = await rateLimitedMap(urls, (url) => fetch(url).then((r) => r.json()), bucket);

Utilities

createDeferred

Creates a Promise with externalized resolve and reject — useful for bridging callback-based APIs or coordinating across async boundaries.

import { createDeferred } from '@mattstrom/async-primitives';

const deferred = createDeferred<string>();

someEmitter.once('done', (value) => deferred.resolve(value));
someEmitter.once('error', (err) => deferred.reject(err));

const result = await deferred.promise;

delay

A cancellable promise-based sleep.

import { delay } from '@mattstrom/async-primitives';

await delay(1000); // sleep 1s

const controller = new AbortController();
await delay(5000, controller.signal); // abortable

Development

vp install   # install dependencies
vp test      # run tests
vp pack      # build the library

License

MIT