npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

queen-mq

v0.12.1

Published

High-performance C++ message queue backed by PostgreSQL

Readme

Queen MQ - JavaScript Client

Modern, high-performance message queue client for Node.js

npm License Node

Quick StartComplete GuideExamplesAPI Reference


What is Queen MQ?

Queen MQ is a PostgreSQL-backed message queue system with a powerful feature set:

  • FIFO Partitions - Unlimited ordered partitions within queues
  • Consumer Groups - Kafka-style consumer groups for scalability
  • Flexible Semantics - Exactly-once, at-least-once, and at-most-once delivery
  • Transactions - Atomic operations across push and ack
  • High Performance - 200K+ messages/sec with proper batching
  • Subscription Modes - Process from beginning, new messages only, or from timestamp
  • Dead Letter Queue - Automatic failure handling and monitoring
  • Message Tracing - Debug distributed workflows with trace timelines
  • Client-Side Buffering - 10x-100x throughput boost for high-volume pushes
  • Real-time Streaming - Windowed aggregation and processing

This client provides a fluent, promise-based API for Node.js applications.


Installation

npm install queen-mq

Requirements: Node.js 22+


Quick Start

import { Queen } from 'queen-mq'

// Connect to Queen server
const queen = new Queen('http://localhost:6632')

// Create a queue
await queen.queue('tasks').create()

// Push messages
await queen.queue('tasks').push([
  { data: { task: 'send-email', to: '[email protected]' } }
])

// Consume messages
await queen.queue('tasks').consume(async (message) => {
  console.log('Processing:', message.data)
  // Auto-ack on success, auto-retry on error
})

Core Concepts

Queues

Logical containers for messages with configurable settings:

  • Lease time - How long a consumer has to process a message
  • Retry limit - Number of retry attempts before DLQ
  • Priority - Queue priority for multi-queue consumers
  • Encryption - Message payload encryption at rest
  • Retention - Automatic cleanup policies
await queen.queue('orders')
  .config({
    leaseTime: 300,        // 5 minutes
    retryLimit: 3,
    priority: 5,
    encryptionEnabled: false
  })
  .create()

Partitions

Ordered lanes within a queue. Messages in the same partition are processed sequentially:

// All messages for user-123 are processed in order
await queen.queue('user-events')
  .partition('user-123')
  .push([
    { data: { event: 'login' } },
    { data: { event: 'view-page' } },
    { data: { event: 'logout' } }
  ])

Use cases:

  • Per-user ordering
  • Per-tenant isolation
  • Sharding for parallelism

Consumer Groups

Multiple consumers sharing work, with independent progress tracking:

// Worker 1 & 2 share the load
await queen.queue('emails')
  .group('processors')
  .consume(async (message) => {
    await sendEmail(message.data)
  })

// Separate group processes same messages independently
await queen.queue('emails')
  .group('analytics')
  .consume(async (message) => {
    await logMetrics(message.data)
  })

Subscription Modes

Control whether consumer groups process historical messages:

// Default: Process ALL messages (including backlog)
await queen.queue('events')
  .group('batch-analytics')
  .consume(async (message) => { /* all messages */ })

// Skip history, only new messages
await queen.queue('events')
  .group('realtime-monitor')
  .subscriptionMode('new')
  .consume(async (message) => { /* new only */ })

// Start from specific timestamp
await queen.queue('events')
  .group('replay')
  .subscriptionFrom('2025-10-28T10:00:00.000Z')
  .consume(async (message) => { /* from timestamp */ })

Connection Options

Single Server

const queen = new Queen('http://localhost:6632')

Multiple Servers (High Availability)

const queen = new Queen([
  'http://server1:6632',
  'http://server2:6632'
])

Full Configuration

const queen = new Queen({
  urls: ['http://server1:6632', 'http://server2:6632'],
  timeoutMillis: 30000,
  retryAttempts: 3,
  loadBalancingStrategy: 'round-robin',  // or 'session'
  enableFailover: true
})

Basic Usage Patterns

Push Messages

// Simple push
await queen.queue('tasks').push([
  { data: { job: 'resize-image', imageId: 123 } }
])

// With partition
await queen.queue('tasks')
  .partition('tenant-456')
  .push([{ data: { action: 'process' } }])

// With custom transaction ID (for exactly-once)
await queen.queue('tasks').push([
  {
    transactionId: 'unique-id-123',
    data: { value: 42 }
  }
])

Consume Messages (Long-Running Workers)

// Runs forever, processes messages as they arrive
await queen.queue('tasks')
  .concurrency(10)        // 10 parallel workers
  .batch(20)              // Fetch 20 at a time
  .consume(async (message) => {
    await processTask(message.data)
    // Auto-ack on success, auto-retry on error
  })

// Process with limit and stop
await queen.queue('tasks')
  .limit(100)
  .consume(async (message) => {
    await processTask(message.data)
  })

Pop Messages (On-Demand Processing)

// Grab messages manually
const messages = await queen.queue('tasks')
  .batch(10)
  .wait(true)  // Long polling
  .pop()

// Manual acknowledgment
for (const message of messages) {
  try {
    await processMessage(message.data)
    await queen.ack(message, true)  // Success
  } catch (error) {
    await queen.ack(message, false)  // Retry
  }
}

Transactions (Atomic Operations)

// Pop from queue A
const messages = await queen.queue('input').pop()

// Atomically: ack input AND push output
await queen.transaction()
  .ack(messages[0])
  .queue('output')
  .push([{ data: processedResult }])
  .commit()

// If commit fails, nothing happens - message stays in input queue

Client-Side Buffering (High Throughput)

// Buffer messages locally, batch to server
for (let i = 0; i < 10000; i++) {
  await queen.queue('events')
    .buffer({ messageCount: 500, timeMillis: 1000 })
    .push([{ data: { id: i } }])
}

// Flush remaining buffered messages
await queen.flushAllBuffers()

// Result: 10x-100x faster than individual pushes

Dead Letter Queue

// Enable DLQ on queue
await queen.queue('risky')
  .config({ retryLimit: 3, dlqAfterMaxRetries: true })
  .create()

// Query failed messages
const dlq = await queen.queue('risky')
  .dlq()
  .limit(10)
  .get()

console.log(`Found ${dlq.total} failed messages`)
for (const msg of dlq.messages) {
  console.log('Error:', msg.errorMessage)
}

Message Tracing

await queen.queue('orders').consume(async (msg) => {
  const orderId = msg.data.orderId
  
  // Record trace with name for cross-service correlation
  await msg.trace({
    traceName: `order-${orderId}`,
    eventType: 'info',
    data: { text: 'Order processing started' }
  })
  
  await processOrder(msg.data)
  
  await msg.trace({
    traceName: `order-${orderId}`,
    eventType: 'processing',
    data: { 
      text: 'Order completed',
      total: msg.data.total
    }
  })
})

// View traces in webapp: Traces → Search "order-12345"

Examples

Complete Pipeline with Consumer Groups

import { Queen } from 'queen-mq'

const queen = new Queen('http://localhost:6632')

// Stage 1: Ingest with buffering
async function ingestEvents() {
  for (let i = 0; i < 10000; i++) {
    await queen.queue('raw-events')
      .partition(`user-${i % 100}`)
      .buffer({ messageCount: 500, timeMillis: 1000 })
      .push([{ data: { userId: i % 100, event: 'page_view' } }])
  }
  await queen.flushAllBuffers()
}

// Stage 2: Process with transactions
async function processEvents() {
  await queen.queue('raw-events')
    .group('processors')
    .concurrency(5)
    .batch(10)
    .autoAck(false)
    .consume(async (messages) => {
      const results = messages.map(m => process(m.data))
      
      // Atomic: ack all inputs, push all outputs
      const txn = queen.transaction()
      for (const msg of messages) txn.ack(msg)
      txn.queue('processed-events').push(results.map(r => ({ data: r })))
      await txn.commit()
    })
}

// Stage 3: Separate analytics consumer (fan-out)
async function analytics() {
  await queen.queue('raw-events')
    .group('analytics')
    .subscriptionMode('new')  // Skip backlog
    .consume(async (message) => {
      await logMetrics(message.data)
    })
}

await ingestEvents()
await Promise.all([processEvents(), analytics()])

Long-Running Tasks with Lease Renewal

await queen.queue('video-processing')
  .renewLease(true, 60000)  // Renew every 60 seconds
  .consume(async (message) => {
    // Can take hours - lease keeps renewing automatically
    await processVideo(message.data)
  })

Error Handling with Callbacks

await queen.queue('tasks')
  .autoAck(false)
  .consume(async (message) => {
    return await riskyOperation(message.data)
  })
  .onSuccess(async (message, result) => {
    console.log('Success:', result)
    await queen.ack(message, true)
  })
  .onError(async (message, error) => {
    console.error('Failed:', error.message)
    
    // Custom retry logic
    if (error.message.includes('temporary')) {
      await queen.ack(message, false)  // Retry
    } else {
      await queen.ack(message, 'failed', { error: error.message })
    }
  })

API Reference

Queue Operations

// Create
await queen.queue('my-queue').create()
await queen.queue('my-queue').config({ priority: 5 }).create()

// Delete
await queen.queue('my-queue').delete()

// Get info
const info = await queen.getQueueInfo('my-queue')

Push

await queen.queue('q').push([{ data: { value: 1 } }])
await queen.queue('q').partition('p1').push([{ data: { value: 1 } }])
await queen.queue('q').buffer({ messageCount: 100, timeMillis: 1000 }).push([...])

Pop

const msgs = await queen.queue('q').pop()
const msgs = await queen.queue('q').batch(10).pop()
const msgs = await queen.queue('q').batch(10).wait(true).pop()

Consume

await queen.queue('q').consume(async (msg) => { /* process */ })
await queen.queue('q').limit(10).consume(async (msg) => { /* process */ })
await queen.queue('q').concurrency(5).consume(async (msg) => { /* 5 workers */ })
await queen.queue('q').group('my-group').consume(async (msg) => { /* consumer group */ })

Acknowledgment

await queen.ack(message, true)   // Success
await queen.ack(message, false)  // Retry
await queen.ack(message, false, { error: 'reason' })
await queen.ack([msg1, msg2], true)  // Batch ack

Transactions

await queen.transaction()
  .ack(message)
  .queue('output')
  .push([{ data: { result: 'processed' } }])
  .commit()

Lease Renewal

await queen.renew(message)
await queen.renew([msg1, msg2, msg3])
await queen.queue('q').renewLease(true, 60000).consume(async (msg) => { /* auto-renew */ })

Buffering

await queen.flushAllBuffers()
await queen.queue('q').flushBuffer()
const stats = queen.getBufferStats()

Dead Letter Queue

const dlq = await queen.queue('q').dlq().limit(10).get()
const dlq = await queen.queue('q').dlq('consumer-group').limit(10).get()
const dlq = await queen.queue('q').dlq().from('2025-01-01').to('2025-01-31').get()

Shutdown

await queen.close()  // Flush buffers and close connections

Configuration Defaults

Client Defaults

{
  timeoutMillis: 30000,
  retryAttempts: 3,
  retryDelayMillis: 1000,
  loadBalancingStrategy: 'round-robin',
  enableFailover: true
}

Queue Defaults

{
  leaseTime: 300,         // 5 minutes
  retryLimit: 3,
  priority: 0,
  delayedProcessing: 0,
  windowBuffer: 0,
  maxSize: 0,            // Unlimited
  retentionSeconds: 0,   // Keep forever
  encryptionEnabled: false
}

Consume Defaults

{
  concurrency: 1,
  batch: 1,
  autoAck: true,
  wait: true,            // Long polling
  timeoutMillis: 30000,
  limit: null,           // Run forever
  renewLease: false
}

Logging

Enable detailed logging for debugging:

export QUEEN_CLIENT_LOG=true
node your-app.js

Example output:

[2025-10-28T10:30:45.123Z] [INFO] [Queen.constructor] {"status":"initialized","urls":1}
[2025-10-28T10:30:45.234Z] [INFO] [QueueBuilder.push] {"queue":"tasks","partition":"Default","count":5}

Best Practices

  1. Use consume() for workers - Simpler API, handles retries automatically
  2. Use pop() for control - When you need precise control over acking
  3. Buffer for speed - Always use buffering when pushing many messages
  4. Partitions for order - Use partitions when message order matters
  5. Consumer groups for scale - Run multiple workers in the same group
  6. Transactions for consistency - Use transactions for atomic operations
  7. Enable DLQ - Always enable DLQ in production
  8. Renew long leases - Use auto-renewal for long-running tasks
  9. Graceful shutdown - Always call queen.close() before exiting
  10. Monitor DLQ - Regularly check for failed messages

TypeScript Support

Full TypeScript definitions included:

import { Queen, Message, QueueConfig } from 'queen-mq'

const queen: Queen = new Queen('http://localhost:6632')

interface OrderData {
  orderId: number
  amount: number
}

const messages: Message<OrderData>[] = await queen.queue('orders').pop()

Documentation


Support


License

Apache 2.0 - See LICENSE.md