@millionfor/mq
v2.1.0-alpha.1
Published
A portable function based on mq
Downloads
9
Readme
@millionfor/mq
一個基於 RabbitMQ 的高可靠性消息隊列插件,支持自動重連、心跳監控和錯誤處理。
主要特性
🔄 自動重連機制
- 網絡斷開時自動重連
- 指數退避重連策略
- 可配置最大重連次數
- 連接狀態監控
💓 心跳監控
- 定期檢查連接狀態
- 可配置心跳間隔
- 自動檢測死連接
🛡️ 錯誤處理
- 完善的錯誤處理機制
- 消息處理失敗時自動重新入隊
- 通道錯誤時自動重建
📊 連接狀態管理
- 實時連接狀態監控
- 詳細的錯誤日誌
- 優雅的連接關閉
安裝
npm install @millionfor/mq使用方法
基本使用
const Mq = require('@millionfor/mq')
// 創建 MQ 實例
const mq = new Mq({
connectionSetting: 'amqp://localhost:5672',
heartbeat: 60, // 心跳間隔(秒)
reconnectInterval: 5000, // 重連間隔(毫秒)
maxReconnectAttempts: 10 // 最大重連次數
})
// 初始化連接
await mq.init()
// 發送消息
await mq.post('queue-name', { message: 'Hello World' })
// 消費消息
await mq.consumer(
'amqp://localhost:5672',
'queue-name',
2, // 消費者數量
async (message) => {
console.log('收到消息:', message)
// 處理消息
}
)
// 關閉連接
await mq.close()配置選項
const options = {
connectionSetting: 'amqp://localhost:5672', // RabbitMQ 連接字符串
heartbeat: 60, // 心跳間隔(秒),默認 60
reconnectInterval: 5000, // 重連間隔(毫秒),默認 5000
maxReconnectAttempts: 10 // 最大重連次數,默認 10
}連接狀態監控
// 獲取連接狀態
const status = mq.getConnectionStatus()
console.log('連接狀態:', status)
// 輸出: { isConnected: true, isConnecting: false, reconnectAttempts: 0 }API 文檔
構造函數
new Mq(options: Option)Option 接口
interface Option {
connectionSetting: string // RabbitMQ 連接字符串
heartbeat?: number // 心跳間隔(秒)
reconnectInterval?: number // 重連間隔(毫秒)
maxReconnectAttempts?: number // 最大重連次數
}方法
init(): Promise<void>
初始化連接,建立與 RabbitMQ 的連接和通道。
manualInit(): Promise<void>
手動初始化連接,重置重連次數並嘗試連接。
post(queueName: string, payload: any): Promise<void>
發送消息到指定隊列。
consumer(connectionSetting: string, queueName: string, numConsumers: number, handler: (message: string) => Promise<void>): Promise<void>
啟動消費者,處理隊列中的消息。
getConnectionStatus(): ConnectionState
獲取當前連接狀態。
close(): Promise<void>
優雅關閉連接。
錯誤處理
插件會自動處理以下情況:
- 網絡斷開: 自動重連
- 心跳超時: 重新建立連接
- 通道錯誤: 重新創建通道
- 消息處理失敗: 消息重新入隊
- 連接阻塞: 記錄日誌並監控
重連機制
重連策略
- 初始重連間隔: 配置的
reconnectInterval值 - 指數退避: 每次重連間隔翻倍
- 最大間隔: 30 秒
- 重連限制: 配置的
maxReconnectAttempts值
重連示例
const mq = new Mq({
connectionSetting: 'amqp://localhost:5672',
heartbeat: 5, // 5 秒心跳
reconnectInterval: 1000, // 1 秒重連間隔
maxReconnectAttempts: 1000 // 最多重連 1000 次
})日誌
插件使用 @millionfor/logger 進行日誌記錄,包含以下級別:
INFO: 連接成功、消息發送成功等WARN: 連接關閉、重連嘗試等ERROR: 連接失敗、消息處理錯誤等
最佳實踐
- 連接管理: 在應用啟動時初始化連接,在應用關閉時優雅關閉
- 錯誤處理: 為消息處理函數添加適當的錯誤處理
- 監控: 定期檢查連接狀態
- 配置: 根據網絡環境調整心跳和重連參數
故障排除
常見問題
- 重連不工作: 確保
maxReconnectAttempts設置正確 - 心跳超時: 調整
heartbeat值 - 連接失敗: 檢查 RabbitMQ 服務狀態和網絡連接
調試建議
// 監控連接狀態
setInterval(() => {
const status = mq.getConnectionStatus()
console.log('連接狀態:', status)
}, 5000)示例
完整的使用示例請參考 example-fixed.js 文件。
版本歷史
v2.0.4
- ✅ 添加自動重連機制
- ✅ 實現心跳監控
- ✅ 改善錯誤處理
- ✅ 添加連接狀態管理
- ✅ 修復 TypeScript 類型錯誤
- ✅ 添加優雅關閉機制
- ✅ 完善文檔和示例
- ✅ 修復重連機制問題 - 確保在指定次數內正確重連
授權
MIT License
