npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@rubyleung/p-flow

v0.1.0

Published

Tiny, zero-dependency concurrency orchestrator for Promises: concurrency limit, rate limiting, retry with backoff & jitter, timeout, priority queue, and adaptive backpressure.

Readme

p-flow

Tiny, zero-dependency concurrency orchestrator for Promises. Concurrency limit · rate limiting · retry with backoff & jitter · timeout · priority queue · adaptive backpressure.

npm bundle size CI types license

English · 中文


Why p-flow?

p-limit, p-queue, and p-retry are great — but you usually need several of them at once, wired together by hand. p-flow bundles concurrency, rate limiting, retries, timeouts, and a priority queue behind one intuitive API — and adds something none of them have:

🌟 Adaptive backpressure — when the remote starts pushing back (HTTP 429/503 or Retry-After), p-flow automatically dials concurrency down, then ramps it back up once things recover (AIMD, the algorithm behind TCP congestion control). Push maximum throughput without getting banned.

  • 🪶 Zero dependencies, tiny, fully typed.
  • 🌍 Runs anywhere: Node, Bun, Deno, edge, browsers.
  • 🧩 One object instead of four libraries.
  • 🛡️ Honors Retry-After, supports AbortSignal, per-task overrides.
npm install @rubyleung/p-flow

Quick start

import { PFlow } from '@rubyleung/p-flow'

const flow = new PFlow({
  concurrency: 8,             // at most 8 at a time
  interval: 1000,             // ...and
  intervalCap: 20,            // at most 20 started per second
  timeout: 10_000,            // 10s per task
  retry: { retries: 3, jitter: true, respectRetryAfter: true },
  adaptive: true,             // auto-throttle on 429/503
})

// run a single task
const me = await flow.run(({ signal }) => fetch('/api/me', { signal }).then((r) => r.json()))

// map over many — order preserved, limits enforced.
// throw on bad responses so retry / respectRetryAfter kick in
// (fetch never rejects on 4xx/5xx, so retries only fire if you throw)
const pages = await flow.map(urls, async (url, _i, { signal }) => {
  const res = await fetch(url, { signal })
  if (!res.ok) throw Object.assign(new Error(`HTTP ${res.status}`), { status: res.status, headers: res.headers })
  return res.text()
})

The problem it solves

Naive:   Promise.all(urls.map(fetch))   →  1000 sockets at once  →  💥 banned / OOM
Serial:  for (const u of urls) await…    →  safe but painfully slow
p-flow:  flow.map(urls, fetch)           →  capped, rate-limited, retried,
                                            self-throttling on 429  →  ✅ fast & safe

Recipes

Polite, ban-resistant scraping

const flow = new PFlow({
  concurrency: 10,
  interval: 1000, intervalCap: 30,
  retry: { retries: 5, backoff: 'exponential', jitter: 'full', respectRetryAfter: true },
  adaptive: { min: 2 },              // never drop below 2, ramp back to 10
})

flow.on('throttle', ({ concurrency }) => console.warn('backing off →', concurrency))
flow.on('retry', ({ attempt, delay }) => console.log(`retry #${attempt} in ${delay}ms`))

const results = await flow.map(urls, async (url, _i, { signal }) => {
  const res = await fetch(url, { signal })
  // throw on push-back so retry + respectRetryAfter (reads res.headers) engage
  if (res.status === 429 || res.status === 503 || !res.ok) {
    throw Object.assign(new Error(`HTTP ${res.status}`), { status: res.status, headers: res.headers })
  }
  return res.json()
})

Prioritize some work

flow.run(() => criticalJob(), { priority: 10 })   // jumps the queue
flow.run(() => backgroundJob(), { priority: 0 })

Wait for everything to finish

for (const job of jobs) flow.run(job)   // fire-and-forget
await flow.onIdle()                     // resolves when fully drained

Gotchas

  • Retries fire only on throw. fetch resolves a Response even for 429/503, so it won't retry unless you throw on bad responses (see the examples). adaptive backpressure does react to a returned 429/503, but retry/respectRetryAfter need an error.
  • timeout & signal are cooperative. On timeout the task's signal is aborted and run() rejects, but JS can't force-cancel a Promise — a task that ignores signal keeps running in the background, so actually executing bodies can briefly exceed concurrency. Always forward ctx.signal to fetch/IO.
  • clear() rejects pending tasks with AbortError (so awaiters aren't stuck). Catch them, or they surface as unhandled rejections.
  • Two kinds of attempt. ctx.attempt is 0-based (0 = first try). The retry event's attempt and retryOn(error, attempt) are 1-based (1 = first retry).
  • Invalid config fails fast. concurrency/intervalCap must be a finite number >= 1 (or Infinity); otherwise the constructor throws RangeError rather than silently deadlocking.

API

new PFlow(options)

| option | type | default | description | | --- | --- | --- | --- | | concurrency | number | Infinity | Max tasks running at once. | | interval | number | – | Rate-limit window (ms). Pair with intervalCap. | | intervalCap | number | Infinity | Max tasks started per interval. | | timeout | number | 0 | Per-task timeout (ms). Aborts the task's signal. | | retry | number \| RetryOptions | 0 | Retry policy (number = retries). | | adaptive | boolean \| AdaptiveOptions | false | AIMD auto-throttling on push-back. | | autoStart | boolean | true | Start processing immediately. | | throttleOn | (info) => boolean | – | Custom throttle detection. |

RetryOptions: retries, backoff ('exponential' \| 'linear' \| 'fixed'), factor, minDelay, maxDelay, jitter (true \| 'full' \| 'equal'), respectRetryAfter, retryOn(error, attempt).

AdaptiveOptions: min, max, decreaseFactor (default 0.5), increaseStep (default 1), successesToIncrease (default 10).

Methods

  • run(fn, options?)Promise<T> — enqueue a task. fn receives { signal, attempt }. Options: priority, signal, timeout, retry.
  • map(items, fn, options?)Promise<R[]> — map with limits, order preserved.
  • pause() / start() / clear() / onIdle()
  • getters: size, pending, concurrency, isPaused

Events

flow.on(event, cb) (returns an unsubscribe fn): retry {error, attempt, delay} · throttle {concurrency} · concurrency (n) · idle · active · resolve · reject.

vs. p-limit / p-queue

| | p-limit | p-queue | p-flow | | --- | :---: | :---: | :---: | | Concurrency limit | ✅ | ✅ | ✅ | | Rate limiting | ❌ | ✅ | ✅ | | Priority queue | ❌ | ✅ | ✅ | | Retry + backoff + jitter | ❌ | ❌ | ✅ | | Per-task timeout | ❌ | ✅ | ✅ | | Adaptive backpressure | ❌ | ❌ | ✅ | | Dependencies | 1 | 1 | 0 |


中文

p-limitp-queuep-retry 都很好用——但你往往需要同时用上好几个,再手动拼起来。p-flow 把并发控制、限流、重试、超时、优先级队列收进一个直观的 API,还多了一个它们都没有的本事:

🌟 自适应背压——当对方开始限速(HTTP 429/503Retry-After),p-flow自动下调并发,稳定后再逐步回升(AIMD,即 TCP 拥塞控制用的算法)。既打满吞吐,又不被封。

  • 🪶 零依赖、极小、完整 TypeScript 类型。
  • 🌍 到处能跑:Node、Bun、Deno、边缘、浏览器
  • 🧩 一个对象顶四个库。
  • 🛡️ 尊重 Retry-After、支持 AbortSignal、可按任务覆盖配置。
npm install @rubyleung/p-flow
import { PFlow } from '@rubyleung/p-flow'

const flow = new PFlow({
  concurrency: 8,            // 最多同时 8 个
  interval: 1000,            // 每秒
  intervalCap: 20,           // 最多启动 20 个
  timeout: 10_000,           // 每个任务 10 秒超时
  retry: { retries: 3, jitter: true, respectRetryAfter: true },
  adaptive: true,            // 遇到 429/503 自动减速
})

const data = await flow.run(() => fetch(url).then((r) => r.json()))   // 单个
const all  = await flow.map(urls, (u) => fetch(u))                     // 批量,保序

它解决什么问题

一股脑:  Promise.all(urls.map(fetch))  →  瞬间 1000 个连接  →  💥 被封 / 内存爆
傻等:    for (const u of urls) await…  →  安全但慢到怀疑人生
p-flow:  flow.map(urls, fetch)         →  限并发 + 限速 + 重试 + 遇 429 自动减速  →  ✅ 又快又稳

API、事件、配置项同上表。完整示例见 examples/ 与上方英文 Recipes。

License

MIT © Ruby Leung