moroutine
v1.3.0
Published
> Offload functions to worker threads with shared memory primitives for Node.js.
Downloads
1,057
Readme
moroutine
Offload functions to worker threads with shared memory primitives for Node.js.
Installation
npm install moroutineRequires Node.js v24+.
Quick Start
// main.ts
import { isPrime } from './is-prime.ts';
const result = await isPrime(999_999_937);
console.log(result); // true// is-prime.ts
import { mo } from 'moroutine';
export const isPrime = mo(import.meta, (n: number): boolean => {
if (n < 2) return false;
for (let i = 2; i * i <= n; i++) {
if (n % i === 0) return false;
}
return true;
});Define a function with mo() in its own module, then import and run it on a worker pool. Moroutine modules must be side-effect free — workers import them to find the registered functions.
Core API
mo(import.meta, fn)
Wraps a function so it runs on a worker thread. The function must be defined at module scope (not dynamically).
// math.ts
import { mo } from 'moroutine';
export const add = mo(import.meta, (a: number, b: number): number => {
return a + b;
});workers(size?, opts?)
Creates a pool of worker threads. Returns a Runner that dispatches tasks. Disposable via using or [Symbol.dispose](). Defaults to os.availableParallelism() workers and round-robin scheduling when arguments are omitted.
import { workers } from 'moroutine';
import { add } from './math.ts';
{
using run = workers(2);
const result = await run(add(3, 4)); // single task
const [a, b] = await run([add(1, 2), add(3, 4)]); // batch
}Graceful Shutdown
Use await using for graceful async shutdown. The pool exposes a signal that fires when disposal begins — thread it into tasks for cooperative cancellation.
{
await using run = workers(4);
run(longTask(run.signal)); // task can react to abort
run(otherTask()); // runs to completion
}
// signal fired, waited for both tasks, then workers terminatedUse using (without await) for immediate termination, same as before.
A shutdownTimeout option force-terminates workers if graceful shutdown takes too long:
{
await using run = workers(4, { shutdownTimeout: 5000 });
// ...
}Load Balancing
The pool uses round-robin scheduling by default. Pass a balance option to change the strategy:
import { workers, leastBusy } from 'moroutine';
{
using run = workers(4, { balance: leastBusy() });
// tasks dispatched to whichever worker has the fewest in-flight tasks
}Built-in balancers:
roundRobin()— cycles through workers in order (default)leastBusy()— picks the worker with the lowest active task count
Custom balancers implement the Balancer interface:
import type { Balancer, WorkerHandle, Task } from 'moroutine';
const random: Balancer = {
select(workers: readonly WorkerHandle[], task: Task) {
return workers[Math.floor(Math.random() * workers.length)];
},
};Each WorkerHandle exposes activeCount (in-flight tasks) and thread (the underlying worker_threads.Worker) for building custom strategies.
isTask(moroutine, task) narrows a task to the descriptor type produced by a specific moroutine — useful inside a balancer to route by task kind or by a key in the args. For example, a worker-affinity balancer can hash a shard key out of the args so that every call for the same key hits the worker that already has its state loaded:
import { isTask, roundRobin } from 'moroutine';
import type { Balancer } from 'moroutine';
import { increment, read } from './counter.ts';
export function keyAffinity(): Balancer {
const fallback = roundRobin();
return {
select(workers, task) {
let key: string | undefined;
if (isTask(increment, task)) key = task.args[0];
else if (isTask(read, task)) key = task.args[0];
if (key === undefined) return fallback.select(workers, task);
return workers[hash(key) % workers.length];
},
};
}Inside the isTask branch, task.args is typed as the moroutine's argument tuple (e.g. [key: string, n: number] for increment). See examples/worker-affinity for the full demo, including per-worker state that only stays consistent under affinity routing.
Dedicated Workers
Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function.
const result = await add(3, 4); // runs on a dedicated worker for `add`Task-Args
Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection.
// db.ts
import { DatabaseSync } from 'node:sqlite';
import { mo } from 'moroutine';
export const openDb = mo(import.meta, (filename: string): DatabaseSync => {
return new DatabaseSync(filename);
});
export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => {
db.exec(sql);
});
export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => {
return db.prepare(sql).all();
});import { workers } from 'moroutine';
import { openDb, exec, query } from './db.ts';
const db = openDb(':memory:');
{
using run = workers(1);
await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`));
await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`));
const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }]
}openDb() returns a Task<DatabaseSync>, and exec()/query() accept it in place of DatabaseSync. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it.
Shared Memory
Descriptors and shared()
Shared-memory types are created with descriptor functions or the shared() allocator.
import { shared, int32, bool, mutex, string, bytes } from 'moroutine';Primitives
const counter = int32(); // standalone Int32
const flag = bool(); // standalone Bool
const big = int64(); // standalone Int64 (bigint)Atomics
Atomic variants use Atomics.* for thread-safe operations without a lock.
const counter = int32atomic();
counter.add(1); // atomic increment, returns previous value
counter.load(); // atomic readFull atomic operations: load, store, add, sub, and, or, xor, exchange, compareExchange.
Int32Atomic additionally exposes futex-style wait / wake:
const slot = int32atomic();
// In one thread — wait until the slot's value differs from 0,
// with an optional timeout.
await slot.waitAsync(0); // 'ok' | 'not-equal' | 'timed-out'
await slot.waitAsync(0, 100 /*ms*/); // with timeout
// In another thread — change the value and wake waiters.
slot.store(1);
slot.notify(); // wakes all waiters on this slot
slot.notify(2); // wakes at most 2waitAsync returns 'not-equal' synchronously (no microtask hop) if the slot already holds a value different from expected. The underlying Atomics.wait/notify futex is available only on Int32Atomic.
Structs
Plain objects in shared() create structs backed by a single SharedArrayBuffer.
const point = shared({ x: int32, y: int32 });
point.load(); // { x: 0, y: 0 }
point.store({ x: 10, y: 20 });
point.fields.x.store(10); // direct field accessStructs nest:
const rect = shared({
pos: { x: int32, y: int32 },
size: { w: int32, h: int32 },
});Tuples
Arrays in shared() create fixed-length tuples.
const pair = shared([int32, bool]);
pair.load(); // [0, false]
pair.store([42, true]);
pair.elements[0].store(99);Bytes and Strings
const buf = bytes(32); // fixed 32-byte buffer
buf.store(new Uint8Array(32)); // exact length required
buf.load(); // Readonly<Uint8Array> view
buf.view[0] = 0xff; // direct mutable access
const name = string(64); // UTF-8, max 64 bytes
name.store('hello');
name.load(); // 'hello'Value Shorthand
Primitive values in schemas infer their type.
shared(0); // Int32 initialized to 0
shared(true); // Bool initialized to true
shared(0n); // Int64 initialized to 0n
shared({ x: 10, y: 20 }); // struct with Int32 fieldsLocks
Mutex
const mu = mutex();
using guard = await mu.lock();
// exclusive access
// auto-unlocks when guard is disposed
// or manually:
await mu.lock();
mu.unlock();
// Non-blocking attempt — returns null if held elsewhere.
using guard = mu.tryLock();
if (!guard) return;RwLock
const rw = rwlock();
using guard = await rw.readLock(); // multiple readers OK
using guard = await rw.writeLock(); // exclusive access
// Non-blocking variants — return null if unavailable.
using r = rw.tryReadLock(); // null if write-locked
using w = rw.tryWriteLock(); // null if any lock is heldUsing with Workers
Shared-memory types pass through postMessage automatically. They're reconstructed on the worker side with the same shared backing memory.
// update-position.ts
import { mo } from 'moroutine';
import type { Mutex, SharedStruct, Int32 } from 'moroutine';
type Position = SharedStruct<{ x: Int32; y: Int32 }>;
export const updatePosition = mo(
import.meta,
async (mu: Mutex, pos: Position, dx: number, dy: number): Promise<void> => {
using guard = await mu.lock();
const current = pos.load();
pos.store({ x: current.x + dx, y: current.y + dy });
},
);// main.ts
import { workers, shared, int32, mutex } from 'moroutine';
import { updatePosition } from './update-position.ts';
const mu = mutex();
const pos = shared({ x: int32, y: int32 });
{
using run = workers(4);
await run([updatePosition(mu, pos, 1, 0), updatePosition(mu, pos, 0, 1)]);
}
console.log(pos.load()); // { x: 1, y: 1 }Streaming
Streaming Moroutines
Wrap an async function* with mo() to create a streaming moroutine. Values are streamed between threads via MessageChannel with atomics-based backpressure — the producer parks when highWaterMark items (default 16) sit in flight and resumes when the consumer drains below the cap.
// count.ts
import { mo } from 'moroutine';
export const countUp = mo(import.meta, async function* (n: number) {
for (let i = 0; i < n; i++) {
yield i;
}
});Iterate directly (dedicated worker) or dispatch via a pool:
import { workers } from 'moroutine';
import { countUp } from './count.ts';
// Dedicated worker
for await (const n of countUp(5)) {
console.log(n); // 0, 1, 2, 3, 4
}
// Worker pool
{
using run = workers(2);
for await (const n of run(countUp(5))) {
console.log(n); // 0, 1, 2, 3, 4
}
}channel() and Fan-out
When you pass the same AsyncIterable or streaming task argument to multiple tasks, each task gets its own copy of the data. Use channel() to share a single source across multiple workers — each item goes to exactly one consumer (work stealing).
import { workers, channel, assign, mo } from 'moroutine';
const generate = mo(import.meta, async function* (n: number) {
for (let i = 0; i < n; i++) yield i;
});
const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => {
const items: number[] = [];
for await (const n of input) items.push(n);
return items;
});const ch = channel(generate(100));
{
using run = workers();
const fanout = run.workers.map((w) => {
return assign(w, process(ch));
});
const results = await run(fanout);
// Items distributed across workers — no duplicates, no gaps
}Use assign(worker, task) to pin a task to a specific worker. run.workers is a read-only array of worker handles, one per pool worker.
Without channel(), AsyncIterable and streaming task arguments are auto-detected and streamed to a single consumer. channel() is only needed for fan-out.
map() — Bounded Fan-out
Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with inert() so it passes through the stream as-is instead of being auto-awaited.
// main.ts
import { readdir } from 'node:fs/promises';
import { join } from 'node:path';
import { workers, map, inert } from 'moroutine';
import type { Task } from 'moroutine';
import { hashFile, type FileHash } from './hash-file.ts';
{
using run = workers();
for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) {
console.log(`${hash.slice(0, 12)} ${path}`);
}
}
async function* walk(dir: string): AsyncGenerator<Task<FileHash>> {
for (const entry of await readdir(dir, { withFileTypes: true })) {
const p = join(dir, entry.name);
if (entry.isDirectory()) {
yield* walk(p);
} else {
yield inert(hashFile(p));
}
}
}// hash-file.ts
import { readFile } from 'node:fs/promises';
import { createHash } from 'node:crypto';
import { mo } from 'moroutine';
export type FileHash = { path: string; hash: string };
export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => {
const buf = await readFile(path);
return { path, hash: createHash('sha256').update(buf).digest('hex') };
});map() accepts a sync iterable, async iterable, or generator of tasks. concurrency caps in-flight dispatches (default 1). Mixed task types unify: map over Task<string> | Task<number> yields string | number. An optional signal aborts iteration — and, since moroutine auto-transfers AbortSignal args, the same signal passed to tasks will also cancel in-flight work.
Pipelines
Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker.
const doubled = double(generate(5));
const squared = square(doubled);
for await (const n of squared) {
console.log(n);
}Transfers
Use transfer() for zero-copy movement of ArrayBuffer, TypedArray, MessagePort, or streams.
import { transfer } from 'moroutine';
const buf = new ArrayBuffer(1024);
await run(processData(transfer(buf)));
// buf is now detached (zero-length) — ownership moved to workerReturn values from workers are auto-transferred when possible.
Examples
All examples require Node v24+ and can be run directly, e.g. node examples/primes/main.ts.
examples/primes-- CPU-bound prime checking on a dedicated workerexamples/non-blocking-- main thread stays responsive during heavy computationexamples/parallel-batch-- sequential vs parallel batch processingexamples/atomics-- shared atomic counter across workersexamples/shared-state-- mutex-protected shared structexamples/multi-module-- moroutines from multiple modules on one workerexamples/transfer-- zero-copy buffer transfer to and from a workerexamples/sqlite-- shared SQLite database on a worker via task-arg cachingexamples/pipeline-- streaming pipeline across dedicated workersexamples/channel-fanout-- fan-out a channel to multiple workers via work stealingexamples/bounded-map-- recursive tree walk hashing files withmap()examples/load-balancing-- round-robin vs least-busy with variable-cost tasksexamples/worker-affinity-- custom balancer usingisTask()to route tasks by keyexamples/benchmark-- roundtrip channel throughput with 1–N workers
