superqueue
v0.0.6
Published
Variably concurrent Promise queue
Readme
superqueue
The only asynchronous promise queue you'll ever need. Very simple to use, with optionally configurable concurrency, rate, interval, priority, and more.
Installation
$ npm install --save superqueueUsage
Always start by requiring the package.
const SuperQueue = require('superqueue');Simple Single Concurrency Queue
To create a simple promise queue with a concurrency of one (i.e. only one function is executing at a time):
const SuperQueue = require('superqueue');
const myQueue = new SuperQueue();
function basicThennableFunction(param1) {
return new Promise((resolve, reject) => {
basicAsyncFunction((err, data) => {
console.log(param1);
if (err) {
reject(err);
return;
}
resolve(data);
});
});
}
// Add thennable to queue
myQueue.push(basicThennableFunction, 'One')
// This 'then' will be called after the function clears the queue
.then((data) => {
// Use the data
})
.catch((err) => {
// Handle the error
});
myQueue.push(basicThennableFunction, 'Two');
myQueue.push(basicThennableFunction, 'Three');
// Console output: 'One', 'Two', 'Three'Simple Adjustable Concurrency Queue
A SuperQueue's concurrency may be adjusted to specify how many functions can execute at once. To create a SuperQueue with a concurrency of 5:
const SuperQueue = require('superqueue');
const myQueue = new SuperQueue(5);A concurrency of 0 means unlimited.
Advanced Queue Configuration: Interval and Rate
A SuperQueue can also optionally be configured with an interval and/or a rate, useful for rate-limited APIs. There is a subtle difference between the two:
- The
intervalis the amount of time (in ms) that must pass between queued items being executed. - The
rateis the number of requests which can be made over a given period of time (therateDenominator). Theratemust be an integer.
For example, a queue with an interval of 200 will execute one request every 200ms, whereas a queue with a rate of 5 (and a rateDenominator of 1000) will execute 5 requests immediately, wait 1s, and then execute the next 5.
const queueConfig = {
concurrency: 10,
interval: 200 // Time in ms, defaults to 0
};
const myQueue = new SuperQueue(queueConfig);
Or:
const queueConfig = {
concurrency: 10,
rate: 10, // Defaults to 0 (No restriction)
rateDenominator: 2000, // Time in ms, defaults to 1000
};
const myQueue = new SuperQueue(queueConfig);It is possible to set both a rate and an interval on the same Queue.
Advanced Queue Item Configuration: Priority and Name
Each item added to the queue may be accompanied by options, including name (returned by EventEmitter when item begins or finishes execution) and priority (larger numbers = higher priority, default = 10).
const SuperQueue = require('superqueue');
const myQueue = SuperQueue();
function lowPriorityFunc(param1) {
return new Promise((resolve, reject) => {...});
}
function highPriorityFunc(param1) {
return new Promise((resolve, reject) => {...});
}
const priorityConfig = {
// Default priority is 10, higher numbers happen sooner
priority: 11,
};
myQueue.push(lowPriorityFunc, 'one');
myQueue.push(lowPriorityFunc, 'two');
myQueue.push(priorityConfig, highPriorityFunc, 'three');
/*
* Order will be: 'one', 'three', 'two', so long as the first
* function didn't resolve before the third one was added.
*/
Advanced Queue Configuration: Flags
By creating a flag, and by assigning specific queue items to that flag, restrictions on execution (concurrency, interval, rate, rateDenominator) can be applied to a subset of items in the Queue.
E.g using an API which has both public and private calls. The API is rate-limited to 5 requests per second. Public requests are simple, but the private requests require a nonce value which must always be increasing.
To implement this, simply set up a 'private' flag:
const SuperQueue = require('superqueue');
const queueConfig = {
concurrency: 0, // Unlimited concurrency
rate: 5,
};
const myQueue = new SuperQueue(queueConfig);
const flagConfig = {
name: 'private',
concurrency: 1,
};
myQueue.addFlag(flagConfig);
function publicFunc(param1) {
return new Promise((resolve, reject) => {...});
}
function privateFunc(param1, param2) {
return new Promise((resolve, reject) => {...});
}
const privateConfig = {
priority: 11, // If the private API calls are more important
flags: ['private'],
name: 'privateFunction',
};
myQueue.push(privateConfig, privateFunc, param1, param2)
.then((data) => {
// Use data
})
.catch((err) => {
// Handle error
});
myQueue.push(publicFunc, param1)
.then((data) => {
// Use the data
})
.catch((err) => {
// Handle error
});Methods
.push(func, [...args]);
.push({
priority, // ?Number<10>
flags, // ?[String]
name, // ?String<''>
}, func, [...args]);
.addFlag({
name, // String
concurrency, // ?Integer<1>
interval, // ?Number
rate, // ?Number
rateDenominator, // ?Number<1000>
});
.pause();
// Stops executing queued items. Returns false if already paused,
// true otherwise.
.pause(flag);
// Stops executing queued items with flag. If any flag on an item
// is paused, the item will never execute. Returns false if already
// paused, true otherwise.
.unpause();
// Resumes queue execution. Returns false if already unpaused,
// true otherwise.
.unpause(flag);
// Resumes queue execution for flag. Returns false if already
// unpaused, true otherwise.
.getLength();
// Returns number of queued (non-executing) items
.getLength(flag);
// Returns number of queued (non-executing) items under flag
.getConcurrent();
// Returns number of items currently executing
.getConcurrent(flag);
// Returns number of items categorized under flag currently
// executing
.on('start', ({ name, flags }) => {} );
// Whenever a function starts executing
.on('complete', ({ name, duration, flags, error, result }) => {} );
// Whenever a function completes
.on('empty', () => {} );
// Whenever the queue becomes empty