npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@develop-x/nest-events

v1.0.15

Published

基于 NestJS 的 RabbitMQ 模块,提供完整的 RabbitMQ 消息队列功能。

Readme

@develop-x/nest-rabbitmq

基于 NestJS 的 RabbitMQ 模块,提供完整的 RabbitMQ 消息队列功能。

特性

  • ✅ 使用 amqp-connection-manager 进行自动重连
  • ✅ 支持消息持久化
  • ✅ 支持 fanout 模式(广播消息)
  • ✅ 装饰器方式监听消息
  • ✅ 支持 forRoot/forRootAsync 动态配置
  • ✅ 自动创建交换机和队列
  • ✅ 消息确认机制
  • ✅ 错误处理和日志记录
  • 使用 DiscoveryModule 自动扫描装饰器类
  • 支持目录过滤功能
  • 异步模块注册

安装

npm install @develop-x/nest-rabbitmq amqp-connection-manager amqplib minimatch
npm install --save-dev @types/amqplib

使用方法

1. 静态配置

import { Module } from '@nestjs/common';
import { RabbitMQModule } from '@develop-x/nest-rabbitmq';

@Module({
  imports: [
    RabbitMQModule.forRoot({
      urls: ['amqp://localhost:5672'],
      exchange: 'app.events',
      exchangeType: 'topic',
      durable: true,
      // 目录过滤 - 只扫描 events 相关目录
      includePatterns: [
        '**/events/**',
        '**/handlers/**'
      ],
      // 排除测试文件
      excludePatterns: [
        '**/*.spec.ts',
        '**/*.test.ts'
      ]
    }),
  ],
})
export class AppModule {}

2. 异步配置(推荐)

import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { RabbitMQModule } from '@develop-x/nest-rabbitmq';

@Module({
  imports: [
    ConfigModule.forRoot(),
    RabbitMQModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        urls: [configService.get('RABBITMQ_URL', 'amqp://localhost:5672')],
        exchange: configService.get('RABBITMQ_EXCHANGE', 'app.events'),
        exchangeType: 'topic',
        durable: true,
        prefetch: 10,
        // 只包含特定模块的事件处理器
        includePatterns: [
          '**/src/events/**',
          '**/src/modules/*/handlers/**',
          '**/src/listeners/**'
        ],
        // 排除不需要的文件
        excludePatterns: [
          '**/*.spec.ts',
          '**/*.test.ts',
          '**/node_modules/**',
          '**/dist/**'
        ]
      }),
      inject: [ConfigService],
    }),
  ],
})
export class AppModule {}

3. 自动扫描装饰器类

模块会自动使用 DiscoveryModule 扫描所有带有 @RabbitListener 装饰器的类和方法,无需手动注册:

// src/events/user-events.handler.ts
import { Injectable } from '@nestjs/common';
import { RabbitListener } from '@develop-x/nest-rabbitmq';

@Injectable()
export class UserEventsHandler {
  @RabbitListener({
    queue: 'user.notifications',
    routingKey: 'user.created'
  })
  async handleUserCreated(data: any) {
    console.log('用户创建事件:', data);
    // 处理用户创建逻辑
  }

  @RabbitListener({
    queue: 'user.emails',
    routingKey: 'user.welcome'
  })
  async handleWelcomeEmail(data: any) {
    console.log('发送欢迎邮件:', data);
    // 发送欢迎邮件逻辑
  }
}

// src/events/order-events.handler.ts
@Injectable()
export class OrderEventsHandler {
  @RabbitListener({
    queue: 'order.processing',
    routingKey: 'order.*',
    exchange: 'order.events'
  })
  async handleOrderEvents(data: any) {
    console.log('订单事件:', data);
    // 处理订单事件逻辑
  }
}

4. 目录过滤配置示例

只监听特定目录

RabbitMQModule.forRoot({
  urls: ['amqp://localhost:5672'],
  exchange: 'app.events',
  exchangeType: 'topic',
  durable: true,
  // 只包含events目录下的文件
  includePatterns: [
    '**/events/**',
    '**/handlers/**',
    '**/listeners/**'
  ]
})

排除测试文件和特定目录

RabbitMQModule.forRoot({
  urls: ['amqp://localhost:5672'],
  exchange: 'app.events',
  exchangeType: 'topic',
  durable: true,
  // 包含所有events相关目录
  includePatterns: ['**/events/**'],
  // 排除测试文件
  excludePatterns: [
    '**/*.spec.ts',
    '**/*.test.ts',
    '**/test/**',
    '**/node_modules/**'
  ]
})

复杂的目录过滤

RabbitMQModule.forRoot({
  urls: ['amqp://localhost:5672'],
  exchange: 'app.events',
  exchangeType: 'topic',
  durable: true,
  // 只包含特定模块的事件处理器
  includePatterns: [
    '**/src/events/**',
    '**/src/modules/*/events/**',
    '**/src/handlers/**'
  ],
  // 排除不需要的文件
  excludePatterns: [
    '**/*.spec.ts',
    '**/node_modules/**',
    '**/dist/**'
  ]
})

5. 消息发布

import { Injectable } from '@nestjs/common';
import { RabbitMQService } from '@develop-x/nest-rabbitmq';

@Injectable()
export class UserService {
  constructor(private readonly rabbitMQService: RabbitMQService) {}

  async createUser(userData: any) {
    // 业务逻辑...
    
    // 发布用户创建事件到fanout交换机(广播)
    await this.rabbitMQService.publishToFanout(
      'user.events',
      {
        event: 'user.created',
        data: userData,
        timestamp: new Date(),
      }
    );

    // 发布到指定路由键
    await this.rabbitMQService.publish(
      'order.events',
      'order.created',
      { orderId: 123, userId: userData.id }
    );

    // 直接发送到队列
    await this.rabbitMQService.sendToQueue(
      'user.notifications',
      { type: 'welcome', userId: userData.id }
    );
  }
}

6. 消息消费(装饰器方式)

import { Injectable } from '@nestjs/common';
import { ConsumeMessage } from 'amqplib';
import { RabbitMQSubscribe } from '@develop-x/nest-rabbitmq';

@Injectable()
export class NotificationService {
  // 监听fanout交换机的广播消息
  @RabbitMQSubscribe({
    exchange: {
      name: 'user.events',
      type: 'fanout',
      durable: true,
    },
    // 不指定队列名称,会自动创建临时独占队列
  })
  async handleUserEvents(data: any, message: ConsumeMessage) {
    console.log('收到用户事件:', data);
    
    if (data.event === 'user.created') {
      // 处理用户创建事件
      console.log('新用户注册:', data.data);
    }
  }

  // 监听指定队列
  @RabbitMQSubscribe({
    exchange: {
      name: 'order.events',
      type: 'direct',
      durable: true,
    },
    queue: {
      name: 'order.notifications',
      durable: true,
    },
    routingKey: 'order.created',
  })
  async handleOrderCreated(data: any, message: ConsumeMessage) {
    console.log('收到订单创建事件:', data);
    // 发送订单确认通知
  }

  // 监听持久化队列
  @RabbitMQSubscribe({
    exchange: {
      name: 'system.events',
      type: 'topic',
      durable: true,
    },
    queue: {
      name: 'email.queue',
      durable: true,
    },
    routingKey: 'email.*',
  })
  async handleEmailEvents(data: any, message: ConsumeMessage) {
    console.log('收到邮件事件:', data);
    // 处理邮件发送
  }
}

7. 多服务fanout模式示例

// 服务A - 用户服务
@Injectable()
export class UserService {
  constructor(private readonly rabbitMQService: RabbitMQService) {}

  async registerUser(userData: any) {
    // 注册用户逻辑...
    
    // 广播用户注册事件,所有监听此交换机的服务都会收到
    await this.rabbitMQService.publishToFanout('user.events', {
      event: 'user.registered',
      userId: userData.id,
      email: userData.email,
      timestamp: new Date(),
    });
  }
}

// 服务B - 邮件服务
@Injectable()
export class EmailService {
  @RabbitMQSubscribe({
    exchange: {
      name: 'user.events',
      type: 'fanout',
      durable: true,
    },
  })
  async handleUserEvents(data: any) {
    if (data.event === 'user.registered') {
      await this.sendWelcomeEmail(data.email);
    }
  }
}

// 服务C - 统计服务
@Injectable()
export class StatisticsService {
  @RabbitMQSubscribe({
    exchange: {
      name: 'user.events',
      type: 'fanout',
      durable: true,
    },
  })
  async handleUserEvents(data: any) {
    if (data.event === 'user.registered') {
      await this.updateUserStats();
    }
  }
}

// 服务D - 积分服务
@Injectable()
export class PointsService {
  @RabbitMQSubscribe({
    exchange: {
      name: 'user.events',
      type: 'fanout',
      durable: true,
    },
  })
  async handleUserEvents(data: any) {
    if (data.event === 'user.registered') {
      await this.giveWelcomePoints(data.userId);
    }
  }
}

API 参考

RabbitMQService

主要的服务类,提供消息发布和消费功能。

方法

  • publish(exchangeName, routingKey, message, options?) - 发布消息到交换机
  • publishToFanout(exchangeName, message, options?) - 发布消息到fanout交换机
  • sendToQueue(queueName, message, options?) - 直接发送消息到队列
  • consume(queueName, onMessage, options?) - 消费队列消息
  • getConnection() - 获取连接实例
  • getChannel() - 获取通道实例

@RabbitMQSubscribe 装饰器

用于标记消息订阅方法的装饰器。

参数

{
  exchange: {
    name: string;      // 交换机名称
    type: string;      // 交换机类型:direct, topic, fanout, headers
    durable?: boolean; // 是否持久化,默认true
    options?: any;     // 其他选项
  },
  queue?: {
    name: string;      // 队列名称
    durable?: boolean; // 是否持久化,默认true
    options?: any;     // 其他选项
  },
  routingKey?: string;    // 路由键
  consumeOptions?: any;   // 消费选项
}

注意事项

  1. 消息持久化:默认启用消息持久化,确保消息不会因服务重启而丢失
  2. 自动重连:使用 amqp-connection-manager 实现自动重连机制
  3. 错误处理:消息处理失败时会自动nack,不会重新排队
  4. Fanout模式:不指定队列名称时会创建临时独占队列,适合广播场景
  5. 确认机制:消息成功处理后会自动ack,确保消息可靠性

许可证

MIT

@develop-x/nest-events

RabbitMQ事件处理模块,支持目录过滤功能。

配置示例

1. 只监听events目录下的handlers

import { Module } from '@nestjs/common';
import { RabbitMQModule } from '@develop-x/nest-events';

@Module({
  imports: [
    RabbitMQModule.forRoot({
      urls: ['amqp://localhost:5672'],
      exchange: 'app.events',
      exchangeType: 'topic',
      durable: true,
      // 只包含events目录下的文件
      includePatterns: [
        '**/events/**',
        '**/handlers/**'
      ]
    }),
  ],
})
export class AppModule {}

2. 排除测试文件和特定目录

@Module({
  imports: [
    RabbitMQModule.forRoot({
      urls: ['amqp://localhost:5672'],
      exchange: 'app.events',
      exchangeType: 'topic',
      durable: true,
      // 包含所有events相关目录
      includePatterns: ['**/events/**'],
      // 排除测试文件
      excludePatterns: [
        '**/*.spec.ts',
        '**/*.test.ts',
        '**/test/**'
      ]
    }),
  ],
})
export class AppModule {}

3. 复杂的目录过滤

@Module({
  imports: [
    RabbitMQModule.forRoot({
      urls: ['amqp://localhost:5672'],
      exchange: 'app.events',
      exchangeType: 'topic',
      durable: true,
      // 只包含特定模块的事件处理器
      includePatterns: [
        '**/src/events/**',
        '**/src/modules/*/events/**',
        '**/src/handlers/**'
      ],
      // 排除不需要的文件
      excludePatterns: [
        '**/*.spec.ts',
        '**/node_modules/**',
        '**/dist/**'
      ]
    }),
  ],
})
export class AppModule {}

使用事件处理器

// src/events/user-events.handler.ts
import { Injectable } from '@nestjs/common';
import { RabbitListener } from '@develop-x/nest-events';

@Injectable()
export class UserEventsHandler {
  @RabbitListener({
    queue: 'user.notifications',
    routingKey: 'user.created'
  })
  handleUserCreated(data: any) {
    console.log('用户创建事件:', data);
  }
}

模式说明

  • * - 匹配任意字符(除路径分隔符)
  • ** - 匹配任意字符(包括路径分隔符),用于跨目录匹配
  • 模式匹配不区分大小写
  • 如果同时设置了 includePatternsexcludePatterns,会先检查包含,再检查排除
  • 如果没有设置任何模式,默认扫描所有provider