@nest-packages/nestjs-queue-events
v0.1.1
Published
一个通用的 NestJS 事件系统,支持同步/异步事件触发、可配置重试策略、可扩展告警适配器和插件化的事件处理机制。
Downloads
13
Readme
@nest-packages/nestjs-queue-events
一个通用的 NestJS 事件系统,支持同步/异步事件触发、可配置重试策略、可扩展告警适配器和插件化的事件处理机制。
特性
- ✅ 通用事件系统:支持为任何事件类型注册自定义插件处理器
- ✅ 同步/异步执行:支持同步和异步(通过队列)两种执行模式
- ✅ 插件化架构:支持为事件注册多个插件,按优先级执行
- ✅ 重试策略:支持可配置的重试策略(固定延迟、指数退避)
- ✅ 告警适配器:支持自定义告警适配器,处理事件失败情况
- ✅ 队列集成:与 Bull 队列无缝集成,支持异步事件处理
- ✅ 类型安全:完整的 TypeScript 类型支持
安装
npm install @nest-packages/nestjs-queue-events
# 或
pnpm add @nest-packages/nestjs-queue-events快速开始
1. 导入模块
import { Module } from '@nestjs/common';
import { QueueEventsModule } from '@nest-packages/nestjs-queue-events';
@Module({
imports: [
QueueEventsModule.forRoot({
queueName: 'queue-events',
enableQueue: true,
}),
],
})
export class AppModule {}2. 创建事件插件
import { Injectable } from '@nestjs/common';
import {
IEventPlugin,
PluginContext,
PluginResult,
} from '@nest-packages/nestjs-queue-events';
@Injectable()
export class MyEventPlugin implements IEventPlugin<MyEventData, void> {
getEventType(): string {
return 'MY_EVENT';
}
getPriority(): number {
return 100; // 数字越大优先级越高
}
getName(): string {
return 'MyEventPlugin';
}
async process(
data: MyEventData,
context: PluginContext,
): Promise<PluginResult<void>> {
// 处理事件逻辑
console.log('Processing event:', data);
return {
success: true,
shouldContinue: false, // 成功后不再执行其他插件
};
}
}3. 注册插件
import { Module } from '@nestjs/common';
import { QueueEventsModule } from '@nest-packages/nestjs-queue-events';
import { MyEventPlugin } from './my-event.plugin';
@Module({
imports: [
QueueEventsModule.forRoot({
plugins: [new MyEventPlugin()],
}),
],
})
export class AppModule {}4. 触发事件
import { Injectable } from '@nestjs/common';
import {
EventService,
EventExecutionMode,
} from '@nest-packages/nestjs-queue-events';
@Injectable()
export class MyService {
constructor(private readonly eventService: EventService) {}
async doSomething() {
// 同步触发
await this.eventService.emit('MY_EVENT', { message: 'Hello' }, {
mode: EventExecutionMode.SYNC,
});
// 异步触发(通过队列)
await this.eventService.emit('MY_EVENT', { message: 'Hello' }, {
mode: EventExecutionMode.ASYNC,
retryConfig: {
maxRetries: 3,
retryDelay: 1000,
},
onSuccess: async (result) => {
console.log('Event processed successfully:', result);
},
onFailure: async (error) => {
console.error('Event processing failed:', error);
},
});
}
}FIELD_NAME_NOT_FOUND 事件示例
创建字段处理插件
import { Injectable } from '@nestjs/common';
import {
IEventPlugin,
PluginContext,
PluginResult,
} from '@nest-packages/nestjs-queue-events';
import { FieldNotFoundEventData } from '@nest-packages/nestjs-queue-events';
@Injectable()
export class FieldNotFoundPlugin
implements IEventPlugin<FieldNotFoundEventData, void>
{
constructor(
// 注入你需要的服务
private readonly bitableFieldService: BitableFieldService,
) {}
getEventType(): string {
return 'FIELD_NAME_NOT_FOUND';
}
getPriority(): number {
return 100;
}
getName(): string {
return 'FieldNotFoundPlugin';
}
async process(
data: FieldNotFoundEventData,
context: PluginContext,
): Promise<PluginResult<void>> {
try {
// 1. 获取字段列表
const fieldsList = await this.bitableFieldService.getFieldList(
data.request.path,
);
// 2. 找出缺失的字段
const missingFields = this.findMissingFields(
data.request.data,
fieldsList,
);
// 3. 创建缺失的字段
if (missingFields.length > 0) {
await this.bitableFieldService.createFields(
missingFields,
data.request.path,
);
}
return {
success: true,
shouldContinue: false,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error : new Error(String(error)),
shouldContinue: true,
retryable: true,
};
}
}
private findMissingFields(record: any, fieldsList: any[]): string[] {
// 实现字段查找逻辑
return [];
}
}在 bitable-base-record.ts 中触发事件
import { EventService, EventExecutionMode } from '@nest-packages/nestjs-queue-events';
import { FieldNotFoundEventData } from '@nest-packages/nestjs-queue-events';
// 在 createRecord 方法中
if (res.code === FieldNameNotFound && retryOnFieldNotFound) {
const eventData: FieldNotFoundEventData = {
response: res,
request: { path, data: fields },
feishuAppType: this.feishuAppType,
};
await this.eventService.emit('FIELD_NAME_NOT_FOUND', eventData, {
mode: EventExecutionMode.ASYNC,
retryConfig: {
maxRetries: maxRetries,
retryDelay: retryDelay,
},
onSuccess: async () => {
// 字段创建成功后,重新执行 createRecord
return await this.larkClient.bitable.appTableRecord.create({
data: { fields },
path,
});
},
onFailure: async (error) => {
// 所有重试失败后,触发告警
console.error('Field creation failed:', error);
},
});
}API 文档
EventService
核心事件服务,提供事件注册和触发功能。
方法
register(eventType: string, metadata: EventMetadata): 注册事件类型registerHandler<TEventData, TResult>(eventType: string, handler: IEventHandler<TEventData, TResult>): 注册事件处理器registerPlugin<TEventData, TResult>(plugin: IEventPlugin<TEventData, TResult>): 注册事件插件emit<TEventData, TResult>(eventType: string, data: TEventData, options?: EmitOptions): 触发事件on<TEventData>(eventType: string, listener: (data: TEventData) => void | Promise<void>): 注册事件监听器(兼容旧 API)
IEventPlugin
事件插件接口。
interface IEventPlugin<TEventData, TResult> {
process(data: TEventData, context: PluginContext): Promise<PluginResult<TResult>>;
getEventType(): string;
getPriority(): number;
getName(): string;
}PluginResult
插件执行结果。
interface PluginResult<T> {
success: boolean;
data?: T;
error?: Error;
shouldContinue?: boolean; // 是否继续执行下一个插件
retryable?: boolean; // 是否可重试
}插件开发指南
1. 实现 IEventPlugin 接口
export class MyPlugin implements IEventPlugin<MyEventData, void> {
getEventType(): string {
return 'MY_EVENT';
}
getPriority(): number {
return 100; // 数字越大优先级越高
}
getName(): string {
return 'MyPlugin';
}
async process(
data: MyEventData,
context: PluginContext,
): Promise<PluginResult<void>> {
// 实现处理逻辑
return {
success: true,
shouldContinue: false,
};
}
}2. 插件优先级
插件按优先级从高到低执行(数字越大优先级越高)。如果某个插件成功且 shouldContinue 为 false,则停止执行后续插件。
3. 插件链式执行
多个插件可以按优先级组合执行:
// 注册多个插件
eventService.registerPlugin(new HighPriorityPlugin()); // 优先级 100
eventService.registerPlugin(new LowPriorityPlugin()); // 优先级 50
// 执行顺序:HighPriorityPlugin -> LowPriorityPlugin配置选项
QueueEventsModuleOptions
interface QueueEventsModuleOptions {
queueName?: string; // 队列名称,默认 'queue-events'
enableQueue?: boolean; // 是否启用队列,默认 true
plugins?: IEventPlugin[]; // 插件列表
alarmAdapter?: IAlarmAdapter; // 告警适配器
}扩展性
自定义告警适配器
import { IAlarmAdapter, AlarmData } from '@nest-packages/nestjs-queue-events';
export class CustomAlarmAdapter implements IAlarmAdapter {
async send(alarmData: AlarmData): Promise<void> {
// 实现自定义告警逻辑
console.error('Alarm:', alarmData);
}
}
// 在模块中注册
QueueEventsModule.forRoot({
alarmAdapter: new CustomAlarmAdapter(),
});自定义重试策略
import { IRetryStrategy, RetryContext, RetryResult } from '@nest-packages/nestjs-queue-events';
export class CustomRetryStrategy implements IRetryStrategy<MyContext, MyResult> {
async retry(
fn: (context: RetryContext<MyContext>) => Promise<MyResult>,
context: RetryContext<MyContext>,
): Promise<RetryResult<MyResult>> {
// 实现自定义重试逻辑
}
}许可证
MIT
