kafka-clamed
v1.0.3
Published
Kafka provider with retry and DLQ support for Clamed applications
Downloads
40
Readme
Kafka Clamed
Provedor Kafka com suporte a retry e DLQ (Dead Letter Queue) para aplicações Clamed.
Funcionalidades
- Consumer Kafka com tratamento de falhas automático
- Retry progressivo com múltiplos tópicos de retry
- DLQ (Dead Letter Queue) para mensagens que excedem o limite de tentativas
- Singleton do cliente Kafka
- Zero logs internos — o pacote apenas lança erros, o app consumidor decide como logar
- Tipagem completa em TypeScript
Instalação
npm install kafka-clamedConfiguração
Variáveis de ambiente
| Variável | Obrigatório | Padrão | Descrição |
|---|---|---|---|
| KAFKA_CLIENT_ID | Não | kafka-clamed | ID do cliente Kafka |
| APINAME | Não | — | Fallback para KAFKA_CLIENT_ID |
| KAFKA_BROKER | Não | localhost:9092 | Endereço do broker Kafka |
| KAFKA_BROKERS | Não | — | Lista de brokers separados por vírgula (sobrescreve KAFKA_BROKER) |
Arquivo de configuração de tópicos
Crie config/kafka-topics.yml:
partitions: 1
replication: 1
topics:
- name: meu-topico-principal
retry_topics:
- name: meu-topico-retry-30s
retention_ms: 30000
- name: meu-topico-retry-60s
retention_ms: 60000
- name: meu-topico-dlq
retention_ms: 0O tópico DLQ é identificado automaticamente pelo sufixo
-dlq.
Uso
Uso básico
import { KafkaConsumer, TConsumerTopicData } from "kafka-clamed";
const consumer = new KafkaConsumer();
const data: TConsumerTopicData = {
topic: "meu-topico-principal",
partition: 0,
};
consumer
.handleConsumer(data, async (message) => {
// Sua lógica de negócio aqui
const payload = message.value?.toString();
console.log("Processando:", payload);
})
.catch((err) => {
// Erro de setup (conexão, configuração, etc.)
console.error("Falha ao iniciar consumer:", err);
});Com tratamento de erro personalizado
const consumer = new KafkaConsumer();
consumer.handleConsumer(
{ topic: "pedidos", partition: 0 },
async (message) => {
const pedido = JSON.parse(message.value!.toString());
if (!pedido.id) {
throw new Error("Pedido sem ID");
}
await salvarPedido(pedido);
}
).catch((err) => {
console.error("Erro fatal no consumer:", err);
process.exit(1);
});Como funciona o retry
Fluxo de processamento
Mensagem recebida
↓
messageHandleFunction() é executada
↓
├── Sucesso → offset é commitado → fim
│
└── Erro → nextRetryTopic() decide o destino
↓
├── attemptRetry < MAX_RETRIES (3)
│ → envia para tópico de retry
│ → offset é commitado
│ → mensagem será reprocessada pelo consumer de retry
│
└── attemptRetry >= MAX_RETRIES (3)
→ envia para DLQ
→ offset é commitado
→ mensagem não será mais processadaCiclo de retry completo
| Tentativa | Tópico destino | Delay | Descrição |
|---|---|---|---|
| 0 | retry-30s | 30s | Primeira tentativa de retry |
| 1 | retry-60s | 60s | Segunda tentativa |
| 2 | Último retry configurado | — | Usa Math.min se não houver mais tópicos |
| 3+ | DLQ | — | Limite excedido, mensagem vai para Dead Letter Queue |
Headers enviados para o tópico de retry
{
"x-retry-delay": "30000",
"attemptRetry": "1",
"erro": "Mensagem de erro original"
}API
KafkaConsumer
new KafkaConsumer()
Cria uma nova instância do consumer.
handleConsumer(data, messageHandleFunction, fromBeginning?)
Inicia o consumer Kafka.
| Parâmetro | Tipo | Padrão | Descrição |
|---|---|---|---|
| data | TConsumerTopicData | — | Tópico e partição |
| messageHandleFunction | (message: KafkaMessage) => Promise<void> | — | Função de processamento |
| fromBeginning | boolean | false | Se deve ler mensagens desde o início |
Erros propagados:
"Consumer not initialized"— consumer não foi configurado"Producer for retry messages not initialized"— producer de retry não foi configurado- Erros de conexão com Kafka
- Erros de configuração de tópicos (arquivo YAML)
kafkaMessageDecode(message)
Decodifica uma mensagem Kafka.
| Parâmetro | Tipo | Retorno |
|---|---|---|
| message | KafkaMessage | object \| string \| null |
- Buffer com JSON válido → objeto
- Buffer com texto inválido → string raw
- String com JSON válido → objeto
- String com texto inválido → string raw
nullouundefined→null
nextRetryTopic(message, retryTopics, dlqTopic)
Decide o próximo tópico de retry baseado no header attemptRetry.
| Parâmetro | Tipo | Descrição |
|---|---|---|
| message | KafkaMessage | Mensagem com headers |
| retryTopics | TTopicConfig[] | Lista de tópicos de retry |
| dlqTopic | TTopicConfig | Tópico DLQ |
topicsConfig(configPath?)
Carrega a configuração de tópicos do arquivo YAML.
| Parâmetro | Tipo | Padrão | Descrição |
|---|---|---|---|
| configPath | string | ./config/kafka-topics.yml | Caminho do arquivo |
Erros propagados:
"File topic config not found"— arquivo vazio ou não encontrado"Invalid topic config format"— YAML inválido"No retry topics configured"— nenhum tópico comretentionMs > 0"No DLQ topic configured"— nenhum tópico com sufixo-dlq
safeDisconnect()
Desconecta consumer e producer com timeout de 5 segundos.
sendRetryMessage(message, topic, retryDelay, attemptRetry, errorMessage)
Envia mensagem para o tópico de retry.
configureConsumer(kafka, retryTopics, data, fromBeginning?)
Configura e conecta o consumer.
configureProducer(kafka)
Configura e conecta o producer.
consumerRun(retryTopics, dlqTopic, messageHandleFunction)
Inicia o loop de consumo de mensagens.
Tipos
type TConsumerTopicData = {
topic: string;
partition: number;
};
type TTopicConfig = {
topic: string;
numPartitions: number;
replicationFactor: number;
retentionMs: number;
cleanupPolicy: string;
};
type TKafkaTopicYamlConfig = {
partitions: number;
replication: number;
topics: { name: string }[];
retry_topics: { name: string; retention_ms: number }[];
};KafkaClient
KafkaClient.createInstance()
Retorna a instância singleton do Kafka.
KafkaClient.resetInstance()
Reseta a instância singleton (útil para testes).
kafkaConfig
kafkaConfig()
Retorna a configuração Kafka baseada em variáveis de ambiente.
const config = kafkaConfig();
// { clientId: string, brokers: string[] }Desenvolvimento
Scripts
npm test # Executa todos os testes
npm run test:watch # Modo watch
npm run test:coverage # Cobertura de testes
npm run build # Compila TypeScript
npm run dev # Modo desenvolvimento com hot-reloadTestes
npm test
# Saída esperada:
# Test Suites: 3 passed, 3 total
# Tests: 31 passed, 31 totalLicença
ISC — Clamed
