@kglozhkin/event-bus
v1.6.0
Published
Event bus
Readme
@kglozhkin/event-bus
Библиотека событийной шины на основе NATS JetStream, упакованная как динамический модуль NestJS. Предназначена для микросервисной архитектуры торговых ботов: обеспечивает надёжную доставку событий с дедупликацией, трассировкой и повторными попытками.
Установка
npm install @kglozhkin/event-busPeer-зависимости
Библиотека требует наличия в проекте:
npm install @nestjs/common @nestjs/core reflect-metadata rxjsБыстрый старт
1. Подключение модуля
import { Module } from '@nestjs/common';
import { EventBusModule } from '@kglozhkin/event-bus';
@Module({
imports: [
EventBusModule.forRoot({
servers: ['nats://localhost:4222'],
serviceName: 'my-service',
streamName: 'bot-events', // необязательно
}),
],
})
export class AppModule {}Модуль зарегистрирован как global: true — EventBusService доступен во всём приложении без повторного импорта.
2. Публикация события
import { Injectable } from '@nestjs/common';
import { EventBusService, OrderBookUpdatedPayload } from '@kglozhkin/event-bus';
@Injectable()
export class MarketService {
constructor(private readonly eventBus: EventBusService) {}
async publishOrderBook(data: OrderBookUpdatedPayload) {
await this.eventBus.publish('market.orderbook.updated', data, {
marketId: data.marketId,
correlationId: 'some-trace-id',
});
}
}3. Подписка на событие
import { Injectable, OnModuleInit } from '@nestjs/common';
import { EventBusService, EventEnvelope, OrderBookUpdatedPayload } from '@kglozhkin/event-bus';
@Injectable()
export class StrategyService implements OnModuleInit {
constructor(private readonly eventBus: EventBusService) {}
async onModuleInit() {
await this.eventBus.subscribe<OrderBookUpdatedPayload>(
'market.orderbook.updated',
'strategy-orderbook-consumer', // уникальное имя durable consumer
async (event: EventEnvelope<OrderBookUpdatedPayload>) => {
console.log('Получено событие:', event.eventName, event.payload);
},
);
}
}API
EventBusModule.forRoot(options)
| Параметр | Тип | Описание |
|---|---|---|
| servers | string[] | Адреса NATS-серверов |
| serviceName | string | Имя сервиса (используется как source в событиях) |
| streamName | string? | Имя JetStream-стрима (по умолчанию "bot-events") |
EventBusService
publish<TPayload>(eventName, payload, meta?)
Публикует событие в JetStream. Каждое событие автоматически оборачивается в EventEnvelope.
| Параметр | Тип | Описание |
|---|---|---|
| eventName | EventName | Имя события |
| payload | TPayload | Полезная нагрузка |
| meta.correlationId | string? | ID для трассировки цепочки событий |
| meta.causationId | string? | ID события-причины |
| meta.marketId | string? | Идентификатор рынка |
subscribe<TPayload>(eventName, durableName, handler)
Создаёт (или переиспользует) durable pull-consumer и запускает фоновую обработку сообщений.
| Параметр | Тип | Описание |
|---|---|---|
| eventName | EventName | Имя события |
| durableName | string | Уникальное имя consumer-а |
| handler | (event: EventEnvelope<TPayload>) => Promise<void> \| void | Обработчик |
При успешной обработке сообщение подтверждается (ack). При исключении в handler — отклоняется (nak) и будет доставлено повторно (до 5 раз).
request<TRequest, TResponse>(subject, payload, timeoutMs?)
Синхронный запрос/ответ через core NATS (не JetStream). Таймаут по умолчанию — 1000 мс.
const result = await this.eventBus.request<RequestDto, ResponseDto>(
'risk.check',
{ orderId: '123' },
3000,
);respond<TRequest, TResponse>(subject, handler)
Регистрирует обработчик входящих запросов для request().
await this.eventBus.respond<RequestDto, ResponseDto>(
'risk.check',
async (payload) => {
return { approved: true };
},
);close()
Завершает соединение с NATS (drain + close).
EventEnvelope<TPayload>
Обёртка, в которую упаковывается каждое опубликованное событие:
interface EventEnvelope<TPayload> {
eventId: string; // UUID — уникальный идентификатор события
eventName: EventName; // тип события
source: string; // имя сервиса-отправителя
timestamp: string; // ISO-8601
payload: TPayload; // полезная нагрузка
correlationId?: string;
causationId?: string;
marketId?: string;
}Доменные события (EventName)
| Событие | Тип нагрузки |
|---|---|
| market.orderbook.updated | OrderBookUpdatedPayload |
| market.trade.created | — |
| strategy.quote.generated | QuoteGeneratedPayload |
| risk.quote.approved | RiskQuoteApprovedPayload |
| inventory.updated | — |
| orders.place.requested | OrderPlaceRequestedPayload |
| orders.placed | OrderPlacedPayload |
| orders.cancel.requested | — |
| orders.cancelled | — |
Параметры стрима
Стрим создаётся автоматически при первом подключении:
| Параметр | Значение |
|---|---|
| Subjects | bot.> |
| Хранение | Лимит 24 часа |
| Политика доставки | Только новые сообщения |
| Макс. попыток | 5 |
| Таймаут подтверждения | 30 секунд |
Разработка
npm run build # компиляция
npm run typecheck # проверка типов
npm run test # тесты
npm run test:cov # тесты с покрытием
npm run lint # линтингЛицензия
MIT
