npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

lhisp-eventbus-consumer

v4.0.3

Published

PT-BR | [EN](#english)

Readme

lhisp-eventbus-consumer

PT-BR | EN

Português

Biblioteca TypeScript para consumir eventos via RabbitMQ (AMQP) usando amqplib.

Ela fornece:

  • LhispEventbus: utilitário leve para conectar, criar channels, assertExchange e publishMessage.
  • LhispEventbusConsumer: classe abstrata que configura exchange/queue/bind/consume e delega o processamento para o seu método consume().

Instalação

npm i lhisp-eventbus-consumer

Importação

import {
  LhispEventbus,
  LhispEventbusConsumer,
  type LhispEventbusConsumerConstructorParams,
  type LhispBaseEvent,
  type EventBusMessage,
  type Logger,
} from "lhisp-eventbus-consumer";

Conceitos

  • Exchange de entrada: onde sua aplicação consome eventos.
  • Queue: fila que será vinculada (bind) à exchange.
  • Routing key / pattern: padrão usado no bindQueue.
    • Por padrão, queuePattern = "".
  • Ack/Nack:
    • Por padrão, noAck = true (RabbitMQ considera a mensagem entregue sem necessidade de ack).
    • Se noAck = false, a classe dá ack quando o processamento termina sem erro; em erro/timeout, dá nack.
  • Prefetch (paralelismo):
    • Se maxParallelProcesses > 0, é aplicado channel.prefetch(maxParallelProcesses).

Parâmetros do consumer

A classe LhispEventbusConsumer recebe um objeto com:

  • eventBusUrl (obrigatório): URL do AMQP, por exemplo amqp://user:pass@host:5672.
  • exchangeName (obrigatório): exchange de entrada.
  • queueName (obrigatório): nome da fila.
  • exchangeType (opcional, default fanout).
  • exchangeOptions (opcional, default { durable: true }).
  • queuePattern (opcional, default "").
  • queueOptions (opcional, default { durable: true, exclusive: false, autoDelete: false }).
  • noAck (opcional, default true).
  • maxParallelProcesses (opcional, default 0).
  • consumeTimeout (opcional, default 60000 ms): timeout do processamento por mensagem.
  • Callback/Status exchange (opcionais):
    • callbackModulo (default ""): se definido, publica um evento de callback quando ocorrer erro no consumo.
    • callbackExchangeName (default DaemonStatus).
    • callbackExchangeType (default fanout).
  • Sync exchange (opcionais):
    • syncExchangeName (default SyncEvents).
    • syncExchangeType (default topic).
  • logger (opcional): precisa implementar a interface Logger.

Exemplo: criando um consumer

Crie uma classe que estende LhispEventbusConsumer e implemente:

  • consume(acao, evento, msg, logger): processamento do evento.
  • handleChannelError(error): handler de erro do channel.
import { LhispEventbusConsumer, type EventBusMessage, type LhispBaseEvent, type Logger } from "lhisp-eventbus-consumer";

interface UserCreatedEvent extends LhispBaseEvent {
  payload: {
    id: string;
    email: string;
  };
}

class UserCreatedConsumer extends LhispEventbusConsumer<UserCreatedEvent> {
  protected handleChannelError(error: any): void {
    this.logger.error({ message: "Channel Error", error });
  }

  protected async consume(
    acao: string,
    evento: UserCreatedEvent,
    _msg: EventBusMessage,
    logger: Logger,
  ): Promise<void> {
    logger.info({ message: "Consuming event", acao, evento });

    // Seu processamento aqui
    // - use `acao` (routing key)
    // - use `evento.dbname`, `evento.EmpresaId`, `evento.payload`

    // Se você precisar responder eventos síncronos:
    // if (evento.uuid) await this.replySyncEvent(evento.uuid, { ok: true });
  }
}

async function main() {
  const consumer = new UserCreatedConsumer({
    eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
    exchangeName: "UserEvents",
    queueName: "user-created-consumer",
    queuePattern: "user.created",
    noAck: false,
    maxParallelProcesses: 10,
    consumeTimeout: 60_000,

    // Opcional: publicar callback em caso de erro
    // callbackModulo: "my-service",
    // callbackExchangeName: "DaemonStatus",
  });

  consumer.banner();
  await consumer.start();
}

main().catch((err) => {
  console.error(err);
  process.exit(1);
});

Exemplo: publicando mensagens

A classe LhispEventbus pode ser usada para publicar mensagens (fire-and-forget) em uma exchange.

import { LhispEventbus } from "lhisp-eventbus-consumer";

async function publishExample() {
  const bus = new LhispEventbus({
    eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
  });

  await bus.publishMessage("UserEvents", "user.created", {
    dbname: "tenant_db",
    EmpresaId: "1",
    payload: { id: "123", email: "[email protected]" },
  });
}

Scripts

  • Rodar testes: npm test
  • Rodar testes em watch: npm run test:watch
  • Build: npm run build

English

TypeScript library to consume RabbitMQ (AMQP) events using amqplib.

It provides:

  • LhispEventbus: a small helper to connect, create channels, assertExchange, and publishMessage.
  • LhispEventbusConsumer: an abstract class that sets up exchange/queue/bind/consume and delegates processing to your consume() method.

Installation

npm i lhisp-eventbus-consumer

Import

import {
  LhispEventbus,
  LhispEventbusConsumer,
  type LhispEventbusConsumerConstructorParams,
  type LhispBaseEvent,
  type EventBusMessage,
  type Logger,
} from "lhisp-eventbus-consumer";

Concepts

  • Input exchange: where your application consumes events from.
  • Queue: the queue that will be bound to the exchange.
  • Routing key / pattern: pattern used in bindQueue.
    • Default is queuePattern = "".
  • Ack/Nack:
    • Default is noAck = true (RabbitMQ auto-acknowledges delivery).
    • If noAck = false, the consumer will ack when processing succeeds; on error/timeout it will nack.
  • Prefetch (parallelism):
    • If maxParallelProcesses > 0, channel.prefetch(maxParallelProcesses) is applied.

Consumer parameters

LhispEventbusConsumer constructor receives:

  • eventBusUrl (required): AMQP URL, e.g. amqp://user:pass@host:5672.
  • exchangeName (required): input exchange name.
  • queueName (required): queue name.
  • exchangeType (optional, default fanout).
  • exchangeOptions (optional, default { durable: true }).
  • queuePattern (optional, default "").
  • queueOptions (optional, default { durable: true, exclusive: false, autoDelete: false }).
  • noAck (optional, default true).
  • maxParallelProcesses (optional, default 0).
  • consumeTimeout (optional, default 60000 ms): per-message processing timeout.
  • Callback/Status exchange (optional):
    • callbackModulo (default ""): when provided, publishes a callback event when consumption fails.
    • callbackExchangeName (default DaemonStatus).
    • callbackExchangeType (default fanout).
  • Sync exchange (optional):
    • syncExchangeName (default SyncEvents).
    • syncExchangeType (default topic).
  • logger (optional): must implement the Logger interface.

Example: creating a consumer

Create a class extending LhispEventbusConsumer and implement:

  • consume(action, event, msg, logger): your message handler.
  • handleChannelError(error): channel error handler.
import { LhispEventbusConsumer, type EventBusMessage, type LhispBaseEvent, type Logger } from "lhisp-eventbus-consumer";

interface UserCreatedEvent extends LhispBaseEvent {
  payload: {
    id: string;
    email: string;
  };
}

class UserCreatedConsumer extends LhispEventbusConsumer<UserCreatedEvent> {
  protected handleChannelError(error: any): void {
    this.logger.error({ message: "Channel Error", error });
  }

  protected async consume(
    action: string,
    event: UserCreatedEvent,
    _msg: EventBusMessage,
    logger: Logger,
  ): Promise<void> {
    logger.info({ message: "Consuming event", action, event });

    // Your business logic here.

    // If you need to reply a sync request:
    // if (event.uuid) await this.replySyncEvent(event.uuid, { ok: true });
  }
}

async function main() {
  const consumer = new UserCreatedConsumer({
    eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
    exchangeName: "UserEvents",
    queueName: "user-created-consumer",
    queuePattern: "user.created",
    noAck: false,
    maxParallelProcesses: 10,
    consumeTimeout: 60_000,

    // Optional: publish callback events on error
    // callbackModulo: "my-service",
    // callbackExchangeName: "DaemonStatus",
  });

  consumer.banner();
  await consumer.start();
}

main().catch((err) => {
  console.error(err);
  process.exit(1);
});

Example: publishing messages

Use LhispEventbus for fire-and-forget publishing.

import { LhispEventbus } from "lhisp-eventbus-consumer";

async function publishExample() {
  const bus = new LhispEventbus({
    eventBusUrl: process.env.EVENTBUS_URL ?? "amqp://localhost:5672",
  });

  await bus.publishMessage("UserEvents", "user.created", {
    dbname: "tenant_db",
    EmpresaId: "1",
    payload: { id: "123", email: "[email protected]" },
  });
}

Scripts

  • Run tests: npm test
  • Run tests (watch): npm run test:watch
  • Build: npm run build