hurried
v2.3.0
Published
Modern, type-safe parallel execution for Node.js — workers, pools and parallel iterators with first-class TypeScript support.
Maintainers
Readme
hurried
Parallel execution for Node.js, done right.
Workers · Pools · Parallel iterators · A typed event bus across the worker boundary — all behind an API that fits on a sticky note.
Documentation · Quick start · Bus showcase · Patterns · API · Examples
30 seconds to your first parallel task
import { Thread } from 'hurried';
const thread = Thread.fromFunction((n: number) => n * 2);
await thread.run(21); // → 42, on a worker thread
await thread.terminate();That's it. Three lines, fully typed, CPU work off the event loop, no separate worker file.
Need progress events? Add a typed Events map and use the bus:
import { Thread } from 'hurried';
type Events = { progress: { done: number; total: number } };
const thread = Thread.fromFunction<Events, number, number>((bus, n) => {
for (let i = 0; i < n; i++) {
if (i % 1_000_000 === 0) bus.emit('progress', { done: i, total: n });
}
return n;
});
thread.on('progress', (p) => console.log(`${p.done}/${p.total}`));
await thread.run(50_000_000);
await thread.terminate();Live, end-to-end-typed progress events across the worker boundary. That's the whole demo.
✨ Why hurried?
CPU-bound JavaScript blocks the event loop. The standard Node fix — worker_threads — is powerful but unfriendly: a separate file, untyped postMessage, no pools, no progress events, no cancellation. hurried is a small library that wraps that primitive in an API real codebases actually want to use.
💡 What makes hurried different? Every primitive —
Thread,Pool,parallel,mapParallel— speaks the same tiny API:run / on / emit / terminate. Learn it once, scale from a single worker to a 16-CPU pool without rewriting your code.
📦 Install
npm install hurried
pnpm add hurried
yarn add hurried
bun add hurriedRequires Node.js 18.17+. Ships ESM + CJS +
.d.ts— no build step on your side.
⚡ Quick start
1) Run a single inline task
import { Thread } from 'hurried';
const t = Thread.fromFunction((n: number) => n * 2);
await t.run(21); // → 42
await t.terminate();2) Spin up a pool
import { Pool } from 'hurried';
const pool = new Pool({
size: 4,
task: (n: number) => {
let total = 0;
for (let i = 0; i < n; i++) total += i;
return total;
},
});
await pool.map([1e6, 2e6, 3e6, 4e6]);
await pool.terminate();3) Skip lifecycle entirely
import { mapParallel } from 'hurried';
const squares = await mapParallel(
[1, 2, 3, 4, 5, 6, 7, 8],
(n) => n * n,
{ concurrency: 4 },
);4) Stream results as they finish
import { mapParallelStream } from 'hurried';
// Source is pulled lazily — it can be huge, or infinite. Memory stays flat.
for await (const parsed of mapParallelStream(readLines('huge.log'), parseLine, {
concurrency: 8,
ordered: false, // emit as-completed; drop it for input order
})) {
save(parsed); // each result, the moment a worker produces it
}A true parallel iterator: bounded concurrency, backpressure, and clean teardown on early break. Already have a Pool? Stream through it with pool.stream(items).
5) Retry flaky work, with backoff
// One option, on any primitive. `retry: 3` = up to 4 attempts.
await pool.run(input, {
retry: { retries: 3, minDelay: 100, factor: 2, jitter: true },
});
await mapParallel(urls, fetchAndParse, { retry: 3, concurrency: 8 });Exponential backoff, jitter, a custom shouldRetry/onRetry, and full AbortSignal support — cancellation interrupts even a pending backoff. Abort and teardown errors are never retried.
6) Survive partial failure — allSettled for workers
import { mapParallelSettled } from 'hurried';
// One bad item no longer rejects the whole batch.
const results = await mapParallelSettled(urls, fetchAndParse, { concurrency: 8 });
for (const r of results) {
if (r.status === 'fulfilled') save(r.value);
else log.warn(r.reason);
}Every map/stream helper has a settled twin — mapParallelSettled, mapParallelStreamSettled, pool.mapSettled, pool.streamSettled — each yielding a typed SettledResult per item. Task failures are recorded; cancellation still stops everything.
🚌 The Bus — typed pub/sub across threads
The feature you've always wished
worker_threadshad: send strongly-typed events both ways with one shared contract.
type Events = {
progress: { done: number; total: number };
log: string;
cancel: void; // void events → no payload arg
};
const thread = Thread.fromFunction<Events, number, number>((bus, n) => {
bus.on('cancel', () => { /* gracefully stop */ });
for (let i = 0; i < n; i++) {
bus.emit('progress', { done: i, total: n });
}
bus.emit('log', 'done');
return n;
});
thread.on('progress', (p) => render(p.done / p.total));
thread.on('log', (msg) => console.log(msg));
thread.emit('cancel');One event map, two endpoints. Both thread.on(...) on the main side and bus.on(...) inside the worker are typed against the same Events. Rename a field; both sides break at compile time.
Five methods, total surface. emit / on / once / off / waitFor — and on() returns its own unsubscribe function, so cleanup is one variable.
// worker.ts
import { defineWorker, workerBus } from 'hurried';
export type Events = { progress: { done: number; total: number } };
const bus = workerBus<Events>();
export default defineWorker({
process(items: string[]) {
items.forEach((item, i) => {
bus.emit('progress', { done: i + 1, total: items.length });
});
return items.length;
},
});// main.ts
import { Thread } from 'hurried';
import type { Events } from './worker.ts';
const thread = Thread.fromFile<Events>(new URL('./worker.js', import.meta.url));
thread.on('progress', (p) => console.log(p));
const count = await thread.run('process', ['a', 'b', 'c']);const pool = new Pool<Events, number, number>({
size: 4,
task: (bus, n) => {
bus.emit('progress', { done: n, total: n });
return n;
},
});
pool.on('progress', (p) => console.log(p)); // events from ANY worker
pool.emit('cancel'); // broadcasts to ALL workers🎯 Pattern showcase
Five real TypeScript pain points, each painful with raw worker_threads and a one-liner with hurried.
1. Typed RPC across the worker boundary
// worker.ts
import { defineWorker } from 'hurried';
export const handlers = defineWorker({
add: (a: number, b: number) => a + b,
greet: (name: string) => `Hello, ${name}!`,
hash: async (input: string) =>
(await import('node:crypto')).createHash('sha256').update(input).digest('hex'),
});
export type Handlers = typeof handlers;// main.ts
import { Thread } from 'hurried';
import type { Handlers } from './worker.ts';
const thread = Thread.fromFile(new URL('./worker.js', import.meta.url));
await thread.run('add', 2, 5); // Promise<number>
await thread.run('greet', 'world'); // Promise<string>Worker file = single source of truth. Export its handler-map type; import it back in main.
2. Streaming progress from CPU-bound work
type Events = { progress: { done: number; total: number; pctText: string } };
const thread = Thread.fromFunction<Events, number, number>((bus, total) => {
for (let i = 0; i < total; i++) {
if (i % Math.floor(total / 20) === 0) {
bus.emit('progress', {
done: i, total, pctText: `${((i / total) * 100).toFixed(0)}%`,
});
}
}
return total;
});
thread.on('progress', (p) => process.stdout.write(`\r${p.pctText}`));
await thread.run(50_000_000);Live progress bar, fully typed, no polling.
3. Cooperative cancellation through the bus
type Events = { cancel: void; cancelled: { atIteration: number } };
const thread = Thread.fromFunction<Events, number, 'completed' | 'cancelled'>(
async (bus, n) => {
let stop = false;
bus.on('cancel', () => { stop = true; });
const chunk = 5_000_000;
for (let i = 0; i < n; i += chunk) {
if (stop) { bus.emit('cancelled', { atIteration: i }); return 'cancelled'; }
for (let j = 0; j < chunk; j++) Math.sqrt(i + j);
await new Promise((r) => setImmediate(r)); // drain pending messages
}
return 'completed';
},
);
setTimeout(() => thread.emit('cancel'), 200);💬 Pro tip: Node workers won't process incoming messages while inside a tight sync loop.
await setImmediateis the cooperative yield that lets cancellation actually arrive.
4. Aggregated events from many workers
type Events = {
progress: { workerLabel: string; done: number; total: number };
done: { workerLabel: string };
};
const pool = new Pool<Events, { id: number; size: number }, number>({
size: 4,
task: (bus, { id, size }) => {
const workerLabel = `worker-${id}`;
for (let i = 0; i < size; i++) {
if (i % Math.floor(size / 4) === 0)
bus.emit('progress', { workerLabel, done: i, total: size });
}
bus.emit('done', { workerLabel });
return size;
},
});
pool.on('progress', (p) => console.log(`${p.workerLabel}: ${p.done}/${p.total}`));
pool.on('done', (d) => console.log(`${d.workerLabel} ✓`));5. Worker as a finite state machine
type WorkerState =
| { phase: 'init' }
| { phase: 'downloading'; url: string }
| { phase: 'processing'; chunk: number }
| { phase: 'done'; bytes: number }
| { phase: 'error'; message: string };
type Events = { state: WorkerState };
const thread = Thread.fromFunction<Events, string, number>(async (bus, url) => {
bus.emit('state', { phase: 'init' });
bus.emit('state', { phase: 'downloading', url });
bus.emit('state', { phase: 'processing', chunk: 1 });
bus.emit('state', { phase: 'done', bytes: 1234 });
return 1234;
});
thread.on('state', (s) => {
switch (s.phase) {
case 'downloading': console.log(`↓ ${s.url}`); break; // s narrows here
case 'processing': console.log(`⚙ chunk ${s.chunk}`); break;
case 'done': console.log(`✓ ${s.bytes} bytes`); break;
case 'error': console.error(s.message); break;
}
});TypeScript narrows the union per branch — no as any, no manual type guards.
📚 API reference
Full interactive API reference lives at the docs site. Below is the cheat sheet.
Thread
| Method | Description |
| --- | --- |
| Thread.fromFunction(task, options?) | Spawn from inline function. Declare two params (bus, arg) to receive a typed Bus. |
| Thread.fromFile(filename, options?) | Spawn from a module file (pair with defineWorker / workerBus). |
| Thread.fromScript(script, options?) | Spawn from a raw code string. |
| thread.run(arg, options?) | Invoke the default inline task. |
| thread.run(name, ...args) | Invoke a named handler. |
| thread.on / once / off / emit / bus() | Typed event API. |
| thread.terminate() | Stop the worker; pending calls reject with TerminatedError. |
Pool
| Method | Description |
| --- | --- |
| new Pool({ task, size?, maxQueue?, timeout?, … }) | Fixed-size pool of workers running the same task. |
| pool.run / pool.map | Run one or many. Inputs preserve order. |
| pool.mapSettled / pool.streamSettled | Fault-tolerant map/stream — a SettledResult per item; failures don't sink the batch. |
| pool.stream(items, options?) | Stream results as an async iterator — lazy, bounded, ordered or as-completed. |
| pool.on / emit / bus() | Aggregated event bus across workers. |
| pool.size / idleCount / queueLength | Inspection. |
| pool.terminate() | Reject queued tasks, tear down workers. |
Helpers
| Function | Description |
| --- | --- |
| parallel(tasks, options?) | Run an array of inline functions concurrently. |
| mapParallel(items, task, options?) | Parallel-map an iterable through one task using a pool. |
| mapParallelStream(items, task, options?) | Streaming mapParallel — async iterator of results, bounded memory over huge/infinite sources. |
| mapParallelSettled / mapParallelStreamSettled | Fault-tolerant batch / stream — SettledResult per item (allSettled for workers). |
| defineWorker(handlers) | Register a typed handler map inside a worker file. |
| workerBus<Events>() | Get the typed Bus<Events> inside a worker. |
Options
interface RunOptions {
timeout?: number; // per-call timeout (ms), applied per attempt
signal?: AbortSignal; // cancellation (interrupts pending backoff too)
transferList?: TransferListItem[];// zero-copy transfers
retry?: number | RetryOptions; // retry on failure, with optional backoff
}ThreadOptions extends Node's WorkerOptions (env, execArgv, stdin/out/err, workerData, resourceLimits, name) and adds a timeout default.
Errors
All errors extend HurriedError: TaskError (handler threw — original in .cause), TaskTimeoutError, TaskAbortedError, TerminatedError (you tore the worker down), WorkerExitedError (the worker crashed on its own).
🧪 Testing
- Vitest with v8 coverage. CI enforces ≥ 50% thresholds; the suite ships at ~95% statements.
- 156 tests across 14 files: bus, runtime, protocol, thread, pool, parallel & streaming helpers, retry/backoff, settled batches, legacy handler API.
- Matrix CI: Node 18 / 20 / 22 / 24 × Ubuntu / macOS / Windows × lint + typecheck + format + test + coverage + build + examples.
npm test # run once
npm run test:watch # watch mode
npm run test:coverage # full report🤝 Contributing
npm install
npm run lint # eslint flat config
npm run typecheck # tsc --noEmit
npm test # vitest
npm run build # tsup → dist/ (ESM + CJS + .d.ts)
npm run docs:dev # local docs preview
npm run docs:build # static site → website/buildPRs welcome. Code is TypeScript-first with no shortcuts — every public surface is fully typed, and the test suite enforces both behavior and coverage.
🗺 Migration from v1
- const { Thread, makeExecutable } = require('hurried');
+ import { Thread, defineWorker, workerBus } from 'hurried';
- module.exports.slow = slow;
- makeExecutable(slow, 'slow');
+ export default defineWorker({ slow });- ESM-first build; CJS still works via the
requireexport. - New
Thread.fromFunctionreplaces ad-hocfromScriptfor inline tasks — fully type-safe. - New
Pool/parallel/mapParallelcover the common parallel-map case. - New
Bus<Events>— typed pub/sub across the worker boundary. AbortSignaland per-calltimeoutare first-class everywhere.- Legacy
makeExecutablestill works, so v1 worker files don't need to change.
📜 License
Built with ❤️ for the JS/TS ecosystem · ⭐ Star us on GitHub if hurried makes your code faster.
