npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

kafka-clamed

v1.0.3

Published

Kafka provider with retry and DLQ support for Clamed applications

Downloads

40

Readme

Kafka Clamed

Provedor Kafka com suporte a retry e DLQ (Dead Letter Queue) para aplicações Clamed.

Funcionalidades

  • Consumer Kafka com tratamento de falhas automático
  • Retry progressivo com múltiplos tópicos de retry
  • DLQ (Dead Letter Queue) para mensagens que excedem o limite de tentativas
  • Singleton do cliente Kafka
  • Zero logs internos — o pacote apenas lança erros, o app consumidor decide como logar
  • Tipagem completa em TypeScript

Instalação

npm install kafka-clamed

Configuração

Variáveis de ambiente

| Variável | Obrigatório | Padrão | Descrição | |---|---|---|---| | KAFKA_CLIENT_ID | Não | kafka-clamed | ID do cliente Kafka | | APINAME | Não | — | Fallback para KAFKA_CLIENT_ID | | KAFKA_BROKER | Não | localhost:9092 | Endereço do broker Kafka | | KAFKA_BROKERS | Não | — | Lista de brokers separados por vírgula (sobrescreve KAFKA_BROKER) |

Arquivo de configuração de tópicos

Crie config/kafka-topics.yml:

partitions: 1
replication: 1
topics:
  - name: meu-topico-principal
retry_topics:
  - name: meu-topico-retry-30s
    retention_ms: 30000
  - name: meu-topico-retry-60s
    retention_ms: 60000
  - name: meu-topico-dlq
    retention_ms: 0

O tópico DLQ é identificado automaticamente pelo sufixo -dlq.

Uso

Uso básico

import { KafkaConsumer, TConsumerTopicData } from "kafka-clamed";

const consumer = new KafkaConsumer();

const data: TConsumerTopicData = {
    topic: "meu-topico-principal",
    partition: 0,
};

consumer
    .handleConsumer(data, async (message) => {
        // Sua lógica de negócio aqui
        const payload = message.value?.toString();
        console.log("Processando:", payload);
    })
    .catch((err) => {
        // Erro de setup (conexão, configuração, etc.)
        console.error("Falha ao iniciar consumer:", err);
    });

Com tratamento de erro personalizado

const consumer = new KafkaConsumer();

consumer.handleConsumer(
    { topic: "pedidos", partition: 0 },
    async (message) => {
        const pedido = JSON.parse(message.value!.toString());

        if (!pedido.id) {
            throw new Error("Pedido sem ID");
        }

        await salvarPedido(pedido);
    }
).catch((err) => {
    console.error("Erro fatal no consumer:", err);
    process.exit(1);
});

Como funciona o retry

Fluxo de processamento

Mensagem recebida
    ↓
messageHandleFunction() é executada
    ↓
    ├── Sucesso → offset é commitado → fim
    │
    └── Erro → nextRetryTopic() decide o destino
                ↓
        ├── attemptRetry < MAX_RETRIES (3)
        │   → envia para tópico de retry
        │   → offset é commitado
        │   → mensagem será reprocessada pelo consumer de retry
        │
        └── attemptRetry >= MAX_RETRIES (3)
            → envia para DLQ
            → offset é commitado
            → mensagem não será mais processada

Ciclo de retry completo

| Tentativa | Tópico destino | Delay | Descrição | |---|---|---|---| | 0 | retry-30s | 30s | Primeira tentativa de retry | | 1 | retry-60s | 60s | Segunda tentativa | | 2 | Último retry configurado | — | Usa Math.min se não houver mais tópicos | | 3+ | DLQ | — | Limite excedido, mensagem vai para Dead Letter Queue |

Headers enviados para o tópico de retry

{
    "x-retry-delay": "30000",
    "attemptRetry": "1",
    "erro": "Mensagem de erro original"
}

API

KafkaConsumer

new KafkaConsumer()

Cria uma nova instância do consumer.

handleConsumer(data, messageHandleFunction, fromBeginning?)

Inicia o consumer Kafka.

| Parâmetro | Tipo | Padrão | Descrição | |---|---|---|---| | data | TConsumerTopicData | — | Tópico e partição | | messageHandleFunction | (message: KafkaMessage) => Promise<void> | — | Função de processamento | | fromBeginning | boolean | false | Se deve ler mensagens desde o início |

Erros propagados:

  • "Consumer not initialized" — consumer não foi configurado
  • "Producer for retry messages not initialized" — producer de retry não foi configurado
  • Erros de conexão com Kafka
  • Erros de configuração de tópicos (arquivo YAML)

kafkaMessageDecode(message)

Decodifica uma mensagem Kafka.

| Parâmetro | Tipo | Retorno | |---|---|---| | message | KafkaMessage | object \| string \| null |

  • Buffer com JSON válido → objeto
  • Buffer com texto inválido → string raw
  • String com JSON válido → objeto
  • String com texto inválido → string raw
  • null ou undefinednull

nextRetryTopic(message, retryTopics, dlqTopic)

Decide o próximo tópico de retry baseado no header attemptRetry.

| Parâmetro | Tipo | Descrição | |---|---|---| | message | KafkaMessage | Mensagem com headers | | retryTopics | TTopicConfig[] | Lista de tópicos de retry | | dlqTopic | TTopicConfig | Tópico DLQ |

topicsConfig(configPath?)

Carrega a configuração de tópicos do arquivo YAML.

| Parâmetro | Tipo | Padrão | Descrição | |---|---|---|---| | configPath | string | ./config/kafka-topics.yml | Caminho do arquivo |

Erros propagados:

  • "File topic config not found" — arquivo vazio ou não encontrado
  • "Invalid topic config format" — YAML inválido
  • "No retry topics configured" — nenhum tópico com retentionMs > 0
  • "No DLQ topic configured" — nenhum tópico com sufixo -dlq

safeDisconnect()

Desconecta consumer e producer com timeout de 5 segundos.

sendRetryMessage(message, topic, retryDelay, attemptRetry, errorMessage)

Envia mensagem para o tópico de retry.

configureConsumer(kafka, retryTopics, data, fromBeginning?)

Configura e conecta o consumer.

configureProducer(kafka)

Configura e conecta o producer.

consumerRun(retryTopics, dlqTopic, messageHandleFunction)

Inicia o loop de consumo de mensagens.

Tipos

type TConsumerTopicData = {
    topic: string;
    partition: number;
};

type TTopicConfig = {
    topic: string;
    numPartitions: number;
    replicationFactor: number;
    retentionMs: number;
    cleanupPolicy: string;
};

type TKafkaTopicYamlConfig = {
    partitions: number;
    replication: number;
    topics: { name: string }[];
    retry_topics: { name: string; retention_ms: number }[];
};

KafkaClient

KafkaClient.createInstance()

Retorna a instância singleton do Kafka.

KafkaClient.resetInstance()

Reseta a instância singleton (útil para testes).

kafkaConfig

kafkaConfig()

Retorna a configuração Kafka baseada em variáveis de ambiente.

const config = kafkaConfig();
// { clientId: string, brokers: string[] }

Desenvolvimento

Scripts

npm test              # Executa todos os testes
npm run test:watch    # Modo watch
npm run test:coverage # Cobertura de testes
npm run build         # Compila TypeScript
npm run dev           # Modo desenvolvimento com hot-reload

Testes

npm test

# Saída esperada:
# Test Suites: 3 passed, 3 total
# Tests:       31 passed, 31 total

Licença

ISC — Clamed