clever-queue
v0.4.1
Published
Queuing system for promises that handle concurrent, throttling, weighting and prioritizing in a clever fashion.
Maintainers
Readme
clever-queue
Purpose
Queuing system for promises that handle concurrent, throttling, distributing, weighting and prioritizing in a clever fashion ?
Zero dependencies.
Features
- [X] Task Promises
- [X] Distribution (multiple queues)
- [X] Priority queues
- [X] Weighted queues
- [X] Concurrency (multiple runners)
- [X] Priority runners
- [ ] Throttling
Architecture - clever-queue implements 4 classes :
- engine : this is the starting point of clever-queue. You must instantiate at least one engine in your program. You may instantiate multiple engines if needed.
- task : this is what we would like to execute. It may be an async or a sync function
- runner : It is in charge of the execution of tasks. It runs tasks, one by one. You must have at least one runner in an engine (one best-effort runner is automatically created on engine instantiation). If you need concurrency running, you may instantiate as many runners as needed. You may configure each runner in a way that it will only execute tasks coming from queues that have an equal or higher priority than runner priority.
- queue : It is in charge of keeping tasks in a deterministic order (FIFO : First In, First Out by default), waiting for an available runner. You must have at least one queue in an engine. You may instantiate as many queues you need and manage priorities and weights between queues.
import * as cleverQueue from "clever-queue"; // import the library - no dependencies
const engine = cleverQueue.createEngine(); // create and start (by default) engine with a single best effort runner (by default)
const queue = engine.createQueue(); // create your first queue, with standard priority (by default) and default weight (by default)
const myAsyncTaskToExecute: cleverQueue.tasks.FunctionToExecute = async function (message: string, timeout: number) {
// do your stuff here - In this case, just wait for the timeout and return the message
await new Promise((resolve) => setTimeout(resolve, timeout));
return message;
};
(async () => {
const result = await queue.createTaskAndEnqueue(() => myAsyncTaskToExecute("myValue", 1000));
console.log(result);
engine.stop(); // stop the engine when you have finished
})();Behavior / Semantics
Engine
Stop and shutdown behavior
clever-queue treats stopping as a transition that prevents new work from being accepted while allowing in-flight work to finish safely.
engine.stop()stops every queue and runner in the engine- queued-but-not-started tasks are rejected when their queue stops
- a runner that is already executing a task finishes that task, then exits without taking another one
queue.stop()prevents any laterenqueue()call from succeeding
This means stop is safe for shutdown, but it is not a drain operation: pending tasks are cancelled rather than silently left unresolved.
Task
A task moves through these observable states:
initialiazing: the task object has been createdqueued: the task has been accepted by a queue and is waiting for a runnerrunning: a runner has started executing the task functioncompleted: the task completed successfullyexception: the task failed
Task return and failure behavior
The promise returned by queue.enqueue() / queue.createTaskAndEnqueue() follows the task result:
- resolved task function => promise resolves with the task return value
- thrown
Error=> promise rejects with that sameError - thrown non-
Errorvalue => promise rejects withFunctionRaisedAnHundledException
Queue
Queuing
Deliver Tasks to runner(s) in a deterministic order (FIFO : First In, First Out by default)
Example : One standard priority queue delivering tasks to a standard runner.
const engine = cleverQueue.createEngine();
const queueA = engine.createQueue();Queue Distribution
Having Multiple Queues to handle fair distribution. With this, you can specialize a queue per requester or group of requester. An overload of one of them will not block others as queues will be served equally (by default, may be changes by priorities and weights features, described below)
Example : 2 x Queues / 1 x Runner / 8 x Tasks
const engine = cleverQueue.createEngine();
const queueA = engine.createQueue();
const queueB = engine.createQueue();Queue Priorities
Queues are checked from absolute priority cleverQueue.queues.Priorities.Absolute: 255 to best-effort priority cleverQueue.queues.Priorities.BestEffort: 0
You may instanciate as many queue you wish, multiple queues may have the same prority.
A lower-priority queue does not get turns while a higher-priority queue still has waiting tasks.
Example : 3 queues with different priorities
const engine = cleverQueue.createEngine({ autostart: false });
const queueA = engine.createQueue({ priority: cleverQueue.queues.Priorities.Absolute, weight: 1 }); // Absolute priority (255), will be served first
const queueB = engine.createQueue({ priority: 100, weight: 1 }); // Random priority (between 0 and 255), will be served after queueA
const queueC = engine.createQueue({ priority: cleverQueue.queues.Priorities.BestEffort, weight: 1 }); // Best effort priority (0), will be served lastQueue Weights
Weights apply only between queues of the same priority.
A queue with a higher weight receives proportionally more dequeue turns over time than a queue with a lower weight.
Queue weights define how often queues of the same priority are selected relative to each other.
- valid active weights are integers from
cleverQueue.queues.Weights.Lowest: 1tocleverQueue.queues.Weights.Highest: 255 - larger weights receive proportionally more dequeue turns over time
Example : two standard-priority queues with weights 2 and 1 will tend toward a 2:1 dequeue pattern while both stay non-empty.
const engine = cleverQueue.createEngine({ autostart: false });
const queueA = engine.createQueue({ priority: cleverQueue.queues.Priorities.Standard, weight: 2 }); // Same priority (128) as queueB, but higher weight (2 vs 1)
const queueB = engine.createQueue({ priority: cleverQueue.queues.Priorities.Standard, weight: 1 }); Runners
Each runner executes one task at a time.
Runners Concurrency
Concurrency comes from creating multiple runners on the same engine.
Example : two best effort runners (the first one is instantiate on engine instantiation).
const engine = cleverQueue.createEngine();
engine.createRunner({ priority: cleverQueue.queues.Priorities.BestEffort });
const queueA = engine.createQueue();Runners Priority
Runner priority sets the lowest queue priority a runner is allowed to consume. In this case, if a new high priority task is coming, and all runners are running lower priority task, a runner will remain available to immedialty run it.
Example : First runner (auto created by engine) is a best effort runner (Priority : 0), it will serve both QueueA (Absolute Prority : 255) and QueueB (Standard Prority 128). The second runner is configured to serve queues of priorites >= 255 (Absolute Priority), it will only serve QueueA.
const engine = cleverQueue.createEngine({ autostart: false }); // create the engine and the first runner with best effort priority (0)
engine.createRunner({ priority: cleverQueue.queues.Priorities.Absolute }); // create a second runner with absolute priority (255)
const queueA = engine.createQueue({ priority: cleverQueue.queues.Priorities.Absolute, weight: 1 }); // Queue with absolute priority (255), will be served first, by the 2 runners.
const queueB = engine.createQueue({ priority: cleverQueue.queues.Priorities.Standard, weight: 1 }); // Queue with standard priority (128), only by the first runner.
