npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

mm_queue

v1.0.3

Published

Message queue abstraction layer supporting memory, Redis, RabbitMQ and more

Readme

mm_queue

npm version License Node.js Version GitHub stars

English | 中文

消息队列抽象层

一个支持多种消息队列实现的抽象层,提供统一的API接口,支持内存、Redis、RabbitMQ等多种队列后端。

特性

核心特性

  • 统一抽象接口 - 提供一致的API,支持多种队列实现
  • 插件化架构 - 易于扩展新的队列实现
  • 类型安全 - 完整的TypeScript类型定义
  • 异步操作 - 所有方法返回Promise,支持async/await

支持的队列类型

  • 内存队列 - 高性能内存实现,适合单机应用
  • Redis队列 - 基于Redis的分布式队列,支持持久化
  • RabbitMQ队列 - 企业级消息队列,支持复杂路由

功能特性

  • 消息存储和顺序处理 - 保证FIFO(先进先出)顺序
  • 异步消费模式 - 支持多个消费者并发处理
  • 回调通知机制 - 消息处理完成后通知入队者
  • 请求-响应模式 - 支持类似RPC的请求响应场景
  • 队列管理功能 - 长度限制、状态监控、清空操作
  • 错误处理 - 消费者错误不影响队列运行
  • 消息ID跟踪 - 每个消息有唯一标识,支持结果查询

安装

npm install mm_queue

快速开始

安装依赖

npm install mm_queue
# 如果需要Redis或RabbitMQ支持,还需要安装相应依赖
npm install redis amqplib

基本使用

const { create, TYPE } = require('mm_queue');

// 创建内存队列实例
const queue = create(TYPE.MEMORY, { 
    max_size: 100,
    name: 'my_queue'
});

// 注册消费者
await queue.consume(async (data) => {
    console.log('处理消息:', data);
    return `处理结果: ${data}`;
});

// 入队消息
const msgId = await queue.enqueue('测试消息', (result) => {
    console.log('回调收到结果:', result.results[0].result);
});

console.log('消息ID:', msgId);

多队列类型示例

// 内存队列(无需外部依赖)
const memory_queue = create(TYPE.MEMORY, { max_size: 1000 });

// Redis队列(需要Redis服务)
const redis_queue = create(TYPE.REDIS, {
    host: 'localhost',
    port: 6379,
    name: 'redis_queue'
});

// RabbitMQ队列(需要RabbitMQ服务)
const rabbitmq_queue = create(TYPE.RABBITMQ, {
    host: 'localhost',
    port: 5672,
    name: 'rabbitmq_queue'
});

// 所有队列使用相同的API
await memory_queue.enqueue('内存消息');
await redis_queue.enqueue('Redis消息');
await rabbitmq_queue.enqueue('RabbitMQ消息');

请求-响应模式

// 注册请求处理器
await queue.consume(async (request) => {
    if (request.type === 'query') {
        return { 
            status: 'success', 
            data: `响应数据: ${request.payload}` 
        };
    }
});

// 发送请求
await queue.enqueue({
    type: 'query',
    payload: '用户数据'
}, (response) => {
    console.log('收到响应:', response);
});

API 文档

核心接口

QueueInterface 抽象类

所有队列实现都必须遵循此接口:

/**
 * 消息队列接口
 * @interface
 */
class QueueInterface {
    /**
     * 入队消息
     * @param {any} data - 消息数据
     * @param {Function} [callback] - 处理完成回调
     * @returns {Promise<string>} 消息ID
     */
    async enqueue(data, callback) {}
    
    /**
     * 出队消息
     * @returns {Promise<Object>} 消息对象
     */
    async dequeue() {}
    
    /**
     * 注册消费者
     * @param {Function} handler - 消息处理函数
     * @returns {Promise<void>}
     */
    async consume(handler) {}
    
    /**
     * 获取队列长度
     * @returns {Promise<number>}
     */
    async size() {}
    
    /**
     * 检查队列是否为空
     * @returns {Promise<boolean>}
     */
    async isEmpty() {}
    
    /**
     * 清空队列
     * @returns {Promise<void>}
     */
    async clear() {}
    
    /**
     * 获取队列状态
     * @returns {Promise<Object>}
     */
    async status() {}
    
    /**
     * 获取消息处理结果
     * @param {string} message_id - 消息ID
     * @returns {Promise<Object>}
     */
    async get(message_id) {}
    
    /**
     * 删除消息
     * @param {string} message_id - 消息ID
     * @returns {Promise<boolean>}
     */
    async del(message_id) {}
}

工厂方法

create(type, config)

创建队列实例

参数:

  • type (TYPE) - 队列类型
  • config (Object) - 队列配置

返回值:Queue - 队列实例

getTypes()

获取支持的队列类型

返回值:Array<string> - 支持的队列类型列表

isSupported(type)

检查是否支持指定队列类型

参数:

  • type (string) - 队列类型

返回值:boolean - 是否支持

队列类型枚举

/**
 * 队列类型枚举
 * @enum {string}
 */
const TYPE = {
    MEMORY: 'memory',      // 内存队列
    REDIS: 'redis',        // Redis队列
    RABBITMQ: 'rabbitmq'   // RabbitMQ队列
};

配置参数

通用配置

  • max_size (Number) - 队列最大长度,默认无限制
  • name (String) - 队列名称,用于标识

Redis配置

  • host (String) - Redis主机地址,默认'localhost'
  • port (Number) - Redis端口,默认6379
  • password (String) - Redis密码,可选
  • db (Number) - Redis数据库,默认0

RabbitMQ配置

  • host (String) - RabbitMQ主机地址,默认'localhost'
  • port (Number) - RabbitMQ端口,默认5672
  • username (String) - 用户名,默认'guest'
  • password (String) - 密码,默认'guest'
  • vhost (String) - 虚拟主机,默认'/'

示例

基本示例

const { create, Type } = require('./index');

async function main() {
    // 创建内存队列
    const queue = create(Type.MEMORY, { max_size: 100 });
    
    // 注册消费者
    await queue.consume(async (data) => {
        console.log('处理消息:', data);
        return `处理完成: ${data}`;
    });
    
    // 入队消息
    await queue.enqueue('消息1');
    await queue.enqueue('消息2');
    
    // 获取状态
    const status = await queue.status();
    console.log('队列状态:', status);
}

main().catch(console.error);

回调通知示例

const { create, Type } = require('./index');

async function main() {
    const queue = create(Type.MEMORY);
    
    // 注册消费者
    await queue.consume(async (data) => {
        console.log('处理消息:', data);
        return { result: 'success', data: `已处理: ${data}` };
    });
    
    // 入队带回调的消息
    await queue.enqueue('重要任务', (result) => {
        console.log('回调通知:');
        console.log('  消息ID:', result.message_id);
        console.log('  处理结果:', result.results[0].result);
    });
}

main().catch(console.error);

请求-响应模式

const { create, Type } = require('./index');

async function main() {
    const queue = create(Type.MEMORY);
    
    // 注册请求处理器
    await queue.consume(async (request) => {
        if (request.type === 'get_user') {
            return { user: { id: 1, name: '张三' } };
        }
        return { error: '未知请求类型' };
    });
    
    // 发送请求
    await queue.enqueue({
        type: 'get_user',
        user_id: 1
    }, (response) => {
        console.log('收到响应:', response.results[0].result);
    });
}

main().catch(console.error);

多队列类型切换示例

const { create_queue, QueueType } = require('./index');

async function test_queue(type, config) {
    console.log(`\n测试 ${type} 队列:`);
    
    try {
        const queue = await create_queue(type, config);
        
        // 注册消费者
        await queue.consume(async (data) => {
            console.log(`[${type}] 处理消息:`, data);
            return `处理完成: ${data}`;
        });
        
        // 入队消息
        const msgId = await queue.enqueue(`测试消息 - ${type}`);
        console.log(`[${type}] 消息ID:`, msgId);
        
        // 检查状态
        const status = await queue.status();
        console.log(`[${type}] 队列状态:`, status);
        
        // 关闭连接
        await queue.close();
        
    } catch (err) {
        console.error(`[${type}] 错误:`, err.message);
    }
}

async function main() {
    // 测试内存队列
    await test_queue(QueueType.MEMORY, { max_size: 10 });
    
    // 测试Redis队列(需要Redis服务)
    await test_queue(QueueType.REDIS, {
        host: 'localhost',
        port: 6379
    });
    
    // 测试RabbitMQ队列(需要RabbitMQ服务)
    await test_queue(QueueType.RABBITMQ, {
        host: 'localhost',
        port: 5672
    });
}

main().catch(console.error);

高级用法

批量处理

const { create_queue, QueueType } = require('./index');

async function batch_processing() {
    const queue = await create_queue(QueueType.MEMORY);
    
    // 注册批量处理器
    await queue.consume(async (data) => {
        console.log('批量处理消息:', data);
        return { processed: true, timestamp: Date.now() };
    });
    
    // 批量入队消息
    const messages = ['消息1', '消息2', '消息3'];
    const promises = messages.map(msg => queue.enqueue(msg));
    
    // 等待所有消息入队
    await Promise.all(promises);
    console.log('批量处理完成');
}

batch_processing().catch(console.error);

自定义消息处理

const { create_queue, QueueType } = require('./index');

async function custom_processing() {
    const queue = await create_queue(QueueType.MEMORY);
    
    // 复杂消息处理器
    await queue.consume(async (message) => {
        switch (message.type) {
            case 'email':
                // 处理邮件
                return await send_email(message);
            case 'notification':
                // 处理通知
                return await send_notification(message);
            case 'report':
                // 生成报告
                return await generate_report(message);
            default:
                return { error: '未知消息类型' };
        }
    });
    
    // 发送不同类型的消息
    await queue.enqueue({ type: 'email', to: '[email protected]', subject: '测试邮件' });
    await queue.enqueue({ type: 'notification', user_id: 1, message: '系统通知' });
}

custom_processing().catch(console.error);

队列监控

const { create_queue, QueueType } = require('./index');

async function monitor_queue() {
    const queue = await create_queue(QueueType.MEMORY);
    
    // 定期监控队列状态
    setInterval(async () => {
        try {
            const status = await queue.status();
            console.log('队列监控:');
            console.log('  长度:', status.size);
            console.log('  消费者:', status.consumers);
            console.log('  处理中:', status.processing);
            console.log('  待处理回调:', status.pending_callbacks);
        } catch (err) {
            console.error('监控错误:', err.message);
        }
    }, 5000);
    
    // 模拟消息处理
    await queue.consume(async (data) => {
        await new Promise(resolve => setTimeout(resolve, 1000));
        return `处理完成: ${data}`;
    });
    
    // 持续入队消息
    for (let i = 0; i < 10; i++) {
        await queue.enqueue(`监控消息${i}`);
        await new Promise(resolve => setTimeout(resolve, 2000));
    }
}

monitor_queue().catch(console.error);

架构说明

抽象层设计

新的消息队列抽象层采用插件化架构,包含以下核心组件:

mm_queue/
├── index.js          # 主入口,提供工厂方法和统一接口
├── interface.js      # QueueInterface抽象类定义
├── factory.js        # QueueFactory工厂类
└── impl/            # 具体实现
    ├── memory.js     # 内存队列实现
    ├── redis.js      # Redis队列实现
    └── rabbitmq.js   # RabbitMQ队列实现

设计模式

  • 抽象工厂模式:通过QueueFactory统一创建不同队列实例
  • 策略模式:不同的队列实现可以动态切换
  • 适配器模式:将第三方队列系统适配到统一接口

扩展性

要添加新的队列实现,只需:

  1. impl目录下创建新的实现文件
  2. 实现QueueInterface接口的所有方法
  3. factory.js中注册新的队列类型
  4. 更新QueueType枚举

性能考虑

  • 内存队列:适合单机应用,性能最高
  • Redis队列:适合分布式应用,支持持久化
  • RabbitMQ队列:适合企业级应用,支持复杂路由

最佳实践

1. 选择合适的队列类型

  • 单机应用:使用内存队列,性能最优
  • 分布式应用:使用Redis队列,支持多实例
  • 企业级应用:使用RabbitMQ队列,功能最全

2. 错误处理策略

try {
    await queue.enqueue(data);
} catch (err) {
    console.error('入队失败:', err.message);
    // 重试逻辑或降级处理
}

3. 资源管理

// 使用完毕后关闭连接
await queue.close();

4. 消费者设计

// 消费者应该处理所有可能的错误
await queue.consume(async (data) => {
    try {
        return await process_data(data);
    } catch (err) {
        console.error('处理失败:', err.message);
        return { error: err.message };
    }
});

命名规范

本项目严格遵循以下命名规范:

  • 单字优先: 所有命名首先尝试使用一个单词
  • 禁用废话: 避免使用Manager、Processor、Handler等宽泛词汇
  • 使用缩写: 使用行业通用缩写保持简洁

许可证

MIT