express-queued
v1.0.1
Published
Independent queue system for Express with support for hooks
Downloads
278
Maintainers
Readme
Express-Queued
Express-Queued is an independent queue system for Express.js that enables asynchronous execution of tasks in multiple concurrent queues. This system is ideal for time-consuming operations such as image processing, email sending, payment processing, and other operations that shouldn't block the immediate response to user requests.
Features
- Support for N queues: Create as many queues as needed, each with its own configuration.
- Configurable concurrency: Control how many tasks execute simultaneously per queue.
- Custom hooks: Customize processing and fallback logic for each queue using the hooks system from express-hooked.
- Retry system: Automatic failure handling with configurable retries.
- Priority management: Assign priorities to tasks to determine their execution order.
- Fallback system: Handle tasks that definitively fail.
- Fully asynchronous: Does not block the main execution thread.
- Express integration: Middleware to inject the queue system into Express requests.
Installation
npm install express-queued express-hookedBasic Usage
1. Import and initialize
const express = require('express');
const ExpressQueueIntegration = require('express-queued');
const app = express();
const expressQueue = new ExpressQueueIntegration();2. Create a queue
// Create a queue with default configuration
expressQueue.createQueue('general-processing', {
concurrency: 2, // Maximum 2 tasks running simultaneously
retryAttempts: 3, // Retry up to 3 times if it fails
retryDelay: 1000 // Wait 1 second between retries
});3. Define a task
// Function representing a task
function processImage(data, taskObj) {
return new Promise((resolve, reject) => {
// Simulate processing
setTimeout(() => {
console.log(`Processing image:`, data);
resolve('Image processing completed');
}, 2000);
});
}4. Add tasks to the queue
// Add a task to the queue
const taskId = expressQueue.addTask(
'general-processing', // Queue name
processImage, // Task function
{ id: 1, filename: 'image1.jpg' }, // Data for the task
0 // Priority (0 is highest)
);5. Start the queue system
// Start task execution
expressQueue.start();Express Usage
1. Add the middleware
// Middleware to inject the queue system into requests
app.use(expressQueue.queueMiddleware());2. Use the queue system in routes
// Route to add a task
app.post('/process-image', (req, res) => {
const { filename, size } = req.body;
const taskId = req.addTask('general-processing', processImage, {
filename,
size
});
res.json({
message: 'Image processing task added to queue',
taskId
});
});
// Route to get queue system status
app.get('/queue-status', (req, res) => {
const status = req.getQueueStatus();
res.json(status);
});Custom Hooks
The queue system integrates with the hooks system from express-hooked, allowing customization of system behavior through extension points.
Available Hooks
queue_execute_task: Executes before processing a task. Allows modifying task logic.queue_task_completed: Executes when a task completes successfully.queue_task_failed: Executes when a task fails.queue_task_retry: Executes when a task is retried.queue_task_fallback: Executes when a task definitively fails after all retries.queue_task_added: Executes when a task is added to the queue.queue_system_started: Executes when the queue system starts.queue_system_stopped: Executes when the queue system stops.
Custom Hook Example
const { HookSystem } = require('express-hooked');
// Create a specific hook system for a queue
const imageHooks = new HookSystem();
// Hook to customize processing logic
imageHooks.addAction('queue_execute_task', (task, queueName, taskObj) => {
console.log(`Preparing to process image: ${taskObj.id}`);
// Wrap the original task with additional logic
const wrappedTask = async (data, taskObj) => {
console.log(`Starting image processing: ${taskObj.id}`);
try {
const result = await task(data, taskObj);
console.log(`Image processed successfully: ${taskObj.id}`);
return result;
} catch (error) {
console.log(`Error processing image: ${taskObj.id} - ${error.message}`);
throw error;
}
};
return wrappedTask;
});
// Create the queue with custom hooks
expressQueue.createQueue('images', {
concurrency: 3,
retryAttempts: 2,
retryDelay: 1000
}, imageHooks);Error Handling and Retries
The queue system includes a robust mechanism for handling errors:
Retry Configuration
expressQueue.createQueue('reliable-processing', {
retryAttempts: 5, // Maximum number of retries
retryDelay: 2000 // Milliseconds to wait between retries
});Fallback Logic
When a task fails after all retries, the fallback logic executes:
const hooks = new HookSystem();
hooks.addAction('queue_task_fallback', (queueName, task, error) => {
// Implement custom logic to handle failed tasks
console.log(`Definitively failed task:`, task.id);
console.log(`Error:`, error.message);
// For example, save to a database for manual processing
// or send an alert to the support team
});Advanced Usage
Multiple Queues with Different Configurations
// Image processing queue
expressQueue.createQueue('image-processing', {
concurrency: 3,
retryAttempts: 2,
retryDelay: 1000
});
// Email sending queue
expressQueue.createQueue('email-sending', {
concurrency: 2,
retryAttempts: 3,
retryDelay: 2000
});
// Payment processing queue
expressQueue.createQueue('payment-processing', {
concurrency: 1,
retryAttempts: 5,
retryDelay: 3000
});Task Priorities
Tasks can be assigned different priority levels:
// High priority task (priority 0)
expressQueue.addTask('image-processing', processImage, { filename: 'urgent.jpg' }, 0);
// Medium priority task (priority 5)
expressQueue.addTask('image-processing', processImage, { filename: 'normal.jpg' }, 5);
// Low priority task (priority 10)
expressQueue.addTask('image-processing', processImage, { filename: 'optional.jpg' }, 10);Queue Management via API
// Pause a queue
app.post('/pause-queue/:queueName', (req, res) => {
const { queueName } = req.params;
expressQueue.pauseQueue(queueName);
res.json({ message: `Queue ${queueName} paused` });
});
// Resume a queue
app.post('/resume-queue/:queueName', (req, res) => {
const { queueName } = req.params;
expressQueue.resumeQueue(queueName);
res.json({ message: `Queue ${queueName} resumed` });
});
// Clear a queue
app.post('/clear-queue/:queueName', (req, res) => {
const { queueName } = req.params;
expressQueue.clearQueue(queueName);
res.json({ message: `Queue ${queueName} cleared` });
});
// Retry failed tasks
app.post('/retry-failed/:queueName', (req, res) => {
const { queueName } = req.params;
expressQueue.retryFailedTasks(queueName);
res.json({ message: `Retrying failed tasks in queue ${queueName}` });
});API
ExpressQueueIntegration
constructor(options = {})
options.concurrency: Default number of concurrent workers (default: 1)options.retryAttempts: Default number of retries (default: 3)options.retryDelay: Default delay between retries in ms (default: 1000)
createQueue(queueName, options = {}, hooks = null)
Creates a queue with the specified configuration.
queueName: Name of the queueoptions.concurrency: Number of concurrent workersoptions.retryAttempts: Number of retriesoptions.retryDelay: Delay between retries in mshooks: Hook system for this queue
addTask(queueName, task, data = {}, priority = 0)
Adds a task to the specified queue.
queueName: Name of the queuetask: Function representing the taskdata: Data for the taskpriority: Task priority (lower number = higher priority)
start()
Starts task execution in all queues.
stop()
Stops task execution in all queues.
getStatus()
Gets the current status of the queue system.
retryFailedTasks(queueName)
Retries all failed tasks in the specified queue.
queueMiddleware()
Middleware to inject the queue system into Express requests.
pauseQueue(queueName)
Pauses the specified queue.
resumeQueue(queueName)
Resumes the specified queue.
clearQueue(queueName)
Clears pending tasks from the specified queue.
Repository
Source code available at: https://gitlab.com/bytedogssyndicate1/express-queued
License
Apache 2.0
