queen-mq
v0.12.1
Published
High-performance C++ message queue backed by PostgreSQL
Maintainers
Readme
Queen MQ - JavaScript Client
Modern, high-performance message queue client for Node.js
Quick Start • Complete Guide • Examples • API Reference
What is Queen MQ?
Queen MQ is a PostgreSQL-backed message queue system with a powerful feature set:
- FIFO Partitions - Unlimited ordered partitions within queues
- Consumer Groups - Kafka-style consumer groups for scalability
- Flexible Semantics - Exactly-once, at-least-once, and at-most-once delivery
- Transactions - Atomic operations across push and ack
- High Performance - 200K+ messages/sec with proper batching
- Subscription Modes - Process from beginning, new messages only, or from timestamp
- Dead Letter Queue - Automatic failure handling and monitoring
- Message Tracing - Debug distributed workflows with trace timelines
- Client-Side Buffering - 10x-100x throughput boost for high-volume pushes
- Real-time Streaming - Windowed aggregation and processing
This client provides a fluent, promise-based API for Node.js applications.
Installation
npm install queen-mqRequirements: Node.js 22+
Quick Start
import { Queen } from 'queen-mq'
// Connect to Queen server
const queen = new Queen('http://localhost:6632')
// Create a queue
await queen.queue('tasks').create()
// Push messages
await queen.queue('tasks').push([
{ data: { task: 'send-email', to: '[email protected]' } }
])
// Consume messages
await queen.queue('tasks').consume(async (message) => {
console.log('Processing:', message.data)
// Auto-ack on success, auto-retry on error
})Core Concepts
Queues
Logical containers for messages with configurable settings:
- Lease time - How long a consumer has to process a message
- Retry limit - Number of retry attempts before DLQ
- Priority - Queue priority for multi-queue consumers
- Encryption - Message payload encryption at rest
- Retention - Automatic cleanup policies
await queen.queue('orders')
.config({
leaseTime: 300, // 5 minutes
retryLimit: 3,
priority: 5,
encryptionEnabled: false
})
.create()Partitions
Ordered lanes within a queue. Messages in the same partition are processed sequentially:
// All messages for user-123 are processed in order
await queen.queue('user-events')
.partition('user-123')
.push([
{ data: { event: 'login' } },
{ data: { event: 'view-page' } },
{ data: { event: 'logout' } }
])Use cases:
- Per-user ordering
- Per-tenant isolation
- Sharding for parallelism
Consumer Groups
Multiple consumers sharing work, with independent progress tracking:
// Worker 1 & 2 share the load
await queen.queue('emails')
.group('processors')
.consume(async (message) => {
await sendEmail(message.data)
})
// Separate group processes same messages independently
await queen.queue('emails')
.group('analytics')
.consume(async (message) => {
await logMetrics(message.data)
})Subscription Modes
Control whether consumer groups process historical messages:
// Default: Process ALL messages (including backlog)
await queen.queue('events')
.group('batch-analytics')
.consume(async (message) => { /* all messages */ })
// Skip history, only new messages
await queen.queue('events')
.group('realtime-monitor')
.subscriptionMode('new')
.consume(async (message) => { /* new only */ })
// Start from specific timestamp
await queen.queue('events')
.group('replay')
.subscriptionFrom('2025-10-28T10:00:00.000Z')
.consume(async (message) => { /* from timestamp */ })Connection Options
Single Server
const queen = new Queen('http://localhost:6632')Multiple Servers (High Availability)
const queen = new Queen([
'http://server1:6632',
'http://server2:6632'
])Full Configuration
const queen = new Queen({
urls: ['http://server1:6632', 'http://server2:6632'],
timeoutMillis: 30000,
retryAttempts: 3,
loadBalancingStrategy: 'round-robin', // or 'session'
enableFailover: true
})Basic Usage Patterns
Push Messages
// Simple push
await queen.queue('tasks').push([
{ data: { job: 'resize-image', imageId: 123 } }
])
// With partition
await queen.queue('tasks')
.partition('tenant-456')
.push([{ data: { action: 'process' } }])
// With custom transaction ID (for exactly-once)
await queen.queue('tasks').push([
{
transactionId: 'unique-id-123',
data: { value: 42 }
}
])Consume Messages (Long-Running Workers)
// Runs forever, processes messages as they arrive
await queen.queue('tasks')
.concurrency(10) // 10 parallel workers
.batch(20) // Fetch 20 at a time
.consume(async (message) => {
await processTask(message.data)
// Auto-ack on success, auto-retry on error
})
// Process with limit and stop
await queen.queue('tasks')
.limit(100)
.consume(async (message) => {
await processTask(message.data)
})Pop Messages (On-Demand Processing)
// Grab messages manually
const messages = await queen.queue('tasks')
.batch(10)
.wait(true) // Long polling
.pop()
// Manual acknowledgment
for (const message of messages) {
try {
await processMessage(message.data)
await queen.ack(message, true) // Success
} catch (error) {
await queen.ack(message, false) // Retry
}
}Transactions (Atomic Operations)
// Pop from queue A
const messages = await queen.queue('input').pop()
// Atomically: ack input AND push output
await queen.transaction()
.ack(messages[0])
.queue('output')
.push([{ data: processedResult }])
.commit()
// If commit fails, nothing happens - message stays in input queueClient-Side Buffering (High Throughput)
// Buffer messages locally, batch to server
for (let i = 0; i < 10000; i++) {
await queen.queue('events')
.buffer({ messageCount: 500, timeMillis: 1000 })
.push([{ data: { id: i } }])
}
// Flush remaining buffered messages
await queen.flushAllBuffers()
// Result: 10x-100x faster than individual pushesDead Letter Queue
// Enable DLQ on queue
await queen.queue('risky')
.config({ retryLimit: 3, dlqAfterMaxRetries: true })
.create()
// Query failed messages
const dlq = await queen.queue('risky')
.dlq()
.limit(10)
.get()
console.log(`Found ${dlq.total} failed messages`)
for (const msg of dlq.messages) {
console.log('Error:', msg.errorMessage)
}Message Tracing
await queen.queue('orders').consume(async (msg) => {
const orderId = msg.data.orderId
// Record trace with name for cross-service correlation
await msg.trace({
traceName: `order-${orderId}`,
eventType: 'info',
data: { text: 'Order processing started' }
})
await processOrder(msg.data)
await msg.trace({
traceName: `order-${orderId}`,
eventType: 'processing',
data: {
text: 'Order completed',
total: msg.data.total
}
})
})
// View traces in webapp: Traces → Search "order-12345"Examples
Complete Pipeline with Consumer Groups
import { Queen } from 'queen-mq'
const queen = new Queen('http://localhost:6632')
// Stage 1: Ingest with buffering
async function ingestEvents() {
for (let i = 0; i < 10000; i++) {
await queen.queue('raw-events')
.partition(`user-${i % 100}`)
.buffer({ messageCount: 500, timeMillis: 1000 })
.push([{ data: { userId: i % 100, event: 'page_view' } }])
}
await queen.flushAllBuffers()
}
// Stage 2: Process with transactions
async function processEvents() {
await queen.queue('raw-events')
.group('processors')
.concurrency(5)
.batch(10)
.autoAck(false)
.consume(async (messages) => {
const results = messages.map(m => process(m.data))
// Atomic: ack all inputs, push all outputs
const txn = queen.transaction()
for (const msg of messages) txn.ack(msg)
txn.queue('processed-events').push(results.map(r => ({ data: r })))
await txn.commit()
})
}
// Stage 3: Separate analytics consumer (fan-out)
async function analytics() {
await queen.queue('raw-events')
.group('analytics')
.subscriptionMode('new') // Skip backlog
.consume(async (message) => {
await logMetrics(message.data)
})
}
await ingestEvents()
await Promise.all([processEvents(), analytics()])Long-Running Tasks with Lease Renewal
await queen.queue('video-processing')
.renewLease(true, 60000) // Renew every 60 seconds
.consume(async (message) => {
// Can take hours - lease keeps renewing automatically
await processVideo(message.data)
})Error Handling with Callbacks
await queen.queue('tasks')
.autoAck(false)
.consume(async (message) => {
return await riskyOperation(message.data)
})
.onSuccess(async (message, result) => {
console.log('Success:', result)
await queen.ack(message, true)
})
.onError(async (message, error) => {
console.error('Failed:', error.message)
// Custom retry logic
if (error.message.includes('temporary')) {
await queen.ack(message, false) // Retry
} else {
await queen.ack(message, 'failed', { error: error.message })
}
})API Reference
Queue Operations
// Create
await queen.queue('my-queue').create()
await queen.queue('my-queue').config({ priority: 5 }).create()
// Delete
await queen.queue('my-queue').delete()
// Get info
const info = await queen.getQueueInfo('my-queue')Push
await queen.queue('q').push([{ data: { value: 1 } }])
await queen.queue('q').partition('p1').push([{ data: { value: 1 } }])
await queen.queue('q').buffer({ messageCount: 100, timeMillis: 1000 }).push([...])Pop
const msgs = await queen.queue('q').pop()
const msgs = await queen.queue('q').batch(10).pop()
const msgs = await queen.queue('q').batch(10).wait(true).pop()Consume
await queen.queue('q').consume(async (msg) => { /* process */ })
await queen.queue('q').limit(10).consume(async (msg) => { /* process */ })
await queen.queue('q').concurrency(5).consume(async (msg) => { /* 5 workers */ })
await queen.queue('q').group('my-group').consume(async (msg) => { /* consumer group */ })Acknowledgment
await queen.ack(message, true) // Success
await queen.ack(message, false) // Retry
await queen.ack(message, false, { error: 'reason' })
await queen.ack([msg1, msg2], true) // Batch ackTransactions
await queen.transaction()
.ack(message)
.queue('output')
.push([{ data: { result: 'processed' } }])
.commit()Lease Renewal
await queen.renew(message)
await queen.renew([msg1, msg2, msg3])
await queen.queue('q').renewLease(true, 60000).consume(async (msg) => { /* auto-renew */ })Buffering
await queen.flushAllBuffers()
await queen.queue('q').flushBuffer()
const stats = queen.getBufferStats()Dead Letter Queue
const dlq = await queen.queue('q').dlq().limit(10).get()
const dlq = await queen.queue('q').dlq('consumer-group').limit(10).get()
const dlq = await queen.queue('q').dlq().from('2025-01-01').to('2025-01-31').get()Shutdown
await queen.close() // Flush buffers and close connectionsConfiguration Defaults
Client Defaults
{
timeoutMillis: 30000,
retryAttempts: 3,
retryDelayMillis: 1000,
loadBalancingStrategy: 'round-robin',
enableFailover: true
}Queue Defaults
{
leaseTime: 300, // 5 minutes
retryLimit: 3,
priority: 0,
delayedProcessing: 0,
windowBuffer: 0,
maxSize: 0, // Unlimited
retentionSeconds: 0, // Keep forever
encryptionEnabled: false
}Consume Defaults
{
concurrency: 1,
batch: 1,
autoAck: true,
wait: true, // Long polling
timeoutMillis: 30000,
limit: null, // Run forever
renewLease: false
}Logging
Enable detailed logging for debugging:
export QUEEN_CLIENT_LOG=true
node your-app.jsExample output:
[2025-10-28T10:30:45.123Z] [INFO] [Queen.constructor] {"status":"initialized","urls":1}
[2025-10-28T10:30:45.234Z] [INFO] [QueueBuilder.push] {"queue":"tasks","partition":"Default","count":5}Best Practices
- ✅ Use
consume()for workers - Simpler API, handles retries automatically - ✅ Use
pop()for control - When you need precise control over acking - ✅ Buffer for speed - Always use buffering when pushing many messages
- ✅ Partitions for order - Use partitions when message order matters
- ✅ Consumer groups for scale - Run multiple workers in the same group
- ✅ Transactions for consistency - Use transactions for atomic operations
- ✅ Enable DLQ - Always enable DLQ in production
- ✅ Renew long leases - Use auto-renewal for long-running tasks
- ✅ Graceful shutdown - Always call
queen.close()before exiting - ✅ Monitor DLQ - Regularly check for failed messages
TypeScript Support
Full TypeScript definitions included:
import { Queen, Message, QueueConfig } from 'queen-mq'
const queen: Queen = new Queen('http://localhost:6632')
interface OrderData {
orderId: number
amount: number
}
const messages: Message<OrderData>[] = await queen.queue('orders').pop()Documentation
- Complete V2 Guide - Full tutorial with all features (94 test examples)
- HTTP API Reference - Raw HTTP endpoints
- Server Guide - Server setup and configuration
- Architecture Guide - Deep dive into internals
Support
- GitHub: smartpricing/queen
- Issues: GitHub Issues
- LinkedIn: Smartness
License
Apache 2.0 - See LICENSE.md
