kosumer
v0.1.2
Published
Consumidor kafka com suporte a TypeScript | JavaScript
Maintainers
Readme
📦 Kosumer
Um projeto criado TypeScript para facilitar a criação de consumidores Kafka, com suporte a métricas Prometheus, tracing com Elastic APM e um sistema de plugins para consumidores.
📄 Objetivo
Simplificar a integração com consumo de stream de menssagens via kafka
✨ Funcionalidades
- Métricas: Adicionado métricas de consumo (mensagens processadas, tempo médio, CPU, Memoria, etc.) via prometheus
- Monitoramento: Suporte para integração com APM do Elastic Search
- Resiliencia: Adicionado estrategias de retentativas e DLQ
- Long Process: Mecanismos de fallback para operações longas
- Plugin/Play: Adicionado suporte para criação de plugins para uso em geral
💡 Roadmap Futuro
- Healcheck para consumidores ativos
- Exemplos de
idempotency keycom redis
⚡ Pipeline inteligente de consumo Kafka
mensagens fluem em um ciclo estruturado de pré-processamento, processamento e pós-processamento — com controle total do contexto, resiliência e commit seguro de offset, garantindo consistência e extensibilidade de ponta a ponta.

📊 Metricas
Esta lib utiliza Prometheus para coletar, armazenar e alertar sobre métricas em tempo real, garantindo alta disponibilidade e performance.
🔍 Métricas Coletadas:
- Infraestrutura: CPU, memória, uso de disco, tráfego de rede.
- Consumidor Kafka: Total de mensagens processadas, Tempo de processamento de mensagens, Mensagens atualmente sendo processadas, Total de novas tentativas de processamento, Total de mensagens enviadas para DLQ

🚀 Application Performance Monitoring (APM)
Este lib utiliza APM para monitorar desempenho, rastrear transações distribuídas e identificar gargalos em tempo real.
🔍 Métricas Coletadas:
- Erros corridos em tempo de execução
- Latencia media do consumidor
- Throughput por consumo de mensagens
- CPU e Memoria utilizada

⚙️ Instalação
npm install kosumer -S
🤓 Exemplo de uso
Simple
import { KafkaConsumerBase } from 'kosumer'
class ExampleConsumer extends KafkaConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
groupId: "Example.Group",
topics: ['Queuing.Example'],
kafka: {
brokers: ['127.0.0.1:9092']
},
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Simple Metadata shared
import { KafkaConsumerBase } from 'kosumer'
class ExampleConsumer extends KafkaConsumerBase<Example> {
protected async preProcess(ctx: IMessageContext<Example>): Promise<void> {
ctx.metadata.new = "New Value in ctx" // Data shared in context
}
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
groupId: "Example.Group",
topics: ['Queuing.Example'],
kafka: {
brokers: ['127.0.0.1:9092']
},
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Saidas de Error´s
import { KafkaConsumerBase } from 'kosumer'
class ExampleConsumer extends KafkaConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
// In Class Domain
protected async onErrorProcess(ctx: IMessageContext<Example>, error: unknown): Promise<void> {
console.log(ctx, error)
}
}
const consumer = new ExampleConsumer({
groupId: "Example.Group",
topics: ['Queuing.Example'],
kafka: {
brokers: ['127.0.0.1:9092']
},
})
// Out Class Domain
consumer.onError((err) => {
console.log(err)
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Suporte ao APM
import { KafkaConsumerBase } from 'kosumer'
class ExampleConsumer extends KafkaConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
groupId: "Example.Group",
topics: ['Queuing.Example'],
kafka: {
brokers: ['127.0.0.1:9092']
},
apm: {
serverUrl: 'http://localhost:8200'
}
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Suporte ao Prometheus
import { KafkaConsumerBase } from 'kosumer'
import http, { Server } from 'http';
class ExampleConsumer extends KafkaConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
groupId: "Example.Group",
topics: ['Queuing.Example'],
kafka: {
brokers: ['127.0.0.1:9092']
}
})
const server = http.createServer(async (req, res) => {
if (req.url === '/metrics') {
const { metrics, contentType } = await consumer.getMetrics();
res.writeHead(200, { 'Content-Type': contentType });
res.end(metrics);
} else {
res.writeHead(404);
res.end();
}
});
await consumer.start()
server.listen(9091, () => {
console.log('🚀 Metrics server rodando em http://localhost:9091/metrics');
})
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Suporte para Plugins
import { KafkaConsumerBase } from 'kosumer'
class PluginValidade extends PluginBase<unknown> {
private schema: AnySchema
constructor(schema: AnySchema) {
super('PluginValidade');
this.schema = schema
}
async preProcess(ctx: IMessageContext<unknown>): Promise<void> {
const validate = this.schema.validate(ctx)
if (validate.error) {
throw Error(JSON.stringify(validate.error.details))
}
}
async initialize(consumer: IConsumer): Promise<void> {
consumer.onError((err) => {
console.log("plugin", err)
})
}
}
class ExampleConsumer extends KafkaConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
groupId: "Example.Group",
topics: ['Queuing.Example'],
kafka: {
brokers: ['127.0.0.1:9092']
},
plugins: [
new PluginValidade(joi.object({
message: joi.number()
}))
]
})
consumer.withPlugin(new PluginValidade(joi.object({
topic: joi.string(),
partition: joi.string(),
message: joi.object({
name: joi.string(),
identifier: joi.string(),
})
})))
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Error Types
| Parâmetro | Descrição | Example | ------------------- | -------------------------------------------------------- | ----------------- | StopProcessingError | Consumo da menssagem e parado é marcado como erro | new StopProcessingError('msg', {...params}) | SkipMessageError | Consumo da menssagem e finalizado é marcado como sucesso | new SkipMessageError('msg', {...params}) | RetryableError | Consumo da menssagem e marcado para reprocessamento | new RetryableError('msg', {...params})
Configs
Name | Description | Example ----------------------------|---------------------------------------------------------------|------------------ groupId | Identificador do grupo de consumidores ao qual este consumidor pertence. | meu-grupo topics | Nomes das Filas a serem consumidas | ['Queuing.Example'] dlqtopics | Nomes das Filas de DLQ (Dead Letter Queue) | ['Queuing.Example.DLQ'] serviceName | Nome do consumidor | Padrão primeiro nome das filas configuradas fromBeginning | Utilizado Quando você quer reprocessar todas as mensagens de um tópico. | Padrão: false plugins | Lista de plugins utilizados | [new PluginExample()] apm | Configurações do APM (Application Performance Monitoring) | { serverUrl: 'http://localhost:8200' } kafka | Configurações de conexão com o Kafka | { brokers: ['127.0.0.1:9092'] } retryPolicy | Configurações de retry | { backoff: 1000, maxRetries: 3 } allowAutoTopicCreation | Indica se o consumidor pode solicitar a criação automática de tópicos inexistentes. | Padrão: true. partitionAssigners | Lista de estratégias personalizadas para atribuição de partições aos consumidores. | 10 metadataMaxAge | Intervalo em milissegundos para atualização das informações de metadata (como lista de tópicos e partições). | Padrão: 300000 (5 minutos). sessionTimeout | Tempo em milissegundos que o consumidor pode ficar inativo antes de ser considerado morto pelo broker. | Padrão: 30000 (30 segundos). rebalanceTimeout | Tempo máximo permitido para o consumidor concluir o processo de rebalanceamento. | Padrão: 60000 (60 segundos). heartbeatInterval | Intervalo em milissegundos entre os heartbeats enviados ao broker para indicar que o consumidor está ativo. | Padrão: 3000 (3 segundos). maxBytesPerPartition | Quantidade máxima de dados (em bytes) que o consumidor pode ler de cada partição em uma única solicitação. | Padrão: 1048576 (1 MB). minBytes | Quantidade mínima de dados (em bytes) que o broker deve retornar para uma solicitação de leitura. | Padrão: 1 maxBytes | Quantidade máxima de dados (em bytes) que o broker pode retornar para uma solicitação de leitura. | Padrão: 10485760 (10 MB). maxWaitTimeInMs | Tempo máximo em milissegundos que o broker pode esperar antes de retornar dados para uma solicitação de leitura. | Padrão: 5000 (5 segundos). readUncommitted | Indica se o consumidor deve ler mensagens não confirmadas (uncommitted) em tópicos com transações. | Padrão: false. rackId | Identificador do rack onde o consumidor está localizado, utilizado para preferências de leitura em ambientes com replicação. | 1
📫 Contribuições
Sinta-se à vontade para abrir issues ou enviar pull requests. Sugestões são sempre bem-vindas!
🤓 Contato
Desenvolvido por: Ismael Alves 🤓🤓🤓
- Email: [email protected]
- Github: github.com/ismaelalvesgit
- Linkedin: linkedin.com/in/ismael-alves-6945531a0/
