@dotdo/pg-queue
v0.1.1
Published
pgmq-style message queue on Durable Objects with single-alarm visibility timeout and Cap'n Web RPC
Readme
@dotdo/pg-queue
A high-performance, pgmq-style message queue built on Cloudflare Durable Objects with SQLite storage. Designed for cost efficiency and low latency with Cap'n Web RPC integration.
Key Features
- Single Alarm for Multiple Visibility Timeouts - O(1) instead of O(n) alarm operations per dequeue, reducing costs by ~95%
- Cap'n Web RPC - Promise pipelining allows multiple operations in a single round-trip
- SKIP LOCKED Pattern - Atomic message claiming prevents concurrent consumers from receiving the same message
- Batch Operations -
enqueueBatch,ackBatch,nackBatchfor high-throughput scenarios - Dead Letter Queue (DLQ) - Automatic DLQ routing after configurable retry attempts
- Visibility Timeout Management - Automatic message re-delivery if not acknowledged
- Archive Support - Optional message archiving for audit trails
Installation
npm install @dotdo/pg-queue
# or
pnpm add @dotdo/pg-queue
# or
yarn add @dotdo/pg-queueQuick Start
1. Configure Wrangler
Add the Durable Object binding to your wrangler.toml:
name = "my-worker"
main = "src/index.ts"
compatibility_date = "2024-12-30"
compatibility_flags = ["nodejs_compat"]
[[durable_objects.bindings]]
name = "QUEUE"
class_name = "QueueDO"
[[migrations]]
tag = "v1"
new_sqlite_classes = ["QueueDO"]2. Export the Durable Object
import { QueueDO } from '@dotdo/pg-queue'
export { QueueDO }
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const id = env.QUEUE.idFromName('my-queue')
const stub = env.QUEUE.get(id)
return stub.fetch(request)
}
}
interface Env {
QUEUE: DurableObjectNamespace
}3. Use the Queue
// Enqueue a message
const response = await stub.fetch(new Request('https://queue/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
payload: { task: 'process-image', url: 'https://example.com/image.jpg' },
delaySeconds: 0,
maxRetries: 3
})
}))
// Dequeue messages
const messages = await stub.fetch(new Request('https://queue/dequeue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ vtSeconds: 30, qty: 10 })
}))API Reference
Types
interface Message<T = unknown> {
msg_id: string
payload: T
status: 'pending' | 'processing' | 'completed' | 'dead'
created_at: number
vt: number // Visibility timeout (Unix timestamp in ms)
read_count: number
max_retries: number
worker_id: string | null
archived_at: number | null
}
interface SendOptions {
delaySeconds?: number // Delay before message becomes visible (default: 0)
maxRetries?: number // Max retries before moving to DLQ (default: 3)
}
interface ReadOptions {
qty?: number // Max messages to read (default: 1)
workerId?: string // Worker ID for tracking (auto-generated if not provided)
}
interface QueueMetrics {
total: number
pending: number
processing: number
completed: number
dead: number
archived: number
oldest_pending_age_seconds: number | null
next_alarm_at: number | null
alarm_invocations: number
}QueueRpc Interface
The QueueRpc interface defines all available queue operations:
interface QueueRpc {
// Single-Message Operations
enqueue<T>(payload: T, options?: SendOptions): Promise<SendResult>
dequeue<T>(vtSeconds: number, options?: ReadOptions): Promise<Message<T>[]>
ack(msgId: string): Promise<DeleteResult>
nack(msgId: string): Promise<NackResult>
archive(msgId: string): Promise<ArchiveResult>
extend(msgId: string, additionalSeconds: number): Promise<boolean>
// Batch Operations
enqueueBatch<T>(items: BatchEnqueueItem<T>[]): Promise<BatchEnqueueResult>
ackBatch(msgIds: string[]): Promise<BatchAckResult>
nackBatch(msgIds: string[]): Promise<BatchNackResult>
// Monitoring & Management
metrics(): Promise<QueueMetrics>
listDLQ<T>(limit?: number): Promise<Message<T>[]>
retryDLQ(msgId: string): Promise<boolean>
purge(): Promise<number>
}Usage Examples
Basic Producer/Consumer Pattern
import type { QueueRpc, Message } from '@dotdo/pg-queue'
interface MyTask {
task: string
url: string
}
// Producer: Enqueue jobs
async function enqueueJob(stub: DurableObjectStub, task: MyTask) {
const response = await stub.fetch(new Request('https://queue/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ payload: task })
}))
const result = await response.json()
return result.data.msg_id
}
// Consumer: Process jobs
async function processJobs(stub: DurableObjectStub) {
// Dequeue up to 10 messages with 30-second visibility timeout
const response = await stub.fetch(new Request('https://queue/dequeue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ vtSeconds: 30, qty: 10 })
}))
const { data: messages } = await response.json() as { data: Message<MyTask>[] }
for (const message of messages) {
try {
await processTask(message.payload)
// Acknowledge successful processing
await stub.fetch(new Request(`https://queue/ack/${message.msg_id}`, {
method: 'DELETE'
}))
} catch (error) {
// Negative acknowledge - returns to queue or moves to DLQ
const nackResponse = await stub.fetch(new Request(`https://queue/nack/${message.msg_id}`, {
method: 'POST'
}))
const { data } = await nackResponse.json() as { data: { moved_to_dlq: boolean } }
if (data.moved_to_dlq) {
console.error(`Message ${message.msg_id} moved to DLQ after exhausting retries`)
}
}
}
}Delayed Jobs (Scheduling)
// Schedule a job to run in 5 minutes
await stub.fetch(new Request('https://queue/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
payload: { task: 'send-reminder', userId: '123' },
delaySeconds: 300 // 5 minutes
})
}))
// Schedule a job to run in 1 hour with more retries
await stub.fetch(new Request('https://queue/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
payload: { task: 'daily-report' },
delaySeconds: 3600,
maxRetries: 5
})
}))Batch Operations (High Throughput)
// Enqueue multiple messages efficiently
const batchResponse = await stub.fetch(new Request('https://queue/batch/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
items: [
{ payload: { task: 'job1' } },
{ payload: { task: 'job2' }, options: { delaySeconds: 60 } },
{ payload: { task: 'job3' }, options: { maxRetries: 5 } }
]
})
}))
const { data: batchResult } = await batchResponse.json()
console.log(`Enqueued ${batchResult.successCount} messages`)
// Acknowledge multiple messages at once
const messages = await dequeueMessages(stub, 100)
const processedIds = await processAllMessages(messages)
await stub.fetch(new Request('https://queue/batch/ack', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ msgIds: processedIds })
}))Extending Visibility Timeout (Heartbeat Pattern)
For long-running tasks, extend the visibility timeout to prevent re-delivery:
async function processLongRunningTask(stub: DurableObjectStub, message: Message<MyTask>) {
// Set up heartbeat to extend visibility every 20 seconds
const heartbeatInterval = setInterval(async () => {
const response = await stub.fetch(new Request(`https://queue/extend/${message.msg_id}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ additionalSeconds: 30 })
}))
const { data } = await response.json() as { data: { extended: boolean } }
if (!data.extended) {
// Message was already deleted or visibility expired
clearInterval(heartbeatInterval)
}
}, 20000)
try {
await longRunningTask(message.payload) // May take several minutes
await stub.fetch(new Request(`https://queue/ack/${message.msg_id}`, {
method: 'DELETE'
}))
} finally {
clearInterval(heartbeatInterval)
}
}Dead Letter Queue Management
// List messages in the DLQ
const dlqResponse = await stub.fetch(new Request('https://queue/dlq?limit=50'))
const { data: dlqMessages } = await dlqResponse.json() as { data: Message<MyTask>[] }
for (const msg of dlqMessages) {
console.log(`Failed message: ${msg.msg_id}`)
console.log(` Payload: ${JSON.stringify(msg.payload)}`)
console.log(` Attempts: ${msg.read_count}`)
}
// Retry a specific DLQ message after fixing the issue
await stub.fetch(new Request(`https://queue/dlq/${msgId}/retry`, {
method: 'POST'
}))
// Or delete it permanently
await stub.fetch(new Request(`https://queue/ack/${msgId}`, {
method: 'DELETE'
}))Monitoring Queue Health
async function checkQueueHealth(stub: DurableObjectStub) {
const response = await stub.fetch(new Request('https://queue/metrics'))
const { data: metrics } = await response.json() as { data: QueueMetrics }
console.log(`Queue Status:`)
console.log(` Pending: ${metrics.pending}`)
console.log(` Processing: ${metrics.processing}`)
console.log(` Dead (DLQ): ${metrics.dead}`)
console.log(` Archived: ${metrics.archived}`)
console.log(` Oldest pending age: ${metrics.oldest_pending_age_seconds}s`)
console.log(` Alarm invocations: ${metrics.alarm_invocations}`)
// Alert if DLQ is growing
if (metrics.dead > 100) {
console.warn('DLQ has over 100 messages - investigate failures!')
}
// Alert if queue depth is high
if (metrics.pending > 10000) {
console.warn('High queue depth - consider scaling consumers!')
}
}Cap'n Web RPC (Promise Pipelining)
For maximum efficiency, use Cap'n Web RPC with promise pipelining:
import { createRpcClient } from 'capnweb/client'
import type { QueueRpc } from '@dotdo/pg-queue'
// WebSocket connection (hibernated for cost savings)
const ws = new WebSocket('wss://your-worker.example.com/rpc/my-queue')
const queue = createRpcClient<QueueRpc>(ws)
// Promise pipelining - all operations in single round-trip
const [result1, result2, stats] = await Promise.all([
queue.enqueue({ task: 'job1' }),
queue.enqueue({ task: 'job2' }),
queue.metrics()
])
// Batch operations for even higher throughput
const batchResult = await queue.enqueueBatch([
{ payload: { task: 'job1' } },
{ payload: { task: 'job2' }, options: { delaySeconds: 60 } },
{ payload: { task: 'job3' }, options: { maxRetries: 5 } }
])
// Process messages
const messages = await queue.dequeue<MyTask>(30, { qty: 100 })
await processAllTasks(messages)
await queue.ackBatch(messages.map(m => m.msg_id))REST API Reference
The queue exposes a REST API for operations:
Single-Message Operations
| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | /enqueue or /send | Enqueue a message |
| POST | /dequeue or /read | Dequeue messages with visibility timeout |
| DELETE | /ack/:msgId or /delete/:msgId | Acknowledge (delete) a message |
| POST | /nack/:msgId | Negative acknowledge (retry or DLQ) |
| POST | /archive/:msgId | Archive a message for audit trail |
| POST | /extend/:msgId | Extend visibility timeout |
Batch Operations
| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | /batch/enqueue | Enqueue multiple messages |
| POST | /batch/ack | Acknowledge multiple messages |
| POST | /batch/nack | Nack multiple messages |
Monitoring & Management
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | /metrics | Get queue metrics |
| GET | /dlq | List dead letter queue |
| POST | /dlq/:msgId/retry | Retry a DLQ message |
| DELETE | /purge | Purge all messages (use with caution!) |
Configuration Options
SendOptions
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| delaySeconds | number | 0 | Delay before message becomes visible |
| maxRetries | number | 3 | Maximum retry attempts before DLQ |
ReadOptions
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| qty | number | 1 | Maximum messages to read |
| workerId | string | auto-generated | Worker ID for tracking |
Architecture
Single Alarm Optimization
Traditional queue implementations schedule a separate alarm for each message's visibility timeout, resulting in O(n) alarm operations. This package uses a single-alarm approach:
- One alarm is set to the earliest visibility timeout expiration
- When the alarm fires, all expired messages are processed
- The alarm is rescheduled to the next earliest expiration
This reduces Durable Object alarm costs by approximately 95% at scale.
SKIP LOCKED Pattern
Messages are claimed atomically using SQLite's UPDATE with a subquery:
UPDATE messages
SET status = 'processing', vt = ?, worker_id = ?
WHERE msg_id IN (
SELECT msg_id FROM messages
WHERE status = 'pending' AND vt <= ?
ORDER BY created_at ASC
LIMIT ?
)
RETURNING *This prevents race conditions when multiple consumers dequeue simultaneously.
Message Lifecycle
pending -> processing -> (ack) -> deleted
-> (nack) -> pending (retry)
-> (nack + max retries) -> dead (DLQ)
-> (archive) -> archived
-> (vt expires) -> pending (auto-retry)Troubleshooting
Messages Not Being Processed
- Check queue metrics: Use
/metricsto see if messages are pending - Verify visibility timeout: Messages reappear after VT expires if not acked
- Check DLQ: Failed messages may have moved to the dead letter queue
High DLQ Count
- Review error logs: Check why processing is failing
- Inspect DLQ messages: Use
/dlqto see failed payloads - Increase maxRetries: If failures are transient, allow more attempts
- Fix and retry: After fixing issues, use
/dlq/:id/retry
Duplicate Processing
- Visibility timeout too short: Increase
vtSecondsfor long tasks - Missing acknowledgment: Ensure
ack()is called after processing - Use heartbeat pattern: Call
extend()for long-running tasks
Memory Issues
The SQLite storage in Durable Objects has limits. For very high-volume queues:
- Archive processed messages: Use
/archiveinstead of/ackif audit needed - Purge completed messages: Periodically clean up old data
- Use multiple queue instances: Shard by queue name
Exports
// Main Durable Object
export { QueueDO, handleQueueRpc } from '@dotdo/pg-queue'
// Store (for standalone/testing use)
export { QueueStore, QUEUE_SCHEMA } from '@dotdo/pg-queue/store'
// Visibility management utilities
export {
VisibilityAlarmManager,
parseMessageRow,
generateMsgId,
generateWorkerId,
VISIBILITY_QUERIES,
} from '@dotdo/pg-queue/visibility'
// Types
export type {
Message,
MessageRow,
MessageStatus,
SendOptions,
ReadOptions,
QueueMetrics,
SendResult,
DeleteResult,
ArchiveResult,
NackResult,
BatchEnqueueItem,
BatchEnqueueResult,
BatchAckResult,
BatchNackResult,
QueueEnv,
QueueRpc,
} from '@dotdo/pg-queue'License
MIT
