queue-bull
v1.0.0
Published
Persistent job queue backed by SQLite. No Redis. No lost jobs. Ever.
Downloads
164
Maintainers
Readme
queue-bull 🐂
A persistent job queue backed by SQLite. No Redis. No lost jobs. Ever.
The Problem
BullMQ requires Redis. Redis means an extra server, extra cost, extra configuration, and an extra failure point. If your server restarts and jobs were only in Redis memory — they're gone.
The Solution
npm install queue-bullimport { Queue, Worker } from 'queue-bull';
// Jobs persist in SQLite — survive server crashes automatically
const queue = new Queue('emails', { dbPath: './jobs.db' });
await queue.add('sendEmail', { to: '[email protected]', name: 'Alice' });
// Worker recovers stalled jobs on restart — zero configuration needed
new Worker('emails', async (job) => {
await sendEmail(job.data);
return { sentAt: new Date().toISOString() };
});That's it. No Redis. No configuration. No lost jobs.
Why SQLite?
| | queue-bull | BullMQ | Agenda |
|---|---|---|---|
| External service required | ❌ None | ✅ Redis | ✅ MongoDB |
| Insert throughput | ~50,000/s | ~100,000/s | ~5,000/s |
| Memory footprint | ~30 MB | ~200 MB+ | ~80 MB |
| Crash recovery | ✅ Automatic | ✅ (needs Redis) | ✅ (needs Mongo) |
| TypeScript | ✅ Full | ✅ Full | ⚠️ Partial |
| Setup cost | npm install | Redis + config | MongoDB + config |
| Monthly infra cost | $0 | $15–100+ | $15–50+ |
| Smart Retry | ✅ (Pro) | ❌ | ❌ |
| Dashboard | ✅ (Pro) | ✅ Bull Board | ❌ |
SQLite with WAL mode achieves 50,000+ inserts/second on commodity hardware — fast enough for any workload that doesn't require a Redis-level distributed cluster.
Installation
npm install queue-bull
# or
yarn add queue-bull
# or
pnpm add queue-bullRequirements: Node.js ≥ 18.0.0
Quick Start
Basic Queue + Worker
import { Queue, Worker } from 'queue-bull';
// ── Producer ────────────────────────────────────────────────────────────────
const queue = new Queue('image-processing', {
dbPath: './jobs.db',
defaultJobOptions: {
attempts: 3,
retryStrategy: 'exponential',
retryDelay: 1_000,
},
});
// Add a job
await queue.add('resize', { imageId: 'img-001', width: 800 });
// High-priority job — processed before others
await queue.add('resize', { imageId: 'img-vip', width: 1200 }, { priority: 100 });
// Delayed job — runs 1 hour from now
await queue.add('cleanup', { imageId: 'img-001-raw' }, { delay: 60 * 60 * 1_000 });
// Bulk insert — single atomic transaction, much faster than individual adds
await queue.addBulk(
Array.from({ length: 1_000 }, (_, i) => ({
name: 'resize',
data: { imageId: `batch-${i}`, width: 400 },
}))
);
// ── Consumer ────────────────────────────────────────────────────────────────
const worker = new Worker<{ imageId: string; width: number }>(
'image-processing',
async (job) => {
job.log(`Resizing ${job.data.imageId} to ${job.data.width}px`);
job.updateProgress(0);
const result = await resizeImage(job.data.imageId, job.data.width);
job.updateProgress(100);
return { url: result.url, processedAt: new Date().toISOString() };
},
{ concurrency: 4 }
);
worker.on('job:completed', (job, result) => console.log('✅', job.id, result));
worker.on('job:failed', (job, err) => console.error('❌', job.id, err.message));
worker.on('job:retrying', (job, err, d) => console.log(`🔄 retry in ${d}ms`));Cron Scheduling
import { Queue, Worker, Scheduler } from 'queue-bull';
const queue = new Queue('reports', { dbPath: './jobs.db' });
const scheduler = new Scheduler(queue, { tickInterval: 10_000 });
// Every day at 9 AM Monday–Friday
await scheduler.every('reports', 'generateDailyReport', '0 9 * * 1-5', {
data: { type: 'daily' },
});
// Every 5 minutes
await scheduler.every('reports', 'healthCheck', '*/5 * * * *');
// List schedules
const schedules = await scheduler.list('reports');
console.log(schedules);Job Status Monitoring
const counts = await queue.getJobCounts();
// { waiting: 42, active: 3, completed: 1250, failed: 2, delayed: 7 }
const failedJobs = await queue.getJobs('failed', 0, 10);
for (const job of failedJobs) {
console.log(job.id, job.lastError, job.attempts);
await queue.retryJob(job.id); // Retry individually
}Express.js Integration
import express from 'express';
import { Queue, Worker } from 'queue-bull';
const app = express();
app.use(express.json());
const queue = new Queue('reports', { dbPath: './jobs.db' });
const worker = new Worker('reports', processReport, { concurrency: 5 });
app.post('/api/reports', async (req, res) => {
const job = await queue.add('generate', req.body);
res.json({ jobId: job.id });
});
app.get('/api/jobs/:id', async (req, res) => {
const job = await queue.getJob(req.params.id);
if (!job) return res.status(404).json({ error: 'Not found' });
res.json({ status: job.status, result: job.result, error: job.lastError });
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await worker.close();
await queue.close();
process.exit(0);
});API Reference
new Queue(name, options?)
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| dbPath | string | './queue-bull.db' | SQLite file path |
| defaultJobOptions | JobOptions | — | Default options for all jobs |
| maxCompletedJobs | number | 1000 | Auto-prune completed jobs above this count |
| maxFailedJobs | number | 500 | Auto-prune failed jobs above this count |
| verbose | boolean | false | Enable debug logging |
Methods:
| Method | Description |
|--------|-------------|
| queue.add(name, data, opts?) | Enqueue a single job |
| queue.addBulk(jobs[]) | Enqueue multiple jobs atomically |
| queue.getJob(id) | Fetch a job by ID |
| queue.getJobs(status, start, end) | Paginated job list |
| queue.getJobCounts() | Aggregated status counts |
| queue.pause() | Pause job processing |
| queue.resume() | Resume job processing |
| queue.clean(grace, limit, status?) | Remove old jobs |
| queue.retryJob(id) | Reset a failed job to waiting |
| queue.removeJob(id) | Permanently delete a job |
| queue.drain() | Wait until queue is empty |
| queue.obliterate() | Delete queue and all its jobs |
| queue.close() | Close DB connection |
new Worker(queueName, processor, options?)
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| concurrency | number | 1 | Parallel jobs limit |
| dbPath | string | './queue-bull.db' | Must match Queue's dbPath |
| pollInterval | number | 500 | How often to poll for jobs (ms) |
| lockDuration | number | 30000 | Stall detection threshold (ms) |
| stalledInterval | number | 15000 | How often to check for stalled jobs (ms) |
| drainTimeout | number | 5000 | How long close() waits for active jobs |
Events:
worker.on('job:active', (job) => { /* job started */ });
worker.on('job:completed', (job, result) => { /* job done */ });
worker.on('job:failed', (job, error) => { /* permanently failed */ });
worker.on('job:retrying', (job, error, ms) => { /* will retry after ms */ });
worker.on('job:progress', (job, progress) => { /* 0–100 */ });
worker.on('stalled:recovered', (count) => { /* stalled jobs rescued */ });JobOptions
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| priority | number | 0 | Higher = processed first |
| attempts | number | 3 | Max retry attempts |
| delay | number | 0 | Initial delay in ms |
| retryStrategy | 'exponential'\|'linear'\|'none'\|'smart' | 'exponential' | Backoff algorithm |
| retryDelay | number | 1000 | Base retry delay in ms |
| jobId | string | auto UUID | Custom job ID for idempotency |
| groupId | string | — | Logical group for related jobs |
| parentId | string | — | Parent job ID (chaining) |
| ttl | number | — | Result TTL in ms |
| removeOnComplete | boolean | false | Auto-delete completed jobs |
| removeOnFail | boolean | false | Auto-delete permanently failed jobs |
Crash Recovery
queue-bull guarantees at-least-once delivery through SQLite persistence:
- Every job is written to disk before
queue.add()returns. - Jobs move to
activestatus when a Worker picks them up. - If the process crashes, the job stays
activein SQLite. - When a new Worker starts,
recoverStalledJobs()runs everystalledIntervalms. - Any job that has been
activelonger thanlockDurationms is reset towaiting. - The job is re-processed on the next poll cycle.
Important: Make your job processors idempotent — they should produce the same result if run twice for the same job.
Pro Features
The Pro tier adds enterprise features on top of the MIT core:
- 📊 Visual Dashboard — real-time job monitoring, retry-on-click, audit trail
- 🧠 Smart Retry — error-type-aware backoff (rate limits, network, validation)
- 🔗 Job Chaining — define parent→child job pipelines
- ⏱ Rate Limiting — per-queue job rate caps
- 📡 Webhook Notifications — POST to your endpoint on job completion/failure
- 📈 Prometheus Metrics — export queue stats to Grafana
Learn more → queue-bull.dev/pro
Performance
Measured on a mid-range laptop with WAL mode enabled:
| Operation | Throughput |
|-----------|-----------|
| Single job insert | ~50,000/s |
| Bulk insert (100 jobs/tx) | ~200,000 jobs/s |
| Job claim (atomic) | ~10,000/s |
| getJobCounts() | ~100,000/s |
| Memory footprint | ~30 MB |
| Startup time | < 100 ms |
Migration from BullMQ
// BullMQ — before
import { Queue, Worker } from 'bullmq';
const queue = new Queue('emails', { connection: redisConnection });
// queue-bull — after
import { Queue, Worker } from 'queue-bull';
const queue = new Queue('emails', { dbPath: './jobs.db' });
// Everything else stays the same!Full migration guide: docs/guides/migration-from-bullmq.md
License
MIT © queue-bull contributors
The Pro Dashboard and advanced features are available under a commercial license.
See LICENSE-PRO for details.
