npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@libeilong/mq

v0.2.6

Published

**企业级 Node.js 任务队列解决方案**

Downloads

908

Readme

@libeilong/bullmq

企业级 Node.js 任务队列解决方案

基于 BullMQ 的增强封装库。既提供开箱即用的类型安全 Worker智能重试机制,也支持复杂的多租户公平调度系统。

npm version License

🌟 为什么选择它?

你可以根据业务场景灵活选择两种使用模式:

  1. Lite 模式 (BaseTask):

    • 如果你只需要一个比原生 BullMQ 更易用、支持 TypeScript 类型、支持动态退避(如 API 限流等待)的 Worker 封装。
    • 0 配置成本,直接替代原生 Worker。
  2. Pro 模式 (FairScheduler):

    • 如果你面临 SaaS 场景,需要解决“VIP 用户插队”、“防止大户阻塞系统”、“全局并发控制”等复杂调度问题。
    • 企业级调度,实现资源隔离与公平轮询。

📦 安装

npm install @libeilong/bullmq ioredis bullmq

🚀 模式一:独立使用 BaseTask (推荐)

最简单的使用方式。你不需要调度器,只需要一个健壮的任务处理器。

核心优势

  • Smart Backoff: 抛弃死板的重试间隔,支持在代码中精确控制下一次重试时间(如 retryLater({ delay: 5000 }))。
  • Type Safe: 泛型支持,Payload 类型自动推导。
  • LifeCycle: 统一的 onCompleted / onFailed 钩子。

代码示例

import IORedis from 'ioredis';
import { BaseTask, HandleOptions, Job } from '@libeilong/bullmq';

// 1. 定义任务 Payload 类型
interface MyPayload {
  url: string;
}

// 2. 继承 BaseTask
class ScraperTask extends BaseTask<MyPayload> {
  constructor(connection: IORedis) {
    super(connection, {
      name: 'ScraperQueue',
      concurrency: 5, // 本地并发限制
      defaultJobOptions: { attempts: 5 } // 默认重试 5 次
    });
  }

  // 3. 实现 handle 方法
  async handle(job: Job<MyPayload>, opts: HandleOptions) {
    console.log(`Scraping: ${job.data.url}`);

    try {
      await doSomething(job.data.url);
    } catch (err: any) {
      
      // --- 智能重试核心 ---
      
      // 场景 A: 触发反爬虫限流 (429),等待 60 秒再试
      if (err.status === 429) {
        opts.retryLater({ 
          message: 'Rate Limited', 
          delay: 60 * 1000 
        });
        return; 
      }

      // 场景 B: 偶发网络错误,立即重试 (0 delay)
      if (err.code === 'ECONNRESET') {
        opts.retryLater({ delay: 0 });
        return;
      }

      // 场景 C: 其他错误,抛出异常,使用默认指数退避
      throw err;
    }
  }
}

// 4. 使用
const redis = new IORedis();
const task = new ScraperTask(redis);

// 发送任务
await task.start({ url: 'https://google.com' });

🚀 模式二:多租户公平调度 (FairScheduler)

当你需要管理多个用户(租户),且希望他们之间互不影响、资源隔离时使用。

核心特性

  • 资源隔离:VIP 用户可拥有 10 倍于普通用户的并发度。
  • 防队头阻塞:大户提交 1万个任务,不会堵死后面提交任务的小户。
  • VIP 插队:支持优先级策略,VIP 任务优先处理。

快速上手

1. 初始化调度器

import { FairScheduler, RoundRobinStrategy } from '@libeilong/bullmq';

const scheduler = new FairScheduler({
  connection: new IORedis(),
  globalConcurrency: 50, // 系统总水位
  // 动态获取每个租户的并发上限
  getGroupConcurrency: async (groupKey) => {
    return groupKey.startsWith('vip') ? 10 : 1;
  },
  // 默认使用轮询策略
  strategy: new RoundRobinStrategy() 
});

scheduler.run();

2. 定义任务 (继承 FairTask)

注意:这里继承的是 FairTask 而不是 BaseTask

import { FairTask } from '@libeilong/bullmq';

class ReportGenTask extends FairTask<{ reportId: string }> {
  constructor(scheduler: FairScheduler) {
    super(scheduler, { name: 'ReportQueue' });
  }
  
  async handle(job, opts) {
    // 业务逻辑...
  }
}

const reportTask = new ReportGenTask(scheduler);
scheduler.register(reportTask);

3. 提交任务 (带上 GroupKey)

// 普通用户 (groupKey='user:101') -> 限制并发 1
reportTask.schedule('user:101', { reportId: 'A1' });

// VIP 用户 (groupKey='vip:999') -> 限制并发 10
reportTask.schedule('vip:999', { reportId: 'B1' });

模式三:父子任务流 (FairFlow)

如果你需要一个父任务等待多个子任务完成后再处理结果,可以使用 FairFlow。支持混合调度:一部分子任务通过 FairScheduler 进行公平调度,另一部分直接进入 BullMQ。

使用场景

  • 数据处理流程:一个父任务发起多个子任务(如数据验证、处理、上传),等待全部完成后进行合并。
  • 并行工作流:某些子任务需要公平调度(多用户场景),某些子任务需要快速响应(通知、邮件)。
  • 混合调度系统:充分利用 FairScheduler 的多租户隔离优势,同时保留标准队列的灵活性。

简单示例

import { FairFlowProducer, BaseTask, FairTask, FairScheduler } from '@libeilong/bullmq'
import IORedis from 'ioredis'

// 初始化
const redis = new IORedis()
const scheduler = new FairScheduler({
  connection: redis,
  globalConcurrency: 50,
  getGroupConcurrency: async (group) => (group.startsWith('vip:') ? 10 : 2),
})
const flowProducer = new FairFlowProducer(scheduler)

// 定义任务
class VideoProcessTask extends FairTask<{ file: string }> {
  async handle(job) {
    console.log(`处理视频: ${job.data.file}`)
    await new Promise(res => setTimeout(res, 2000))
    return { processed: true }
  }
}

class NotifyTask extends BaseTask<{ message: string }> {
  async handle(job) {
    console.log(`发送通知: ${job.data.message}`)
    return { sent: true }
  }
}

class AggregateTask extends BaseTask {
  async handle(job) {
    const results = await job.getChildrenValues()
    console.log('所有子任务完成!结果:', results)
    return { aggregated: true }
  }
}

// 注册任务
const videoTask = new VideoProcessTask(scheduler)
const notifyTask = new NotifyTask(redis, { name: 'NotifyTask' })
const aggregateTask = new AggregateTask(redis, { name: 'AggregateTask' })
scheduler.register(videoTask)
scheduler.run()

// 创建 Flow:父任务 + 两个子任务
async function startWorkflow() {
  await flowProducer.add({
    name: 'AggregateTask',
    queueName: 'AggregateTask',
    data: { userId: 'user-123' },
    children: [
      // 子任务 1:需要公平调度的任务 (通过 FairScheduler)
      {
        type: 'fair',
        taskName: 'VideoProcessTask',
        groupKey: 'user-123',
        data: { file: 'movie.mp4' },
      },
      // 子任务 2:标准任务 (直接进入 BullMQ)
      {
        type: 'standard',
        taskName: 'NotifyTask',
        queueName: 'NotifyTask',
        data: { message: '处理已启动!' },
      },
    ],
  })
  console.log('Flow 已创建!')
}

startWorkflow()

工作流程

  1. 创建 Flow 时指定父任务和子任务列表
  2. fair 类型子任务进入 FairScheduler 的等待队列,按 groupKey 进行公平调度
  3. standard 类型子任务直接进入 BullMQ,不受 FairScheduler 限制
  4. 使用哨兵模式确保父任务等待所有 fair 子任务完成
  5. 当所有子任务完成后,父任务通过 job.getChildrenValues() 获取结果并处理

进阶配置

1. 配置多级优先级调度

如果你希望根据用户等级实现多级优先级调度(不仅是简单的高/低二分),可以使用 PriorityStrategy

基于 Redis Sorted Set 的权重机制,支持任意粒度的优先级控制。

import { PriorityStrategy } from '@libeilong/bullmq';

const scheduler = new FairScheduler({
  // ... 其他配置
  strategy: new PriorityStrategy(async (groupKey) => {
    // 返回权重值(数字越大优先级越高)
    // 权重推荐范围:1-100
    if (groupKey.startsWith('super-vip:')) return 100;  // 超级VIP
    if (groupKey.startsWith('vip:')) return 50;         // VIP用户
    if (groupKey.startsWith('premium:')) return 10;     // 高级用户
    return 1;                                            // 普通用户
  })
});

工作原理

  • 使用权重值对活跃组进行排序
  • 每次调度时,优先获取权重高的组
  • 高权重组优先处理任务,但低权重组不会完全饿死
  • 同级权重内随机排序,保证公平性

高级特性

  • 动态权重更新:用户等级在运行中改变时(如升级为VIP),权重会在下一次调度时自动更新
  • 权重值验证:系统会自动校验权重范围(0-100),超出范围的值会被自动夹紧(clamp)
  • 错误恢复:权重更新失败不会阻断调度,仅记录警告日志

2. Redis 连接的自动恢复

调度器内置了自动故障恢复机制。当 Redis 连接出现问题时:

  • 自动检测:系统会自动识别连接错误(如 ECONNREFUSED, ENOTFOUND, ETIMEDOUT 等)
  • 指数退避重试:连接错误会触发自动重试,重试间隔为 100ms × 2^(n-1)(第一次100ms,第二次200ms,第三次400ms...)
  • 最多重试5次:超过5次重试后停止特殊处理,改用正常调度间隔
  • 业务错误区分:业务逻辑错误不会触发重试,仅记录日志

示例日志输出:

[Scheduler] Redis connection error (attempt 1/5), retrying after 100ms: connect ECONNREFUSED 127.0.0.1:6379
[Scheduler] Redis connection error (attempt 2/5), retrying after 200ms: connect ECONNREFUSED 127.0.0.1:6379
[Scheduler] Redis connection error (attempt 3/5), retrying after 400ms: connect ECONNREFUSED 127.0.0.1:6379

4. 任务丢失防护与补偿机制

调度器使用了多层防护来确保任务不会丢失:

  • 原子操作:使用 Redis 事务保证计数器和队列操作的一致性
  • 自动补偿:如果任务发送失败,会自动回滚计数器并将任务重新放回队列
  • 异常隔离:队列操作异常不会阻断其他任务队列的处理
  • 详细日志:所有补偿操作都有清晰的日志记录

示例补偿场景:

[Scheduler] Failed to send task to BullMQ, rolled back and task re-queued: Error: Queue not found

5. 优先级分层采样

PriorityStrategy 采用分层采样算法,确保不同权重的组获得公平但按比例的调度机会:

  • 权重比例调度:权重 100 的组会获得约 10 倍于权重 10 的调度机会
  • 避免饿死:即使权重最低的组也保证每个调度周期至少被采样一次
  • 动态归一化:权重差异大时自动归一化到合理范围,防止权重碾压

示例:假设 3 个组权重为 100/50/1,在 100 个调度候选中的分布:

权重 100 的组: 约 45 次采样
权重 50 的组:  约 23 次采样
权重 1 的组:   约 2 次采样

6. 优雅关闭

调度器提供了安全的关闭方式,确保在途任务不会丢失。

关闭场景

  • 🔄 服务重启/升级:需要安全停止调度器
  • 📊 负载均衡:从一个实例迁移到另一个实例
  • 🛑 优雅降级:当下游服务异常时停止调度
  • 🚀 部署:Kubernetes 或容器编排系统中的优雅关闭

三个关闭等级

// 方式 1:快速停止(仅用于开发/测试环境)
// ❌ 不推荐生产环境,可能导致计数器不一致
scheduler.stop()

// 方式 2:优雅关闭(推荐用于生产环境)
// ✅ 等待当前 dispatch 循环完成再停止
// 这样确保不会有中途中断的操作
await scheduler.gracefulStop()

// 方式 3:完全关闭(最安全但最慢)
// ✅ 等待所有正在进行中的任务完成
// 使用场景:需要确保所有任务都处理完才能停止
await scheduler.gracefulStop({
  timeout: 60000,      // 最多等待 60 秒
  drainPending: true   // 等待所有活跃任务完成
})

实际示例 - Express 服务器优雅关闭

import express from 'express'
import { FairScheduler, PriorityStrategy } from '@libeilong/bullmq'

const app = express()
const scheduler = new FairScheduler({
  connection: redis,
  globalConcurrency: 50,
  getGroupConcurrency: async (groupKey) => groupKey.startsWith('vip') ? 10 : 1,
  strategy: new PriorityStrategy(async (groupKey) => {
    return groupKey.startsWith('vip') ? 50 : 1
  })
})

scheduler.run()

// 监听关闭信号(Kubernetes 或 Docker 会发送 SIGTERM)
process.on('SIGTERM', async () => {
  console.log('收到关闭信号,开始优雅关闭...')

  // 1. 停止接收新请求
  server.close(() => {
    console.log('HTTP 服务已关闭,停止接收新请求')
  })

  // 2. 优雅关闭调度器
  // 等待最多 30 秒,让当前 dispatch 完成
  await scheduler.gracefulStop({
    timeout: 30000
  })

  console.log('所有操作已完成,进程退出')
  process.exit(0)
})

// 如需等待所有任务完成(更保险):
process.on('SIGTERM', async () => {
  console.log('收到关闭信号,开始优雅关闭...')

  server.close()

  // 等待所有活跃任务完成(推荐用于关键业务)
  await scheduler.gracefulStop({
    timeout: 120000,      // 最多等待 2 分钟
    drainPending: true    // 等待所有任务完成
  })

  console.log('所有任务已完成,进程退出')
  process.exit(0)
})

日志示例 - 完整关闭过程

收到关闭信号,开始优雅关闭...
[Scheduler] Starting graceful shutdown...
[Scheduler] Current dispatch loop completed
[Scheduler] Waiting for 5 pending tasks...
[Scheduler] Waiting for 3 pending tasks...
[Scheduler] Waiting for 1 pending tasks...
[Scheduler] All pending tasks drained
[Scheduler] Graceful shutdown completed.
所有任务已完成,进程退出

最佳实践

| 场景 | 推荐方式 | 原因 | |------|--------|------| | 本地开发 | stop() | 快速,无需等待 | | 单元测试 | stop() | 快速,无需等待 | | 生产环境重启 | gracefulStop() | 平衡速度和安全 | | 关键业务处理 | gracefulStop({ drainPending: true }) | 最安全,确保无损 | | Kubernetes Pod 关闭 | gracefulStop() | K8s 有 terminationGracePeriodSeconds 限制 |

三种关闭方式的对比

stop()                    gracefulStop()              gracefulStop({drainPending: true})
│                         │                            │
├─ 立即停止调度          ├─ 等待当前循环              ├─ 等待当前循环
├─ 计数器可能不一致      ├─ 计数器始终一致            ├─ 计数器始终一致
├─ 已发出的任务继续处理  ├─ 已发出的任务继续处理     ├─ 已发出的任务继续处理
├─ 未发出的任务放回队列  ├─ 未发出的任务放回队列     ├─ 等待所有任务完成
└─ 关闭时间: 毫秒级      └─ 关闭时间: 秒级            └─ 关闭时间: 分钟级
                          └─ 推荐生产环境              └─ 关键业务必选

关闭流程示意

启动关闭
  ↓
step 1: 停止接收新的 dispatch 循环
  ↓
step 2: 等待当前 dispatch 完成
  ├─ stop():         [跳过此步骤] ← 风险!
  ├─ gracefulStop(): [等待] → 完成 ✓
  └─ gracefulStop({drainPending}): [等待] → 完成 ✓
  ↓
step 3: 是否等待所有活跃任务完成?
  ├─ stop():         [不等待] ← 可能丢失
  ├─ gracefulStop(): [不等待] ← 任务继续运行,但调度已停止
  └─ gracefulStop({drainPending}): [等待所有完成] ✓ 最安全
  ↓
关闭完成

7. 监控和事件系统

调度器提供了完整的监控和事件机制,方便集成到监控系统和实时反馈。

获取实时指标

// 获取调度器监控指标
const metrics = await scheduler.getMetrics()

console.log(`
  运行状态: ${metrics.isRunning}
  全局并发: ${metrics.globalRunning}/${metrics.globalConcurrency} (${metrics.globalUsagePercent}%)
  活跃组数: ${metrics.activeGroups}
  注册任务: ${metrics.registeredTasks}
  运行时长: ${metrics.uptime}ms
`)

监听调度事件

// 1. 监听调度事件(有任务被发送给 Worker)
scheduler.on('dispatch', (event) => {
  console.log(`Task dispatched: ${event.taskName} for ${event.groupKey}`)
})

// 2. 监听槽位释放事件(任务完成或失败)
scheduler.on('slot-release', (event) => {
  console.log(`Slot released for ${event.groupKey}`)
})

// 3. 监听错误事件
scheduler.on('error', (event) => {
  console.error(`Scheduler error (${event.context}): ${event.error.message}`)
})

// 4. 只监听一次
scheduler.once('error', (event) => {
  // 处理首次错误
  notifyAdmin(`Scheduler error: ${event.error.message}`)
})

// 5. 取消监听
const listener = (event) => console.log(event)
scheduler.on('dispatch', listener)
scheduler.off('dispatch', listener)

集成 Prometheus 监控

import prometheus from 'prom-client'

// 创建 Gauge 指标
const concurrencyGauge = new prometheus.Gauge({
  name: 'scheduler_concurrency_usage',
  help: 'Current concurrency usage percentage',
})

const activeGroupsGauge = new prometheus.Gauge({
  name: 'scheduler_active_groups',
  help: 'Number of active groups',
})

// 定期更新指标
setInterval(async () => {
  const metrics = await scheduler.getMetrics()
  concurrencyGauge.set(metrics.globalUsagePercent)
  activeGroupsGauge.set(metrics.activeGroups)
}, 10000)

// 监听事件记录计数
const dispatchCounter = new prometheus.Counter({
  name: 'scheduler_dispatches_total',
  help: 'Total number of tasks dispatched',
})

scheduler.on('dispatch', () => {
  dispatchCounter.inc()
})

3. 什么是 "Smart Backoff"?

BullMQ 原生的 backoff 通常是静态的(如固定的指数退避)。但在实际业务中,我们往往需要根据错误的类型来决定下一次重试的时间。

BaseTask 提供的 retryLater 让这变得简单:

// 灵活控制下一次尝试的时间,精确到毫秒
opts.retryLater({ 
  error: originalError, // 保留原始错误堆栈
  message: 'Token expired, waiting for refresh', 
  delay: 500 // 500ms 后重试
});

当任务进入重试等待期时,它会释放当前的并发名额(在 FairScheduler 模式下),确保系统资源不被挂起的任务占用。


📋 配置指南

选择合适的全局并发数

globalConcurrency 是整个系统能同时处理的最大任务数。选择太小会导致资源浪费,太大会导致系统过载。

推荐公式

globalConcurrency = CPU核心数 * 2-4

示例

// 对于 4 核 CPU 的服务器
const scheduler = new FairScheduler({
  globalConcurrency: 8,  // 4 * 2
  // ...
})

// 对于 CPU 密集型任务
const scheduler = new FairScheduler({
  globalConcurrency: 4,  // 4 * 1,避免上下文切换
  // ...
})

// 对于 I/O 密集型任务(推荐)
const scheduler = new FairScheduler({
  globalConcurrency: 16, // 4 * 4,充分利用等待时间
  // ...
})

配置组并发限制

getGroupConcurrency 决定每个用户/租户最多能同时运行多少任务。

推荐策略

const scheduler = new FairScheduler({
  globalConcurrency: 50,
  getGroupConcurrency: async (groupKey) => {
    if (groupKey.startsWith('premium:')) return 20   // 高级用户
    if (groupKey.startsWith('vip:')) return 10       // VIP用户
    return 2                                          // 普通用户
  },
})

注意

  • 所有组的并发之和可以超过 globalConcurrency(这是正常的)
  • 实际能运行的任务数取决于 min(globalConcurrency, groupLimit)

调整调度间隔

interval 控制多久检查一次是否有新任务需要调度。

权衡

interval 越小 → 调度越快,但 Redis 请求越多
interval 越大 → 调度延迟越高,但系统负担越低

推荐值

// 对于响应时间要求高的系统(如 SaaS)
const scheduler = new FairScheduler({
  interval: 10,  // 10ms,最多延迟 10ms
  // ...
})

// 对于通常的后台任务
const scheduler = new FairScheduler({
  interval: 50,  // 50ms(默认值)
  // ...
})

// 对于对延迟不敏感的系统
const scheduler = new FairScheduler({
  interval: 200, // 200ms,减少 Redis 调用
  // ...
})

选择合适的调度策略

RoundRobinStrategy(默认)- 适合大多数场景

import { RoundRobinStrategy } from '@libeilong/bullmq'

const scheduler = new FairScheduler({
  strategy: new RoundRobinStrategy(), // 随机轮询
  // ...
})

PriorityStrategy - 需要优先级区分

import { PriorityStrategy } from '@libeilong/bullmq'

const scheduler = new FairScheduler({
  strategy: new PriorityStrategy(async (groupKey) => {
    // 返回权重,数字越大优先级越高
    if (groupKey.startsWith('vip:')) return 50
    return 1
  }),
  // ...
})

性能优化建议

  1. 使用异步 getGroupConcurrency
// ❌ 不好:每次都查询数据库
getGroupConcurrency: async (groupKey) => {
  const user = await db.query(`SELECT level FROM users WHERE id = ${groupKey}`)
  return user.level === 'vip' ? 10 : 1
}

// ✅ 好:使用缓存
const cache = new Map()
getGroupConcurrency: async (groupKey) => {
  if (!cache.has(groupKey)) {
    const user = await db.query(`SELECT level FROM users WHERE id = ${groupKey}`)
    cache.set(groupKey, user.level === 'vip' ? 10 : 1)
  }
  return cache.get(groupKey)
}
  1. 定期清理过期缓存
// 每小时清理一次缓存
setInterval(() => cache.clear(), 3600000)
  1. 监控关键指标
setInterval(async () => {
  const metrics = await scheduler.getMetrics()
  if (metrics.globalUsagePercent > 90) {
    console.warn('Scheduler is under heavy load!')
  }
}, 30000)

🐛 故障排查

Redis 连接断开

症状:日志中出现 [Scheduler] Redis connection error

诊断

  1. 检查 Redis 服务是否正常运行
  2. 检查网络连接和防火墙规则
  3. 查看完整的错误日志获取具体错误码

解决

  • 调度器会自动进行指数退避重试,无需手动干预
  • 若连接恢复,调度会自动继续
  • 若多次重试失败,检查 Redis 服务日志

权重值异常(被自动夹紧)

症状:日志中出现 [PriorityStrategy] Weight ... exceeds maximum

诊断

  1. 检查 getWeight() 函数是否返回了超过 100 或小于 0 的值
  2. 检查是否返回了非数值(如 null, undefined, NaN

解决

  • 系统会自动将权重范围化到 [0, 100]
  • 建议检查业务逻辑,确保 getWeight() 返回的值在预期范围内

任务被重新放回队列

症状:日志中出现 [Scheduler] Failed to send task to BullMQ, rolled back and task re-queued

诊断

  1. 检查 BullMQ Worker 是否正常运行
  2. 检查 Queue 实例是否正确初始化
  3. 查看完整的错误堆栈获取具体原因

解决

  • 这是正常的容错机制,任务不会丢失
  • 检查并修复导致任务发送失败的根本原因
  • 调度器会在下一个周期重试

优雅关闭超时

症状gracefulStop() 在等待期间超时

诊断

  1. 查看日志中是否有待处理的任务(Waiting for X pending tasks...
  2. 检查这些任务是否在正常处理中

解决

  • 增大 timeout 参数以给予更多时间
  • 检查任务处理是否陷入死循环或无限等待
  • 若任务明显卡住,可以先调用 stop() 强制停止

License

MIT