npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

thread-factory

v2.0.1

Published

Framework to manage the worker/manager relationship to maximise threads and performance with zero dependancies.

Downloads

22

Readme

WORK IN PROGRESS

thread-factory

Zero dependancy framework to manage the worker/manager relationship to maximise threads and performance.

Install

npm i -S thread-factory

What is it?

Thread factory is a module which utilises all cores on your machines rather than just the 1 which by default Node uses. It uses a manager worker relationship where a worker advertises itself as available to work and the manager holds a list of jobs which the workers need to carry out.

Wait, Node only uses 1 of my Cores? But I have 8

Modern processors have swapped clock speed for cores. In simple terms this is swapping serialisation for parallelisation meaning we can do more things slower then one thing really fast. This is over simplified description but the important thing to remember is that your Node programme will only make use of one core. If you have 8 cores you are are only making use 12.5% of your CPU.

Why not just use pm2?

PM2 has the ability to automatically run your Node programme on each Core. Each of these processes has no idea what the other process is working on. They can't communicate with each other. This might be what you want with a web server but if you are working with a dataset you need this workload to be centrally managed.

Why not use pm2 with RabbitMQ or SQS?

These are great production solutions for continuous and evergreen workloads. But there is a high level of entry and setup for this. Thread factory is ideal for work loads where you do not want to spin up multiple machines, deal with external queues or worry about managing your own processes and threads.

OK, so how does it work?

When you start your Node application, the manager will start working and look at the jobs it has been asked to hand out. This list is an array of instructions which the manager managers the state of. The manager then creates child processes to the number of cores which the machine includes. Each worker will make itself know to the manager that is it ready to work. The manager distributes jobs to each worker and the workers communicate with the manager to say when it has completed or whether it encountered a problem.

Workers cannot communicate with other workers, just its manager and can either write data to a file for pass data back to the manager.

Why would I use something like this?

I created this because I needed to look through 1000's of large files on AWS S3. The solution at the time was to use Hadoop or Athena, both of which have a level of setup that is over kill some some projects. These tools feel like they were created for data scientist or analysts who use CSV and python. JSON and JS are second class citizens in this world and this module is to help with that.

Example Scenario

I have a month of log data stored in S3. The data is formatted as ld-json (JSON which is on a single line and delimetered by \n). Each file contains a million rows totalling 500mb per file which is GZIP'd down to 30mb. I want to find all errors which happened over a time period with error code 505.

Include the modules we are going to use in our example

const ThreadFactory = require('thread-factory');
const AWS = require('aws-sdk');
const zlib = require('zlib');
const fs = require('fs');

The manger asks AWS S3 for all keys that match the criteria of the prefix - in this case all keys for November. It maps the returned object to only data we care about and passes it to the promise resolver.

const manager = () => (
  new Promise((resolve, reject) => {
    const params = {
      Bucket: 'name-of-log-bucket',
      Prefix: '2017-11-',
    };
    s3.listObjectsV2(params, (err, res) => {
      if (err) {
        reject(err);
        return;
      }
      const keys = res.Contents.map(row => row.Key);
      resolve(keys);
    });
  })
);

Once the managers promise resolves, the workers startup and notify the manager they are ready for work. The manager passes an element from the keys array to each of the workers.

const worker = (key, resume) => (
  new Promise((resolve, reject) => {
    const params = {
      Bucket: ‘name-of-log-bucket’, 
      Key: key,
    };
    
    const zipper = zlib.Gunzip();
    const rl = readline.createInterface({
      input: s3.getObject(params).createReadStream().pipe(zipper),
    });

    rl.on('line', (data) => {
      const json = JSON.parse(data);
      if (json.error === 505) {
        fs.appendFileSync(`output-${resume.id}.ldjson`);
      }
    });
    rl.on('close', () => resolve());
    rl.on('error', e => reject(e));
  })
);

Each of these workers runs on its own core, writes its output to a new file and when it finishes, notifies the worker it is ready for the next key in the array.


// init Thread Factory with the manager and worker functions
const factory = new ThreadFactory({
  manager,
  worker,
});

factory.on('finished', () => {
  console.log('finished');
});

factory.on('error', (e) => {
  console.log(e);
})

factory.start();

API

ThreadFactory takes 1 argument in form of an object of functions

const ThreadFactory = require('thread-factory');
const spec = {
  manager, //<Function> The function is run when ThreadFactory starts up, returns a Promise with an array. Required.
  managerFinish, //<Function> The function which runs once all items in the manager array have completed. Returns a Promise. Optional.
  workerStartup, //<Function> If your worker needs to do any one of processes such as start a socket or connect to a db. Returns a Promise. Optional.
  workerProcess, //<Function> The function that runs every time the manager passes an instruction. Returns a Promise. Required.
  workerFinish, //<Function> When the manager has no more work for the worker, this cleanup function is fired. Optional.
};

const factory = new ThreadFactory(spec);

// fired each time a worker finishes a job
factory.on('completed', (d) => {
  console.log(d);
});

// fired when all of the managers tasks have been completed
factory.on('finished', (d) => {
  console.log('finished');
});

// fired on error passing the Error Object
factory.on('error', (e) => {
  console.log(e);
})

// starts the manager
factory.start();