npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

moroutine

v1.3.0

Published

> Offload functions to worker threads with shared memory primitives for Node.js.

Downloads

1,057

Readme

moroutine

Offload functions to worker threads with shared memory primitives for Node.js.

Installation

npm install moroutine

Requires Node.js v24+.

Quick Start

// main.ts
import { isPrime } from './is-prime.ts';

const result = await isPrime(999_999_937);
console.log(result); // true
// is-prime.ts
import { mo } from 'moroutine';

export const isPrime = mo(import.meta, (n: number): boolean => {
  if (n < 2) return false;
  for (let i = 2; i * i <= n; i++) {
    if (n % i === 0) return false;
  }
  return true;
});

Define a function with mo() in its own module, then import and run it on a worker pool. Moroutine modules must be side-effect free — workers import them to find the registered functions.

Core API

mo(import.meta, fn)

Wraps a function so it runs on a worker thread. The function must be defined at module scope (not dynamically).

// math.ts
import { mo } from 'moroutine';

export const add = mo(import.meta, (a: number, b: number): number => {
  return a + b;
});

workers(size?, opts?)

Creates a pool of worker threads. Returns a Runner that dispatches tasks. Disposable via using or [Symbol.dispose](). Defaults to os.availableParallelism() workers and round-robin scheduling when arguments are omitted.

import { workers } from 'moroutine';
import { add } from './math.ts';

{
  using run = workers(2);

  const result = await run(add(3, 4)); // single task
  const [a, b] = await run([add(1, 2), add(3, 4)]); // batch
}

Graceful Shutdown

Use await using for graceful async shutdown. The pool exposes a signal that fires when disposal begins — thread it into tasks for cooperative cancellation.

{
  await using run = workers(4);

  run(longTask(run.signal)); // task can react to abort
  run(otherTask()); // runs to completion
}
// signal fired, waited for both tasks, then workers terminated

Use using (without await) for immediate termination, same as before.

A shutdownTimeout option force-terminates workers if graceful shutdown takes too long:

{
  await using run = workers(4, { shutdownTimeout: 5000 });
  // ...
}

Load Balancing

The pool uses round-robin scheduling by default. Pass a balance option to change the strategy:

import { workers, leastBusy } from 'moroutine';

{
  using run = workers(4, { balance: leastBusy() });
  // tasks dispatched to whichever worker has the fewest in-flight tasks
}

Built-in balancers:

  • roundRobin() — cycles through workers in order (default)
  • leastBusy() — picks the worker with the lowest active task count

Custom balancers implement the Balancer interface:

import type { Balancer, WorkerHandle, Task } from 'moroutine';

const random: Balancer = {
  select(workers: readonly WorkerHandle[], task: Task) {
    return workers[Math.floor(Math.random() * workers.length)];
  },
};

Each WorkerHandle exposes activeCount (in-flight tasks) and thread (the underlying worker_threads.Worker) for building custom strategies.

isTask(moroutine, task) narrows a task to the descriptor type produced by a specific moroutine — useful inside a balancer to route by task kind or by a key in the args. For example, a worker-affinity balancer can hash a shard key out of the args so that every call for the same key hits the worker that already has its state loaded:

import { isTask, roundRobin } from 'moroutine';
import type { Balancer } from 'moroutine';
import { increment, read } from './counter.ts';

export function keyAffinity(): Balancer {
  const fallback = roundRobin();
  return {
    select(workers, task) {
      let key: string | undefined;
      if (isTask(increment, task)) key = task.args[0];
      else if (isTask(read, task)) key = task.args[0];
      if (key === undefined) return fallback.select(workers, task);
      return workers[hash(key) % workers.length];
    },
  };
}

Inside the isTask branch, task.args is typed as the moroutine's argument tuple (e.g. [key: string, n: number] for increment). See examples/worker-affinity for the full demo, including per-worker state that only stays consistent under affinity routing.

Dedicated Workers

Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function.

const result = await add(3, 4); // runs on a dedicated worker for `add`

Task-Args

Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection.

// db.ts
import { DatabaseSync } from 'node:sqlite';
import { mo } from 'moroutine';

export const openDb = mo(import.meta, (filename: string): DatabaseSync => {
  return new DatabaseSync(filename);
});

export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => {
  db.exec(sql);
});

export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => {
  return db.prepare(sql).all();
});
import { workers } from 'moroutine';
import { openDb, exec, query } from './db.ts';

const db = openDb(':memory:');

{
  using run = workers(1);
  await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`));
  await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`));
  const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }]
}

openDb() returns a Task<DatabaseSync>, and exec()/query() accept it in place of DatabaseSync. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it.

Shared Memory

Descriptors and shared()

Shared-memory types are created with descriptor functions or the shared() allocator.

import { shared, int32, bool, mutex, string, bytes } from 'moroutine';

Primitives

const counter = int32(); // standalone Int32
const flag = bool(); // standalone Bool
const big = int64(); // standalone Int64 (bigint)

Atomics

Atomic variants use Atomics.* for thread-safe operations without a lock.

const counter = int32atomic();
counter.add(1); // atomic increment, returns previous value
counter.load(); // atomic read

Full atomic operations: load, store, add, sub, and, or, xor, exchange, compareExchange.

Int32Atomic additionally exposes futex-style wait / wake:

const slot = int32atomic();

// In one thread — wait until the slot's value differs from 0,
// with an optional timeout.
await slot.waitAsync(0); // 'ok' | 'not-equal' | 'timed-out'
await slot.waitAsync(0, 100 /*ms*/); // with timeout

// In another thread — change the value and wake waiters.
slot.store(1);
slot.notify(); // wakes all waiters on this slot
slot.notify(2); // wakes at most 2

waitAsync returns 'not-equal' synchronously (no microtask hop) if the slot already holds a value different from expected. The underlying Atomics.wait/notify futex is available only on Int32Atomic.

Structs

Plain objects in shared() create structs backed by a single SharedArrayBuffer.

const point = shared({ x: int32, y: int32 });

point.load(); // { x: 0, y: 0 }
point.store({ x: 10, y: 20 });
point.fields.x.store(10); // direct field access

Structs nest:

const rect = shared({
  pos: { x: int32, y: int32 },
  size: { w: int32, h: int32 },
});

Tuples

Arrays in shared() create fixed-length tuples.

const pair = shared([int32, bool]);
pair.load(); // [0, false]
pair.store([42, true]);
pair.elements[0].store(99);

Bytes and Strings

const buf = bytes(32); // fixed 32-byte buffer
buf.store(new Uint8Array(32)); // exact length required
buf.load(); // Readonly<Uint8Array> view
buf.view[0] = 0xff; // direct mutable access

const name = string(64); // UTF-8, max 64 bytes
name.store('hello');
name.load(); // 'hello'

Value Shorthand

Primitive values in schemas infer their type.

shared(0); // Int32 initialized to 0
shared(true); // Bool initialized to true
shared(0n); // Int64 initialized to 0n
shared({ x: 10, y: 20 }); // struct with Int32 fields

Locks

Mutex

const mu = mutex();

using guard = await mu.lock();
// exclusive access
// auto-unlocks when guard is disposed

// or manually:
await mu.lock();
mu.unlock();

// Non-blocking attempt — returns null if held elsewhere.
using guard = mu.tryLock();
if (!guard) return;

RwLock

const rw = rwlock();

using guard = await rw.readLock(); // multiple readers OK
using guard = await rw.writeLock(); // exclusive access

// Non-blocking variants — return null if unavailable.
using r = rw.tryReadLock(); // null if write-locked
using w = rw.tryWriteLock(); // null if any lock is held

Using with Workers

Shared-memory types pass through postMessage automatically. They're reconstructed on the worker side with the same shared backing memory.

// update-position.ts
import { mo } from 'moroutine';
import type { Mutex, SharedStruct, Int32 } from 'moroutine';

type Position = SharedStruct<{ x: Int32; y: Int32 }>;

export const updatePosition = mo(
  import.meta,
  async (mu: Mutex, pos: Position, dx: number, dy: number): Promise<void> => {
    using guard = await mu.lock();
    const current = pos.load();
    pos.store({ x: current.x + dx, y: current.y + dy });
  },
);
// main.ts
import { workers, shared, int32, mutex } from 'moroutine';
import { updatePosition } from './update-position.ts';

const mu = mutex();
const pos = shared({ x: int32, y: int32 });

{
  using run = workers(4);
  await run([updatePosition(mu, pos, 1, 0), updatePosition(mu, pos, 0, 1)]);
}

console.log(pos.load()); // { x: 1, y: 1 }

Streaming

Streaming Moroutines

Wrap an async function* with mo() to create a streaming moroutine. Values are streamed between threads via MessageChannel with atomics-based backpressure — the producer parks when highWaterMark items (default 16) sit in flight and resumes when the consumer drains below the cap.

// count.ts
import { mo } from 'moroutine';

export const countUp = mo(import.meta, async function* (n: number) {
  for (let i = 0; i < n; i++) {
    yield i;
  }
});

Iterate directly (dedicated worker) or dispatch via a pool:

import { workers } from 'moroutine';
import { countUp } from './count.ts';

// Dedicated worker
for await (const n of countUp(5)) {
  console.log(n); // 0, 1, 2, 3, 4
}

// Worker pool
{
  using run = workers(2);
  for await (const n of run(countUp(5))) {
    console.log(n); // 0, 1, 2, 3, 4
  }
}

channel() and Fan-out

When you pass the same AsyncIterable or streaming task argument to multiple tasks, each task gets its own copy of the data. Use channel() to share a single source across multiple workers — each item goes to exactly one consumer (work stealing).

import { workers, channel, assign, mo } from 'moroutine';

const generate = mo(import.meta, async function* (n: number) {
  for (let i = 0; i < n; i++) yield i;
});

const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => {
  const items: number[] = [];
  for await (const n of input) items.push(n);
  return items;
});
const ch = channel(generate(100));

{
  using run = workers();
  const fanout = run.workers.map((w) => {
    return assign(w, process(ch));
  });
  const results = await run(fanout);
  // Items distributed across workers — no duplicates, no gaps
}

Use assign(worker, task) to pin a task to a specific worker. run.workers is a read-only array of worker handles, one per pool worker.

Without channel(), AsyncIterable and streaming task arguments are auto-detected and streamed to a single consumer. channel() is only needed for fan-out.

map() — Bounded Fan-out

Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with inert() so it passes through the stream as-is instead of being auto-awaited.

// main.ts
import { readdir } from 'node:fs/promises';
import { join } from 'node:path';
import { workers, map, inert } from 'moroutine';
import type { Task } from 'moroutine';
import { hashFile, type FileHash } from './hash-file.ts';

{
  using run = workers();
  for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) {
    console.log(`${hash.slice(0, 12)}  ${path}`);
  }
}

async function* walk(dir: string): AsyncGenerator<Task<FileHash>> {
  for (const entry of await readdir(dir, { withFileTypes: true })) {
    const p = join(dir, entry.name);
    if (entry.isDirectory()) {
      yield* walk(p);
    } else {
      yield inert(hashFile(p));
    }
  }
}
// hash-file.ts
import { readFile } from 'node:fs/promises';
import { createHash } from 'node:crypto';
import { mo } from 'moroutine';

export type FileHash = { path: string; hash: string };

export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => {
  const buf = await readFile(path);
  return { path, hash: createHash('sha256').update(buf).digest('hex') };
});

map() accepts a sync iterable, async iterable, or generator of tasks. concurrency caps in-flight dispatches (default 1). Mixed task types unify: map over Task<string> | Task<number> yields string | number. An optional signal aborts iteration — and, since moroutine auto-transfers AbortSignal args, the same signal passed to tasks will also cancel in-flight work.

Pipelines

Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker.

const doubled = double(generate(5));
const squared = square(doubled);
for await (const n of squared) {
  console.log(n);
}

Transfers

Use transfer() for zero-copy movement of ArrayBuffer, TypedArray, MessagePort, or streams.

import { transfer } from 'moroutine';

const buf = new ArrayBuffer(1024);
await run(processData(transfer(buf)));
// buf is now detached (zero-length) — ownership moved to worker

Return values from workers are auto-transferred when possible.

Examples

All examples require Node v24+ and can be run directly, e.g. node examples/primes/main.ts.