@rubyleung/p-flow
v0.1.0
Published
Tiny, zero-dependency concurrency orchestrator for Promises: concurrency limit, rate limiting, retry with backoff & jitter, timeout, priority queue, and adaptive backpressure.
Maintainers
Readme
p-flow
Tiny, zero-dependency concurrency orchestrator for Promises. Concurrency limit · rate limiting · retry with backoff & jitter · timeout · priority queue · adaptive backpressure.
Why p-flow?
p-limit, p-queue, and p-retry are great — but you usually need several of them at once, wired together by hand. p-flow bundles concurrency, rate limiting, retries, timeouts, and a priority queue behind one intuitive API — and adds something none of them have:
🌟 Adaptive backpressure — when the remote starts pushing back (HTTP
429/503orRetry-After),p-flowautomatically dials concurrency down, then ramps it back up once things recover (AIMD, the algorithm behind TCP congestion control). Push maximum throughput without getting banned.
- 🪶 Zero dependencies, tiny, fully typed.
- 🌍 Runs anywhere: Node, Bun, Deno, edge, browsers.
- 🧩 One object instead of four libraries.
- 🛡️ Honors
Retry-After, supportsAbortSignal, per-task overrides.
npm install @rubyleung/p-flowQuick start
import { PFlow } from '@rubyleung/p-flow'
const flow = new PFlow({
concurrency: 8, // at most 8 at a time
interval: 1000, // ...and
intervalCap: 20, // at most 20 started per second
timeout: 10_000, // 10s per task
retry: { retries: 3, jitter: true, respectRetryAfter: true },
adaptive: true, // auto-throttle on 429/503
})
// run a single task
const me = await flow.run(({ signal }) => fetch('/api/me', { signal }).then((r) => r.json()))
// map over many — order preserved, limits enforced.
// throw on bad responses so retry / respectRetryAfter kick in
// (fetch never rejects on 4xx/5xx, so retries only fire if you throw)
const pages = await flow.map(urls, async (url, _i, { signal }) => {
const res = await fetch(url, { signal })
if (!res.ok) throw Object.assign(new Error(`HTTP ${res.status}`), { status: res.status, headers: res.headers })
return res.text()
})The problem it solves
Naive: Promise.all(urls.map(fetch)) → 1000 sockets at once → 💥 banned / OOM
Serial: for (const u of urls) await… → safe but painfully slow
p-flow: flow.map(urls, fetch) → capped, rate-limited, retried,
self-throttling on 429 → ✅ fast & safeRecipes
Polite, ban-resistant scraping
const flow = new PFlow({
concurrency: 10,
interval: 1000, intervalCap: 30,
retry: { retries: 5, backoff: 'exponential', jitter: 'full', respectRetryAfter: true },
adaptive: { min: 2 }, // never drop below 2, ramp back to 10
})
flow.on('throttle', ({ concurrency }) => console.warn('backing off →', concurrency))
flow.on('retry', ({ attempt, delay }) => console.log(`retry #${attempt} in ${delay}ms`))
const results = await flow.map(urls, async (url, _i, { signal }) => {
const res = await fetch(url, { signal })
// throw on push-back so retry + respectRetryAfter (reads res.headers) engage
if (res.status === 429 || res.status === 503 || !res.ok) {
throw Object.assign(new Error(`HTTP ${res.status}`), { status: res.status, headers: res.headers })
}
return res.json()
})Prioritize some work
flow.run(() => criticalJob(), { priority: 10 }) // jumps the queue
flow.run(() => backgroundJob(), { priority: 0 })Wait for everything to finish
for (const job of jobs) flow.run(job) // fire-and-forget
await flow.onIdle() // resolves when fully drainedGotchas
- Retries fire only on
throw.fetchresolves aResponseeven for429/503, so it won't retry unless youthrowon bad responses (see the examples).adaptivebackpressure does react to a returned429/503, butretry/respectRetryAfterneed an error. timeout&signalare cooperative. On timeout the task'ssignalis aborted andrun()rejects, but JS can't force-cancel a Promise — a task that ignoressignalkeeps running in the background, so actually executing bodies can briefly exceedconcurrency. Always forwardctx.signaltofetch/IO.clear()rejects pending tasks withAbortError(so awaiters aren't stuck). Catch them, or they surface as unhandled rejections.- Two kinds of
attempt.ctx.attemptis 0-based (0 = first try). Theretryevent'sattemptandretryOn(error, attempt)are 1-based (1 = first retry). - Invalid config fails fast.
concurrency/intervalCapmust be a finite number>= 1(orInfinity); otherwise the constructor throwsRangeErrorrather than silently deadlocking.
API
new PFlow(options)
| option | type | default | description |
| --- | --- | --- | --- |
| concurrency | number | Infinity | Max tasks running at once. |
| interval | number | – | Rate-limit window (ms). Pair with intervalCap. |
| intervalCap | number | Infinity | Max tasks started per interval. |
| timeout | number | 0 | Per-task timeout (ms). Aborts the task's signal. |
| retry | number \| RetryOptions | 0 | Retry policy (number = retries). |
| adaptive | boolean \| AdaptiveOptions | false | AIMD auto-throttling on push-back. |
| autoStart | boolean | true | Start processing immediately. |
| throttleOn | (info) => boolean | – | Custom throttle detection. |
RetryOptions: retries, backoff ('exponential' \| 'linear' \| 'fixed'), factor, minDelay, maxDelay, jitter (true \| 'full' \| 'equal'), respectRetryAfter, retryOn(error, attempt).
AdaptiveOptions: min, max, decreaseFactor (default 0.5), increaseStep (default 1), successesToIncrease (default 10).
Methods
run(fn, options?)→Promise<T>— enqueue a task.fnreceives{ signal, attempt }. Options:priority,signal,timeout,retry.map(items, fn, options?)→Promise<R[]>— map with limits, order preserved.pause()/start()/clear()/onIdle()- getters:
size,pending,concurrency,isPaused
Events
flow.on(event, cb) (returns an unsubscribe fn): retry {error, attempt, delay} · throttle {concurrency} · concurrency (n) · idle · active · resolve · reject.
vs. p-limit / p-queue
| | p-limit | p-queue | p-flow | | --- | :---: | :---: | :---: | | Concurrency limit | ✅ | ✅ | ✅ | | Rate limiting | ❌ | ✅ | ✅ | | Priority queue | ❌ | ✅ | ✅ | | Retry + backoff + jitter | ❌ | ❌ | ✅ | | Per-task timeout | ❌ | ✅ | ✅ | | Adaptive backpressure | ❌ | ❌ | ✅ | | Dependencies | 1 | 1 | 0 |
中文
p-limit、p-queue、p-retry 都很好用——但你往往需要同时用上好几个,再手动拼起来。p-flow 把并发控制、限流、重试、超时、优先级队列收进一个直观的 API,还多了一个它们都没有的本事:
🌟 自适应背压——当对方开始限速(HTTP
429/503或Retry-After),p-flow会自动下调并发,稳定后再逐步回升(AIMD,即 TCP 拥塞控制用的算法)。既打满吞吐,又不被封。
- 🪶 零依赖、极小、完整 TypeScript 类型。
- 🌍 到处能跑:Node、Bun、Deno、边缘、浏览器。
- 🧩 一个对象顶四个库。
- 🛡️ 尊重
Retry-After、支持AbortSignal、可按任务覆盖配置。
npm install @rubyleung/p-flowimport { PFlow } from '@rubyleung/p-flow'
const flow = new PFlow({
concurrency: 8, // 最多同时 8 个
interval: 1000, // 每秒
intervalCap: 20, // 最多启动 20 个
timeout: 10_000, // 每个任务 10 秒超时
retry: { retries: 3, jitter: true, respectRetryAfter: true },
adaptive: true, // 遇到 429/503 自动减速
})
const data = await flow.run(() => fetch(url).then((r) => r.json())) // 单个
const all = await flow.map(urls, (u) => fetch(u)) // 批量,保序它解决什么问题
一股脑: Promise.all(urls.map(fetch)) → 瞬间 1000 个连接 → 💥 被封 / 内存爆
傻等: for (const u of urls) await… → 安全但慢到怀疑人生
p-flow: flow.map(urls, fetch) → 限并发 + 限速 + 重试 + 遇 429 自动减速 → ✅ 又快又稳API、事件、配置项同上表。完整示例见 examples/ 与上方英文 Recipes。
License
MIT © Ruby Leung
