amqplib-init
v1.2.4
Published
消息队列初始化 - 模块化架构版本 (支持15分钟长任务)
Downloads
83
Readme
amqplib-init
一个强大且易用的 RabbitMQ 消息队列初始化和管理库,支持自动重连、消息处理、长任务执行等功能。
🚀 特性
- 模块化架构 - 清晰的模块分离,易于维护和扩展
- 自动重连 - 连接断开时自动重连,确保服务稳定性
- 消息处理 - 内置消息确认/拒绝机制,支持延迟确认
- 长任务支持 - 支持最长15分钟的消息处理任务
- 错误处理 - 智能处理空消息和格式错误的消息,防止阻塞
- 自动重载 - 支持自动重载监控
- 配置灵活 - 支持多种配置方式和自动配置获取
📦 安装
npm install amqplib-init🔧 快速开始
基础用法
const { init } = require('amqplib-init');
// 初始化消息队列
await init({
channelName: 'my-queue',
amqpLink: 'amqp://localhost:5672',
callback: async (message) => {
console.log('收到消息:', message);
// 处理消息逻辑
await processMessage(message);
}
});高级用法
const { AMQPInitializer } = require('amqplib-init');
const initializer = new AMQPInitializer();
await initializer.init({
channelName: 'my-queue',
amqpLink: 'amqp://localhost:5672',
prefetch: 1,
durable: true,
delay: 1000,
heartbeat: 60,
timeout: 300000,
messageTimeout: 900000,
callback: async (message) => {
// 消息处理逻辑
return processMessage(message);
},
finish: () => {
console.log('初始化完成');
},
initHook: async ({ channel, connection }) => {
// 初始化钩子
console.log('连接已建立');
}
});
// 获取状态信息
console.log(initializer.getStatus());
// 优雅关闭
await initializer.shutdown();⚙️ 配置参数
| 参数 | 类型 | 默认值 | 描述 |
|------|------|--------|------|
| channelName | string | 'node-test-channel' | 队列名称 |
| amqpLink | string | '' | RabbitMQ 连接地址 |
| amqpAutoLink | string | '' | 自动获取连接地址的API |
| prefetch | number | 1 | 预取消息数量 |
| durable | boolean | true | 队列持久化 |
| delay | number | 0 | 消息确认延迟时间(ms) |
| heartbeat | number | 60 | 心跳间隔(秒) |
| timeout | number | 300000 | 连接超时时间(ms) |
| messageTimeout | number | 900000 | 消息处理超时时间(ms) |
| reconnectDelay | number | 5000 | 重连延迟时间(ms) |
| maxReconnectAttempts | number | 10 | 最大重连次数 |
| autoReload | number | 0 | 自动重载间隔(秒) |
| callback | function | () => {} | 消息处理回调 |
| finish | function | () => {} | 初始化完成回调 |
| initHook | function | () => {} | 初始化钩子 |
| queryHook | function | () => {} | 查询钩子 |
🛡️ 错误处理
v1.2.3+ 版本增强了错误处理能力:
- 空消息处理 - 自动识别并确认空消息,防止消费阻塞
- JSON解析错误 - 智能处理格式错误的消息,避免无限循环
- 连接异常 - 自动重连机制,确保服务稳定性
// 库会自动处理以下情况:
// - 空消息内容
// - null 或 undefined 消息
// - 无效的 JSON 格式
// - 连接断开重连🔍 状态监控
const status = initializer.getStatus();
console.log({
isConnected: status.isConnected, // 连接状态
processingCount: status.processingCount, // 处理中消息数量
autoReloaderActive: status.autoReloaderActive, // 自动重载状态
channelName: status.channelName // 当前队列名
});🚨 注意事项
- 长任务支持 - 支持最长15分钟的消息处理,超时会自动处理
- 消息确认 - 消息处理成功后会自动确认,失败会重新排队
- 重连机制 - 连接断开时会自动重连并恢复消息处理
- 内存管理 - 内置消息状态跟踪,防止内存泄漏
📝 更新日志
v1.2.3 (2025-08-25)
- 🐛 修复 JSON 解析错误导致的消息处理阻塞
- ✨ 增加空消息和无效消息的自动确认机制
- 📝 完善错误日志输出,便于问题排查
v1.2.1
- 🔧 模块化架构重构
- ✨ 支持15分钟长任务处理
- 🚀 优化自动重连逻辑
📄 许可证
ISC
🤝 贡献
欢迎提交 Issue 和 Pull Request!
📞 支持
如果您在使用过程中遇到问题,请通过以下方式获取支持:
- 查看本文档的常见问题
- 提交 GitHub Issue
- 查看示例代码
demo.js
