npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@nest-packages/nestjs-queue-events

v0.1.1

Published

一个通用的 NestJS 事件系统,支持同步/异步事件触发、可配置重试策略、可扩展告警适配器和插件化的事件处理机制。

Downloads

13

Readme

@nest-packages/nestjs-queue-events

一个通用的 NestJS 事件系统,支持同步/异步事件触发、可配置重试策略、可扩展告警适配器和插件化的事件处理机制。

特性

  • 通用事件系统:支持为任何事件类型注册自定义插件处理器
  • 同步/异步执行:支持同步和异步(通过队列)两种执行模式
  • 插件化架构:支持为事件注册多个插件,按优先级执行
  • 重试策略:支持可配置的重试策略(固定延迟、指数退避)
  • 告警适配器:支持自定义告警适配器,处理事件失败情况
  • 队列集成:与 Bull 队列无缝集成,支持异步事件处理
  • 类型安全:完整的 TypeScript 类型支持

安装

npm install @nest-packages/nestjs-queue-events
# 或
pnpm add @nest-packages/nestjs-queue-events

快速开始

1. 导入模块

import { Module } from '@nestjs/common';
import { QueueEventsModule } from '@nest-packages/nestjs-queue-events';

@Module({
  imports: [
    QueueEventsModule.forRoot({
      queueName: 'queue-events',
      enableQueue: true,
    }),
  ],
})
export class AppModule {}

2. 创建事件插件

import { Injectable } from '@nestjs/common';
import {
  IEventPlugin,
  PluginContext,
  PluginResult,
} from '@nest-packages/nestjs-queue-events';

@Injectable()
export class MyEventPlugin implements IEventPlugin<MyEventData, void> {
  getEventType(): string {
    return 'MY_EVENT';
  }

  getPriority(): number {
    return 100; // 数字越大优先级越高
  }

  getName(): string {
    return 'MyEventPlugin';
  }

  async process(
    data: MyEventData,
    context: PluginContext,
  ): Promise<PluginResult<void>> {
    // 处理事件逻辑
    console.log('Processing event:', data);

    return {
      success: true,
      shouldContinue: false, // 成功后不再执行其他插件
    };
  }
}

3. 注册插件

import { Module } from '@nestjs/common';
import { QueueEventsModule } from '@nest-packages/nestjs-queue-events';
import { MyEventPlugin } from './my-event.plugin';

@Module({
  imports: [
    QueueEventsModule.forRoot({
      plugins: [new MyEventPlugin()],
    }),
  ],
})
export class AppModule {}

4. 触发事件

import { Injectable } from '@nestjs/common';
import {
  EventService,
  EventExecutionMode,
} from '@nest-packages/nestjs-queue-events';

@Injectable()
export class MyService {
  constructor(private readonly eventService: EventService) {}

  async doSomething() {
    // 同步触发
    await this.eventService.emit('MY_EVENT', { message: 'Hello' }, {
      mode: EventExecutionMode.SYNC,
    });

    // 异步触发(通过队列)
    await this.eventService.emit('MY_EVENT', { message: 'Hello' }, {
      mode: EventExecutionMode.ASYNC,
      retryConfig: {
        maxRetries: 3,
        retryDelay: 1000,
      },
      onSuccess: async (result) => {
        console.log('Event processed successfully:', result);
      },
      onFailure: async (error) => {
        console.error('Event processing failed:', error);
      },
    });
  }
}

FIELD_NAME_NOT_FOUND 事件示例

创建字段处理插件

import { Injectable } from '@nestjs/common';
import {
  IEventPlugin,
  PluginContext,
  PluginResult,
} from '@nest-packages/nestjs-queue-events';
import { FieldNotFoundEventData } from '@nest-packages/nestjs-queue-events';

@Injectable()
export class FieldNotFoundPlugin
  implements IEventPlugin<FieldNotFoundEventData, void>
{
  constructor(
    // 注入你需要的服务
    private readonly bitableFieldService: BitableFieldService,
  ) {}

  getEventType(): string {
    return 'FIELD_NAME_NOT_FOUND';
  }

  getPriority(): number {
    return 100;
  }

  getName(): string {
    return 'FieldNotFoundPlugin';
  }

  async process(
    data: FieldNotFoundEventData,
    context: PluginContext,
  ): Promise<PluginResult<void>> {
    try {
      // 1. 获取字段列表
      const fieldsList = await this.bitableFieldService.getFieldList(
        data.request.path,
      );

      // 2. 找出缺失的字段
      const missingFields = this.findMissingFields(
        data.request.data,
        fieldsList,
      );

      // 3. 创建缺失的字段
      if (missingFields.length > 0) {
        await this.bitableFieldService.createFields(
          missingFields,
          data.request.path,
        );
      }

      return {
        success: true,
        shouldContinue: false,
      };
    } catch (error) {
      return {
        success: false,
        error: error instanceof Error ? error : new Error(String(error)),
        shouldContinue: true,
        retryable: true,
      };
    }
  }

  private findMissingFields(record: any, fieldsList: any[]): string[] {
    // 实现字段查找逻辑
    return [];
  }
}

在 bitable-base-record.ts 中触发事件

import { EventService, EventExecutionMode } from '@nest-packages/nestjs-queue-events';
import { FieldNotFoundEventData } from '@nest-packages/nestjs-queue-events';

// 在 createRecord 方法中
if (res.code === FieldNameNotFound && retryOnFieldNotFound) {
  const eventData: FieldNotFoundEventData = {
    response: res,
    request: { path, data: fields },
    feishuAppType: this.feishuAppType,
  };

  await this.eventService.emit('FIELD_NAME_NOT_FOUND', eventData, {
    mode: EventExecutionMode.ASYNC,
    retryConfig: {
      maxRetries: maxRetries,
      retryDelay: retryDelay,
    },
    onSuccess: async () => {
      // 字段创建成功后,重新执行 createRecord
      return await this.larkClient.bitable.appTableRecord.create({
        data: { fields },
        path,
      });
    },
    onFailure: async (error) => {
      // 所有重试失败后,触发告警
      console.error('Field creation failed:', error);
    },
  });
}

API 文档

EventService

核心事件服务,提供事件注册和触发功能。

方法

  • register(eventType: string, metadata: EventMetadata): 注册事件类型
  • registerHandler<TEventData, TResult>(eventType: string, handler: IEventHandler<TEventData, TResult>): 注册事件处理器
  • registerPlugin<TEventData, TResult>(plugin: IEventPlugin<TEventData, TResult>): 注册事件插件
  • emit<TEventData, TResult>(eventType: string, data: TEventData, options?: EmitOptions): 触发事件
  • on<TEventData>(eventType: string, listener: (data: TEventData) => void | Promise<void>): 注册事件监听器(兼容旧 API)

IEventPlugin

事件插件接口。

interface IEventPlugin<TEventData, TResult> {
  process(data: TEventData, context: PluginContext): Promise<PluginResult<TResult>>;
  getEventType(): string;
  getPriority(): number;
  getName(): string;
}

PluginResult

插件执行结果。

interface PluginResult<T> {
  success: boolean;
  data?: T;
  error?: Error;
  shouldContinue?: boolean; // 是否继续执行下一个插件
  retryable?: boolean; // 是否可重试
}

插件开发指南

1. 实现 IEventPlugin 接口

export class MyPlugin implements IEventPlugin<MyEventData, void> {
  getEventType(): string {
    return 'MY_EVENT';
  }

  getPriority(): number {
    return 100; // 数字越大优先级越高
  }

  getName(): string {
    return 'MyPlugin';
  }

  async process(
    data: MyEventData,
    context: PluginContext,
  ): Promise<PluginResult<void>> {
    // 实现处理逻辑
    return {
      success: true,
      shouldContinue: false,
    };
  }
}

2. 插件优先级

插件按优先级从高到低执行(数字越大优先级越高)。如果某个插件成功且 shouldContinuefalse,则停止执行后续插件。

3. 插件链式执行

多个插件可以按优先级组合执行:

// 注册多个插件
eventService.registerPlugin(new HighPriorityPlugin()); // 优先级 100
eventService.registerPlugin(new LowPriorityPlugin());  // 优先级 50

// 执行顺序:HighPriorityPlugin -> LowPriorityPlugin

配置选项

QueueEventsModuleOptions

interface QueueEventsModuleOptions {
  queueName?: string;           // 队列名称,默认 'queue-events'
  enableQueue?: boolean;        // 是否启用队列,默认 true
  plugins?: IEventPlugin[];     // 插件列表
  alarmAdapter?: IAlarmAdapter; // 告警适配器
}

扩展性

自定义告警适配器

import { IAlarmAdapter, AlarmData } from '@nest-packages/nestjs-queue-events';

export class CustomAlarmAdapter implements IAlarmAdapter {
  async send(alarmData: AlarmData): Promise<void> {
    // 实现自定义告警逻辑
    console.error('Alarm:', alarmData);
  }
}

// 在模块中注册
QueueEventsModule.forRoot({
  alarmAdapter: new CustomAlarmAdapter(),
});

自定义重试策略

import { IRetryStrategy, RetryContext, RetryResult } from '@nest-packages/nestjs-queue-events';

export class CustomRetryStrategy implements IRetryStrategy<MyContext, MyResult> {
  async retry(
    fn: (context: RetryContext<MyContext>) => Promise<MyResult>,
    context: RetryContext<MyContext>,
  ): Promise<RetryResult<MyResult>> {
    // 实现自定义重试逻辑
  }
}

许可证

MIT