mm_queue
v1.0.3
Published
Message queue abstraction layer supporting memory, Redis, RabbitMQ and more
Maintainers
Readme
mm_queue
English | 中文
消息队列抽象层
一个支持多种消息队列实现的抽象层,提供统一的API接口,支持内存、Redis、RabbitMQ等多种队列后端。
特性
核心特性
- ✅ 统一抽象接口 - 提供一致的API,支持多种队列实现
- ✅ 插件化架构 - 易于扩展新的队列实现
- ✅ 类型安全 - 完整的TypeScript类型定义
- ✅ 异步操作 - 所有方法返回Promise,支持async/await
支持的队列类型
- ✅ 内存队列 - 高性能内存实现,适合单机应用
- ✅ Redis队列 - 基于Redis的分布式队列,支持持久化
- ✅ RabbitMQ队列 - 企业级消息队列,支持复杂路由
功能特性
- ✅ 消息存储和顺序处理 - 保证FIFO(先进先出)顺序
- ✅ 异步消费模式 - 支持多个消费者并发处理
- ✅ 回调通知机制 - 消息处理完成后通知入队者
- ✅ 请求-响应模式 - 支持类似RPC的请求响应场景
- ✅ 队列管理功能 - 长度限制、状态监控、清空操作
- ✅ 错误处理 - 消费者错误不影响队列运行
- ✅ 消息ID跟踪 - 每个消息有唯一标识,支持结果查询
安装
npm install mm_queue快速开始
安装依赖
npm install mm_queue
# 如果需要Redis或RabbitMQ支持,还需要安装相应依赖
npm install redis amqplib基本使用
const { create, TYPE } = require('mm_queue');
// 创建内存队列实例
const queue = create(TYPE.MEMORY, {
max_size: 100,
name: 'my_queue'
});
// 注册消费者
await queue.consume(async (data) => {
console.log('处理消息:', data);
return `处理结果: ${data}`;
});
// 入队消息
const msgId = await queue.enqueue('测试消息', (result) => {
console.log('回调收到结果:', result.results[0].result);
});
console.log('消息ID:', msgId);多队列类型示例
// 内存队列(无需外部依赖)
const memory_queue = create(TYPE.MEMORY, { max_size: 1000 });
// Redis队列(需要Redis服务)
const redis_queue = create(TYPE.REDIS, {
host: 'localhost',
port: 6379,
name: 'redis_queue'
});
// RabbitMQ队列(需要RabbitMQ服务)
const rabbitmq_queue = create(TYPE.RABBITMQ, {
host: 'localhost',
port: 5672,
name: 'rabbitmq_queue'
});
// 所有队列使用相同的API
await memory_queue.enqueue('内存消息');
await redis_queue.enqueue('Redis消息');
await rabbitmq_queue.enqueue('RabbitMQ消息');请求-响应模式
// 注册请求处理器
await queue.consume(async (request) => {
if (request.type === 'query') {
return {
status: 'success',
data: `响应数据: ${request.payload}`
};
}
});
// 发送请求
await queue.enqueue({
type: 'query',
payload: '用户数据'
}, (response) => {
console.log('收到响应:', response);
});API 文档
核心接口
QueueInterface 抽象类
所有队列实现都必须遵循此接口:
/**
* 消息队列接口
* @interface
*/
class QueueInterface {
/**
* 入队消息
* @param {any} data - 消息数据
* @param {Function} [callback] - 处理完成回调
* @returns {Promise<string>} 消息ID
*/
async enqueue(data, callback) {}
/**
* 出队消息
* @returns {Promise<Object>} 消息对象
*/
async dequeue() {}
/**
* 注册消费者
* @param {Function} handler - 消息处理函数
* @returns {Promise<void>}
*/
async consume(handler) {}
/**
* 获取队列长度
* @returns {Promise<number>}
*/
async size() {}
/**
* 检查队列是否为空
* @returns {Promise<boolean>}
*/
async isEmpty() {}
/**
* 清空队列
* @returns {Promise<void>}
*/
async clear() {}
/**
* 获取队列状态
* @returns {Promise<Object>}
*/
async status() {}
/**
* 获取消息处理结果
* @param {string} message_id - 消息ID
* @returns {Promise<Object>}
*/
async get(message_id) {}
/**
* 删除消息
* @param {string} message_id - 消息ID
* @returns {Promise<boolean>}
*/
async del(message_id) {}
}工厂方法
create(type, config)
创建队列实例
参数:
type(TYPE) - 队列类型config(Object) - 队列配置
返回值:Queue - 队列实例
getTypes()
获取支持的队列类型
返回值:Array<string> - 支持的队列类型列表
isSupported(type)
检查是否支持指定队列类型
参数:
type(string) - 队列类型
返回值:boolean - 是否支持
队列类型枚举
/**
* 队列类型枚举
* @enum {string}
*/
const TYPE = {
MEMORY: 'memory', // 内存队列
REDIS: 'redis', // Redis队列
RABBITMQ: 'rabbitmq' // RabbitMQ队列
};配置参数
通用配置
max_size(Number) - 队列最大长度,默认无限制name(String) - 队列名称,用于标识
Redis配置
host(String) - Redis主机地址,默认'localhost'port(Number) - Redis端口,默认6379password(String) - Redis密码,可选db(Number) - Redis数据库,默认0
RabbitMQ配置
host(String) - RabbitMQ主机地址,默认'localhost'port(Number) - RabbitMQ端口,默认5672username(String) - 用户名,默认'guest'password(String) - 密码,默认'guest'vhost(String) - 虚拟主机,默认'/'
示例
基本示例
const { create, Type } = require('./index');
async function main() {
// 创建内存队列
const queue = create(Type.MEMORY, { max_size: 100 });
// 注册消费者
await queue.consume(async (data) => {
console.log('处理消息:', data);
return `处理完成: ${data}`;
});
// 入队消息
await queue.enqueue('消息1');
await queue.enqueue('消息2');
// 获取状态
const status = await queue.status();
console.log('队列状态:', status);
}
main().catch(console.error);回调通知示例
const { create, Type } = require('./index');
async function main() {
const queue = create(Type.MEMORY);
// 注册消费者
await queue.consume(async (data) => {
console.log('处理消息:', data);
return { result: 'success', data: `已处理: ${data}` };
});
// 入队带回调的消息
await queue.enqueue('重要任务', (result) => {
console.log('回调通知:');
console.log(' 消息ID:', result.message_id);
console.log(' 处理结果:', result.results[0].result);
});
}
main().catch(console.error);请求-响应模式
const { create, Type } = require('./index');
async function main() {
const queue = create(Type.MEMORY);
// 注册请求处理器
await queue.consume(async (request) => {
if (request.type === 'get_user') {
return { user: { id: 1, name: '张三' } };
}
return { error: '未知请求类型' };
});
// 发送请求
await queue.enqueue({
type: 'get_user',
user_id: 1
}, (response) => {
console.log('收到响应:', response.results[0].result);
});
}
main().catch(console.error);多队列类型切换示例
const { create_queue, QueueType } = require('./index');
async function test_queue(type, config) {
console.log(`\n测试 ${type} 队列:`);
try {
const queue = await create_queue(type, config);
// 注册消费者
await queue.consume(async (data) => {
console.log(`[${type}] 处理消息:`, data);
return `处理完成: ${data}`;
});
// 入队消息
const msgId = await queue.enqueue(`测试消息 - ${type}`);
console.log(`[${type}] 消息ID:`, msgId);
// 检查状态
const status = await queue.status();
console.log(`[${type}] 队列状态:`, status);
// 关闭连接
await queue.close();
} catch (err) {
console.error(`[${type}] 错误:`, err.message);
}
}
async function main() {
// 测试内存队列
await test_queue(QueueType.MEMORY, { max_size: 10 });
// 测试Redis队列(需要Redis服务)
await test_queue(QueueType.REDIS, {
host: 'localhost',
port: 6379
});
// 测试RabbitMQ队列(需要RabbitMQ服务)
await test_queue(QueueType.RABBITMQ, {
host: 'localhost',
port: 5672
});
}
main().catch(console.error);高级用法
批量处理
const { create_queue, QueueType } = require('./index');
async function batch_processing() {
const queue = await create_queue(QueueType.MEMORY);
// 注册批量处理器
await queue.consume(async (data) => {
console.log('批量处理消息:', data);
return { processed: true, timestamp: Date.now() };
});
// 批量入队消息
const messages = ['消息1', '消息2', '消息3'];
const promises = messages.map(msg => queue.enqueue(msg));
// 等待所有消息入队
await Promise.all(promises);
console.log('批量处理完成');
}
batch_processing().catch(console.error);自定义消息处理
const { create_queue, QueueType } = require('./index');
async function custom_processing() {
const queue = await create_queue(QueueType.MEMORY);
// 复杂消息处理器
await queue.consume(async (message) => {
switch (message.type) {
case 'email':
// 处理邮件
return await send_email(message);
case 'notification':
// 处理通知
return await send_notification(message);
case 'report':
// 生成报告
return await generate_report(message);
default:
return { error: '未知消息类型' };
}
});
// 发送不同类型的消息
await queue.enqueue({ type: 'email', to: '[email protected]', subject: '测试邮件' });
await queue.enqueue({ type: 'notification', user_id: 1, message: '系统通知' });
}
custom_processing().catch(console.error);队列监控
const { create_queue, QueueType } = require('./index');
async function monitor_queue() {
const queue = await create_queue(QueueType.MEMORY);
// 定期监控队列状态
setInterval(async () => {
try {
const status = await queue.status();
console.log('队列监控:');
console.log(' 长度:', status.size);
console.log(' 消费者:', status.consumers);
console.log(' 处理中:', status.processing);
console.log(' 待处理回调:', status.pending_callbacks);
} catch (err) {
console.error('监控错误:', err.message);
}
}, 5000);
// 模拟消息处理
await queue.consume(async (data) => {
await new Promise(resolve => setTimeout(resolve, 1000));
return `处理完成: ${data}`;
});
// 持续入队消息
for (let i = 0; i < 10; i++) {
await queue.enqueue(`监控消息${i}`);
await new Promise(resolve => setTimeout(resolve, 2000));
}
}
monitor_queue().catch(console.error);架构说明
抽象层设计
新的消息队列抽象层采用插件化架构,包含以下核心组件:
mm_queue/
├── index.js # 主入口,提供工厂方法和统一接口
├── interface.js # QueueInterface抽象类定义
├── factory.js # QueueFactory工厂类
└── impl/ # 具体实现
├── memory.js # 内存队列实现
├── redis.js # Redis队列实现
└── rabbitmq.js # RabbitMQ队列实现设计模式
- 抽象工厂模式:通过
QueueFactory统一创建不同队列实例 - 策略模式:不同的队列实现可以动态切换
- 适配器模式:将第三方队列系统适配到统一接口
扩展性
要添加新的队列实现,只需:
- 在
impl目录下创建新的实现文件 - 实现
QueueInterface接口的所有方法 - 在
factory.js中注册新的队列类型 - 更新
QueueType枚举
性能考虑
- 内存队列:适合单机应用,性能最高
- Redis队列:适合分布式应用,支持持久化
- RabbitMQ队列:适合企业级应用,支持复杂路由
最佳实践
1. 选择合适的队列类型
- 单机应用:使用内存队列,性能最优
- 分布式应用:使用Redis队列,支持多实例
- 企业级应用:使用RabbitMQ队列,功能最全
2. 错误处理策略
try {
await queue.enqueue(data);
} catch (err) {
console.error('入队失败:', err.message);
// 重试逻辑或降级处理
}3. 资源管理
// 使用完毕后关闭连接
await queue.close();4. 消费者设计
// 消费者应该处理所有可能的错误
await queue.consume(async (data) => {
try {
return await process_data(data);
} catch (err) {
console.error('处理失败:', err.message);
return { error: err.message };
}
});命名规范
本项目严格遵循以下命名规范:
- 单字优先: 所有命名首先尝试使用一个单词
- 禁用废话: 避免使用Manager、Processor、Handler等宽泛词汇
- 使用缩写: 使用行业通用缩写保持简洁
许可证
MIT
