groupbullmq
v1.0.1
Published
Per-group FIFO queue for Redis with high concurrency, visibility timeouts, retries, and BullMQ-inspired patterns for scale.
Downloads
267
Maintainers
Readme
GroupBullMQ
🚀 Why GroupBullMQ?
GroupBullMQ solves one specific, hard problem: Processing jobs for specific groups (e.g., customers) in parallel, while maintaining order.
Most queues force a trade-off:
- Standard Queues: Fast parallel processing, but zero ordering guarantees.
- FIFO Queues: Strict ordering, but often locked to single-threaded processing per queue.
GroupBullMQ gives you the best of both:
- Concurrency per Group: Process
Njobs from the same group simultaneously. - Concurrency across Groups: Process jobs from different groups in parallel.
- Ordering: Jobs generally maintain insertion order within their concurrency window.
Perfect for SaaS, E-commerce, and High-throughput systems.
✨ Features
- ⚡ Group Concurrency: Configure how many jobs per group run at once.
- 🔄 Automatic Retries: Robust retry logic with exponential backoff.
- 🧹 Auto Cleanup: Self-managing maintenance of completed and failed jobs.
- 🚦 Flow Control: Pause/Resume queues globally.
- ⏱️ Delayed & Scheduled Jobs: Run jobs now, later, or on a recurring Cron schedule.
- 🛡️ Stalled Job Recovery: Auto-recovers jobs if a worker crashes.
- 📊 Bull Board Ready: Visualize your queues with a built-in adapter.
- 🧩 TypeScript: First-class type definitions included.
📦 Installation
npm install groupbullmq ioredis🛠️ Usage
1. Simple Setup
import Redis from 'ioredis';
import { Queue, Worker } from 'groupbullmq';
const redis = new Redis('redis://localhost:6379', { maxRetriesPerRequest: null });
// 1. Create a Queue
const orderQueue = new Queue({
redis,
namespace: 'orders',
groupConcurrency: 5, // 👈 5 orders per customer can be processed at once
});
// 2. Create a Worker
const orderWorker = new Worker({
queue: orderQueue,
concurrency: 20, // 👈 Worker can handle 20 orders total across all customers
handler: async (job) => {
console.log(`Processing Order ${job.data.orderId} for Customer ${job.groupId}`);
await processOrder(job.data);
},
});2. The Power of groupConcurrency
This is the killer feature. You can control concurrency globally or per-job.
Scenario: You have a "Premium" customer who needs faster processing than "Free" customers.
// Free user: Uses default queue settings (Start 1 job at a time)
await queue.add({
groupId: 'free-user-123',
data: { task: 'slow-processing' },
// No override, uses default groupConcurrency
});
// Premium user: Overrides to allow 50 concurrent jobs!
await queue.add({
groupId: 'premium-corp-999',
data: { task: 'fast-processing' },
groupConcurrency: 50, // ⚡ Processing boost for this specific job's group
});3. Advanced Job Types
Delayed Jobs
Process a job after a fixed delay.
await queue.add({
groupId: 'notifications',
data: { message: 'Remind me later' },
delay: 5000, // 5 seconds
});Repeating Jobs (Cron)
Run a job on a schedule.
await queue.addRepeating({
groupId: 'system-maintenance',
data: { task: 'daily-cleanup' },
cron: '0 0 * * *', // Every midnight
});⚙️ Configuration
Queue Options (new Queue(...))
| Option | Type | Default | Description |
| :--- | :--- | :--- | :--- |
| redis | Redis | Required | ioredis instance. |
| namespace | string | Required | Unique name for the queue. |
| groupConcurrency | number | 1 | Key Feature. Max concurrent jobs allowed per group. |
| jobTimeoutMs | number | 30000 | How long a job can run before "stalling". |
| maxAttemptsDefault | number | 3 | Default retry count for jobs. |
| keepCompleted | number | 0 | Number of completed jobs to keep in history. |
| keepFailed | number | 0 | Number of failed jobs to keep in history. |
Worker Options (new Worker(...))
| Option | Type | Default | Description |
| :--- | :--- | :--- | :--- |
| queue | Queue | Required | The queue instance to bind to. |
| handler | Function | Required | Async function to process jobs. |
| concurrency | number | 1 | Total max jobs this worker instance can process in parallel. |
| blockingTimeoutSec | number | 2 | IDLE wait time for Redis connection. |
| lockDuration | number | 30000 | Lock duration for job processing. |
📊 Dashboard (Bull Board)
Visualize your queues easily with the official Bull Board.
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; // Use generic or custom adapter
import { BullBoardGroupMQAdapter } from 'groupbullmq'; // 👈 Provided adapter
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [new BullBoardGroupMQAdapter(myQueue)],
serverAdapter,
});🤝 Credits & License
Maintainer: Techspawn Solutions
This library is built upon the excellent work of the open-source community:
- GroupMQ by OpenPanel.dev (Carl-Gerhard Lindesvärd) - Original architecture and core logic.
- BullMQ by Taskforce.sh - Inspiration for robust patterns, Lua script structure, and concurrency handling.
License: MIT
