npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

amqplib-init

v1.2.4

Published

消息队列初始化 - 模块化架构版本 (支持15分钟长任务)

Downloads

83

Readme

amqplib-init

一个强大且易用的 RabbitMQ 消息队列初始化和管理库,支持自动重连、消息处理、长任务执行等功能。

🚀 特性

  • 模块化架构 - 清晰的模块分离,易于维护和扩展
  • 自动重连 - 连接断开时自动重连,确保服务稳定性
  • 消息处理 - 内置消息确认/拒绝机制,支持延迟确认
  • 长任务支持 - 支持最长15分钟的消息处理任务
  • 错误处理 - 智能处理空消息和格式错误的消息,防止阻塞
  • 自动重载 - 支持自动重载监控
  • 配置灵活 - 支持多种配置方式和自动配置获取

📦 安装

npm install amqplib-init

🔧 快速开始

基础用法

const { init } = require('amqplib-init');

// 初始化消息队列
await init({
  channelName: 'my-queue',
  amqpLink: 'amqp://localhost:5672',
  callback: async (message) => {
    console.log('收到消息:', message);
    // 处理消息逻辑
    await processMessage(message);
  }
});

高级用法

const { AMQPInitializer } = require('amqplib-init');

const initializer = new AMQPInitializer();

await initializer.init({
  channelName: 'my-queue',
  amqpLink: 'amqp://localhost:5672',
  prefetch: 1,
  durable: true,
  delay: 1000,
  heartbeat: 60,
  timeout: 300000,
  messageTimeout: 900000,
  callback: async (message) => {
    // 消息处理逻辑
    return processMessage(message);
  },
  finish: () => {
    console.log('初始化完成');
  },
  initHook: async ({ channel, connection }) => {
    // 初始化钩子
    console.log('连接已建立');
  }
});

// 获取状态信息
console.log(initializer.getStatus());

// 优雅关闭
await initializer.shutdown();

⚙️ 配置参数

| 参数 | 类型 | 默认值 | 描述 | |------|------|--------|------| | channelName | string | 'node-test-channel' | 队列名称 | | amqpLink | string | '' | RabbitMQ 连接地址 | | amqpAutoLink | string | '' | 自动获取连接地址的API | | prefetch | number | 1 | 预取消息数量 | | durable | boolean | true | 队列持久化 | | delay | number | 0 | 消息确认延迟时间(ms) | | heartbeat | number | 60 | 心跳间隔(秒) | | timeout | number | 300000 | 连接超时时间(ms) | | messageTimeout | number | 900000 | 消息处理超时时间(ms) | | reconnectDelay | number | 5000 | 重连延迟时间(ms) | | maxReconnectAttempts | number | 10 | 最大重连次数 | | autoReload | number | 0 | 自动重载间隔(秒) | | callback | function | () => {} | 消息处理回调 | | finish | function | () => {} | 初始化完成回调 | | initHook | function | () => {} | 初始化钩子 | | queryHook | function | () => {} | 查询钩子 |

🛡️ 错误处理

v1.2.3+ 版本增强了错误处理能力:

  • 空消息处理 - 自动识别并确认空消息,防止消费阻塞
  • JSON解析错误 - 智能处理格式错误的消息,避免无限循环
  • 连接异常 - 自动重连机制,确保服务稳定性
// 库会自动处理以下情况:
// - 空消息内容
// - null 或 undefined 消息
// - 无效的 JSON 格式
// - 连接断开重连

🔍 状态监控

const status = initializer.getStatus();
console.log({
  isConnected: status.isConnected,        // 连接状态
  processingCount: status.processingCount, // 处理中消息数量
  autoReloaderActive: status.autoReloaderActive, // 自动重载状态
  channelName: status.channelName         // 当前队列名
});

🚨 注意事项

  1. 长任务支持 - 支持最长15分钟的消息处理,超时会自动处理
  2. 消息确认 - 消息处理成功后会自动确认,失败会重新排队
  3. 重连机制 - 连接断开时会自动重连并恢复消息处理
  4. 内存管理 - 内置消息状态跟踪,防止内存泄漏

📝 更新日志

v1.2.3 (2025-08-25)

  • 🐛 修复 JSON 解析错误导致的消息处理阻塞
  • ✨ 增加空消息和无效消息的自动确认机制
  • 📝 完善错误日志输出,便于问题排查

v1.2.1

  • 🔧 模块化架构重构
  • ✨ 支持15分钟长任务处理
  • 🚀 优化自动重连逻辑

📄 许可证

ISC

🤝 贡献

欢迎提交 Issue 和 Pull Request!

📞 支持

如果您在使用过程中遇到问题,请通过以下方式获取支持:

  1. 查看本文档的常见问题
  2. 提交 GitHub Issue
  3. 查看示例代码 demo.js