npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

kafkakit

v0.0.1

Published

Advanced Node.js/TypeScript toolkit for Kafka with exactly-once semantics (EOS), transactional and idempotent producers, dynamic consumer groups, retry and dead-letter pipelines, producer pool management, multi-cluster support, and graceful shutdown. Full

Readme

Introduction

A Node.js/TypeScript toolkit for Kafka with exactly-once semantics (EOS), transactional and idempotent producers, dynamic consumer groups, retry and dead-letter pipelines, producer pool management, multi-cluster support, and graceful shutdown. Fully typed and event-driven, with all internal complexity hidden. Designed to support Saga-based workflows and orchestration patterns for building reliable distributed systems.

Key Features

  • Dual-Mode EOS Message Handling

    • Supports transactional producers for EOS.
    • Can also work in exactly-once mode without full Kafka transactions, using idempotent producers.
    • Manages offsets automatically for transactional and non-transactional consumption.
  • The Transactional Guarantee (EOS)

    • Every partition is assigned to a stable producer identity.
    • Maintains a pool of Kafka producers for transactional sends.
    • Ensures transactional producers are not shared mid-transaction.
    • Constraint: Producer pool size and transactional id should remain stable across pod restarts to mantain EOS guarantees.
  • The Recovery Pipeline (Retry & DLQ)

    • Messages that fail are automatically moved to "Retry Topics" with the required metadata attached to the headers whether its a transtional or a non-transactional message.
    • Once retries are exhausted or no retries specified, messages are safely moved to a DLQ for manual inspection, ensuring your main processing loop never gets blocked by a "poison pill" message.
  • Consumer Groups

    • Graceful shutdown support for in-flight message processing.
    • Supports different configs per topic.
    • Built-in retry and dead-letter pipelines.
  • Multi-cluster Support

    • Run multiple independent Kafka clusters (e.g., Finance and Analytics) within a single server instance. All internal state, producer pools, and configurations are strictly isolated.
  • Event-Driven Routing

    • Subscribe to typed events by message event.
    • Each topic can have multiple event handlers.
    • Handles JSON parsing, metadata injection, and transactional context automatically.
  • Internal Metadata Support

    • Each consumer can have custom metadata (consumer ID, custom context such as pod region) injected automatically.
    • Useful for tracking and dynamic routing of messages.

Consumer Retries

Image

Installation

Install from NPM

npm i kafkakit

Usage

Setting a logger (optional)

import { kafkaConfig } from "kafkakit";

kafkaConfig.setLogger(logger); // Your customized logger

Initialize your Kafka client to pass in

import { Kafka } from "kafkajs";
kafkaClient = new Kafka({
  clientId: "example",
  brokers: [],
});

Producer

  • Functions Available

    • The connect() function allows you to connect your non-transactional producer. Regarding transational ones, they're automatically created by the consumers.

    • The disconnect() allows you to gracefully shut down your producers by waiting for each to finish its jobs, ensuring that no in-flight messages are dropped and all internal buffers are flushed before the process exits.

    • The reset() mechanism is specifically designed for Kafka rebalances. It "drains" the current promise queues for transactional producers, allowing pending sends to complete before clearing the pool. This prevents "Zombie Producers" from hanging around after partition ownership has shifted.

      Note: This is an internal safeguard and is not intended for direct use.

    • The send() method publishes messages using a shared idempotent producer and is intended for non-transactional flows. It ensures durability with acks: -1 but does not tie message production to consumer offset commits.

    • The runInTransaction() method enables exactly-once semantics by wrapping message production and offset commits inside a single Kafka transaction. If the task succeeds, both the produced messages and the offset commit are finalized atomically; if it fails, the transaction is aborted and no offsets are advanced.

      Note: This is typically used within consumers and is already configured for you in the consumer examples below, as the package handles the setup automatically.

  • Example

    import { Producer, ProducerConfig } from "kafkakit";
    
    
    /*
    * transactionalIdPrefix & maxPoolSize => Must remain stable across pod restarts to avoid transactional fencing and PID invalidation
    */
    
    const config: ProducerConfig = {
    transactionalIdPrefix: env.POD_NAME,
    createPartitioner?: ICustomPartitioner
    retry?: RetryOptions
    metadataMaxAge?: number
    allowAutoTopicCreation?: boolean
    transactionTimeout?: number
    maxInFlightRequests?: number
    }
    
    const maxPoolSize = 5
    
    const producer = new Producer(kafkaClient,config,maxPoolSize);
    
    await producer.connect()
    await producer.disconnect()
    await producer.reset() // Not meant for manual usage
    
    await producer.send("topic",[{
      key: "test",
      value: JSON.stringify({
        event: "OrderCreated",
        data: {
          name: "example"
        }
      })
    }])
    
    
    // Either both topics get the messages or none do.
    // This is meant for consume -> process -> produce -> commit offset atomically (can't be used manually).
    await producer.runInTransaction(async (send)=> {
      // Any processing
      const result = await db.findById(exampleId)
      // If you want to send a message to another topic
      await send("topic-1",[
        {
         event: "OrderCreated",
          data: {
            result
          }
        }
      ])
    
       await send("topic-2",[
        {
         event: "OrderCreated",
          data: {
            result
          }
        }
      ])
    },{
      groupId: "example-group",
      topic,
      partition,
      offset: (BigInt(message.offset) + 1n).toString(),
    })

Consumer Group

  • Functions Available

    • The connect() function connects and starts the consumers belonging to their group.
    • The disconnect() function gracefully shuts down the consumers in the group by waiting for all in-flight messages to finish processing before shutting down.
    • The subscribe() function registers consumers in a group to a specific topic and attaches the corresponding event handlers.

Topic Events

  • Properties & Functions Available

    • The topic property returns the topic name.
    • The on<EventDataType>() function registers an event handler for a specific event within a topic.
    • The getSubscription() function returns the topic along with its registered event handlers, ready to be subscribed by a consumer group.

Consumer Group + Topic Events Example

  • Everyone might have their own way of writing the code and initializing things, but here is an illustrative example.
import { ConsumerGroup, TopicEvents, ConsumerGroupConfigs, ConsumerConfigs } from "kafkakit";

interface OrderCreated {
  products: string[]
}

const setupOrdersTopic = async () => {
  /*
   * const db = db client (for example);
   */
  const topicEvents = new TopicEvents("orders");

  topicEvents.on<OrderCreated>("OrderCreated",async({
    key, // if no key was set on the message its undefined
    data, // data object without the event
    ctx: {
      producedAt, // timestamp when message was produced
      receivedAt, // timestamp when consumer started processing
      headers, // message header -> might be undefined if no header was set
      metadata: { // additional context
        consumerId, groupId, isLeader, // base metadata of the consumer -> might be undefined before the consumer joins the group
        // context provided by you
        region
      },
      send // Sends messages either transactionally or non-transactionally, depending on the topic configuration
    }
  })=> {
     // Example: Only the leader consumer performs certain actions
  if (isLeader)
    console.log("I am the leader, performing leader-only logic...");

  // Example: Store the region that processed the actions
  db.store(region)

  await send("Payment",[{
    key: "example",
    headers: {
      source: "example-service"
    },
    value: JSON.stringify({
      event: "OrderCreated",
      data: {
        orderId: key
      }
    })
  }])
  })
  return topicEvents
};

const topicsHandlerSetupMap: Record<
  string,
  () => Promise<TopicEvents<any>>
> = {
  orders: setupOrdersTopic
}

// Consumer Group

const consumerConfig: ConsumerConfigs<TMeta> = {
  dlqTopic: "dead-letter-queue";
  sessionTimeout?: number;
  heartbeatInterval?: number;
  rebalanceTimeout?: number;
  partitionsConsumedConcurrently?: number;
  /*
  * Additional context that will be passed to consumers.
  * This will run on each message.
  * For best practices, avoid making database or external API calls here to prevent delays during message processing.
  */
  meta?: ()=> ({
    region: env.POD_REGION
  })
}

const consumerGroupConfig:ConsumerGroupConfigs<TMeta> = {
  groupId: "example",
  totalConsumers: 3,
  producer: // Producer cluster instance we created
  kafkaClient // kafka client
  consumerConfig:consumerConfig,
  topics: [
    {
      topic: "orders",
      useTransaction: true,
      retries: {
        count: 3,
        retrialTopic: "orders-retry"
      }
    },
    {
      topic: "payment",
      useTransaction: false,
      // retries is optional
    }
  ]
}

const consumerGroup = new ConsumerGroup<TMeta>(consumerGroupConfig)
await Promise.all(consumerGroupConfig.topics.map(async ({topic})=> {
  const setupFn = topicsHandlerSetupMap[topic]
  if(!setupFn) return

  const topicEvents = await setupFn();
  const subscription = topicEvents.getSubscription();

  consumerGroup.subscribe(topic, subscription.handler);
}))

await consumerGroup.connect()

await consumerGroup.disconnect()