@develop-x/nest-events
v1.0.15
Published
基于 NestJS 的 RabbitMQ 模块,提供完整的 RabbitMQ 消息队列功能。
Keywords
Readme
@develop-x/nest-rabbitmq
基于 NestJS 的 RabbitMQ 模块,提供完整的 RabbitMQ 消息队列功能。
特性
- ✅ 使用 amqp-connection-manager 进行自动重连
- ✅ 支持消息持久化
- ✅ 支持 fanout 模式(广播消息)
- ✅ 装饰器方式监听消息
- ✅ 支持 forRoot/forRootAsync 动态配置
- ✅ 自动创建交换机和队列
- ✅ 消息确认机制
- ✅ 错误处理和日志记录
- ✅ 使用 DiscoveryModule 自动扫描装饰器类
- ✅ 支持目录过滤功能
- ✅ 异步模块注册
安装
npm install @develop-x/nest-rabbitmq amqp-connection-manager amqplib minimatch
npm install --save-dev @types/amqplib使用方法
1. 静态配置
import { Module } from '@nestjs/common';
import { RabbitMQModule } from '@develop-x/nest-rabbitmq';
@Module({
imports: [
RabbitMQModule.forRoot({
urls: ['amqp://localhost:5672'],
exchange: 'app.events',
exchangeType: 'topic',
durable: true,
// 目录过滤 - 只扫描 events 相关目录
includePatterns: [
'**/events/**',
'**/handlers/**'
],
// 排除测试文件
excludePatterns: [
'**/*.spec.ts',
'**/*.test.ts'
]
}),
],
})
export class AppModule {}2. 异步配置(推荐)
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { RabbitMQModule } from '@develop-x/nest-rabbitmq';
@Module({
imports: [
ConfigModule.forRoot(),
RabbitMQModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
urls: [configService.get('RABBITMQ_URL', 'amqp://localhost:5672')],
exchange: configService.get('RABBITMQ_EXCHANGE', 'app.events'),
exchangeType: 'topic',
durable: true,
prefetch: 10,
// 只包含特定模块的事件处理器
includePatterns: [
'**/src/events/**',
'**/src/modules/*/handlers/**',
'**/src/listeners/**'
],
// 排除不需要的文件
excludePatterns: [
'**/*.spec.ts',
'**/*.test.ts',
'**/node_modules/**',
'**/dist/**'
]
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}3. 自动扫描装饰器类
模块会自动使用 DiscoveryModule 扫描所有带有 @RabbitListener 装饰器的类和方法,无需手动注册:
// src/events/user-events.handler.ts
import { Injectable } from '@nestjs/common';
import { RabbitListener } from '@develop-x/nest-rabbitmq';
@Injectable()
export class UserEventsHandler {
@RabbitListener({
queue: 'user.notifications',
routingKey: 'user.created'
})
async handleUserCreated(data: any) {
console.log('用户创建事件:', data);
// 处理用户创建逻辑
}
@RabbitListener({
queue: 'user.emails',
routingKey: 'user.welcome'
})
async handleWelcomeEmail(data: any) {
console.log('发送欢迎邮件:', data);
// 发送欢迎邮件逻辑
}
}
// src/events/order-events.handler.ts
@Injectable()
export class OrderEventsHandler {
@RabbitListener({
queue: 'order.processing',
routingKey: 'order.*',
exchange: 'order.events'
})
async handleOrderEvents(data: any) {
console.log('订单事件:', data);
// 处理订单事件逻辑
}
}4. 目录过滤配置示例
只监听特定目录
RabbitMQModule.forRoot({
urls: ['amqp://localhost:5672'],
exchange: 'app.events',
exchangeType: 'topic',
durable: true,
// 只包含events目录下的文件
includePatterns: [
'**/events/**',
'**/handlers/**',
'**/listeners/**'
]
})排除测试文件和特定目录
RabbitMQModule.forRoot({
urls: ['amqp://localhost:5672'],
exchange: 'app.events',
exchangeType: 'topic',
durable: true,
// 包含所有events相关目录
includePatterns: ['**/events/**'],
// 排除测试文件
excludePatterns: [
'**/*.spec.ts',
'**/*.test.ts',
'**/test/**',
'**/node_modules/**'
]
})复杂的目录过滤
RabbitMQModule.forRoot({
urls: ['amqp://localhost:5672'],
exchange: 'app.events',
exchangeType: 'topic',
durable: true,
// 只包含特定模块的事件处理器
includePatterns: [
'**/src/events/**',
'**/src/modules/*/events/**',
'**/src/handlers/**'
],
// 排除不需要的文件
excludePatterns: [
'**/*.spec.ts',
'**/node_modules/**',
'**/dist/**'
]
})5. 消息发布
import { Injectable } from '@nestjs/common';
import { RabbitMQService } from '@develop-x/nest-rabbitmq';
@Injectable()
export class UserService {
constructor(private readonly rabbitMQService: RabbitMQService) {}
async createUser(userData: any) {
// 业务逻辑...
// 发布用户创建事件到fanout交换机(广播)
await this.rabbitMQService.publishToFanout(
'user.events',
{
event: 'user.created',
data: userData,
timestamp: new Date(),
}
);
// 发布到指定路由键
await this.rabbitMQService.publish(
'order.events',
'order.created',
{ orderId: 123, userId: userData.id }
);
// 直接发送到队列
await this.rabbitMQService.sendToQueue(
'user.notifications',
{ type: 'welcome', userId: userData.id }
);
}
}6. 消息消费(装饰器方式)
import { Injectable } from '@nestjs/common';
import { ConsumeMessage } from 'amqplib';
import { RabbitMQSubscribe } from '@develop-x/nest-rabbitmq';
@Injectable()
export class NotificationService {
// 监听fanout交换机的广播消息
@RabbitMQSubscribe({
exchange: {
name: 'user.events',
type: 'fanout',
durable: true,
},
// 不指定队列名称,会自动创建临时独占队列
})
async handleUserEvents(data: any, message: ConsumeMessage) {
console.log('收到用户事件:', data);
if (data.event === 'user.created') {
// 处理用户创建事件
console.log('新用户注册:', data.data);
}
}
// 监听指定队列
@RabbitMQSubscribe({
exchange: {
name: 'order.events',
type: 'direct',
durable: true,
},
queue: {
name: 'order.notifications',
durable: true,
},
routingKey: 'order.created',
})
async handleOrderCreated(data: any, message: ConsumeMessage) {
console.log('收到订单创建事件:', data);
// 发送订单确认通知
}
// 监听持久化队列
@RabbitMQSubscribe({
exchange: {
name: 'system.events',
type: 'topic',
durable: true,
},
queue: {
name: 'email.queue',
durable: true,
},
routingKey: 'email.*',
})
async handleEmailEvents(data: any, message: ConsumeMessage) {
console.log('收到邮件事件:', data);
// 处理邮件发送
}
}7. 多服务fanout模式示例
// 服务A - 用户服务
@Injectable()
export class UserService {
constructor(private readonly rabbitMQService: RabbitMQService) {}
async registerUser(userData: any) {
// 注册用户逻辑...
// 广播用户注册事件,所有监听此交换机的服务都会收到
await this.rabbitMQService.publishToFanout('user.events', {
event: 'user.registered',
userId: userData.id,
email: userData.email,
timestamp: new Date(),
});
}
}
// 服务B - 邮件服务
@Injectable()
export class EmailService {
@RabbitMQSubscribe({
exchange: {
name: 'user.events',
type: 'fanout',
durable: true,
},
})
async handleUserEvents(data: any) {
if (data.event === 'user.registered') {
await this.sendWelcomeEmail(data.email);
}
}
}
// 服务C - 统计服务
@Injectable()
export class StatisticsService {
@RabbitMQSubscribe({
exchange: {
name: 'user.events',
type: 'fanout',
durable: true,
},
})
async handleUserEvents(data: any) {
if (data.event === 'user.registered') {
await this.updateUserStats();
}
}
}
// 服务D - 积分服务
@Injectable()
export class PointsService {
@RabbitMQSubscribe({
exchange: {
name: 'user.events',
type: 'fanout',
durable: true,
},
})
async handleUserEvents(data: any) {
if (data.event === 'user.registered') {
await this.giveWelcomePoints(data.userId);
}
}
}API 参考
RabbitMQService
主要的服务类,提供消息发布和消费功能。
方法
publish(exchangeName, routingKey, message, options?)- 发布消息到交换机publishToFanout(exchangeName, message, options?)- 发布消息到fanout交换机sendToQueue(queueName, message, options?)- 直接发送消息到队列consume(queueName, onMessage, options?)- 消费队列消息getConnection()- 获取连接实例getChannel()- 获取通道实例
@RabbitMQSubscribe 装饰器
用于标记消息订阅方法的装饰器。
参数
{
exchange: {
name: string; // 交换机名称
type: string; // 交换机类型:direct, topic, fanout, headers
durable?: boolean; // 是否持久化,默认true
options?: any; // 其他选项
},
queue?: {
name: string; // 队列名称
durable?: boolean; // 是否持久化,默认true
options?: any; // 其他选项
},
routingKey?: string; // 路由键
consumeOptions?: any; // 消费选项
}注意事项
- 消息持久化:默认启用消息持久化,确保消息不会因服务重启而丢失
- 自动重连:使用 amqp-connection-manager 实现自动重连机制
- 错误处理:消息处理失败时会自动nack,不会重新排队
- Fanout模式:不指定队列名称时会创建临时独占队列,适合广播场景
- 确认机制:消息成功处理后会自动ack,确保消息可靠性
许可证
MIT
@develop-x/nest-events
RabbitMQ事件处理模块,支持目录过滤功能。
配置示例
1. 只监听events目录下的handlers
import { Module } from '@nestjs/common';
import { RabbitMQModule } from '@develop-x/nest-events';
@Module({
imports: [
RabbitMQModule.forRoot({
urls: ['amqp://localhost:5672'],
exchange: 'app.events',
exchangeType: 'topic',
durable: true,
// 只包含events目录下的文件
includePatterns: [
'**/events/**',
'**/handlers/**'
]
}),
],
})
export class AppModule {}2. 排除测试文件和特定目录
@Module({
imports: [
RabbitMQModule.forRoot({
urls: ['amqp://localhost:5672'],
exchange: 'app.events',
exchangeType: 'topic',
durable: true,
// 包含所有events相关目录
includePatterns: ['**/events/**'],
// 排除测试文件
excludePatterns: [
'**/*.spec.ts',
'**/*.test.ts',
'**/test/**'
]
}),
],
})
export class AppModule {}3. 复杂的目录过滤
@Module({
imports: [
RabbitMQModule.forRoot({
urls: ['amqp://localhost:5672'],
exchange: 'app.events',
exchangeType: 'topic',
durable: true,
// 只包含特定模块的事件处理器
includePatterns: [
'**/src/events/**',
'**/src/modules/*/events/**',
'**/src/handlers/**'
],
// 排除不需要的文件
excludePatterns: [
'**/*.spec.ts',
'**/node_modules/**',
'**/dist/**'
]
}),
],
})
export class AppModule {}使用事件处理器
// src/events/user-events.handler.ts
import { Injectable } from '@nestjs/common';
import { RabbitListener } from '@develop-x/nest-events';
@Injectable()
export class UserEventsHandler {
@RabbitListener({
queue: 'user.notifications',
routingKey: 'user.created'
})
handleUserCreated(data: any) {
console.log('用户创建事件:', data);
}
}模式说明
*- 匹配任意字符(除路径分隔符)**- 匹配任意字符(包括路径分隔符),用于跨目录匹配- 模式匹配不区分大小写
- 如果同时设置了
includePatterns和excludePatterns,会先检查包含,再检查排除 - 如果没有设置任何模式,默认扫描所有provider
