npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

forq

v0.0.7

Published

manage forked processes using async.queue

Downloads

8

Readme

Forq

Manage process forks using a task queue

Build Status

Branch | Build Status | Version ------- | ------------ | ---- master | build status | npm version development | build status | 0.0.7

Installation

npm install forq

Usage

Require the 'forq' module at the top of your file:

var Forq = require('forq');

Task

Set up a node module to be your task worker For example, './slow_printer' is a node module that would console log after 500ms and look like this:

// slow_printer.js

setTimeout(function(){
  console.log('printing slowly');
}, 500)

###Pool Setup an array of tasks that reference one or more node module:

// make tasks
var tasks = [];

tasks.push({
  path: './slow_printer',
  // you can specify arguments as an array here
  args: [ '-f', 'yo' ],
  description: 'task #1'
});

tasks.push({
  path: './fast_printer',
  // you can specify arguments as an array here
  args: [ '-f', 'blah'],
  description: 'task #2'
});

Initialization

Initialize your new fork queue with the array of tasks

// initialize new task queue
var queue = new Forq({
  todo: tasks,
  // set your desired concurrency
  concurrency: 4
});

Start processing the queue

Start all the tasks in the array passed into your fork queue using .run:

queue.run();

###Tasks After a task queue has been initialized, additional work can be added as a Task. To use a task, first require the Task constructor:

var Task = require('fork/task');

Then just use the .addTask method to add it to the queue

var queue = new Forq({
  todo: tasks,
  onfinished: function () {
    
    // waiting to add another task
    setTimeout(function(){
    
      // adding another task
      queue.addTask(new Task({
        path: './test/printer',
        description: 'a later task'
      }, pool ));
    
    }, 1000);
  
  }
});

queue.run();

Tasks also accept callbacks:

var queue = new Forq({
  todo: tasks,
  onfinished: function () {
    
    // waiting to add another task
    setTimeout(function(){
    
      // adding another task
      queue.addTask(new Task({
        path: './test/printer',
        description: 'a later task'
      }, 

      // pass in a queue as a second argument (can be a different queue)
      queue, 

      // this is a callback that fires when the task has been processed
      function (err) {
        // insert your callback logic here
      }));
    
    }, 1000);
  
  }
});

queue.run();

###Callbacks You may also set an optional callback to fire when the fork queue has been notified that all task forks have terminated:

// initialize new queue
var queue = new Forq({
  todo: tasks,
  // set your desired concurrency
  concurrency: 4,
  // optional callback
  onfinished: function() {
    console.log("Queue is drained!");
  };
});

This callback fires when all tasks have been completed.

##Events Communication with each fork can be done through events.

###Binding These events can be bound on the events key on queue initialization.

var queue = new Forq({
  todo: tasks,
  onfinished: function () {
   // stuff to do when the task queue finishes all tasks and there are no active processes/tasks
  },
  // task events to listen for and react to
  events: {
    exit: function(code, err) {
      // stuff to do on process exit
    },
    myCustomEvent: function(data){
      // stuff to do during custom event
    }
  }
});

###Default Events By default, tasks and queues emit a variety of events:

Queue Events

Event Name | Parameters Sent to Handler | Notes ----------- | -------------------------- | -------- finished | -> ( { status: '...' } ) | possible statuses: 'completed', 'aborted' error | -> ( err ) | see the errors.js module for different error types taskError| -> (err) | when any task fires an error taskError:{idOfTask} | -> (err) | when an specific task with specified id fires an error

Task Events

Event Name | Parameters Sent to Handler | Notes ----------- | -------------------------- | -------- finished | -> ( {} ) | empty object error | -> ( err ) | see the errors.js module for different error types

###Custom Events Each custom event can be fired from within a task child process by passing an object with data and event keys to the process.send method:

// my_task_worker.js

process.send({ event: 'myCustomEvent', data: { hello: 'world', temp: 100 }});

Each event's this is bound to the Process instance that triggered it. The ```data`` object is then sent to the event handler as the first argument and accessible in its scope.

##Sharing Data Among Tasks In a Queue Tasks can share data by attaching it to the queue. For example:

// initialize the queue
var queue = new Forq({
  todo: tasks,
  onfinished: function () {
    // check updated values
    console.log(this.__data.tempCounter);
    console.log(this.__data.statuses)
  },
  oninit: function() {
    // initialize data containers
    this.__data = {};
    this.__data.tempCounter = 0;
    this.__data.statuses = [];
  },
  events: {
    myCustomEvent: function(data){
      // update the data containers or increment their values
      var statuses = [ 'fine', 'cool', 'hungry', 'bored'];
      this.queue.__data.tempCounter += data.temp;
      this.queue.__data.statuses.push(_.sample(statuses));
    }
  }
});

// start processing the tasks in the queue
queue.run();

##Errors ###Fork-Halting Errors Errors will be logged in the respective stderr of their task's fork and emit an 'error' event to the queue. Errors can be listened for on the queue level:

queue.on('error', function(err){
  // handle the error somehow
});

Or they can be listened for on the tasks themselves using the following namespace:

var tasks = [
  {
    path: './worker_module',
    description: 'task b',
    // set the task name here
    id: 'task_a'
  },
  path: './worker_module',
    description: 'task a',
    // set the task name here
    id: 'task_b'
  }
];

var queue = new Forq({
  todo: tasks,
  concurrency: 10,
  onfinish: function () {
    // all done!
  }
});

queue.on('taskError:task_a', function(err){
  // handle the error for just task_a somehow
});

.errors

Each queue has an array of arrays called .errors containing errors raised by their respective tasks and their forks.

Forq.Errors

The Forq module includes a few Error constructors that can be used for

##Timeouts

Both tasks and queues can have timeouts set killTimeout attribute upon their initialization. There is also a pollFrequency value which can be used to adjust how often the main process checks for timeouts

###Pool Timeout

var queue = new Forq({
  todo: tasks,
  concurrency: 10,
  // set a 10 second timeout for the pool
  killTimeout: 10000,
  // poll frequency of 1 second
  pollFrequency: 1000
});

###Worker Timeout

var tasks = []
tasks.push({
  path: './test/slow_worker',
  args: [ ],
  id: 'slow_task',
  // a 10 second timeout on the task
  killTimeout: 10000,
    // poll frequency of 1 second
  pollFrequency: 1000
});

#Changelog

##0.0.2

  • Now using native Node Domain for worker pool management
  • Improved Error handling
  • Added Worker Error namespacing support

##0.0.3

  • Added monitoring for active forks and handling for stale/hanging forks.
  • Added killTimeout options for pools and forks
  • Added 'onfinished' option to use in place of queue.drain

##0.0.4

  • Readme updates and minor improvements to index.js

##0.0.7

  • Added noLimit mode that ignores concurrency limit
  • Changed 'worker' to 'tasks' and 'pool' to 'queue' throughout