lhisp-eventbus-consumer
v4.0.3
Published
PT-BR | [EN](#english)
Readme
lhisp-eventbus-consumer
PT-BR | EN
Português
Biblioteca TypeScript para consumir eventos via RabbitMQ (AMQP) usando amqplib.
Ela fornece:
LhispEventbus: utilitário leve para conectar, criar channels,assertExchangeepublishMessage.LhispEventbusConsumer: classe abstrata que configura exchange/queue/bind/consume e delega o processamento para o seu métodoconsume().
Instalação
npm i lhisp-eventbus-consumerImportação
import {
LhispEventbus,
LhispEventbusConsumer,
type LhispEventbusConsumerConstructorParams,
type LhispBaseEvent,
type EventBusMessage,
type Logger,
} from "lhisp-eventbus-consumer";Conceitos
- Exchange de entrada: onde sua aplicação consome eventos.
- Queue: fila que será vinculada (bind) à exchange.
- Routing key / pattern: padrão usado no
bindQueue.- Por padrão,
queuePattern = "".
- Por padrão,
- Ack/Nack:
- Por padrão,
noAck = true(RabbitMQ considera a mensagem entregue sem necessidade deack). - Se
noAck = false, a classe dáackquando o processamento termina sem erro; em erro/timeout, dánack.
- Por padrão,
- Prefetch (paralelismo):
- Se
maxParallelProcesses > 0, é aplicadochannel.prefetch(maxParallelProcesses).
- Se
Parâmetros do consumer
A classe LhispEventbusConsumer recebe um objeto com:
eventBusUrl(obrigatório): URL do AMQP, por exemploamqp://user:pass@host:5672.exchangeName(obrigatório): exchange de entrada.queueName(obrigatório): nome da fila.exchangeType(opcional, defaultfanout).exchangeOptions(opcional, default{ durable: true }).queuePattern(opcional, default"").queueOptions(opcional, default{ durable: true, exclusive: false, autoDelete: false }).noAck(opcional, defaulttrue).maxParallelProcesses(opcional, default0).consumeTimeout(opcional, default60000ms): timeout do processamento por mensagem.- Callback/Status exchange (opcionais):
callbackModulo(default""): se definido, publica um evento de callback quando ocorrer erro no consumo.callbackExchangeName(defaultDaemonStatus).callbackExchangeType(defaultfanout).
- Sync exchange (opcionais):
syncExchangeName(defaultSyncEvents).syncExchangeType(defaulttopic).
logger(opcional): precisa implementar a interfaceLogger.
Exemplo: criando um consumer
Crie uma classe que estende LhispEventbusConsumer e implemente:
consume(acao, evento, msg, logger): processamento do evento.handleChannelError(error): handler de erro do channel.
import { LhispEventbusConsumer, type EventBusMessage, type LhispBaseEvent, type Logger } from "lhisp-eventbus-consumer";
interface UserCreatedEvent extends LhispBaseEvent {
payload: {
id: string;
email: string;
};
}
class UserCreatedConsumer extends LhispEventbusConsumer<UserCreatedEvent> {
protected handleChannelError(error: any): void {
this.logger.error({ message: "Channel Error", error });
}
protected async consume(
acao: string,
evento: UserCreatedEvent,
_msg: EventBusMessage,
logger: Logger,
): Promise<void> {
logger.info({ message: "Consuming event", acao, evento });
// Seu processamento aqui
// - use `acao` (routing key)
// - use `evento.dbname`, `evento.EmpresaId`, `evento.payload`
// Se você precisar responder eventos síncronos:
// if (evento.uuid) await this.replySyncEvent(evento.uuid, { ok: true });
}
}
async function main() {
const consumer = new UserCreatedConsumer({
eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
exchangeName: "UserEvents",
queueName: "user-created-consumer",
queuePattern: "user.created",
noAck: false,
maxParallelProcesses: 10,
consumeTimeout: 60_000,
// Opcional: publicar callback em caso de erro
// callbackModulo: "my-service",
// callbackExchangeName: "DaemonStatus",
});
consumer.banner();
await consumer.start();
}
main().catch((err) => {
console.error(err);
process.exit(1);
});Exemplo: publicando mensagens
A classe LhispEventbus pode ser usada para publicar mensagens (fire-and-forget) em uma exchange.
import { LhispEventbus } from "lhisp-eventbus-consumer";
async function publishExample() {
const bus = new LhispEventbus({
eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
});
await bus.publishMessage("UserEvents", "user.created", {
dbname: "tenant_db",
EmpresaId: "1",
payload: { id: "123", email: "[email protected]" },
});
}Scripts
- Rodar testes:
npm test - Rodar testes em watch:
npm run test:watch - Build:
npm run build
English
TypeScript library to consume RabbitMQ (AMQP) events using amqplib.
It provides:
LhispEventbus: a small helper to connect, create channels,assertExchange, andpublishMessage.LhispEventbusConsumer: an abstract class that sets up exchange/queue/bind/consume and delegates processing to yourconsume()method.
Installation
npm i lhisp-eventbus-consumerImport
import {
LhispEventbus,
LhispEventbusConsumer,
type LhispEventbusConsumerConstructorParams,
type LhispBaseEvent,
type EventBusMessage,
type Logger,
} from "lhisp-eventbus-consumer";Concepts
- Input exchange: where your application consumes events from.
- Queue: the queue that will be bound to the exchange.
- Routing key / pattern: pattern used in
bindQueue.- Default is
queuePattern = "".
- Default is
- Ack/Nack:
- Default is
noAck = true(RabbitMQ auto-acknowledges delivery). - If
noAck = false, the consumer willackwhen processing succeeds; on error/timeout it willnack.
- Default is
- Prefetch (parallelism):
- If
maxParallelProcesses > 0,channel.prefetch(maxParallelProcesses)is applied.
- If
Consumer parameters
LhispEventbusConsumer constructor receives:
eventBusUrl(required): AMQP URL, e.g.amqp://user:pass@host:5672.exchangeName(required): input exchange name.queueName(required): queue name.exchangeType(optional, defaultfanout).exchangeOptions(optional, default{ durable: true }).queuePattern(optional, default"").queueOptions(optional, default{ durable: true, exclusive: false, autoDelete: false }).noAck(optional, defaulttrue).maxParallelProcesses(optional, default0).consumeTimeout(optional, default60000ms): per-message processing timeout.- Callback/Status exchange (optional):
callbackModulo(default""): when provided, publishes a callback event when consumption fails.callbackExchangeName(defaultDaemonStatus).callbackExchangeType(defaultfanout).
- Sync exchange (optional):
syncExchangeName(defaultSyncEvents).syncExchangeType(defaulttopic).
logger(optional): must implement theLoggerinterface.
Example: creating a consumer
Create a class extending LhispEventbusConsumer and implement:
consume(action, event, msg, logger): your message handler.handleChannelError(error): channel error handler.
import { LhispEventbusConsumer, type EventBusMessage, type LhispBaseEvent, type Logger } from "lhisp-eventbus-consumer";
interface UserCreatedEvent extends LhispBaseEvent {
payload: {
id: string;
email: string;
};
}
class UserCreatedConsumer extends LhispEventbusConsumer<UserCreatedEvent> {
protected handleChannelError(error: any): void {
this.logger.error({ message: "Channel Error", error });
}
protected async consume(
action: string,
event: UserCreatedEvent,
_msg: EventBusMessage,
logger: Logger,
): Promise<void> {
logger.info({ message: "Consuming event", action, event });
// Your business logic here.
// If you need to reply a sync request:
// if (event.uuid) await this.replySyncEvent(event.uuid, { ok: true });
}
}
async function main() {
const consumer = new UserCreatedConsumer({
eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
exchangeName: "UserEvents",
queueName: "user-created-consumer",
queuePattern: "user.created",
noAck: false,
maxParallelProcesses: 10,
consumeTimeout: 60_000,
// Optional: publish callback events on error
// callbackModulo: "my-service",
// callbackExchangeName: "DaemonStatus",
});
consumer.banner();
await consumer.start();
}
main().catch((err) => {
console.error(err);
process.exit(1);
});Example: publishing messages
Use LhispEventbus for fire-and-forget publishing.
import { LhispEventbus } from "lhisp-eventbus-consumer";
async function publishExample() {
const bus = new LhispEventbus({
eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
});
await bus.publishMessage("UserEvents", "user.created", {
dbname: "tenant_db",
EmpresaId: "1",
payload: { id: "123", email: "[email protected]" },
});
}Scripts
- Run tests:
npm test - Run tests (watch):
npm run test:watch - Build:
npm run build
