kued
v1.3.1
Published
Extensions for the Kue library (Daemonization, Checkpointing, etc.)
Downloads
19
Readme
kued
Extensions for Kue (Daemonization, Checkpointing, etc.)
Workers
Kued simplifies worker creation by organizing Job handling around classes:
const Worker = require('kued').Worker;
class MyWorker extends Worker {
// This is where you can register handlers.
init(){
this.process('my-task', 'myTaskHandler');
this.process('my-task-2', this.myTask2Handler.bind(this));
// Retrieve up to 50 messages at a time (default is 1).
this.process('my-task-3', 50, this.myTask3Handler.bind(this));
this.process('my-task-4', (job, context, done) -> done());
}
myTaskHandler(job, context, done){
done();
}
myTask2Handler(job, context, done){
done();
}
myTask3Handler(job, context, done){
done();
}
}
The queue is available to Worker
s, so the outcome of one job can be another:
const Worker = require('kued').Worker;
class MyWorker extends Worker {
// This is where you can register handlers.
init(){
this.process('my-task', 'myTaskHandler');
}
myTaskHandler(job, context, done){
this.queue.create('new-task', { foo: 'bar' }).save(done);
}
}
Time-based Intervals.
Sometimes you just need tasks executed at a regular interval. For this usecase, we have a special CronWorker
:
const CronWorker = require('kued').CronWorker;
class Synchronizer extends CronWorker {
constructor(config, logger, _queue){
const cron = { "cronTime": "10 * * * * *", "runOnInit": true };
super(cron, config, logger, _queue);
}
tick(){
this.queue.create('new-task', { foo: 'bar' }).save(done);
}
}
Checkpoints.
With distributed workers, sometimes it's important to not process a record for some reason, like if it's older than a last "modified" point. For this use case, Kued provides a "checkpointing" feature that applies a predicate test before processing a record.
Checkpoints wrap a typical subscription (i.e. use of Worker.process()
):
const Worker = require('kued').Worker;
const moment = require('moment');
class CheckpointedWorker extends Worker {
// This is where you can register handlers.
init(){
this.checkpoint()
.topic('stock-quotes')
.concurrency(1)
.keyFactory((quote) => {
// dynamically determining the checkpoint key
// allows checkpoints to be scoped to practically
// anything.
return `quotes:${quote.symbol}`;
})
.iff((quote, checkpoint, callback) => {
return moment(quote.datetime).valueOf() > moment(checkpoint.lastseen).valueOf();
})
.process('myTaskHandler');
}
myTaskHandler(job, context, done){
const quote = job.data;
this.queue.create('stock-update', quote).save((err) => {
// Second argument is the new checkpoint value.
done(err, { lastseen: quote.datetime });
});
}
}
Bridges.
Another common requirement of a worker is to receive messages from sources other than Kue. A bridge is a mechanism to take messages off of a provider (like IronMQ) and forward it to a Worker
.
const BridgeFactory = require('kued').BridgeFactory;
const bridgeFactory = new BridgeFactory(config, logger);
// Specific options for the Bridge
const opts = {};
// This is not implemented yet.
const bridge = bridgeFactory.create('imq:queue', 'kue:task-worker-topic', opts);
TaskManager
TaskManager is simply a tiny wrapper around Kue used to build and submit Jobs. It's more of a convenience mechanism for managing the Kue connection outside of using a worker.
const moment = require('moment');
const TaskManager = require('kued').TaskManager;
const taskManager = new TaskManager(config, logger);
const stockQuote = { symbol: 'ABCD', value: 35.2, datetime: moment().toISOString() };
// Simplest form of enqueuing a task
taskManager.enqueue('stock-quotes', stockQuote,(err) => {
if (err) console.error(err);
});
// Omit the Callback and get the Kue job (make further adjustments as necessary,
// but don't forget to call `save()` to submit the job.
taskManager.enqueue('stock-quotes', stockQuote).priority('high').attempts(2).save((err) => {
if (err) console.error(err);
// Close the connection when your done!
taskManager.close();
});
Daemonizing
Worker config file: workers.json
{
"providers": [
{
"provides": "kue",
"connection": {
"prefix": "myservice",
"redis": {
"port": 6379,
"host": "localhost",
"auth": "alright_alright_alright"
}
}
},
{
"provides": "imq",
"token": "abcde12345",
"project_id": "asdfadfadsf"
},
{
"provides": "checkpointer",
"name": "redis-checkpointer",
"require": "kued/lib/checkpointers/redis"
}
],
"workers": [
{
"name": "MyWorker",
"require": "./lib/workers/myworker",
"options": {
"mongo": "mongodb://blah"
}
},
{
"name": "YourWorker",
"require": "./lib/workers/yourworker",
"options": {
"db": "mysql://blah"
}
}
],
"bridges": [
{
"name": "GoldenGate",
"to": "imq:queue",
"from": "kue:task-worker-topic",
"options": {}
}
],
"workgroups": [
{
"workers": ["MyWorker", "YourWorker"],
"instances": 3
},
{
"workers": ["GoldenGate"],
"instances": 1
}
]
}
And to start that workers, just use the workers.js
entrypoint:
# Your project
npm install
./node_modules/.bin/kued workers --config=workers.json
Evergreen Integration
Evergreen is side project of ours for making Node configuration easier and more powerful. We created a simple wrapper to allow a Workgroup
to be spawned after importing an Evergreen compatible configuration file.
# Get the range of options by specifying the --help flag
everkued --help
# Example of spawning a Workgroup:
everkued --config config.json -workgroup ImageProcessor \
--workers-path foo.bar.workers --modules trbl-evergreen-mongo
Keep in mind, this is used to spawn only one workgroup.
Task Enqueuing from Cli
# Your project
npm install
./node_modules/.bin/kued task --config=tasks.json taskname --params