npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

wise-workers

v1.1.0

Published

A convenient worker thread pool implementation for Node.js

Downloads

5

Readme

wise-workers test

A worker thread pool for CPU-bound tasks. It requires no configuration and has many powerful features:

  • Generator and AsyncGenerator function support
    • Worker functions can be generator functions or async generator functions, making it easy to stream results back to the main thread. Iteration happens eagerly, to maximize parallelism (i.e., the main thread does not pause the generator function).
  • Functions (callbacks) as arguments
    • Functions can be passed to worker tasks. They become async functions in the worker thread, using MessagePort for communication under the hood.
  • Cancellation support
  • Automatic thread management
    • Crashed threads are automatically respawned, unless they're crashing during startup (to prevent an infinite spawn loop).
  • Zero-copy data transfer
    • Data can be efficiently moved between threads without copying it. This is beneficial to performance when passing large Buffers between threads.

Installation

npm install wise-workers

Requires Node.js v16.x.x or later.

Usage

const ThreadPool = require('wise-workers');

const pool = new ThreadPool({ filename: require.resolve('./worker') });

const result = await pool.call('add', 2, 2); // => 4

worker.js

exports.add = (a, b) => a + b;

Example: Zero-copy

const ThreadPool = require('wise-workers');

const pool = new ThreadPool({ filename: require.resolve('./worker') });

const data = Buffer.alloc(1024 * 1024);

// pool.invoke() allows you to provide more options than pool.call()
const compressedData = await pool.invoke('compress', {
    args: [data],
    transferList: [data.buffer], // Pass the ArrayBuffer in the transferList
});

worker.js

const zlib = require('zlib');
const { move } = require('wise-workers');

exports.compress = (data) => {
    const compressedData = zlib.gzipSync(data);

    // Use move() to include a transferList in the return value.
    return move(compressedData, [compressedData.buffer]);
};

Example: Generator function

When calling a generator function, you will get an async iterable object.

const ThreadPool = require('wise-workers');

const pool = new ThreadPool({ filename: require.resolve('./worker') });

const asyncIterable = await pool.call('readFile', 'data.csv');

for await (const chunk of asyncIterable) {
    console.log(`got chunk of size ${chunk.byteLength} bytes`);
}

worker.js

const fs = require('fs');
const { move } = require('wise-workers');

exports.readFile = function* (filename, chunkSize = 1024 * 16) {
    const fd = fs.openSync(filename);
    try {
        while (true) {
            const buffer = Buffer.alloc(chunkSize);
            const bytesRead = fs.readSync(fd, buffer, 0, chunkSize);
            if (bytesRead > 0) {
                const chunk = buffer.subarray(0, bytesRead);
                // You can move() yielded values too
                yield move(chunk, [chunk.buffer]);
            }
            if (bytesRead < chunkSize) {
                break;
            }
        }
    } finally {
        fs.closeSync(fd);
    }
};

Example: Callback function

You an pass callback functions to the worker, but they must be in the top-level arguments (they can't be nested within some other object). Callback functions can also be async functions.

const ThreadPool = require('wise-workers');

const pool = new ThreadPool({ filename: require.resolve('./worker') });

const allowedList = new Set(getHugeDataset());
const result = await pool.call('search', searchTerm, (value) => {
    return allowedList.has(value);
});

worker.js

exports.search = async (searchTerm, filter) => {
    const matches = [];
    for (const match of searchFor(searchTerm)) {
        if (await filter(match)) {
            matches.push(match);
        }
    }
    return matches;
};

Currently, callback functions do not support "zero-copy" data transfer in their arguments or return values. This restriction may be lifted in the future.

Example: AbortSignal (cancellation)

Calling controller.abort() will forcefully terminate the thread that's assigned to the associated task.

const ThreadPool = require('wise-workers');

const pool = new ThreadPool({ filename: require.resolve('./worker') });

const controller = new AbortController();
setTimeout(() => {
    controller.abort();
}, 1000);

await pool.invoke('infiniteLoop', {
    signal: controller.signal,
});

worker.js

exports.infiniteLoop = () => {
    while (true) {}
};

Forcefulling aborting a thread is not a cheap operation, so it should only be used for exceptional/rare situations. For more common situations where performance is critical, you can use util.transferableAbortSignal() to implement your own co-operative cancellation logic.

API

new ThreadPool(options)

Creates a new thread pool. The following options are supported:

  • filename (string, required)
    • The absolute path to the worker script or module. Both CommonJS and ESM modules are supported.
  • minThreads (number, optional)
    • The minimum number of worker threads to keep in the pool. By default, this is equal to half the number of physical CPUs on the machine.
  • maxThreads (number, optional)
    • The maximum number of worker threads to keep in the pool. By default, this is equal to the number of physical CPUs on the machine.
  • The following options are passed directly to new Worker() under the hood:
    • execArgv
    • argv
    • env
    • workerData
    • resourceLimits
    • trackUnmanagedFds
    • name

ThreadPool is an EventEmitter. It emits the following events:

  • error: This occurs if a worker crashes unexpectedly. If this occurs while a worker is starting up, the ThreadPool will be destroyed.
  • online:n: Where n is an integer, these events occur when the number of online threads changes (i.e., when a new thread comes online or when an existing thread goes offline).

pool.call(methodName, [...args]) -> promise

Invokes a function exported by a worker thread. Even if the worker's function is synchronous, this method always returns a Promise.

The args can contain any value that is supported by the HTML structured clone algorithm. Additionally, functions may be passed within the top-level arguments (i.e., not nested within some other object), in which case they appear as async functions in the worker thread.

If the worker method is a generator or async generator function, the returned promise will be resolved with an async iterable object.

pool.invoke(methodName, [options]) -> promise

This is the same as pool.call(), except it supports more options:

  • args (Array, optional)
    • The arguments to pass to the worker function.
  • transferList (Array, optional)
    • A list of transferable objects within args that should be moved, rather than copied to the worker thread ("zero-copy").
  • signal (AbortSignal, optional)
    • An AbortSignal that, when aborted, will forcefully stop this task. If the signal is aborted after the task completes, nothing happens.

pool.destroy([error]) -> promise

Destroys the thread pool, cancelling any pending tasks and permanently terminating all threads. After being destroyed, the thread pool is no longer usable.

If an error object is provided, all pending tasks will be rejected with it. Otherwise, a default error is used.

The returned promise resolves when all threads have finished shutting down.

ThreadPool properties

  • pool.filename: The filename of the worker script being used.
  • pool.threadCount: The number of threads currently spawned within the pool.
  • pool.onlineThreadCount: The number of threads which are fully online, which means they have completed their initialization/startup and are capable of handling tasks.
  • pool.activeThreadCount: The number of threads which are busy with a pending task.
  • pool.pendingTaskCount: The number of pending tasks yet to be resolved.
  • pool.destroyed: Whether or not the thread pool is destroyed (boolean).

Static properties

  • ThreadPool.PHYSICAL_CORES: The number of physical CPU cores detected on the machine. This number is used to calculate defaults for minThreads and maxThreads when constructing a ThreadPool.

License

MIT