npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

multiprocessing

v1.2.0

Published

Dead simple parallel processing for node

Downloads

474

Readme

node-multiprocessing

Dead simple parallel processing for node

Version 1.0 upgrade notes

Example

const Pool = require('multiprocessing').Pool;

function square(x) {
  return x * x;
}

const pool = new Pool(4);  // spawns 4 child processes to complete your jobs

pool.map([1, 2, 3], square).then(result => console.log(result));

// [1, 4, 9]

Promise + Module worker example

// ./worker.js
const P = require('bluebird');

module.exports = function squareAsync(x) {
  return P.resolve().then(() => x * x);
};
// ./main.js
const Pool = require('multiprocessing').Pool;
(new Pool(4)).map([1, 2, 3], __dirname + '/worker')
  .then(function (res) {
    // [1, 4, 9]
  });

Installation

Via npm:

npm install multiprocessing

Writing a mapper function

Functions passed to the mapper can't reference any variables declared outside of their block scope. This is because they must be stringified in order to be passed to the child processes.

function good(x) {
  return x * x;  // I don't reference any outside variables
}

const two = 2;
function bad(x) {
  return x * two;  // "two" wont be defined after being passed to the child proc
}

API Reference

new Pool([int numWorkers]) -> Pool

Create a new Pool with specified number of worker child processes.

Default number of workers will be the numbers of logical CPUs on the machine.

.map(Array arr, Function|String fnOrModulePath[, int|Object chunksizeOrOptions]) -> Promise

The second argument should either be the mapper function or the absolute path of a module that exports the mapper function.

As the function must be stringified before being passed to the child process, I recommend instead using the module path for functions of non-trivial size. It will be much easier than trying to keep track of what your mapper function references.

Option: chunksize

Chunksize determines the number of array elements to be passed to the work process at once. By default, the chunksize will default to the array length divided by the number of available workers. Setting this to 1 is fine for tasks that are expected to be very large, but smaller tasks will run much faster with a larger chunksize.

Option: onResult

Takes a function that is called with each result as it comes in. Useful for streaming.

NOTE: The onResult is called in whatever order results come back in. This may not be the same order as the input array.

const Pool = require('multiprocessing').Pool;

function square(x) {
  return x * x;
}

const pool = new Pool(4);

result = []
pool.map([1, 2, 3], square, {onResult: val => { result.push(val) }})
  .then(() => console.log(result));
// prints "[4, 9, 1]"
// OR in some other order, depending on how we receive the results!
Option: timeout [experimental]

Approximate maximum processing time to allow for a single item in the array. If more than the alloted time passes, the mapper promise will be rejected with an error saying that the task timed out.

Recommended that you use this only for longer tasks, or as a way to prevent infinite loops. Timeouts below 200ms or so can be unreliable.

const Pool = require('multiprocessing').Pool;

function anInfiniteLoop() {
  while (true) {}
}

const pool = new Pool(4);

pool.map([1, 2, 3, 4, 5], anInfiniteLoop, {timeout: 1000})
  .catch(err => console.log(err));

// "Task timed out!"
.apply(any arg, Function|String fnOrModulePath[, Object options]) -> Promise

A convenience method for calling map with a single argument. Useful for when you want to use the pool as a queue that processes jobs in a first-come, first-served manner.

Uses same options as map, but chunksize will be ignored.

.close()

Terminates worker processes after waiting for outstanding jobs. Calling methods of the pool after this will result in an error.

.terminate()

Like #close, but will immediately terminate worker processes. All outstanding jobs at the time this method is called will have their promises rejected.

new PriorityQueue(numWorkers) -> PriorityQueue

A max priority queue built off of a pool of worker processes. Items with a higher priority will be processed first.

.push(any arg, number priority, Function|String fnOrModulePath[, Object options]) -> Promise

Pushes an item onto the queue and returns a promise that will be resolved with the result or rejected if any errors were raised.

const PriorityQueue = require('multiprocessing').PriorityQueue;

function square(x) {
  return x * x;
}

// one worker guarantees ordering -- multiple workers will only start tasks in order
const pq = new PriorityQueue(1);

pq.push(25, 1, square).then(console.log),
pq.push(100, 3, square).then(console.log),
pq.push(50, 2, square).then(console.log),
pq.push(10, 4, square).then(console.log)

// >>> 625   <- low priority, but gets kicked off first
// >>> 100   <- highest priority
// >>> 10000
// >>> 2500

Uses same options as Pool.map, but chunksize will be ignored.

License

MIT