muncherjs
v0.1.6
Published
Consumidor RabbitMQ com suporte a TypeScript | JavaScript
Maintainers
Readme
📦 MuncherJs
Um projeto criado em TypeScript para facilitar a criação de consumidores RabbitMQ, com suporte a métricas Prometheus, tracing com Elastic APM e um sistema de plugins para consumidores.
📄 Objetivo
Simplificar a integração com consumo de stream de menssagens via RabbitMQ
✨ Funcionalidades
- Métricas: Adicionado métricas de consumo (mensagens processadas, tempo médio, CPU, Memoria, etc.) via prometheus
- Monitoramento: Suporte para integração com APM do Elastic Search
- Resiliencia: Adicionado estrategias de retentativas e DLQ
- Long Process: Mecanismos de fallback para operações longas
- Plugin/Play: Adicionado suporte para criação de plugins para uso em geral
- ManagerConsumer: Controle de Skip, Stop, Retryable consumo de mensagens
- Logger: Suporte de de incoporação do logger de sua escolha
💡 Roadmap Futuro
- Healcheck para consumidores ativos
- Exemplos de
idempotency keycom redis
⚡ Pipeline inteligente de consumo RabbitMQ
mensagens fluem em um ciclo estruturado de pré-processamento, processamento e pós-processamento — com controle total do contexto, resiliência, garantindo consistência e extensibilidade de ponta a ponta.

📊 Metricas
Esta lib utiliza Prometheus para coletar, armazenar e alertar sobre métricas em tempo real, garantindo alta disponibilidade e performance.
🔍 Métricas Coletadas:
- Infraestrutura: CPU, memória, uso de disco, tráfego de rede.
- Consumidor RabbitMQ: Total de mensagens processadas, Tempo de processamento de mensagens, Mensagens atualmente sendo processadas, Total de novas tentativas de processamento, Total de mensagens enviadas para DLQ

🚀 Application Performance Monitoring (APM)
Este lib utiliza APM para monitorar desempenho, rastrear transações distribuídas e identificar gargalos em tempo real.
🔍 Métricas Coletadas:
- Erros corridos em tempo de execução
- Latencia media do consumidor
- Throughput por consumo de mensagens
- CPU e Memoria utilizada

⚙️ Instalação
npm install muncherjs -S
🤓 Exemplo de uso
Simple
import { MuncherConsumerBase } from 'muncher'
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}]
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Simple Metadata shared
import { MuncherConsumerBase } from 'muncher'
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async preProcess(ctx: IMessageContext<Example>): Promise<void> {
ctx.metadata.new = "New Value in ctx" // Data shared in context
}
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx.metadata.new)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}]
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Saidas de Error´s
import { MuncherConsumerBase } from 'muncher'
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
// In Class Domain
protected async onErrorProcess(ctx: IMessageContext<Example>, error: unknown): Promise<void> {
console.log(ctx, error)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}]
})
// Out Class Domain
consumer.onError((err) => {
console.log(err)
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Suporte ao APM
import { MuncherConsumerBase } from 'muncher'
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}],
apm: {
serverUrl: 'http://localhost:8200'
}
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Suporte ao Prometheus
import { MuncherConsumerBase } from 'muncher'
import http, { Server } from 'http';
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}]
})
const server = http.createServer(async (req, res) => {
if (req.url === '/metrics') {
const { metrics, contentType } = await consumer.getMetrics();
res.writeHead(200, { 'Content-Type': contentType });
res.end(metrics);
} else {
res.writeHead(404);
res.end();
}
});
await consumer.start()
server.listen(9091, () => {
console.log('🚀 Metrics server rodando em http://localhost:9091/metrics');
})
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Suporte para Plugins
import { MuncherConsumerBase, SkipMessageError } from 'muncher'
class PluginValidade extends PluginBase<unknown> {
private schema: AnySchema
constructor(schema: AnySchema) {
super('PluginValidade');
this.schema = schema
}
async preProcess(ctx: IMessageContext<unknown>): Promise<void> {
const validate = this.schema.validate(ctx.)
if (validate.error) {
throw new SkipMessageError(this.name, validate.error.details)
}
}
async initialize(consumer: IConsumer): Promise<void> {
consumer.onError((err) => {
console.log("plugin", err)
})
}
}
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}],
plugins: [
new PluginValidade(joi.object({
name: joi.string()
}))
]
})
consumer.withPlugin(new PluginValidade(joi.object({
name: joi.string()
})))
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Error Types
| Parâmetro | Descrição | Example | ------------------- | -------------------------------------------------------- | ----------------- | StopProcessingError | Consumo da menssagem e parado é marcado como erro | new StopProcessingError('msg', {...params}) | SkipMessageError | Consumo da menssagem e finalizado é marcado como sucesso | new SkipMessageError('msg', {...params}) | RetryableError | Consumo da menssagem e marcado para reprocessamento | new RetryableError('msg', {...params})
Configs
| Parâmetro | Tipo | Obrigatório | Descrição
| ----------- |----------|------------- | --------------------------------------
| connection | object | ✅ | Conexão com RabbitMQ Valores
| exchanges | Array | ❌ | Configuração de consumo Via Exchange Valores
| queues | Array | ❌ | Configuração de consumo Via Queue Valores
| binds | Array | ❌ | Configuração de consumo Via Binding Valores
| plugins | Array | ❌ | Configuração de Plugins
| serviceName | string | ❌ | Nome do serviço
| prefetch | string | ❌ | Quantidade mensagens um consumidor pode processar ao mesmo tempo
| logger | number | ❌ | Logger customizado Default value: console
| DLQ | object | ❌ | Configuração de DLQ Valores
| apm | object | ❌ | Configuração do apm Valores
| retryPolicy | object | ❌ | Configuração de retryPolicy Valores
Connection
| Parâmetro | Descrição | Exemplo
| --------- | ----------------------------------------------------------- | -----------------------------------
| protocol | O protocolo a ser utilizado | Default value: 'amqp'
| hostname | Nome do host usado para conexão ao servidor. | Default value: 'localhost'
| port | Port used for connecting to the server. | Default value: 5672
| username | Nome de usuário usado para autenticação no servidor. | Default value: 'guest'
| password | Senha usada para autenticação no servidor. | Default value: 'guest'
| locale | O local desejado para mensagens de erro. O RabbitMQ usa apenas en_US | Default value: 'en_US'
| frameMax | O tamanho em bytes do quadro máximo permitido na conexão. 0 significa sem limite (mas como os quadros têm um campo de tamanho que é um inteiro sem sinal de 32 bits, ele é forçosamente 2^32 - 1). | Default value: 0x1000 (4kb)
| heartbeat | O período do heartbeat da conexão em segundos. | Default value: 0
| vhost | VHost usado. | Default value: '/'
DLQ
| Parâmetro | Tipo | Obrigatório | Descrição
| -----------|----------|------------- | --------------------------------------
| routingKey | string | ✅ | Nome da fila que receberá a mensagem.
| exchange | string | ❌ | Nome da exchange que envia a mensagem.
Importante!: Caso não seja passado uma exchange o envio de dados ao DLQ será feito via sendToQueue
| Cenário | Use sendToQueue | Use publish |
| --------------------------------------------------- | ----------------- | ------------- |
| Enviar direto pra uma fila específica? | ✅ | ⚠️ |
| Quer usar Exchange + RoutingKey? | ⚠️ | ✅ |
| Precisa de controle com DLX, fanout, topic routing? | ❌ | ✅ |
| Protótipo simples/teste? | ✅ | ⚠️ |
Queues
| Parâmetro | Tipo | Obrigatório | Descrição
| --------------- | -------- | ------------ | -----------------------------------
| topic | string | ✅ | Nome da fila que receberá a mensagem.
| options | object | ❌ | Opções de Assert Valores
| optionsConsumer | object | ❌ | Opções do consumidor Valores
Exchange
| Parâmetro | Tipo | Obrigatório | Descrição
| --------------- | -------- | ------------ | -----------------------------------
| topic | string | ✅ | Nome da fila que receberá a mensagem.
| type | string | ✅ | Tipo de roteamento da exchange. Valores
| options | object | ❌ | Opções de Assert Valores
| optionsConsumer | object | ❌ | Opções do consumidor Valores
Binding
| Parâmetro | Tipo | Obrigatório | Descrição
| ------------ | -------- | ------------- | -----------------------------------
| queue | string | ✅ | Nome da fila que receberá a mensagem.
| exchange | string | ✅ | Nome da exchange que envia a mensagem.
| routingKey | string | ✅ | Chave usada pra rotear a mensagem. Pode ser '', #, *.log, etc.
| args | any | ❌ | Argumentos adicionais (raramente usados; para casos avançados).
Exchange Type
| Tipo | Comportamento |
| --------- | ------------------------------------------------------------------- |
| direct | Roteamento exato via routingKey. |
| topic | Roteamento por padrões (*.log, #.error). |
| fanout | Ignora routingKey, envia para todas as filas ligadas. |
| headers | Roteia por headers, não por routingKey. Mais raro e avançado. |
Options.AssertQueue
| Parâmetro | Tipo | Descrição |
| --------------| --------- | ----------------------------------------------------------------------------------------- |
| durable | boolean | Se true, a fila sobrevive a reinicializações do servidor. |
| exclusive | boolean | Se true, a fila é acessível apenas por esta conexão e será deletada ao desconectar. |
| autoDelete | boolean | Se true, a fila será removida automaticamente quando não tiver mais consumidores. |
| messageTtl | number | Tempo (ms) que a mensagem pode ficar na fila antes de ser descartada. |
| expires | number | Tempo (ms) que a fila pode ficar inativa antes de ser deletada. |
| maxLength | number | Máximo de mensagens que a fila pode conter. |
| maxPriority | number | Define níveis de prioridade de mensagens (quando usado). |
Options.AssertExchange
| Parâmetro | Tipo | Descrição |
| ------------ | --------- | ------------------------------------------------------------------------------------------------ |
| durable | boolean | Se true, a exchange sobrevive a reinicializações do servidor. |
| internal | boolean | Se true, a exchange não pode ser publicada diretamente, apenas usada por outras exchanges. |
| autoDelete | boolean | Se true, a exchange é deletada quando não houver mais filas vinculadas. |
| arguments | any | Usado para configurações adicionais específicas de tipo de exchange. |
Options.Consume
| Parâmetro | Tipo | Descrição |
| ------------- | --------- | -------------------------------------------------------------------------------------------------------------- |
| consumerTag | string | Identificador único do consumidor. Útil para cancelar ou rastrear consumo. |
| noAck | boolean | Se true, não exige ack manual. A mensagem é considerada entregue assim que recebida. ⚠️ Use com cuidado. |
| exclusive | boolean | Se true, somente esta conexão pode consumir da fila. |
| priority | number | Prioridade do consumidor. Maior valor consome primeiro (requer suporte da fila). |
| arguments | any | Argumentos adicionais (ex: filtros, extensões customizadas). |
RetryConfig
| Parâmetro | Tipo | Descrição |
| ------------- | -------- | --------------------------------- |
| maxRetries | number | Número de retentativas |
| backoff | number | Tempo de espera entre tentativas |
APM
| Parâmetro | Tipo | Descrição |
| --------------- | -------- | -------------------------------------------------------------------------------------------------------------- |
| serverUrl | string | URL de conexão com APM |
| apiKey | string | A chave de API. Esta é uma alternativa ao secretToken. (Mais Informações)[https://www.elastic.co/docs/reference/apm/agents/nodejs/configuration#api-key] |
| secretToken | string | O token secreto. (Mais Informações)[https://www.elastic.co/docs/reference/apm/agents/nodejs/configuration#secret-token] |
| cloudProvider | string | Auto configuração para provedores Clound (Mais Informações)[https://www.elastic.co/docs/reference/apm/agents/nodejs/configuration#cloud-provider] |
📫 Contribuições
Sinta-se à vontade para abrir issues ou enviar pull requests. Sugestões são sempre bem-vindas!
🤓 Contato
Desenvolvido por: Ismael Alves 🤓🤓🤓
- Email: [email protected]
- Github: github.com/ismaelalvesgit
- Linkedin: linkedin.com/in/ismael-alves-6945531a0/
