@libeilong/mq
v0.2.6
Published
**企业级 Node.js 任务队列解决方案**
Downloads
908
Readme
@libeilong/bullmq
企业级 Node.js 任务队列解决方案
基于 BullMQ 的增强封装库。既提供开箱即用的类型安全 Worker 与 智能重试机制,也支持复杂的多租户公平调度系统。
🌟 为什么选择它?
你可以根据业务场景灵活选择两种使用模式:
Lite 模式 (BaseTask):
- 如果你只需要一个比原生 BullMQ 更易用、支持 TypeScript 类型、支持动态退避(如 API 限流等待)的 Worker 封装。
- ✅ 0 配置成本,直接替代原生 Worker。
Pro 模式 (FairScheduler):
- 如果你面临 SaaS 场景,需要解决“VIP 用户插队”、“防止大户阻塞系统”、“全局并发控制”等复杂调度问题。
- ✅ 企业级调度,实现资源隔离与公平轮询。
📦 安装
npm install @libeilong/bullmq ioredis bullmq🚀 模式一:独立使用 BaseTask (推荐)
最简单的使用方式。你不需要调度器,只需要一个健壮的任务处理器。
核心优势
- Smart Backoff: 抛弃死板的重试间隔,支持在代码中精确控制下一次重试时间(如
retryLater({ delay: 5000 }))。 - Type Safe: 泛型支持,Payload 类型自动推导。
- LifeCycle: 统一的
onCompleted/onFailed钩子。
代码示例
import IORedis from 'ioredis';
import { BaseTask, HandleOptions, Job } from '@libeilong/bullmq';
// 1. 定义任务 Payload 类型
interface MyPayload {
url: string;
}
// 2. 继承 BaseTask
class ScraperTask extends BaseTask<MyPayload> {
constructor(connection: IORedis) {
super(connection, {
name: 'ScraperQueue',
concurrency: 5, // 本地并发限制
defaultJobOptions: { attempts: 5 } // 默认重试 5 次
});
}
// 3. 实现 handle 方法
async handle(job: Job<MyPayload>, opts: HandleOptions) {
console.log(`Scraping: ${job.data.url}`);
try {
await doSomething(job.data.url);
} catch (err: any) {
// --- 智能重试核心 ---
// 场景 A: 触发反爬虫限流 (429),等待 60 秒再试
if (err.status === 429) {
opts.retryLater({
message: 'Rate Limited',
delay: 60 * 1000
});
return;
}
// 场景 B: 偶发网络错误,立即重试 (0 delay)
if (err.code === 'ECONNRESET') {
opts.retryLater({ delay: 0 });
return;
}
// 场景 C: 其他错误,抛出异常,使用默认指数退避
throw err;
}
}
}
// 4. 使用
const redis = new IORedis();
const task = new ScraperTask(redis);
// 发送任务
await task.start({ url: 'https://google.com' });🚀 模式二:多租户公平调度 (FairScheduler)
当你需要管理多个用户(租户),且希望他们之间互不影响、资源隔离时使用。
核心特性
- 资源隔离:VIP 用户可拥有 10 倍于普通用户的并发度。
- 防队头阻塞:大户提交 1万个任务,不会堵死后面提交任务的小户。
- VIP 插队:支持优先级策略,VIP 任务优先处理。
快速上手
1. 初始化调度器
import { FairScheduler, RoundRobinStrategy } from '@libeilong/bullmq';
const scheduler = new FairScheduler({
connection: new IORedis(),
globalConcurrency: 50, // 系统总水位
// 动态获取每个租户的并发上限
getGroupConcurrency: async (groupKey) => {
return groupKey.startsWith('vip') ? 10 : 1;
},
// 默认使用轮询策略
strategy: new RoundRobinStrategy()
});
scheduler.run();2. 定义任务 (继承 FairTask)
注意:这里继承的是 FairTask 而不是 BaseTask。
import { FairTask } from '@libeilong/bullmq';
class ReportGenTask extends FairTask<{ reportId: string }> {
constructor(scheduler: FairScheduler) {
super(scheduler, { name: 'ReportQueue' });
}
async handle(job, opts) {
// 业务逻辑...
}
}
const reportTask = new ReportGenTask(scheduler);
scheduler.register(reportTask);3. 提交任务 (带上 GroupKey)
// 普通用户 (groupKey='user:101') -> 限制并发 1
reportTask.schedule('user:101', { reportId: 'A1' });
// VIP 用户 (groupKey='vip:999') -> 限制并发 10
reportTask.schedule('vip:999', { reportId: 'B1' });模式三:父子任务流 (FairFlow)
如果你需要一个父任务等待多个子任务完成后再处理结果,可以使用 FairFlow。支持混合调度:一部分子任务通过 FairScheduler 进行公平调度,另一部分直接进入 BullMQ。
使用场景
- 数据处理流程:一个父任务发起多个子任务(如数据验证、处理、上传),等待全部完成后进行合并。
- 并行工作流:某些子任务需要公平调度(多用户场景),某些子任务需要快速响应(通知、邮件)。
- 混合调度系统:充分利用 FairScheduler 的多租户隔离优势,同时保留标准队列的灵活性。
简单示例
import { FairFlowProducer, BaseTask, FairTask, FairScheduler } from '@libeilong/bullmq'
import IORedis from 'ioredis'
// 初始化
const redis = new IORedis()
const scheduler = new FairScheduler({
connection: redis,
globalConcurrency: 50,
getGroupConcurrency: async (group) => (group.startsWith('vip:') ? 10 : 2),
})
const flowProducer = new FairFlowProducer(scheduler)
// 定义任务
class VideoProcessTask extends FairTask<{ file: string }> {
async handle(job) {
console.log(`处理视频: ${job.data.file}`)
await new Promise(res => setTimeout(res, 2000))
return { processed: true }
}
}
class NotifyTask extends BaseTask<{ message: string }> {
async handle(job) {
console.log(`发送通知: ${job.data.message}`)
return { sent: true }
}
}
class AggregateTask extends BaseTask {
async handle(job) {
const results = await job.getChildrenValues()
console.log('所有子任务完成!结果:', results)
return { aggregated: true }
}
}
// 注册任务
const videoTask = new VideoProcessTask(scheduler)
const notifyTask = new NotifyTask(redis, { name: 'NotifyTask' })
const aggregateTask = new AggregateTask(redis, { name: 'AggregateTask' })
scheduler.register(videoTask)
scheduler.run()
// 创建 Flow:父任务 + 两个子任务
async function startWorkflow() {
await flowProducer.add({
name: 'AggregateTask',
queueName: 'AggregateTask',
data: { userId: 'user-123' },
children: [
// 子任务 1:需要公平调度的任务 (通过 FairScheduler)
{
type: 'fair',
taskName: 'VideoProcessTask',
groupKey: 'user-123',
data: { file: 'movie.mp4' },
},
// 子任务 2:标准任务 (直接进入 BullMQ)
{
type: 'standard',
taskName: 'NotifyTask',
queueName: 'NotifyTask',
data: { message: '处理已启动!' },
},
],
})
console.log('Flow 已创建!')
}
startWorkflow()工作流程:
- 创建 Flow 时指定父任务和子任务列表
fair类型子任务进入 FairScheduler 的等待队列,按 groupKey 进行公平调度standard类型子任务直接进入 BullMQ,不受 FairScheduler 限制- 使用哨兵模式确保父任务等待所有
fair子任务完成 - 当所有子任务完成后,父任务通过
job.getChildrenValues()获取结果并处理
进阶配置
1. 配置多级优先级调度
如果你希望根据用户等级实现多级优先级调度(不仅是简单的高/低二分),可以使用 PriorityStrategy。
基于 Redis Sorted Set 的权重机制,支持任意粒度的优先级控制。
import { PriorityStrategy } from '@libeilong/bullmq';
const scheduler = new FairScheduler({
// ... 其他配置
strategy: new PriorityStrategy(async (groupKey) => {
// 返回权重值(数字越大优先级越高)
// 权重推荐范围:1-100
if (groupKey.startsWith('super-vip:')) return 100; // 超级VIP
if (groupKey.startsWith('vip:')) return 50; // VIP用户
if (groupKey.startsWith('premium:')) return 10; // 高级用户
return 1; // 普通用户
})
});工作原理:
- 使用权重值对活跃组进行排序
- 每次调度时,优先获取权重高的组
- 高权重组优先处理任务,但低权重组不会完全饿死
- 同级权重内随机排序,保证公平性
高级特性:
- 动态权重更新:用户等级在运行中改变时(如升级为VIP),权重会在下一次调度时自动更新
- 权重值验证:系统会自动校验权重范围(0-100),超出范围的值会被自动夹紧(clamp)
- 错误恢复:权重更新失败不会阻断调度,仅记录警告日志
2. Redis 连接的自动恢复
调度器内置了自动故障恢复机制。当 Redis 连接出现问题时:
- 自动检测:系统会自动识别连接错误(如
ECONNREFUSED,ENOTFOUND,ETIMEDOUT等) - 指数退避重试:连接错误会触发自动重试,重试间隔为
100ms × 2^(n-1)(第一次100ms,第二次200ms,第三次400ms...) - 最多重试5次:超过5次重试后停止特殊处理,改用正常调度间隔
- 业务错误区分:业务逻辑错误不会触发重试,仅记录日志
示例日志输出:
[Scheduler] Redis connection error (attempt 1/5), retrying after 100ms: connect ECONNREFUSED 127.0.0.1:6379
[Scheduler] Redis connection error (attempt 2/5), retrying after 200ms: connect ECONNREFUSED 127.0.0.1:6379
[Scheduler] Redis connection error (attempt 3/5), retrying after 400ms: connect ECONNREFUSED 127.0.0.1:63794. 任务丢失防护与补偿机制
调度器使用了多层防护来确保任务不会丢失:
- 原子操作:使用 Redis 事务保证计数器和队列操作的一致性
- 自动补偿:如果任务发送失败,会自动回滚计数器并将任务重新放回队列
- 异常隔离:队列操作异常不会阻断其他任务队列的处理
- 详细日志:所有补偿操作都有清晰的日志记录
示例补偿场景:
[Scheduler] Failed to send task to BullMQ, rolled back and task re-queued: Error: Queue not found5. 优先级分层采样
PriorityStrategy 采用分层采样算法,确保不同权重的组获得公平但按比例的调度机会:
- 权重比例调度:权重 100 的组会获得约 10 倍于权重 10 的调度机会
- 避免饿死:即使权重最低的组也保证每个调度周期至少被采样一次
- 动态归一化:权重差异大时自动归一化到合理范围,防止权重碾压
示例:假设 3 个组权重为 100/50/1,在 100 个调度候选中的分布:
权重 100 的组: 约 45 次采样
权重 50 的组: 约 23 次采样
权重 1 的组: 约 2 次采样6. 优雅关闭
调度器提供了安全的关闭方式,确保在途任务不会丢失。
关闭场景:
- 🔄 服务重启/升级:需要安全停止调度器
- 📊 负载均衡:从一个实例迁移到另一个实例
- 🛑 优雅降级:当下游服务异常时停止调度
- 🚀 部署:Kubernetes 或容器编排系统中的优雅关闭
三个关闭等级:
// 方式 1:快速停止(仅用于开发/测试环境)
// ❌ 不推荐生产环境,可能导致计数器不一致
scheduler.stop()
// 方式 2:优雅关闭(推荐用于生产环境)
// ✅ 等待当前 dispatch 循环完成再停止
// 这样确保不会有中途中断的操作
await scheduler.gracefulStop()
// 方式 3:完全关闭(最安全但最慢)
// ✅ 等待所有正在进行中的任务完成
// 使用场景:需要确保所有任务都处理完才能停止
await scheduler.gracefulStop({
timeout: 60000, // 最多等待 60 秒
drainPending: true // 等待所有活跃任务完成
})实际示例 - Express 服务器优雅关闭:
import express from 'express'
import { FairScheduler, PriorityStrategy } from '@libeilong/bullmq'
const app = express()
const scheduler = new FairScheduler({
connection: redis,
globalConcurrency: 50,
getGroupConcurrency: async (groupKey) => groupKey.startsWith('vip') ? 10 : 1,
strategy: new PriorityStrategy(async (groupKey) => {
return groupKey.startsWith('vip') ? 50 : 1
})
})
scheduler.run()
// 监听关闭信号(Kubernetes 或 Docker 会发送 SIGTERM)
process.on('SIGTERM', async () => {
console.log('收到关闭信号,开始优雅关闭...')
// 1. 停止接收新请求
server.close(() => {
console.log('HTTP 服务已关闭,停止接收新请求')
})
// 2. 优雅关闭调度器
// 等待最多 30 秒,让当前 dispatch 完成
await scheduler.gracefulStop({
timeout: 30000
})
console.log('所有操作已完成,进程退出')
process.exit(0)
})
// 如需等待所有任务完成(更保险):
process.on('SIGTERM', async () => {
console.log('收到关闭信号,开始优雅关闭...')
server.close()
// 等待所有活跃任务完成(推荐用于关键业务)
await scheduler.gracefulStop({
timeout: 120000, // 最多等待 2 分钟
drainPending: true // 等待所有任务完成
})
console.log('所有任务已完成,进程退出')
process.exit(0)
})日志示例 - 完整关闭过程:
收到关闭信号,开始优雅关闭...
[Scheduler] Starting graceful shutdown...
[Scheduler] Current dispatch loop completed
[Scheduler] Waiting for 5 pending tasks...
[Scheduler] Waiting for 3 pending tasks...
[Scheduler] Waiting for 1 pending tasks...
[Scheduler] All pending tasks drained
[Scheduler] Graceful shutdown completed.
所有任务已完成,进程退出最佳实践:
| 场景 | 推荐方式 | 原因 |
|------|--------|------|
| 本地开发 | stop() | 快速,无需等待 |
| 单元测试 | stop() | 快速,无需等待 |
| 生产环境重启 | gracefulStop() | 平衡速度和安全 |
| 关键业务处理 | gracefulStop({ drainPending: true }) | 最安全,确保无损 |
| Kubernetes Pod 关闭 | gracefulStop() | K8s 有 terminationGracePeriodSeconds 限制 |
三种关闭方式的对比:
stop() gracefulStop() gracefulStop({drainPending: true})
│ │ │
├─ 立即停止调度 ├─ 等待当前循环 ├─ 等待当前循环
├─ 计数器可能不一致 ├─ 计数器始终一致 ├─ 计数器始终一致
├─ 已发出的任务继续处理 ├─ 已发出的任务继续处理 ├─ 已发出的任务继续处理
├─ 未发出的任务放回队列 ├─ 未发出的任务放回队列 ├─ 等待所有任务完成
└─ 关闭时间: 毫秒级 └─ 关闭时间: 秒级 └─ 关闭时间: 分钟级
└─ 推荐生产环境 └─ 关键业务必选关闭流程示意:
启动关闭
↓
step 1: 停止接收新的 dispatch 循环
↓
step 2: 等待当前 dispatch 完成
├─ stop(): [跳过此步骤] ← 风险!
├─ gracefulStop(): [等待] → 完成 ✓
└─ gracefulStop({drainPending}): [等待] → 完成 ✓
↓
step 3: 是否等待所有活跃任务完成?
├─ stop(): [不等待] ← 可能丢失
├─ gracefulStop(): [不等待] ← 任务继续运行,但调度已停止
└─ gracefulStop({drainPending}): [等待所有完成] ✓ 最安全
↓
关闭完成7. 监控和事件系统
调度器提供了完整的监控和事件机制,方便集成到监控系统和实时反馈。
获取实时指标:
// 获取调度器监控指标
const metrics = await scheduler.getMetrics()
console.log(`
运行状态: ${metrics.isRunning}
全局并发: ${metrics.globalRunning}/${metrics.globalConcurrency} (${metrics.globalUsagePercent}%)
活跃组数: ${metrics.activeGroups}
注册任务: ${metrics.registeredTasks}
运行时长: ${metrics.uptime}ms
`)监听调度事件:
// 1. 监听调度事件(有任务被发送给 Worker)
scheduler.on('dispatch', (event) => {
console.log(`Task dispatched: ${event.taskName} for ${event.groupKey}`)
})
// 2. 监听槽位释放事件(任务完成或失败)
scheduler.on('slot-release', (event) => {
console.log(`Slot released for ${event.groupKey}`)
})
// 3. 监听错误事件
scheduler.on('error', (event) => {
console.error(`Scheduler error (${event.context}): ${event.error.message}`)
})
// 4. 只监听一次
scheduler.once('error', (event) => {
// 处理首次错误
notifyAdmin(`Scheduler error: ${event.error.message}`)
})
// 5. 取消监听
const listener = (event) => console.log(event)
scheduler.on('dispatch', listener)
scheduler.off('dispatch', listener)集成 Prometheus 监控:
import prometheus from 'prom-client'
// 创建 Gauge 指标
const concurrencyGauge = new prometheus.Gauge({
name: 'scheduler_concurrency_usage',
help: 'Current concurrency usage percentage',
})
const activeGroupsGauge = new prometheus.Gauge({
name: 'scheduler_active_groups',
help: 'Number of active groups',
})
// 定期更新指标
setInterval(async () => {
const metrics = await scheduler.getMetrics()
concurrencyGauge.set(metrics.globalUsagePercent)
activeGroupsGauge.set(metrics.activeGroups)
}, 10000)
// 监听事件记录计数
const dispatchCounter = new prometheus.Counter({
name: 'scheduler_dispatches_total',
help: 'Total number of tasks dispatched',
})
scheduler.on('dispatch', () => {
dispatchCounter.inc()
})3. 什么是 "Smart Backoff"?
BullMQ 原生的 backoff 通常是静态的(如固定的指数退避)。但在实际业务中,我们往往需要根据错误的类型来决定下一次重试的时间。
BaseTask 提供的 retryLater 让这变得简单:
// 灵活控制下一次尝试的时间,精确到毫秒
opts.retryLater({
error: originalError, // 保留原始错误堆栈
message: 'Token expired, waiting for refresh',
delay: 500 // 500ms 后重试
});当任务进入重试等待期时,它会释放当前的并发名额(在 FairScheduler 模式下),确保系统资源不被挂起的任务占用。
📋 配置指南
选择合适的全局并发数
globalConcurrency 是整个系统能同时处理的最大任务数。选择太小会导致资源浪费,太大会导致系统过载。
推荐公式:
globalConcurrency = CPU核心数 * 2-4示例:
// 对于 4 核 CPU 的服务器
const scheduler = new FairScheduler({
globalConcurrency: 8, // 4 * 2
// ...
})
// 对于 CPU 密集型任务
const scheduler = new FairScheduler({
globalConcurrency: 4, // 4 * 1,避免上下文切换
// ...
})
// 对于 I/O 密集型任务(推荐)
const scheduler = new FairScheduler({
globalConcurrency: 16, // 4 * 4,充分利用等待时间
// ...
})配置组并发限制
getGroupConcurrency 决定每个用户/租户最多能同时运行多少任务。
推荐策略:
const scheduler = new FairScheduler({
globalConcurrency: 50,
getGroupConcurrency: async (groupKey) => {
if (groupKey.startsWith('premium:')) return 20 // 高级用户
if (groupKey.startsWith('vip:')) return 10 // VIP用户
return 2 // 普通用户
},
})注意:
- 所有组的并发之和可以超过 globalConcurrency(这是正常的)
- 实际能运行的任务数取决于
min(globalConcurrency, groupLimit)
调整调度间隔
interval 控制多久检查一次是否有新任务需要调度。
权衡:
interval 越小 → 调度越快,但 Redis 请求越多
interval 越大 → 调度延迟越高,但系统负担越低推荐值:
// 对于响应时间要求高的系统(如 SaaS)
const scheduler = new FairScheduler({
interval: 10, // 10ms,最多延迟 10ms
// ...
})
// 对于通常的后台任务
const scheduler = new FairScheduler({
interval: 50, // 50ms(默认值)
// ...
})
// 对于对延迟不敏感的系统
const scheduler = new FairScheduler({
interval: 200, // 200ms,减少 Redis 调用
// ...
})选择合适的调度策略
RoundRobinStrategy(默认)- 适合大多数场景
import { RoundRobinStrategy } from '@libeilong/bullmq'
const scheduler = new FairScheduler({
strategy: new RoundRobinStrategy(), // 随机轮询
// ...
})PriorityStrategy - 需要优先级区分
import { PriorityStrategy } from '@libeilong/bullmq'
const scheduler = new FairScheduler({
strategy: new PriorityStrategy(async (groupKey) => {
// 返回权重,数字越大优先级越高
if (groupKey.startsWith('vip:')) return 50
return 1
}),
// ...
})性能优化建议
- 使用异步 getGroupConcurrency:
// ❌ 不好:每次都查询数据库
getGroupConcurrency: async (groupKey) => {
const user = await db.query(`SELECT level FROM users WHERE id = ${groupKey}`)
return user.level === 'vip' ? 10 : 1
}
// ✅ 好:使用缓存
const cache = new Map()
getGroupConcurrency: async (groupKey) => {
if (!cache.has(groupKey)) {
const user = await db.query(`SELECT level FROM users WHERE id = ${groupKey}`)
cache.set(groupKey, user.level === 'vip' ? 10 : 1)
}
return cache.get(groupKey)
}- 定期清理过期缓存:
// 每小时清理一次缓存
setInterval(() => cache.clear(), 3600000)- 监控关键指标:
setInterval(async () => {
const metrics = await scheduler.getMetrics()
if (metrics.globalUsagePercent > 90) {
console.warn('Scheduler is under heavy load!')
}
}, 30000)🐛 故障排查
Redis 连接断开
症状:日志中出现 [Scheduler] Redis connection error
诊断:
- 检查 Redis 服务是否正常运行
- 检查网络连接和防火墙规则
- 查看完整的错误日志获取具体错误码
解决:
- 调度器会自动进行指数退避重试,无需手动干预
- 若连接恢复,调度会自动继续
- 若多次重试失败,检查 Redis 服务日志
权重值异常(被自动夹紧)
症状:日志中出现 [PriorityStrategy] Weight ... exceeds maximum
诊断:
- 检查
getWeight()函数是否返回了超过 100 或小于 0 的值 - 检查是否返回了非数值(如
null,undefined,NaN)
解决:
- 系统会自动将权重范围化到 [0, 100]
- 建议检查业务逻辑,确保
getWeight()返回的值在预期范围内
任务被重新放回队列
症状:日志中出现 [Scheduler] Failed to send task to BullMQ, rolled back and task re-queued
诊断:
- 检查 BullMQ Worker 是否正常运行
- 检查 Queue 实例是否正确初始化
- 查看完整的错误堆栈获取具体原因
解决:
- 这是正常的容错机制,任务不会丢失
- 检查并修复导致任务发送失败的根本原因
- 调度器会在下一个周期重试
优雅关闭超时
症状:gracefulStop() 在等待期间超时
诊断:
- 查看日志中是否有待处理的任务(
Waiting for X pending tasks...) - 检查这些任务是否在正常处理中
解决:
- 增大
timeout参数以给予更多时间 - 检查任务处理是否陷入死循环或无限等待
- 若任务明显卡住,可以先调用
stop()强制停止
License
MIT
