npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@e-techsolutions/sdk-queues

v1.0.5

Published

A queue manager for Node.js

Readme

E-BaaS SDK Queue Manager

A robust SDK for queue management using Redis, with support for persistent and real-time messaging.

Installation

npm install @e-techsolutions/sdk-queues
# Or
pnpm add @e-techsolutions/sdk-queues

Quick Setup

Requirements

  • Redis Server

Basic Example

import { QueueManager } from "@e-techsolutions/sdk-queues";

// Initialize the queue manager
const queueManager = new QueueManager({
  host: "redis-host",
  port: 0000, // Redis PORT
  username: "redis-user", // Optional: Redis username
  password: "redis-password", // Optional: Redis password
  db: 0, // Optional: Redis database number (default: 0)

  // Optional: Connection settings (available via RedisOptions)
  connectTimeout: 10000, // Connection timeout in ms
  commandTimeout: 5000, // Command timeout in ms
  retryDelayOnFailover: 100, // Retry delay on failover
  maxRetriesPerRequest: 3, // Max retries per request (null = unlimited)

  // Optional: Redis connection options
  retryStrategy: (times) => Math.min(times * 50, 2000), // Retry strategy function
  reconnectOnError: (err) => err.message.includes("READONLY"), // Reconnect on specific errors
  enableReadyCheck: true, // Enable ready check
});

// Publish message
async function sendMessage() {
  await queueManager.publish("notification", {
    userId: "123",
    message: "New notification",
  });
  console.log("✅ Message published");
}

// Consume messages
function processNotification(message) {
  console.log("📬 Notification received:", message);
  // Process the message...
  return Promise.resolve();
}

// Start consumer
queueManager.consumer("notification", processNotification);

// Publish example message
sendMessage();

Publishing Options

// High priority message
await queueManager.publish("payment", data, { priority: 2 });

// Real-time message (non-persistent)
await queueManager.publish("chat", data, { persistent: false });

// Set number of processing attempts
await queueManager.publish("email", data, { attempts: 5 });

Monitoring

// Get status of all queues
const status = await queueManager.getQueueStatus();
console.log(status);

// Get pending messages
const pending = await queueManager.getPendingMessages("notification");
console.log(`Pending messages: ${pending.length}`);

Advanced Features

Workflow Management

import { WorkflowManager } from "@e-techsolutions/sdk-queues";

const workflow = new WorkflowManager(queueManager);

// Define a multi-step workflow
await workflow.createWorkflow("user-onboarding", [
  { step: "send-welcome-email", queue: "email" },
  { step: "create-user-profile", queue: "user-management" },
  { step: "send-notification", queue: "notification" },
]);

// Start workflow
await workflow.startWorkflow("user-onboarding", { userId: "123" });

Scheduled Tasks

import { Scheduler } from "@e-techsolutions/sdk-queues";

const scheduler = new Scheduler(queueManager);

// Schedule a recurring task
await scheduler.schedule("daily-report", "0 9 * * *", {
  action: "generate-report",
  recipients: ["[email protected]"],
});

// Schedule a one-time task
await scheduler.scheduleOnce("reminder", new Date(Date.now() + 3600000), {
  userId: "123",
  message: "Don't forget your meeting!",
});

Storage Integration

// MySQL storage for persistent queues
import { MySQLStorage } from "@e-techsolutions/sdk-queues";

const mysqlStorage = new MySQLStorage({
  host: "localhost",
  port: 3306,
  user: "queue_user",
  password: "queue_password",
  database: "queue_db",
});

const queueManager = new QueueManager({
  host: "localhost",
  port: 6379,
  storage: mysqlStorage, // For persistent backup
});

Telemetry and Monitoring

import { TelemetryService } from "@e-techsolutions/sdk-queues";

// Enable telemetry
const telemetry = new TelemetryService({
  endpoint: "http://your-telemetry-endpoint.com",
  apiKey: "your-api-key",
});

const queueManager = new QueueManager({
  host: "localhost",
  port: 6379,
  telemetry: telemetry,
});

// Get metrics
const metrics = await queueManager.getMetrics();
console.log("Queue metrics:", metrics);

Configuration Options

Connection Configuration

const queueManager = new QueueManager({
  // Redis connection
  host: "localhost",
  port: 6379,
  username: "redis-user", // Optional
  password: "redis-password", // Optional

  // Connection pool
  maxConnections: 10,
  retryAttempts: 3,
  retryDelay: 1000,

  // Queue settings
  defaultPriority: 1,
  defaultAttempts: 3,
  messageTimeout: 30000,

  // Monitoring
  enableMetrics: true,
  metricsInterval: 60000,

  // Logging
  logLevel: "info", // debug, info, warn, error
  enableTracing: true,
});

Environment Variables

# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_USERNAME=your-username
REDIS_PASSWORD=your-password

# Queue Settings
QUEUE_DEFAULT_PRIORITY=1
QUEUE_DEFAULT_ATTEMPTS=3
QUEUE_MESSAGE_TIMEOUT=30000

# Monitoring
ENABLE_QUEUE_METRICS=true
METRICS_INTERVAL=60000

# Logging
LOG_LEVEL=info
ENABLE_TRACING=true

Error Handling

// Handle connection errors
queueManager.on("error", (error) => {
  console.error("Queue manager error:", error);
});

// Handle message processing errors
queueManager.on("message-error", (error, message) => {
  console.error("Message processing failed:", error);
  console.log("Failed message:", message);
});

// Handle reconnection events
queueManager.on("reconnect", () => {
  console.log("Successfully reconnected to Redis");
});

Best Practices

Message Structure

// Recommended message structure
interface QueueMessage {
  id: string;
  type: string;
  payload: any;
  metadata: {
    createdAt: string;
    source: string;
    version: string;
  };
}

// Example usage
await queueManager.publish("user-action", {
  id: crypto.randomUUID(),
  type: "USER_REGISTRATION",
  payload: {
    userId: "123",
    email: "[email protected]",
  },
  metadata: {
    createdAt: new Date().toISOString(),
    source: "web-app",
    version: "1.0.0",
  },
});

Consumer Patterns

// Idempotent consumer
async function processIdempotent(message) {
  const processedKey = `processed:${message.id}`;

  // Check if already processed
  const alreadyProcessed = await redis.get(processedKey);
  if (alreadyProcessed) {
    console.log("Message already processed, skipping...");
    return;
  }

  try {
    // Process message
    await processMessage(message);

    // Mark as processed
    await redis.setex(processedKey, 3600, "true"); // 1 hour TTL
  } catch (error) {
    console.error("Processing failed:", error);
    throw error; // Re-throw to trigger retry
  }
}

queueManager.consumer("idempotent-queue", processIdempotent);

Performance Tuning

Batch Processing

// Process messages in batches
const batchProcessor = new BatchProcessor(queueManager, {
  queueName: "bulk-processing",
  batchSize: 100,
  batchTimeout: 5000, // 5 seconds
  processor: async (messages) => {
    console.log(`Processing batch of ${messages.length} messages`);
    // Bulk process messages
    await processBatch(messages);
  },
});

await batchProcessor.start();

Connection Pooling

// Optimized for high throughput
const queueManager = new QueueManager({
  host: "localhost",
  port: 6379,
  maxConnections: 20,
  connectionPool: {
    min: 5,
    max: 20,
    acquireTimeoutMillis: 30000,
    idleTimeoutMillis: 30000,
  },
});

Security

Message Encryption

import { EncryptedQueueManager } from "@e-techsolutions/sdk-queues";

const encryptedManager = new EncryptedQueueManager({
  host: "localhost",
  port: 6379,
  encryption: {
    algorithm: "aes-256-gcm",
    key: process.env.ENCRYPTION_KEY,
    rotationInterval: 86400000, // 24 hours
  },
});

// Messages are automatically encrypted/decrypted
await encryptedManager.publish("sensitive-data", {
  creditCard: "****-****-****-1234",
  ssn: "***-**-****",
});

Testing

import { MockQueueManager } from "@e-techsolutions/sdk-queues/testing";

// Use mock for unit tests
const mockQueue = new MockQueueManager();

// Simulate message publishing
await mockQueue.publish("test-queue", { test: "data" });

// Verify messages
const messages = mockQueue.getMessages("test-queue");
expect(messages).toHaveLength(1);
expect(messages[0].payload).toEqual({ test: "data" });

API Reference

QueueManager

Methods

  • publish(queue, message, options?) - Publish a message to a queue
  • consumer(queue, processor, options?) - Start consuming messages from a queue
  • getQueueStatus() - Get status of all queues
  • getPendingMessages(queue) - Get pending messages for a queue
  • getMetrics() - Get performance metrics
  • close() - Close all connections

Events

  • error - Connection or processing errors
  • message-error - Message processing failures
  • reconnect - Successful reconnection to Redis
  • message-processed - Successful message processing
  • queue-empty - Queue becomes empty

Version History

  • v1.0.4 - Advanced obfuscation, performance improvements
  • v1.0.3 - Enhanced telemetry and monitoring
  • v1.0.2 - MySQL storage integration
  • v1.0.1 - Workflow management features
  • v1.0.0 - Initial release

Support

For support and questions:

License

MIT - See LICENSE file for details.


Enterprise Backend as a Service (E-BaaS) - Powering scalable applications with robust queue management.