@satoshibits/queue
v2.4.0
Published
A queue handler for managing background jobs using worker threads.
Downloads
370
Readme
@satoshibits/queue
Backend-agnostic queue abstraction that gets out of your way.
A thin, honest abstraction over queue providers. Switch between BullMQ, AWS SQS, and RabbitMQ without rewriting your application code.
Why @satoshibits/queue?
The Problem: Queue libraries have incompatible APIs. Switching from BullMQ to SQS means rewriting your entire job system.
Our Solution: A unified interface that translates to native provider features. No vendor lock-in. No hidden magic.
What We Do
✅ Unified API - One interface for all providers ✅ Honest Abstractions - We don't fake features. If a provider doesn't support something, we tell you. ✅ Event-Driven - Comprehensive lifecycle events for observability, monitoring, and custom logic ✅ Provider Strengths - Leverage each provider's native capabilities ✅ Escape Hatch - Access provider-specific features when you need them
What We Don't Do
❌ No Framework Magic - We're a translator, not a framework ❌ No Reimplementation - We use provider retries, not custom engines ❌ No Business Logic - Circuit breaking, idempotency, logging are your job ❌ No Feature Virtualization - We don't fake features that don't exist
📖 Understanding Queues: A Beginner's Guide
New to queues? Read this section first. Experienced? Jump to Quick Start or Production Features.
What Is a Queue System?
Simple Analogy: Think of a busy restaurant. A waiter (the Producer) takes an order and puts it on a ticket spike (the Queue Provider). The chef (the Worker) picks up the ticket when they're ready and prepares the meal. The waiter can immediately go take another order without waiting for the chef to finish cooking. This system decouples the waiter from the chef, allowing the restaurant to serve more customers efficiently.
In Software: A queue system decouples work from your web requests. Instead of doing heavy work inline, you add jobs to a queue and process them in the background:
Web Request → Queue.add() → [Provider Storage] → Worker.fetch() → Process
(instant) (Redis/SQS/etc) (background)Benefits:
- Fast response times - API returns immediately, work happens later
- Resilience - Jobs survive server restarts (stored in Redis/SQS)
- Scalability - Add more workers to handle more load
- Rate limiting - Control processing speed to avoid overwhelming external APIs
The Producer-Consumer Pattern
Queue systems use two separate classes that communicate through a provider:
Note for BullMQ users: You may notice BullMQ has its own
QueueandWorkerclasses. This library wraps them! You always use@satoshibits/queue's Queue and Worker - the provider handles the BullMQ classes internally. This abstraction lets you switch providers without changing your code.
┌─────────────────────────────────────────────────────────┐
│ Your Application │
├──────────────────────┬──────────────────────────────────┤
│ PRODUCER SIDE │ CONSUMER SIDE │
│ (Web Server) │ (Worker Process) │
│ │ │
│ Queue │ Worker │
│ • add() │ • fetches jobs │
│ • getStats() │ • processes handler │
│ • pause/resume() │ • emits events │
└──────────┬───────────┴────────────┬─────────────────────┘
│ │
└────────────┬───────────┘
│
┌─────────▼──────────┐
│ Provider Storage │
│ (Redis, SQS, etc) │
└────────────────────┘Critical Understanding:
- Queue and Worker don't talk directly to each other
- They communicate through the provider (Redis, SQS, etc.)
- Can run in separate processes or even separate servers
- Queue pushes jobs in, Worker pulls jobs out
How They're Deployed:
┌─────────────────────┐ ┌──────────────────┐ ┌─────────────────────┐
│ Your API Server │ │ Queue Provider │ │ Your Worker Process │
│ (contains Queue) │──adds──>│ (Redis/SQS/etc) │<──fetches──│ (contains Worker) │
└─────────────────────┘ job └──────────────────┘ job └─────────────────────┘
Process 1 Shared Storage Process 2Job Lifecycle
Every job goes through these states (from src/core/types.mts:46-51):
type JobStatus = "waiting" | "delayed" | "active" | "completed" | "failed"The Flow:
queue.add()
│
▼
┌─────────┐ (if delay option) ┌─────────┐
│ waiting │ ─────────────────→ │ delayed │
└────┬────┘ └────┬────┘
│ │
│ ◄────────────────────────────┘
│ (when delay expires)
│
│ worker.fetch()
▼
┌────────┐
│ active │ ← job is being processed
└───┬────┘
│
├─→ Success? ──→ ┌───────────┐
│ │ completed │
│ └───────────┘
│
└─→ Failed? ──┬─→ attempts < maxAttempts? ──→ back to waiting (retry)
│
└─→ attempts >= maxAttempts? ──→ ┌────────┐
│ failed │ → DLQ
└────────┘State Details:
- waiting: Job is in queue, ready to be picked up
- delayed: Job is scheduled for future (will move to waiting when time comes)
- active: Worker is currently processing this job
- completed: Job succeeded
- failed: Job exhausted all retry attempts
How Queue and Worker Collaborate
Step-by-step example:
// ========================================
// FILE: api-server.ts (Producer)
// ========================================
import { Queue } from '@satoshibits/queue';
const emailQueue = new Queue('emails');
// When user signs up
app.post('/signup', async (req, res) => {
const user = await createUser(req.body);
// Add job to queue (returns immediately)
await emailQueue.add('send-welcome', {
userId: user.id,
email: user.email
});
res.json({ success: true }); // ← instant response
});
// ========================================
// FILE: worker.ts (Consumer - separate process!)
// ========================================
import { Worker } from '@satoshibits/queue';
// Worker continuously polls the queue
const emailWorker = new Worker('emails', async (data, job) => {
console.log(`Processing ${job.name} for user ${data.userId}`);
// Do the actual work
await sendEmail(data.email, 'Welcome!');
// Return Result type
return Result.ok(undefined);
});
// Listen to lifecycle events
emailWorker.on('completed', (payload) => {
console.log(`✅ Job ${payload.jobId} completed`);
});
emailWorker.on('failed', (payload) => {
console.error(`❌ Job ${payload.jobId} failed: ${payload.error}`);
console.log(`Will retry: ${payload.willRetry}`);
});
emailWorker.on('job.retrying', (payload) => {
console.log(`🔄 Job ${payload.jobId} retrying (attempt ${payload.attempts}/${payload.maxAttempts})`);
});
// Start processing
await emailWorker.start();Key Points:
- Queue and Worker use the same queue name (
'emails') to find each other - They can run in different files/processes/servers
- Multiple workers can process the same queue (for parallelism)
- One worker can process multiple queues (create multiple Worker instances)
🎯 Division of Responsibilities
Understanding who does what prevents confusion and helps you build robust systems. This library follows a clear responsibility model.
Tier 1: Your Application's Core Responsibilities
These are your responsibility. The library provides hooks (events), but you implement the policy.
| Responsibility | Why It's Yours | How to Implement |
|----------------|----------------|------------------|
| Business Logic | Every application is different | Implement in your job handler |
| Error Classification | You know which errors are transient | Check error type, throw to retry or return Ok to skip |
| Idempotency | You know what "already processed" means | Use job IDs, check DB before processing |
| Circuit Breaking | You decide when to stop trying | Track failures in failed event, check before processing |
| Observability (Monitoring & Alerting) | You define what metrics matter and when to alert | Monitor queue depth, DLQ size, job latency; set up alerts for anomalies |
| Security | You know what data is sensitive and who can access what | Don't put secrets in payloads (pass references); perform authorization checks in handlers |
| Logging | You decide what's important to log | Use event listeners to log job lifecycle |
Example: Error Classification
const worker = new Worker('payments', async (data, job) => {
try {
await processPayment(data);
return Result.ok(undefined);
} catch (error) {
// YOU classify the error
if (error.code === 'RATE_LIMIT') {
throw error; // transient → retry
} else if (error.code === 'INVALID_CARD') {
logger.error('Permanent failure', { jobId: job.id, error });
return Result.ok(undefined); // permanent → don't retry, mark complete
}
throw error; // unknown → retry
}
});
await worker.start();Example: Circuit Breaking
const breaker = new CircuitBreaker({ threshold: 5, timeout: 60000 });
worker.on('failed', () => breaker.recordFailure());
worker.on('completed', () => breaker.recordSuccess());
const worker = new Worker('payments', async (data) => {
if (breaker.isOpen()) {
throw new Error('Circuit open - payment service degraded');
}
return processPayment(data);
});Tier 2: Configuring Library & Provider Features
These features are available but you must configure them. The library coordinates, the provider executes.
| Feature | Configured Via | What It Does | Provider Support |
|---------|---------------|--------------|------------------|
| Retries | attempts: 3 | Provider re-queues failed jobs. See Error Classification | All providers |
| Backoff | backoff: { type: 'exponential', delay: 1000 } | Provider delays retries | BullMQ, RabbitMQ (SQS uses fixed) |
| Delays | delay: 5000 | Provider schedules job for future | All providers |
| Priorities | priority: 1 | Provider orders job processing | BullMQ, RabbitMQ (SQS: use separate queues) |
| DLQ | Provider config | Provider moves exhausted jobs to DLQ. See Ignoring the DLQ | All providers (provider-specific setup) |
| Timeouts | timeout: 5000 | Worker marks job as failed after timeout* | Library (all providers) |
| Job Cancellation | See Cancellation Guide | Remove queued jobs, abort active jobs (userland patterns), graceful shutdown | Provider-dependent |
| Graceful Shutdown | worker.close({ finishActiveJobs: true }) | Worker waits for active jobs. See Forgetting Graceful Shutdown | Library (all providers) |
| Events | worker.on('failed', ...) | Get notified at lifecycle points. See Not Using Events | Library (all providers) |
| Health Checks | queue.getHealth() | Get queue depth, error rate | Provider-dependent |
| Stats | queue.getStats() | Get job counts by state | Provider-dependent |
Important Notes:
- Timeout caveat: JavaScript can't cancel async functions. Timeouts mark jobs as failed but don't stop execution. Implement
AbortControllerin your handler for true cancellation. - Warn-and-degrade: If you request a feature the provider doesn't support (e.g.,
priorityon SQS), you'll get a warning and the option is ignored.
Example: Full Configuration
await queue.add('process-order', data, {
// normalized options (library translates to provider)
attempts: 5,
priority: 10,
delay: 60000, // 1 minute
timeout: 30000, // 30 seconds
// provider-specific escape hatch
providerOptions: {
bullmq: {
removeOnComplete: 1000,
stackTraceLimit: 0
},
sqs: {
MessageGroupId: 'orders',
MessageDeduplicationId: uuid()
}
}
});Tier 3: Handled Automatically by the Library
These are fully managed by the library and provider. You don't think about them.
| Responsibility | Handled By | Details |
|----------------|------------|---------|
| Job Persistence | Provider | Jobs survive restarts (Redis/SQS stores them) |
| Concurrency Control | Library | Worker respects concurrency limit, manages parallel execution |
| Fetch Loop | Library | Worker continuously polls provider (pull model) |
| Ack/Nack | Library + Provider | Worker acknowledges success/failure to provider |
| Event Emission | Library | Consistent events emitted at all lifecycle points |
| Backpressure | Library | Worker stops fetching when at concurrency limit |
| Provider Connection | Provider | Connection pooling, reconnection logic |
| Job Serialization | Library + Provider | Jobs are serialized to JSON automatically |
| Visibility Timeout | Provider | Provider hides active jobs from other workers |
What This Means:
- ✅ You don't write fetch loops
- ✅ You don't manage connection pools
- ✅ You don't track which jobs are "in flight"
- ✅ You don't serialize/deserialize jobs
- ✅ You focus on: "what should this job do?"
⚠️ Common Mistakes & Best Practices
Learn from others' mistakes. These patterns will save you hours of debugging.
Mistake 1: Treating All Errors the Same
Problem:
// ❌ BAD: All errors trigger retry
const worker = new Worker('payments', async (data) => {
await chargeCustomer(data.cardId); // what if card is invalid?
return Result.ok(undefined);
});If the customer's card is permanently invalid, retrying 3 times accomplishes nothing. You burn resources and delay the inevitable failure.
Solution: Classify Errors
// ✅ GOOD: Classify errors
const worker = new Worker('payments', async (data, job) => {
try {
await chargeCustomer(data.cardId);
return Result.ok(undefined);
} catch (error) {
// permanent errors - don't retry
if (error.code === 'CARD_INVALID' || error.code === 'INSUFFICIENT_FUNDS') {
logger.error('Permanent payment failure', { jobId: job.id, error });
return Result.ok(undefined); // mark complete, don't retry
}
// transient errors - retry
if (error.code === 'NETWORK_ERROR' || error.code === 'SERVICE_UNAVAILABLE') {
throw error; // let provider retry
}
// unknown error - retry to be safe
throw error;
}
});
await worker.start();Alternative: Using QueueError with retryable: false
For pull-model providers (MemoryProvider) or when you need explicit control, you can throw a QueueError with retryable: false to signal permanent failure:
import type { QueueError } from '@satoshibits/queue';
const worker = new Worker('payments', async (data, job) => {
try {
await chargeCustomer(data.cardId);
return Result.ok(undefined);
} catch (error) {
if (error.code === 'CARD_INVALID') {
// throw QueueError with retryable: false to skip all retries
const permanentError: QueueError = {
type: 'DataError',
code: 'VALIDATION',
message: `Invalid card: ${error.message}`,
retryable: false, // signals provider to skip retry
};
throw permanentError;
}
// transient errors - throw plain Error to retry
throw error;
}
});When nack() receives an error with retryable: false, the provider will:
- Skip all remaining retry attempts
- Move the job directly to failed state
- Respect
removeOnFailoption (job is removed or kept in failed state)
This works for both push-model (BullMQ) and pull-model (MemoryProvider) providers.
Mistake 2: Forgetting Graceful Shutdown
Problem:
// ❌ BAD: No shutdown logic
const worker = new Worker('emails', sendEmail);
await worker.start();
process.on('SIGTERM', () => {
process.exit(0); // kills active jobs immediately!
});When your server restarts (deployments, scaling), active jobs get killed mid-processing. This can leave your system in inconsistent states.
Solution: Graceful Shutdown
// ✅ GOOD: Wait for active jobs
const worker = new Worker('emails', sendEmail);
await worker.start();
process.on('SIGTERM', async () => {
console.log('Shutting down gracefully...');
await worker.close({
timeout: 30000, // wait up to 30s
finishActiveJobs: true // let active jobs complete
});
process.exit(0);
});Mistake 3: Ignoring the Dead Letter Queue
Problem:
// ❌ BAD: Set up DLQ but never check it
const queue = new Queue('orders', {
deadLetter: { queue: 'failed-orders', maxAttempts: 3 }
});
// ... jobs fail and pile up in DLQ foreverFailed jobs accumulate in your DLQ. You never know about problems until a customer complains.
Solution: Monitor the DLQ
// ✅ GOOD: Actively monitor DLQ
const queue = new Queue('orders', {
deadLetter: { queue: 'failed-orders', maxAttempts: 3 }
});
// check DLQ periodically
setInterval(async () => {
const dlqJobs = await queue.getDLQJobs(100);
if (dlqJobs.success && dlqJobs.data.length > 0) {
logger.warn(`${dlqJobs.data.length} jobs in DLQ`, {
jobs: dlqJobs.data.map(j => ({ id: j.id, error: j.error }))
});
// alert ops team if threshold exceeded
if (dlqJobs.data.length > 50) {
await alertOps('High DLQ count', { count: dlqJobs.data.length });
}
}
}, 60000); // check every minuteMistake 4: Assuming Immediate Processing
Problem:
// ❌ BAD: Expect immediate processing
await queue.add('send-email', { userId: 123 });
await sendSlackNotification('Email sent to user 123'); // too early!Queues are asynchronous. Adding a job returns immediately, but processing happens later. The email might not send for seconds or minutes.
Solution: Use Events for Confirmation
// ✅ GOOD: React to completion events
queue.add('send-email', { userId: 123, jobId: 'email-123' });
// in your worker process
worker.on('completed', async (payload) => {
if (payload.jobId === 'email-123') {
await sendSlackNotification('Email sent to user 123');
}
});
// OR: Check job status
const job = await queue.getJob('email-123');
if (job.success && job.data?.status === 'completed') {
// email was sent
}Mistake 5: Not Implementing Idempotency
Problem:
// ❌ BAD: Job runs twice, charges customer twice
const worker = new Worker('payments', async (data) => {
await chargeCustomer(data.amount);
return Result.ok(undefined);
});Networks are unreliable. A job might get processed twice (worker crashes after processing but before ack). Without idempotency, you double-charge customers.
Solution: Make Jobs Idempotent
// ✅ GOOD: Check if already processed
const worker = new Worker('payments', async (data, job) => {
// check if we already processed this job
const existingCharge = await db.charges.findOne({ jobId: job.id });
if (existingCharge) {
logger.info('Job already processed', { jobId: job.id });
return Result.ok(undefined); // skip, already done
}
// process and record
const charge = await chargeCustomer(data.amount);
await db.charges.insert({ jobId: job.id, chargeId: charge.id });
return Result.ok(undefined);
});
await worker.start();Mistake 6: Putting Large Payloads in Queue
Problem:
// ❌ BAD: Embed 5MB file in job data
await queue.add('process-video', {
videoData: largeVideoBuffer // 5MB!
});Most queue providers have payload limits (SQS: 256KB, Redis: 512MB). Even if allowed, large payloads slow down serialization and network transfer.
Solution: Store Large Data Separately
// ✅ GOOD: Store file externally, pass reference
const videoUrl = await s3.upload(videoBuffer);
await queue.add('process-video', {
videoUrl, // just the URL
userId: 123
});
// worker fetches the file
const worker = new Worker('videos', async (data) => {
const videoBuffer = await s3.download(data.videoUrl);
await processVideo(videoBuffer);
return Result.ok(undefined);
});
await worker.start();Mistake 7: Not Using Worker Events
Problem:
// ❌ BAD: No visibility into what's happening
const worker = new Worker('emails', sendEmail);
await worker.start();
// ... jobs fail silently, you have no ideaWithout event listeners, you're blind to failures, retries, and performance issues.
Solution: Listen to Events
// ✅ GOOD: Comprehensive event handling
const worker = new Worker('emails', sendEmail);
worker.on('active', (payload) => {
logger.info('Job started', { jobId: payload.jobId });
});
worker.on('completed', (payload) => {
logger.info('Job completed', {
jobId: payload.jobId,
duration: payload.duration
});
metrics.recordJobSuccess(payload.duration);
});
worker.on('failed', (payload) => {
logger.error('Job failed', {
jobId: payload.jobId,
error: payload.error,
willRetry: payload.willRetry
});
metrics.recordJobFailure();
});
worker.on('job.retrying', (payload) => {
logger.warn('Job retrying', {
jobId: payload.jobId,
attempt: payload.attempts,
maxAttempts: payload.maxAttempts
});
});
worker.on('queue.error', (payload) => {
logger.fatal('Queue error', payload.error);
alertOps('Critical queue error', payload);
});
await worker.start();Mistake 8: Not Considering Worker Resource Limits
Problem:
// ❌ BAD: Setting concurrency without considering resources
const worker = new Worker('heavy-jobs', processVideo, {
concurrency: 100 // can this machine handle 100 concurrent video encodings?
});Setting high concurrency (e.g., 100 concurrent jobs) on a machine that can only handle 10 leads to CPU/memory exhaustion and worker crashes. The library manages concurrency, but you must provision infrastructure appropriately.
Solution: Load Test and Right-Size
// ✅ GOOD: Set concurrency based on actual capacity
const worker = new Worker('heavy-jobs', processVideo, {
concurrency: 5 // tested limit for this machine
});
// Monitor resource usage
worker.on('active', () => {
const usage = process.memoryUsage();
if (usage.heapUsed > MEMORY_THRESHOLD) {
logger.warn('High memory usage', { heapUsed: usage.heapUsed });
}
});
await worker.start();Best Practices:
- Load test your workers to find safe concurrency limits for your hardware
- Monitor CPU, memory, and I/O during job processing
- Start conservative (low concurrency) and increase gradually
- Consider job type: CPU-bound jobs need fewer workers than I/O-bound jobs
Best Practice Checklist
Before going to production, verify:
- [ ] Error Classification: Distinguish transient from permanent errors
- [ ] Graceful Shutdown: Implement
SIGTERMhandler withworker.close() - [ ] DLQ Monitoring: Check dead letter queue regularly
- [ ] Idempotency: Jobs can run multiple times safely
- [ ] Event Listeners: Log job lifecycle events
- [ ] Small Payloads: Store large data externally, pass references
- [ ] Timeouts: Set realistic job timeouts
- [ ] Health Checks: Expose
/healthendpoint with queue metrics - [ ] Alerting: Alert on high failure rates or DLQ buildup
- [ ] Resource Limits: Configure concurrency based on available resources
Quick Start
Prerequisites: Read Understanding Queues first if you're new to queues.
Installation
npm install @satoshibits/queueBasic Usage (Development)
Perfect for local development and testing. Uses in-memory provider (zero config).
import { Queue, Worker } from '@satoshibits/queue';
import { Result } from '@satoshibits/functional';
// ========================================
// STEP 1: Create Queue (Producer)
// ========================================
const emailQueue = new Queue('emails'); // defaults to in-memory provider
// ========================================
// STEP 2: Add Jobs
// ========================================
await emailQueue.add('send-welcome', {
userId: 123,
email: '[email protected]'
});
console.log('Job added! Processing will happen in worker...');
// ========================================
// STEP 3: Create Worker (Consumer)
// ========================================
const emailWorker = new Worker('emails', async (data, job) => {
console.log(`Processing ${job.name} for user ${data.userId}`);
// do the actual work
await sendEmail(data.email, 'Welcome!');
// must return Result type
return Result.ok(undefined);
});
// ========================================
// STEP 4: Listen to Events (optional but recommended)
// ========================================
emailWorker.on('completed', (payload) => {
console.log(`✅ Job ${payload.jobId} completed in ${payload.duration}ms`);
});
emailWorker.on('failed', (payload) => {
console.error(`❌ Job ${payload.jobId} failed: ${payload.error}`);
console.log(`Will retry: ${payload.willRetry}`);
});
// ========================================
// STEP 5: Start Worker
// ========================================
await emailWorker.start();
console.log('Worker started, waiting for jobs...');Providers
The queue package supports multiple providers. Each provider has optional peer dependencies that you only install if you use that provider.
Available Providers
| Provider | Import Path | Peer Dependencies | Best For |
|----------|------------|-------------------|----------|
| Memory | @satoshibits/queue (default) | None | Development, testing |
| BullMQ | @satoshibits/queue/providers/bullmq | bullmq, ioredis | Production workhorse |
| SQS | @satoshibits/queue/providers/sqs | @aws-sdk/client-sqs | Serverless, AWS ecosystem |
Installing Provider Dependencies
For BullMQ (Redis-backed):
npm install bullmq ioredis
# or
pnpm add bullmq ioredisFor SQS (AWS):
npm install @aws-sdk/client-sqs
# or
pnpm add @aws-sdk/client-sqsFor Memory (default):
# No additional dependencies neededProduction Usage (BullMQ)
import { Queue, Worker } from '@satoshibits/queue';
import { BullMQProvider } from '@satoshibits/queue/providers/bullmq';
import { Result } from '@satoshibits/functional';
// ========================================
// Configure Provider (once per app)
// ========================================
const providerFactory = new BullMQProvider({
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379')
}
// For full config options, see BullMQProviderConfig in src/providers/bullmq/bullmq.provider.mts
});
// ========================================
// Producer Side (API Server)
// ========================================
const queue = new Queue('emails', {
provider: providerFactory.forQueue('emails')
});
await queue.add('send-welcome', {
userId: 123,
email: '[email protected]'
}, {
attempts: 3,
priority: 1,
delay: 5000,
timeout: 30000
});
// ========================================
// Consumer Side (Worker Process)
// ========================================
const worker = new Worker('emails',
async (data, job) => {
await sendEmail(data.email, 'Welcome!');
return Result.ok(undefined);
},
{
provider: providerFactory.forQueue('emails'),
concurrency: 10,
pollInterval: 100
}
);
// graceful shutdown
process.on('SIGTERM', async () => {
await worker.close({ finishActiveJobs: true });
process.exit(0);
});
await worker.start();Bull Board Integration (BullMQ Only)
Bull Board is a popular UI dashboard for monitoring BullMQ queues. The BullMQProvider exposes underlying BullMQ Queue instances for this purpose:
import { Queue } from '@satoshibits/queue';
import { BullMQProvider } from '@satoshibits/queue/providers/bullmq';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
// ========================================
// Create provider factory (keep reference!)
// ========================================
const providerFactory = new BullMQProvider({
connection: { host: 'localhost', port: 6379 }
});
// ========================================
// Create your queues
// ========================================
const emailQueue = new Queue('emails', {
provider: providerFactory.forQueue('emails')
});
const smsQueue = new Queue('sms', {
provider: providerFactory.forQueue('sms')
});
// ========================================
// Setup bull-board using the provider factory
// ========================================
const bullQueues = Array.from(providerFactory.getBullMQQueues().values());
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: bullQueues.map(q => new BullMQAdapter(q)),
serverAdapter
});
// ========================================
// Mount in Express
// ========================================
app.use('/admin/queues', serverAdapter.getRouter());Important: Keep a reference to your BullMQProvider instance. You'll need it to access the underlying BullMQ queues for monitoring tools.
Production Usage (SQS)
import { Queue, Worker } from '@satoshibits/queue';
import { SQSProvider } from '@satoshibits/queue/providers/sqs';
import { Result } from '@satoshibits/functional';
// ========================================
// Configure Provider
// ========================================
const providerFactory = new SQSProvider({
region: 'us-east-1',
// Map queue names to their full SQS URLs upfront
queueUrls: {
'emails': 'https://sqs.us-east-1.amazonaws.com/123/emails',
'notifications': 'https://sqs.us-east-1.amazonaws.com/123/notifications'
}
// Credentials loaded from environment/IAM role by default
// For full config options, see SQSProviderConfig in src/providers/sqs/sqs.provider.mts
});
// ========================================
// Producer Side
// ========================================
const queue = new Queue('emails', {
provider: providerFactory.forQueue('emails')
});
await queue.add('send-welcome', {
userId: 123,
email: '[email protected]'
}, {
attempts: 3,
delay: 5000
});
// ========================================
// Consumer Side
// ========================================
const worker = new Worker('emails',
async (data, job) => {
await sendEmail(data.email, 'Welcome!');
return Result.ok(undefined);
},
{
provider: providerFactory.forQueue('emails'),
concurrency: 10
}
);
await worker.start();Key Differences from Development:
- Provider: Import from subpath, instantiate with config, use
forQueue()method - Processes: Queue and Worker run in separate processes (API vs worker)
- Configuration: Set retries, timeouts, concurrency
- Shutdown: Implement graceful shutdown for deployments
📚 Want a Complete Example? See Production Setup Example for a runnable demonstration of all best practices covered in this guide (error classification, graceful shutdown, DLQ monitoring, idempotency, events, and more).
Core Concepts
1. Normalized Configuration
Configure common features with a consistent API. We map to native provider capabilities.
await queue.add('process-payment', data, {
// normalized options
attempts: 3, // provider's native retries
priority: 1, // provider's priority queue
delay: 5000, // provider's delay mechanism
backoff: {
type: 'exponential',
delay: 1000
}
});Warn-and-Degrade Policy: If a provider doesn't support a feature, we log a warning and continue:
WARNING: SQS provider does not support job priorities.
The 'priority' option will be ignored.No crashes. No fake implementations. Just honesty.
2. Event-Driven Architecture
We emit events at key lifecycle points. You implement the policy.
// circuit breaking in userland
const breaker = new CircuitBreaker();
const worker = new Worker('jobs', async (data, job) => {
if (breaker.isOpen()) {
throw new Error('Circuit open');
}
return processJob(data);
});
worker.on('failed', (payload) => {
breaker.recordFailure();
logger.error('Job failed', { jobId: payload.jobId, error: payload.error });
});
worker.on('completed', (payload) => {
breaker.recordSuccess();
});
await worker.start();Available Events:
- Queue events:
queue.on('queue.paused', ...)- Queue processing pausedqueue.on('queue.resumed', ...)- Queue processing resumedqueue.on('queue.drained', ...)- Queue became emptyqueue.on('queue.error', ...)- Error occurred in queue operations
- Worker events:
worker.on('active', ...)- Job processing startedworker.on('completed', ...)- Job processing succeededworker.on('failed', ...)- Job processing failedworker.on('job.retrying', ...)- Job will be retried after failureworker.on('queue.error', ...)- Error occurred during job processingworker.on('processor.shutting_down', ...)- Worker is shutting downworker.on('processor.shutdown_timeout', ...)- Graceful shutdown timeout exceeded
3. TypeScript Support
Full type safety with generics:
interface EmailJob {
to: string;
subject: string;
body: string;
}
const queue = new Queue<EmailJob>('emails');
// type error if fields missing
await queue.add('send', {
to: '[email protected]',
subject: 'Welcome',
body: 'Thanks for signing up'
});
const worker = new Worker<EmailJob>('emails', async (data, job) => {
// data is fully typed as EmailJob
// job is typed as ActiveJob<EmailJob>
await sendEmail(data.to, data.subject, data.body);
return Result.ok(undefined);
});
await worker.start();Job Handler Signature:
Job handlers receive two parameters: the job's data payload and an ActiveJob object, which contains the data plus persistent state and runtime metadata.
type JobHandler<T> = (
data: T, // the job's data payload
job: ActiveJob<T> // job with persistent state + runtime metadata
) => Promise<Result<void, QueueError | Error>>;ActiveJob<T> vs Job<T>:
The library separates a job's persistent state (Job) from its runtime metadata (ActiveJob). Job represents what is stored in the queue provider, while ActiveJob is what your worker receives when processing. This separation allows for provider-specific runtime details (like SQS receipt handles) without polluting the core job model.
For full details on these interfaces, see src/core/types.mts.
When You Need the job Object:
Most handlers only use data. However, you might need the full job object for:
// accessing job metadata
const worker = new Worker('emails', async (data, job) => {
console.log(`Processing job ${job.id} (attempt ${job.attempts}/${job.maxAttempts})`);
// access custom metadata
const userId = job.metadata?.userId;
await sendEmail(data);
return Result.ok(undefined);
});
// idempotency checks
const worker = new Worker('payments', async (data, job) => {
// check if already processed using job.id
if (await alreadyProcessed(job.id)) {
return Result.ok(undefined);
}
await processPayment(data);
await markProcessed(job.id);
return Result.ok(undefined);
});4. Escape Hatch
Access provider-specific features when needed:
await queue.add('send-email', data, {
attempts: 3, // normalized
// provider-specific options
providerOptions: {
bullmq: {
removeOnComplete: 100,
stackTraceLimit: 0
},
sqs: {
MessageGroupId: 'emails',
MessageDeduplicationId: uuid()
}
}
});5. Provider-Specific Namespaces
For advanced provider-specific features that don't exist across all providers, use the typed namespace pattern:
BullMQ Queue Extensions
Access BullMQ Queue-specific features through the queue.bullmq namespace:
import { Queue } from '@satoshibits/queue';
import { BullMQProvider } from '@satoshibits/queue/providers/bullmq';
const provider = new BullMQProvider({
connection: { host: 'localhost', port: 6379 }
});
const queue = new Queue('my-queue', {
provider: provider.forQueue('my-queue')
});
// access BullMQ-specific queue features
const extensions = queue.bullmq;
if (extensions) {
// get the underlying BullMQ Queue instance
const queueResult = extensions.getBullMQQueue();
if (queueResult.success && queueResult.data) {
const bullQueue = queueResult.data;
// use native BullMQ Queue features not exposed by core API
const isPaused = await bullQueue.isPaused();
// access queue-level logs
const logs = await bullQueue.getJobLogs('job-id');
}
// recurring job scheduler (BullMQ only)
await extensions.upsertJobScheduler('daily-report', {
pattern: '0 9 * * *', // cron pattern: 9 AM daily
jobName: 'generate-report',
data: { reportType: 'daily' },
timezone: 'America/New_York',
jobOptions: {
priority: 10,
attempts: 3,
backoff: { type: 'exponential', delay: 5000 }
}
});
// list all recurring jobs
const result = await extensions.getJobSchedulers();
if (result.success) {
const schedulers = result.data;
schedulers.forEach(s => {
console.log(`Scheduler: ${s.id}`);
console.log(`Pattern: ${s.pattern}`);
console.log(`Next run: ${s.next}`);
});
}
// remove a scheduler
await extensions.removeJobScheduler('daily-report');
}Key Points:
- Provider-specific namespaces are optional - they return
undefinedfor unsupported providers - Always check if the namespace exists before using it (TypeScript will enforce this)
- Code using provider namespaces is not portable across providers
- Use the escape hatch pattern when you need features that don't have multi-provider equivalents
When to use queue extensions:
- ✅ Direct access to underlying BullMQ Queue instance
- ✅ BullMQ's recurring job schedulers (no SQS equivalent)
- ✅ Provider-specific advanced features that don't map across backends
- ❌ Per-job options (use
providerOptionsinstead) - ❌ Features that could be abstracted across providers
BullMQ Worker Extensions
Access BullMQ Worker-specific features through the worker.bullmq namespace:
import { Worker } from '@satoshibits/queue';
import { BullMQProvider } from '@satoshibits/queue/providers/bullmq';
const provider = new BullMQProvider({
connection: { host: 'localhost', port: 6379 }
});
const worker = new Worker('my-queue', async (data, job) => {
// process job
return { success: true, data: undefined };
}, {
provider: provider.forQueue('my-queue'),
concurrency: 5
});
await worker.start();
// access BullMQ-specific worker features
const extensions = worker.bullmq;
if (extensions) {
// get the underlying BullMQ Worker instance for advanced use cases
const result = extensions.getBullMQWorker();
if (result.success && result.data) {
const bullWorker = result.data;
// use native BullMQ Worker features not exposed by core API
const isPaused = await bullWorker.isPaused();
const isRunning = await bullWorker.isRunning();
// access worker events not exposed by core API
bullWorker.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
// use advanced BullMQ Worker features
const metrics = await bullWorker.getMetrics();
console.log('Worker metrics:', metrics);
}
}Lifecycle Behavior:
// before start() - extensions exist but worker instance is undefined
const ext1 = worker.bullmq?.getBullMQWorker();
// Result.ok(undefined) - no worker yet
await worker.start();
// after start() - worker instance is available
const ext2 = worker.bullmq?.getBullMQWorker();
// Result.ok(BullWorker) - native instance available
await worker.close();
// after close() - worker instance is destroyed
const ext3 = worker.bullmq?.getBullMQWorker();
// Result.ok(undefined) - worker cleaned upWhen to use worker extensions:
- ✅ You need access to BullMQ Worker features not in the core API
- ✅ You're integrating with BullMQ-specific monitoring tools
- ✅ You need fine-grained control over worker behavior
- ✅ You're implementing custom event handlers for BullMQ events
- ❌ Features available through the core Worker API (use core API instead)
Accessing the BullMQ Job During Processing
For advanced use cases requiring direct access to the underlying BullMQ Job instance, the job is available via providerMetadata during processing:
import type { Job as BullJob } from 'bullmq';
const worker = new Worker('my-queue', async (data, job) => {
// access the underlying BullMQ Job instance
const bullJob = job.providerMetadata?.bullmq?.job as BullJob | undefined;
if (bullJob) {
// use native BullMQ Job features
await bullJob.updateProgress(50);
await bullJob.log('Processing started...');
// access BullMQ-specific job properties
const parentKey = bullJob.parentKey;
const repeatJobKey = bullJob.repeatJobKey;
}
await processData(data);
return Result.ok(undefined);
});⚠️ Important Lifecycle Warnings:
Valid Only During Processing: The job handle is only valid during job processing. Do not cache or use it after your handler returns:
// ❌ WRONG: Caching job reference let cachedJob: BullJob; const worker = new Worker('queue', async (data, job) => { cachedJob = job.providerMetadata?.bullmq?.job; // don't do this! return Result.ok(undefined); }); // later... await cachedJob.updateProgress(100); // may fail or cause undefined behaviorNo
structuredClone(): The BullMQ Job contains circular references and cannot be cloned:// ❌ WRONG: Will throw an error const clonedJob = structuredClone(job); // ✅ CORRECT: Clone only the data you need const clonedData = structuredClone(job.data);JSON Serialization Works: The job is stored as a non-enumerable property, so
JSON.stringify(job)works correctly (it excludes the BullMQ Job instance):// ✅ Safe: Non-enumerable properties are excluded const serialized = JSON.stringify(job); // works fine
When to access the BullMQ Job:
- ✅ Progress updates (
job.updateProgress()) - ✅ Job logging (
job.log()) - ✅ Accessing parent job relationships (
job.parentKey) - ✅ Accessing repeat job keys (
job.repeatJobKey) - ✅ Custom flow control features
- ❌ Basic job data and metadata (use the
jobparameter directly)
Accessing BullMQ Jobs Outside of Processing
For accessing jobs outside of handler execution (e.g., in tests, monitoring, or admin tools), use getBullMQQueue() to access BullMQ's native Queue API:
const extensions = queue.bullmq;
if (extensions) {
const result = extensions.getBullMQQueue();
if (result.success && result.data) {
const bullQueue = result.data;
// get a specific job by ID
const job = await bullQueue.getJob('job-id');
// get jobs by state
const waitingJobs = await bullQueue.getWaiting();
const activeJobs = await bullQueue.getActive();
const completedJobs = await bullQueue.getCompleted();
const failedJobs = await bullQueue.getFailed();
// get job counts
const counts = await bullQueue.getJobCounts();
}
}Testing Example:
import { Queue } from '@satoshibits/queue';
import { BullMQProvider } from '@satoshibits/queue/providers/bullmq';
describe('Job Processing', () => {
it('should update job progress during processing', async () => {
const provider = new BullMQProvider({ connection: { host: 'localhost', port: 6379 } });
const queue = new Queue('test-queue', { provider: provider.forQueue('test-queue') });
// add a job
const addResult = await queue.add('test-job', { data: 'test' });
const jobId = addResult.success ? addResult.data.id : null;
// ... start worker and process job ...
// verify job state using native BullMQ Queue
const extensions = queue.bullmq;
if (extensions && jobId) {
const result = extensions.getBullMQQueue();
if (result.success && result.data) {
const bullQueue = result.data;
const job = await bullQueue.getJob(jobId);
expect(job?.progress).toBe(100);
expect(job?.finishedOn).toBeDefined();
}
}
});
});Key Distinction:
| Context | Access Method | Use Case |
|---------|---------------|----------|
| During processing | job.providerMetadata?.bullmq?.job | Progress updates, logging, flow control |
| Outside processing | queue.bullmq.getBullMQQueue() | Tests, monitoring, admin tools, job inspection |
Multiple Provider Instances Pattern
When different queues or workers need different provider-specific configurations, create multiple provider instances instead of mixing configuration into the Queue/Worker layer.
Why: Provider is the infrastructure configuration layer. Queue/Worker are the operation layer. Keeping them separate maintains clean architecture.
Different Queue Configurations
import { BullMQProvider, Queue } from '@satoshibits/queue';
// High-throughput queue with larger event stream
const highThroughputProvider = new BullMQProvider({
connection: { host: 'localhost', port: 6379 },
prefix: 'bull',
queueOptions: {
streams: {
events: { maxLen: 100000 } // Keep more events for monitoring
}
}
});
// Memory-constrained queue with smaller event stream
const lowMemoryProvider = new BullMQProvider({
connection: { host: 'localhost', port: 6379 },
prefix: 'bull',
queueOptions: {
streams: {
events: { maxLen: 1000 } // Minimal event retention
}
}
});
const analyticsQueue = new Queue('analytics', {
provider: highThroughputProvider.forQueue('analytics')
});
const notificationsQueue = new Queue('notifications', {
provider: lowMemoryProvider.forQueue('notifications')
});Different Worker Configurations
import { BullMQProvider, Worker } from '@satoshibits/queue';
// Fast jobs with short lock duration
const fastJobProvider = new BullMQProvider({
connection: { host: 'localhost', port: 6379 },
workerOptions: {
lockDuration: 30000, // 30s lock
stalledInterval: 5000, // Check every 5s
maxStalledCount: 2 // Max 2 stall recoveries
}
});
// Slow jobs with long lock duration
const slowJobProvider = new BullMQProvider({
connection: { host: 'localhost', port: 6379 },
workerOptions: {
lockDuration: 300000, // 5min lock
stalledInterval: 60000, // Check every 1min
maxStalledCount: 5 // More tolerant of stalls
}
});
const emailWorker = new Worker('emails', emailHandler, {
provider: fastJobProvider.forQueue('emails'),
concurrency: 10
});
const reportWorker = new Worker('reports', reportHandler, {
provider: slowJobProvider.forQueue('reports'),
concurrency: 2
});Combined Configuration
import { BullMQProvider, Queue, Worker } from '@satoshibits/queue';
// Production-optimized provider
const productionProvider = new BullMQProvider({
connection: {
host: process.env.REDIS_HOST,
port: 6379,
maxRetriesPerRequest: 3
},
prefix: 'prod',
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000, // Keep last 1000 completed jobs
removeOnFail: 5000 // Keep last 5000 failed jobs
},
queueOptions: {
streams: { events: { maxLen: 10000 } }
},
workerOptions: {
lockDuration: 60000,
stalledInterval: 30000,
maxStalledCount: 3
}
});
// Development provider with looser settings
const devProvider = new BullMQProvider({
connection: { host: 'localhost', port: 6379 },
prefix: 'dev',
defaultJobOptions: {
attempts: 1, // Fail fast in development
removeOnComplete: false, // Keep all jobs for debugging
removeOnFail: false
},
queueOptions: {
streams: { events: { maxLen: 100 } } // Minimal for local dev
}
});
const provider = process.env.NODE_ENV === 'production'
? productionProvider
: devProvider;
const queue = new Queue('orders', { provider: provider.forQueue('orders') });
const worker = new Worker('orders', processOrder, {
provider: provider.forQueue('orders'),
concurrency: process.env.NODE_ENV === 'production' ? 10 : 1
});Key Benefits:
- ✅ Explicit Configuration: Each provider's purpose is clear from its name and settings
- ✅ Type Safety: Full TypeScript support for native BullMQ options
- ✅ No Complexity: No configuration merging or precedence rules to understand
- ✅ Better Testing: Each provider can be tested independently
- ✅ Immutable: Providers are configured once at startup, reducing runtime surprises
When to Use:
- Different queues need different performance characteristics (lock duration, stalled intervals)
- Different queues need different resource limits (stream maxLen, memory settings)
- Different environments need different configurations (prod vs dev)
- Testing scenarios requiring isolated configurations
When NOT to Use:
- Per-job variations (use
JobOptions.providerOptionsinstead) - Runtime configuration changes (providers are immutable)
- Temporary overrides (providers represent stable infrastructure configuration)
Production Features
Worker Lifecycle Management
const worker = new Worker('jobs', handler, {
concurrency: 10, // process 10 jobs in parallel
batchSize: 5, // fetch 5 jobs per poll (if provider supports)
pollInterval: 100, // poll every 100ms when queue is empty
errorBackoff: 1000 // wait 1s after errors before retrying
});
// graceful shutdown
process.on('SIGTERM', async () => {
await worker.close({
timeout: 30000, // wait up to 30s for active jobs
finishActiveJobs: true, // let currently active jobs complete
disconnectProvider: false // keep provider connected (for shared providers)
});
process.exit(0);
});Worker.close() Options:
timeout:number(ms) - Max time to wait for active jobs to finish (default: 30s).finishActiveJobs:boolean- Whether to wait for active jobs to complete (default: true).disconnectProvider:boolean- Whether to disconnect the provider after closing (default: false). Usetrueonly when the provider instance is not shared.
Example with Shared Provider:
// shared provider factory across multiple queues/workers
const providerFactory = new BullMQProvider({ connection: redis });
const emailQueue = new Queue('emails', { provider: providerFactory.forQueue('emails') });
const emailWorker = new Worker('emails', emailHandler, { provider: providerFactory.forQueue('emails') });
const smsQueue = new Queue('sms', { provider: providerFactory.forQueue('sms') });
const smsWorker = new Worker('sms', smsHandler, { provider: providerFactory.forQueue('sms') });
// shutdown workers - keep provider connected
await emailWorker.close({ disconnectProvider: false });
await smsWorker.close({ disconnectProvider: false });
// user explicitly manages shared provider lifecycle
await providerFactory.disconnect();Queue Lifecycle Management
const queue = new Queue('emails', { provider });
// close queue and disconnect owned provider
await queue.close({
disconnectProvider: true // disconnect provider (for non-shared providers)
});Queue.close() Options:
disconnectProvider:boolean- Disconnect the provider after closing (default: false).
Observability (Userland Responsibility)
The library provides comprehensive lifecycle events for you to implement observability. How you collect, aggregate, and export metrics is your policy decision.
OpenTelemetry Integration Example:
import { trace } from '@opentelemetry/api';
const tracer = trace.getTracer('my-app');
const worker = new Worker('orders', async (data, job) => {
const span = tracer.startSpan('process-order', {
attributes: { jobId: job.id, jobName: job.name }
});
try {
await processOrder(data);
span.setStatus({ code: 0 }); // OK
return Result.ok(undefined);
} catch (error) {
span.setStatus({ code: 2, message: error.message }); // ERROR
throw error;
} finally {
span.end();
}
});
worker.on('completed', (payload) => {
tracer.startSpan('job.completed').setAttribute('duration', payload.duration).end();
});
await worker.start();Prometheus Metrics Example:
import { Counter, Histogram, Gauge, register } from 'prom-client';
const jobsProcessed = new Counter({
name: 'queue_jobs_processed_total',
help: 'Total jobs processed',
labelNames: ['queue', 'status']
});
const jobDuration = new Histogram({
name: 'queue_job_duration_seconds',
help: 'Job processing duration',
labelNames: ['queue', 'job_name']
});
const queueDepth = new Gauge({
name: 'queue_size',
help: 'Number of waiting jobs',
labelNames: ['queue']
});
const worker = new Worker('payments', processPayment);
worker.on('completed', (payload) => {
jobsProcessed.inc({ queue: 'payments', status: 'completed' });
jobDuration.observe({ queue: 'payments', job_name: payload.jobName }, payload.duration / 1000);
});
worker.on('failed', (payload) => {
jobsProcessed.inc({ queue: 'payments', status: 'failed' });
});
// periodically update queue depth
setInterval(async () => {
const stats = await queue.getStats();
if (stats.success) {
queueDepth.set({ queue: 'payments' }, stats.data.waiting);
}
}, 5000);
// expose at /metrics endpoint
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
});
await worker.start();Why Userland? Observability strategies vary by organization. By providing events and primitives, you can integrate with your existing monitoring stack (Datadog, New Relic, custom dashboards) without library lock-in.
Health Checks
app.get('/health', async (req, res) => {
const health = await queue.getHealth();
res.json({
status: health.isHealthy ? 'ok' : 'degraded',
workers: health.activeWorkers,
waiting: health.queueDepth,
errorRate: health.errorRate
});
});Dead Letter Queues
const queue = new Queue('payments', {
deadLetter: {
queue: 'failed-payments', // maps to provider DLQ
maxAttempts: 5
}
});
// inspect failed jobs
const failed = await queue.getDeadLetterJobs();
for (const job of failed) {
console.log(`Failed: ${job.id}`, job.failedReason);
// optionally retry
await queue.retryJob(job.id);
}Supported Providers
| Provider | Import Path | Retries | Priority | Delay | DLQ | Best For |
|----------|-------------|---------|----------|-------|-----|----------|
| In-Memory | @satoshibits/queue (default) | ✅ | ✅ | ✅ | ✅ | Development, testing |
| BullMQ | @satoshibits/queue/providers/bullmq | ✅ | ✅ | ✅ | ✅ | Production workhorse |
| SQS | @satoshibits/queue/providers/sqs | ✅ | ❌ | ✅ | ✅ | Serverless, AWS ecosystem |
Peer Dependencies:
- BullMQ: Requires
bullmqandioredis - SQS: Requires
@aws-sdk/client-sqs - Memory: No dependencies
Note: We focus on providers with strong native queue capabilities. We don't ship degraded providers (e.g., PostgreSQL) that lack critical features.
Common Patterns
Idempotency
Implement in userland using job IDs:
await queue.add('charge-customer', data, {
jobId: `charge-${customerId}-${month}` // duplicate IDs are rejected
});
const worker = new Worker('charge-customer', async (data, job) => {
// check in handler
if (await alreadyProcessed(job.id)) {
return Result.ok(undefined);
}
await processCharge(data);
await markProcessed(job.id);
return Result.ok(undefined);
});
await worker.start();Circuit Breaking
Respond to failure events:
const breaker = new CircuitBreaker({
failureThreshold: 5,
timeout: 30000
});
const worker = new Worker('api-calls', async (data, job) => {
if (breaker.isOpen()) {
throw new Error('Circuit open - service degraded');
}
const result = await callExternalAPI(data);
return Result.ok(result);
});
worker.on('failed', () => breaker.recordFailure());
worker.on('completed', () => breaker.recordSuccess());
await worker.start();Large Payloads
Handle externally before queueing:
async function addJobWithLargePayload(data) {
const size = JSON.stringify(data).length;
if (size > 256_000) { // 256KB (SQS limit)
const ref = await s3.upload(`jobs/${uuid()}`, data);
await queue.add('process', { __ref: ref });
} else {
await queue.add('process', data);
}
}
const worker = new Worker('process', async (data, job) => {
const actualData = data.__ref
? await s3.download(data.__ref)
: data;
await processData(actualData);
return Result.ok(undefined);
});
await worker.start();Poison Pill Handling
Problem: A job that consistently crashes the worker can block queue processing.
Solution: Use Dead Letter Queues to isolate toxic messages:
const queue = new Queue('orders', {
deadLetter: {
queue: 'failed-orders',
maxAttempts: 3 // after 3 failures, move to DLQ
}
});
// process main queue
const worker = new Worker('orders', async (data, job) => {
await processOrder(data);
return Result.ok(undefined);
});
// monitor DLQ for poison pills
const dlqWorker = new Worker('failed-orders', async (data, job) => {
// log for investigation
logger.error('Poison pill detected', {
jobId: job.id,
data,
attempts: job.attempts
});
// optionally alert ops team
await alertOps('Toxic message in queue', { jobId: job.id });
// decide: fix and retry, or permanently discard
if (shouldRetry(job)) {
await queue.retryJob(job.id);
}
return Result.ok(undefined);
});
await worker.start();
await dlqWorker.start();Prevention:
const worker = new Worker('orders', async (data, job) => {
try {
await processJob(data);
return Result.ok(undefined);
} catch (error) {
// classify errors
if (isTransientError(error)) {
throw error; // let provider retry
} else {
// permanent failure - log and complete to avoid retries
logger.error('Permanent failure', { jobId: job.id, error });
return Result.ok(undefined); // mark complete, don't retry
}
}
});
await worker.start();Testing Your Application
Testing Strategy
Unit Testing: Your job handler is just a function. Test its business logic in isolation:
// test the handler logic directly
it('processes payment correctly', async () => {
const mockJob = { id: '123', data: { amount: 100, userId: 'user1' }, ... };
const result = await paymentHandler(mockJob.data, mockJob);
expect(result.success).toBe(true);
expect(mockCharge).toHaveBeenCalledWith({ amount: 100, userId: 'user1' });
});Integration Testing: For testing the full flow, run the queue provider (e.g., Redis) in a Docker container as part of your test suite. This verifies jobs are enqueued and processed correctly without mocking the library itself.
Using the In-Memory Provider
The library includes an in-memory provider perfect for testing. It's the default, so no imports needed:
import { Queue, Worker } from '@satoshibits/queue';
describe('Order Processing', () => {
let queue: Queue;
let worker: Worker;
beforeEach(() => {
// no setup required - in-memory provider is default
queue = new Queue('test-orders');
worker = new Worker('test-orders', processOrder);
});
afterEach(async () => {
await worker.close();
await queue.close();
});
it('processes orders successfully', async () => {
const processed: any[] = [];
const worker = new Worker('test-orders', async (data, job) => {
processed.push(data);
return Result.ok(undefined);
});
worker.on('completed', () => {});
await worker.start();
await queue.add('new-order', { orderId: 123, amount: 99.99 });
// wait for processing
await new Promise(resolve => {
setTimeout(resolve, 100); // give it time to process
});
expect(processed).toHaveLength(1);
expect(processed[0].orderId).toBe(123);
});
it('handles failures with retries', async () => {
let attempts = 0;
const worker = new Worker('test-orders', async (data, job) => {
attempts++;
if (attempts < 3) {
throw new Error('Transient failure');
}
return Result.ok(undefined);
});
await worker.start();
await queue.add('flaky-order', { orderId: 456 }, {
attempts: 3
});
await new Promise(resolve => {
worker.on('completed', resolve);
});
expect(attempts).toBe(3);
});
});Integration Tests
Test with real providers in CI:
// test/integration/queue.test.ts
import { Queue } from '@satoshibits/queue';
import { BullMQProvider } from '@satoshibits/queue/providers/bullmq';
import { Redis } from 'ioredis';
describe('Queue with BullMQ', () => {
let redis: Redis;
let queue: Queue;
let providerFactory: BullMQProvider;
beforeAll(async () => {
redis = new Redis(process.env.REDIS_URL);
providerFactory = new BullMQProvider({ connection: redis });
queue = new Queue('test', {
provider: providerFactory.forQueue('test')
});
});
afterAll(async () => {
await queue.close();
await redis.quit();
});
it('persists jobs across restarts', async () => {
await queue.add('persistent-job', { data: 'test' });
// simulate restart
await queue.close();
// reconnect
const newQueue = new Queue('test', {
provider: providerFactory.forQueue('test')
});
const metrics = await newQueue.getMetrics();
expect(metrics.waiting).toBeGreaterThan(0);
});
});Mocking the Queue
For unit testing application code that uses the queue:
import { vi } from 'vitest';
// mock the queue module
vi.mock('@satoshibits/queue', () => ({
Queue: vi.fn().mockImplementation(() => ({
add: vi.fn().mockResolvedValue({ id: 'mock-job-id' }),
close: vi.fn().mockResolvedValue(undefined)
})),
Worker: vi.fn()
}));
// test application code
it('enqueues order processing', async () => {
const { Queue } = await import('@satoshibits/queue');
const mockAdd = vi.mocked(Queue).mock.results[0].value.add;
await createOrder({ userId: 123, items: [...] });
expect(mockAdd).toHaveBeenCalledWith('process-order', {
userId: 123,
items: expect.any(Array)
});
});Library Test Quality
The @satoshibits/queue library maintains high test quality standards:
- Total Tests: 362 tests (all passing)
- Test Quality Grade: A (93/100)
- Coverage: Comprehensive unit and integration tests
- Test Reliability: All timing dependencies eliminated (behavior-based waiting)
- Provider Testing: Memory, BullMQ, and SQS providers fully tested
Test Categories:
- Core API tests (Queue, Worker)
- Provider interface tests (Memory, BullMQ, SQS)
- Event lifecycle tests
- Error handling and edge cases
- Integration tests with real providers
See 7-TEST_QUALITY_AUDIT.md for detailed test quality analysis.
Provider Capability Matrix
Understanding what each provider supports helps you choose the right one:
| Feature | In-Memory | BullMQ | SQS | |---------|-----------|--------|-----| | Retries | ✅ In-process | ✅ Native | ✅ Redrive Policy | | Backoff | ✅ Configurable | ✅ Exponential | ❌ Fixed visibility | | Priority | ✅ Heap-based | ✅ Native | ❌ Use separate queues | | Delay | ✅ setTimeout | ✅ Delayed ZSET | ✅ DelaySeconds | | DLQ | ✅
