@e-techsolutions/sdk-queues
v1.0.5
Published
A queue manager for Node.js
Maintainers
Readme
E-BaaS SDK Queue Manager
A robust SDK for queue management using Redis, with support for persistent and real-time messaging.
Installation
npm install @e-techsolutions/sdk-queues
# Or
pnpm add @e-techsolutions/sdk-queuesQuick Setup
Requirements
- Redis Server
Basic Example
import { QueueManager } from "@e-techsolutions/sdk-queues";
// Initialize the queue manager
const queueManager = new QueueManager({
host: "redis-host",
port: 0000, // Redis PORT
username: "redis-user", // Optional: Redis username
password: "redis-password", // Optional: Redis password
db: 0, // Optional: Redis database number (default: 0)
// Optional: Connection settings (available via RedisOptions)
connectTimeout: 10000, // Connection timeout in ms
commandTimeout: 5000, // Command timeout in ms
retryDelayOnFailover: 100, // Retry delay on failover
maxRetriesPerRequest: 3, // Max retries per request (null = unlimited)
// Optional: Redis connection options
retryStrategy: (times) => Math.min(times * 50, 2000), // Retry strategy function
reconnectOnError: (err) => err.message.includes("READONLY"), // Reconnect on specific errors
enableReadyCheck: true, // Enable ready check
});
// Publish message
async function sendMessage() {
await queueManager.publish("notification", {
userId: "123",
message: "New notification",
});
console.log("✅ Message published");
}
// Consume messages
function processNotification(message) {
console.log("📬 Notification received:", message);
// Process the message...
return Promise.resolve();
}
// Start consumer
queueManager.consumer("notification", processNotification);
// Publish example message
sendMessage();Publishing Options
// High priority message
await queueManager.publish("payment", data, { priority: 2 });
// Real-time message (non-persistent)
await queueManager.publish("chat", data, { persistent: false });
// Set number of processing attempts
await queueManager.publish("email", data, { attempts: 5 });Monitoring
// Get status of all queues
const status = await queueManager.getQueueStatus();
console.log(status);
// Get pending messages
const pending = await queueManager.getPendingMessages("notification");
console.log(`Pending messages: ${pending.length}`);Advanced Features
Workflow Management
import { WorkflowManager } from "@e-techsolutions/sdk-queues";
const workflow = new WorkflowManager(queueManager);
// Define a multi-step workflow
await workflow.createWorkflow("user-onboarding", [
{ step: "send-welcome-email", queue: "email" },
{ step: "create-user-profile", queue: "user-management" },
{ step: "send-notification", queue: "notification" },
]);
// Start workflow
await workflow.startWorkflow("user-onboarding", { userId: "123" });Scheduled Tasks
import { Scheduler } from "@e-techsolutions/sdk-queues";
const scheduler = new Scheduler(queueManager);
// Schedule a recurring task
await scheduler.schedule("daily-report", "0 9 * * *", {
action: "generate-report",
recipients: ["[email protected]"],
});
// Schedule a one-time task
await scheduler.scheduleOnce("reminder", new Date(Date.now() + 3600000), {
userId: "123",
message: "Don't forget your meeting!",
});Storage Integration
// MySQL storage for persistent queues
import { MySQLStorage } from "@e-techsolutions/sdk-queues";
const mysqlStorage = new MySQLStorage({
host: "localhost",
port: 3306,
user: "queue_user",
password: "queue_password",
database: "queue_db",
});
const queueManager = new QueueManager({
host: "localhost",
port: 6379,
storage: mysqlStorage, // For persistent backup
});Telemetry and Monitoring
import { TelemetryService } from "@e-techsolutions/sdk-queues";
// Enable telemetry
const telemetry = new TelemetryService({
endpoint: "http://your-telemetry-endpoint.com",
apiKey: "your-api-key",
});
const queueManager = new QueueManager({
host: "localhost",
port: 6379,
telemetry: telemetry,
});
// Get metrics
const metrics = await queueManager.getMetrics();
console.log("Queue metrics:", metrics);Configuration Options
Connection Configuration
const queueManager = new QueueManager({
// Redis connection
host: "localhost",
port: 6379,
username: "redis-user", // Optional
password: "redis-password", // Optional
// Connection pool
maxConnections: 10,
retryAttempts: 3,
retryDelay: 1000,
// Queue settings
defaultPriority: 1,
defaultAttempts: 3,
messageTimeout: 30000,
// Monitoring
enableMetrics: true,
metricsInterval: 60000,
// Logging
logLevel: "info", // debug, info, warn, error
enableTracing: true,
});Environment Variables
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_USERNAME=your-username
REDIS_PASSWORD=your-password
# Queue Settings
QUEUE_DEFAULT_PRIORITY=1
QUEUE_DEFAULT_ATTEMPTS=3
QUEUE_MESSAGE_TIMEOUT=30000
# Monitoring
ENABLE_QUEUE_METRICS=true
METRICS_INTERVAL=60000
# Logging
LOG_LEVEL=info
ENABLE_TRACING=trueError Handling
// Handle connection errors
queueManager.on("error", (error) => {
console.error("Queue manager error:", error);
});
// Handle message processing errors
queueManager.on("message-error", (error, message) => {
console.error("Message processing failed:", error);
console.log("Failed message:", message);
});
// Handle reconnection events
queueManager.on("reconnect", () => {
console.log("Successfully reconnected to Redis");
});Best Practices
Message Structure
// Recommended message structure
interface QueueMessage {
id: string;
type: string;
payload: any;
metadata: {
createdAt: string;
source: string;
version: string;
};
}
// Example usage
await queueManager.publish("user-action", {
id: crypto.randomUUID(),
type: "USER_REGISTRATION",
payload: {
userId: "123",
email: "[email protected]",
},
metadata: {
createdAt: new Date().toISOString(),
source: "web-app",
version: "1.0.0",
},
});Consumer Patterns
// Idempotent consumer
async function processIdempotent(message) {
const processedKey = `processed:${message.id}`;
// Check if already processed
const alreadyProcessed = await redis.get(processedKey);
if (alreadyProcessed) {
console.log("Message already processed, skipping...");
return;
}
try {
// Process message
await processMessage(message);
// Mark as processed
await redis.setex(processedKey, 3600, "true"); // 1 hour TTL
} catch (error) {
console.error("Processing failed:", error);
throw error; // Re-throw to trigger retry
}
}
queueManager.consumer("idempotent-queue", processIdempotent);Performance Tuning
Batch Processing
// Process messages in batches
const batchProcessor = new BatchProcessor(queueManager, {
queueName: "bulk-processing",
batchSize: 100,
batchTimeout: 5000, // 5 seconds
processor: async (messages) => {
console.log(`Processing batch of ${messages.length} messages`);
// Bulk process messages
await processBatch(messages);
},
});
await batchProcessor.start();Connection Pooling
// Optimized for high throughput
const queueManager = new QueueManager({
host: "localhost",
port: 6379,
maxConnections: 20,
connectionPool: {
min: 5,
max: 20,
acquireTimeoutMillis: 30000,
idleTimeoutMillis: 30000,
},
});Security
Message Encryption
import { EncryptedQueueManager } from "@e-techsolutions/sdk-queues";
const encryptedManager = new EncryptedQueueManager({
host: "localhost",
port: 6379,
encryption: {
algorithm: "aes-256-gcm",
key: process.env.ENCRYPTION_KEY,
rotationInterval: 86400000, // 24 hours
},
});
// Messages are automatically encrypted/decrypted
await encryptedManager.publish("sensitive-data", {
creditCard: "****-****-****-1234",
ssn: "***-**-****",
});Testing
import { MockQueueManager } from "@e-techsolutions/sdk-queues/testing";
// Use mock for unit tests
const mockQueue = new MockQueueManager();
// Simulate message publishing
await mockQueue.publish("test-queue", { test: "data" });
// Verify messages
const messages = mockQueue.getMessages("test-queue");
expect(messages).toHaveLength(1);
expect(messages[0].payload).toEqual({ test: "data" });API Reference
QueueManager
Methods
publish(queue, message, options?)- Publish a message to a queueconsumer(queue, processor, options?)- Start consuming messages from a queuegetQueueStatus()- Get status of all queuesgetPendingMessages(queue)- Get pending messages for a queuegetMetrics()- Get performance metricsclose()- Close all connections
Events
error- Connection or processing errorsmessage-error- Message processing failuresreconnect- Successful reconnection to Redismessage-processed- Successful message processingqueue-empty- Queue becomes empty
Version History
- v1.0.4 - Advanced obfuscation, performance improvements
- v1.0.3 - Enhanced telemetry and monitoring
- v1.0.2 - MySQL storage integration
- v1.0.1 - Workflow management features
- v1.0.0 - Initial release
Support
For support and questions:
- Email: [email protected]
- Documentation: E-BaaS Documentation
- GitHub Issues: Report Issues
License
MIT - See LICENSE file for details.
Enterprise Backend as a Service (E-BaaS) - Powering scalable applications with robust queue management.
