npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

kued

v1.3.1

Published

Extensions for the Kue library (Daemonization, Checkpointing, etc.)

Downloads

19

Readme

kued

Extensions for Kue (Daemonization, Checkpointing, etc.)

Build Status

Workers

Kued simplifies worker creation by organizing Job handling around classes:

const Worker = require('kued').Worker;

class MyWorker extends Worker {

  // This is where you can register handlers.
  init(){
    this.process('my-task', 'myTaskHandler');
    this.process('my-task-2', this.myTask2Handler.bind(this));
    // Retrieve up to 50 messages at a time (default is 1).
    this.process('my-task-3', 50, this.myTask3Handler.bind(this));
    this.process('my-task-4', (job, context, done) -> done());
  }

  myTaskHandler(job, context, done){
    done();
  }

  myTask2Handler(job, context, done){
    done();
  }

  myTask3Handler(job, context, done){
    done();
  }
}

The queue is available to Workers, so the outcome of one job can be another:

const Worker = require('kued').Worker;

class MyWorker extends Worker {

  // This is where you can register handlers.
  init(){
    this.process('my-task', 'myTaskHandler');
  }

  myTaskHandler(job, context, done){
    this.queue.create('new-task', { foo: 'bar' }).save(done);
  }
}

Time-based Intervals.

Sometimes you just need tasks executed at a regular interval. For this usecase, we have a special CronWorker:

const CronWorker = require('kued').CronWorker;

class Synchronizer extends CronWorker {

  constructor(config, logger, _queue){
    const cron = { "cronTime": "10 * * * * *", "runOnInit": true };
    super(cron, config, logger, _queue);
  }

  tick(){
    this.queue.create('new-task', { foo: 'bar' }).save(done);
  }
}

Checkpoints.

With distributed workers, sometimes it's important to not process a record for some reason, like if it's older than a last "modified" point. For this use case, Kued provides a "checkpointing" feature that applies a predicate test before processing a record.

Checkpoints wrap a typical subscription (i.e. use of Worker.process()):

const Worker = require('kued').Worker;
const moment = require('moment');

class CheckpointedWorker extends Worker {

  // This is where you can register handlers.
  init(){
    this.checkpoint()
        .topic('stock-quotes')
        .concurrency(1)
        .keyFactory((quote) => {
          // dynamically determining the checkpoint key
          // allows checkpoints to be scoped to practically
          // anything.
          return `quotes:${quote.symbol}`;
        })
        .iff((quote, checkpoint, callback) => {
          return moment(quote.datetime).valueOf() > moment(checkpoint.lastseen).valueOf();
        })
        .process('myTaskHandler');
  }

  myTaskHandler(job, context, done){
    const quote = job.data;
    this.queue.create('stock-update', quote).save((err) => {
      // Second argument is the new checkpoint value.
      done(err, { lastseen: quote.datetime });
    });
  }
}

Bridges.

Another common requirement of a worker is to receive messages from sources other than Kue. A bridge is a mechanism to take messages off of a provider (like IronMQ) and forward it to a Worker.

const BridgeFactory = require('kued').BridgeFactory;
const bridgeFactory = new BridgeFactory(config, logger);

// Specific options for the Bridge
const opts = {};

// This is not implemented yet.
const bridge = bridgeFactory.create('imq:queue', 'kue:task-worker-topic', opts);

TaskManager

TaskManager is simply a tiny wrapper around Kue used to build and submit Jobs. It's more of a convenience mechanism for managing the Kue connection outside of using a worker.

const moment = require('moment');
const TaskManager = require('kued').TaskManager;
const taskManager = new TaskManager(config, logger);


const stockQuote = { symbol: 'ABCD', value: 35.2, datetime: moment().toISOString() };

// Simplest form of enqueuing a task
taskManager.enqueue('stock-quotes', stockQuote,(err) => {
  if (err) console.error(err);
});

// Omit the Callback and get the Kue job (make further adjustments as necessary,
// but don't forget to call `save()` to submit the job.
taskManager.enqueue('stock-quotes', stockQuote).priority('high').attempts(2).save((err) => {
  if (err) console.error(err);
  // Close the connection when your done!
  taskManager.close();
});

Daemonizing

Worker config file: workers.json

{
  "providers": [
    {
      "provides": "kue",
      "connection": {
        "prefix": "myservice",
          "redis": {
          "port": 6379,
            "host": "localhost",
            "auth": "alright_alright_alright"
        }
      }
    },
    {
      "provides": "imq",
      "token": "abcde12345",
      "project_id": "asdfadfadsf"
    },
    {
      "provides": "checkpointer",
      "name": "redis-checkpointer",
      "require": "kued/lib/checkpointers/redis"
    }
  ],
  "workers": [
    {
      "name": "MyWorker",
      "require": "./lib/workers/myworker",
      "options": {
        "mongo": "mongodb://blah"
      }
    },
    {
      "name": "YourWorker",
      "require": "./lib/workers/yourworker",
      "options": {
        "db": "mysql://blah"
      }
    }
  ],
  "bridges": [
    {
      "name": "GoldenGate",
      "to": "imq:queue",
      "from": "kue:task-worker-topic",
      "options": {}
    }
  ],
  "workgroups": [
    {
      "workers": ["MyWorker", "YourWorker"],
      "instances": 3
    },
    {
      "workers": ["GoldenGate"],
      "instances": 1
    }
  ]
}

And to start that workers, just use the workers.js entrypoint:

# Your project
npm install
./node_modules/.bin/kued workers --config=workers.json

Evergreen Integration

Evergreen is side project of ours for making Node configuration easier and more powerful. We created a simple wrapper to allow a Workgroup to be spawned after importing an Evergreen compatible configuration file.

# Get the range of options by specifying the --help flag
everkued --help

# Example of spawning a Workgroup:
everkued --config config.json -workgroup ImageProcessor \
         --workers-path foo.bar.workers --modules trbl-evergreen-mongo

Keep in mind, this is used to spawn only one workgroup.

Task Enqueuing from Cli

# Your project
npm install
./node_modules/.bin/kued task --config=tasks.json taskname --params