npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

townkrier-queue

v1.0.0-alpha.6

Published

Queue adapter system for TownKrier notifications with retry capabilities

Readme

townkrier-queue

Queue adapter system for TownKrier notifications with retry capabilities, similar to Hangfire.

Features

  • 🔄 Retry Logic: Configurable retry attempts with exponential backoff
  • 🎯 Multiple Adapters: In-memory and BullMQ (Redis-backed) adapters
  • 📊 Job Tracking: Track job status, attempts, and execution history
  • Delayed Jobs: Schedule notifications for future delivery
  • 🔍 Job Management: List, retry, and delete jobs
  • 💾 Persistent Storage: BullMQ adapter with Redis for production use
  • 🔧 Worker-based Processing: Efficient background job processing
  • 🎨 Priority Queues: Support for job priorities (CRITICAL, HIGH, NORMAL, LOW)

Installation

npm install townkrier-queue

# For BullMQ adapter (Redis-backed)
npm install bullmq ioredis

Queue Adapters

In-Memory Adapter (Development)

Perfect for development and testing:

import { QueueManager, InMemoryQueueAdapter } from 'townkrier-queue';
import { NotificationManager } from 'townkrier-core';

const queueAdapter = new InMemoryQueueAdapter({
  maxRetries: 3,
  retryDelay: 1000, // ms
  timeout: 30000, // ms
  pollInterval: 1000, // ms
});

const queueManager = new QueueManager(queueAdapter, notificationManager);

BullMQ Adapter (Production)

Redis-backed persistent queue for production:

import { QueueManager, BullMQQueueAdapter } from 'townkrier-queue';

const queueAdapter = new BullMQQueueAdapter({
  redis: {
    host: 'localhost',
    port: 6379,
    password: process.env.REDIS_PASSWORD, // Optional
    db: 0,
    maxRetriesPerRequest: null, // Required for BullMQ
  },
  queueName: 'townkrier-notifications',
  maxRetries: 3,
  retryDelay: 1000,
  timeout: 30000,
  pollInterval: 1000,
});

const queueManager = new QueueManager(queueAdapter, notificationManager);

Basic Usage

Sending Notifications

import { JobPriority } from 'townkrier-queue';

// Send immediately (synchronous)
const result = await queueManager.sendNow(notification, recipient);

// Queue for background processing
const job = await queueManager.enqueue(notification, recipient, {
  priority: JobPriority.HIGH,
  maxRetries: 5,
  metadata: { userId: '123' },
});

// Schedule for future delivery
const scheduledJob = await queueManager.enqueue(notification, recipient, {
  scheduledFor: new Date(Date.now() + 3600000), // 1 hour from now
  priority: JobPriority.CRITICAL,
});

Starting the Queue Processor

// Start processing jobs
queueManager.startProcessing({
  pollInterval: 2000, // Check for new jobs every 2 seconds
});

// Stop processing (graceful shutdown)
await queueManager.stopProcessing();

Managing Jobs

// Get job by ID
const job = await queueManager.getJob(jobId);

// List jobs with filters
const jobs = await queueManager.getJobs({
  status: JobStatus.FAILED,
  limit: 50,
  offset: 0,
});

// Retry a failed job
await queueManager.retryJob(jobId);

// Delete a job
await queueManager.deleteJob(jobId);

// Get queue statistics
const stats = await queueManager.getStats();
console.log(stats);
// {
//   pending: 5,
//   processing: 2,
//   completed: 100,
//   failed: 3,
//   retrying: 1,
//   scheduled: 10
// }

Job Priorities

import { JobPriority } from 'townkrier-queue';

JobPriority.LOW; // 1  - Background tasks
JobPriority.NORMAL; // 5  - Standard notifications
JobPriority.HIGH; // 10 - Important notifications
JobPriority.CRITICAL; // 20 - Urgent notifications (password resets, etc.)

// Higher priority jobs are processed first
await queueManager.enqueue(notification, recipient, {
  priority: JobPriority.CRITICAL,
});

Retry Logic

Jobs automatically retry on failure with exponential backoff:

  • 1st retry: After retryDelay ms (e.g., 1000ms)
  • 2nd retry: After retryDelay * 2 ms (e.g., 2000ms)
  • 3rd retry: After retryDelay * 4 ms (e.g., 4000ms)
  • Nth retry: After retryDelay * 2^(N-1) ms

Jobs are marked as failed after maxRetries attempts.

BullMQ Features

Worker-based Processing

BullMQ uses a worker-based architecture for efficient job processing:

// Worker automatically starts when processing begins
queueManager.startProcessing();

// Jobs are processed by BullMQ workers
// Multiple workers can process jobs concurrently

Redis Persistence

All jobs are persisted in Redis:

  • Survives application restarts
  • Distributed queue support
  • Horizontal scaling capability

Job Metadata

Store custom metadata with jobs:

await queueManager.enqueue(notification, recipient, {
  metadata: {
    userId: '123',
    source: 'web-app',
    campaign: 'welcome-series',
  },
});

Configuration

InMemoryQueueAdapter Config

interface QueueAdapterConfig {
  maxRetries?: number; // Default: 3
  retryDelay?: number; // Default: 1000ms
  timeout?: number; // Default: 30000ms
  pollInterval?: number; // Default: 1000ms
}

BullMQQueueAdapter Config

interface BullMQQueueAdapterConfig extends QueueAdapterConfig {
  redis?: {
    host?: string; // Default: 'localhost'
    port?: number; // Default: 6379
    password?: string; // Optional
    db?: number; // Default: 0
    maxRetriesPerRequest?: null; // Required: null
  };
  queueName?: string; // Default: 'townkrier-notifications'
}

Redis Setup

Using Docker

# Run Redis container
docker run -d -p 6379:6379 --name redis redis:alpine

# Or with persistence
docker run -d -p 6379:6379 --name redis \
  -v redis-data:/data \
  redis:alpine redis-server --appendonly yes

# Check Redis is running
docker exec redis redis-cli ping
# Should return: PONG

Using Docker Compose

version: '3.8'
services:
  redis:
    image: redis:alpine
    ports:
      - '6379:6379'
    volumes:
      - redis-data:/data
    command: redis-server --appendonly yes

volumes:
  redis-data:

Environment Variables

# .env file
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-secure-password

Job Statuses

  • PENDING: Waiting to be processed
  • PROCESSING: Currently being processed
  • COMPLETED: Successfully completed
  • FAILED: Failed after max retries
  • RETRYING: Failed but will retry
  • SCHEDULED: Scheduled for future processing

Best Practices

Production Deployment

  1. Use BullMQ adapter for persistent queue storage
  2. Configure Redis with persistence (AOF or RDB)
  3. Set appropriate retry limits based on notification type
  4. Monitor queue stats regularly
  5. Handle graceful shutdown properly
// Graceful shutdown
process.on('SIGTERM', async () => {
  console.log('Shutting down...');
  await queueManager.stopProcessing();

  // Close BullMQ connections
  const adapter = queueManager.getAdapter() as BullMQQueueAdapter;
  await adapter.close();

  process.exit(0);
});

Performance Tuning

  • Adjust pollInterval based on your load
  • Use job priorities to handle urgent notifications first
  • Configure Redis memory limits appropriately
  • Monitor Redis memory usage
  • Consider Redis clustering for high availability

Monitoring

Use the townkrier-dashboard package for monitoring:

import { DashboardServer } from 'townkrier-dashboard';

const dashboard = new DashboardServer({
  queueManager,
  storageManager,
  port: 3000,
  path: '/dashboard',
});

dashboard.start();

Examples

See examples/bullmq-dashboard-example.ts for a complete working example.

API Reference

QueueManager

class QueueManager {
  // Send immediately (like Laravel's sendNow)
  sendNow(notification, recipient): Promise<Map<Channel, any>>;

  // Queue for background (like Laravel's send)
  enqueue(notification, recipient, config?): Promise<QueueJob>;

  // Job management
  getJob(jobId): Promise<QueueJob | null>;
  getJobs(filters?): Promise<QueueJob[]>;
  retryJob(jobId): Promise<void>;
  deleteJob(jobId): Promise<void>;

  // Statistics
  getStats(): Promise<QueueStats>;

  // Processing control
  startProcessing(options?): void;
  stopProcessing(): Promise<void>;

  // Get underlying adapter
  getAdapter(): IQueueAdapter;
}

Troubleshooting

Jobs not processing

  1. Ensure processing is started:
queueManager.startProcessing();
  1. For BullMQ, check Redis connection:
redis-cli ping
  1. Check queue stats:
const stats = await queueManager.getStats();
console.log(stats);

BullMQ connection errors

  • Verify Redis is running
  • Check connection configuration
  • Ensure maxRetriesPerRequest: null is set
  • Check firewall rules

Memory issues

  • Monitor Redis memory usage
  • Configure Redis eviction policy
  • Clean up old completed jobs
  • Adjust job retention settings

License

MIT