npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@drarzter/kafka-client

v0.8.0

Published

Type-safe Kafka client wrapper for NestJS with typed topic-message maps

Downloads

1,242

Readme

@drarzter/kafka-client

npm version CI License: MIT

Type-safe Kafka client for Node.js. Framework-agnostic core with a first-class NestJS adapter. Built on top of @confluentinc/kafka-javascript (librdkafka).

Table of contents

What is this?

An opinionated, type-safe abstraction over @confluentinc/kafka-javascript (librdkafka). Works standalone (Express, Fastify, raw Node) or as a NestJS DynamicModule. Not a full-featured framework — just a clean, typed layer for producing and consuming Kafka messages.

This library exists so you don't have to think about:

  • rebalance edge cases
  • retry loops and backoff scheduling
  • dead letter queue wiring
  • transaction coordinator warmup
  • graceful shutdown and offset commit pitfalls
  • silent message loss

Safe by default. Configurable when you need it. Escape hatches for when you know what you're doing.

Why?

  • Typed topics — you define a map of topic -> message shape, and the compiler won't let you send wrong data to wrong topic
  • Topic descriptorstopic() DX sugar lets you define topics as standalone typed objects instead of string keys
  • Framework-agnostic — use standalone or with NestJS (register() / registerAsync(), DI, lifecycle hooks)
  • Idempotent produceracks: -1, idempotent: true by default
  • Lamport Clock deduplication — every outgoing message is stamped with a monotonically increasing x-lamport-clock header; the consumer tracks the last processed value per topic:partition and silently drops (or routes to DLQ / a dedicated topic) any message whose clock is not strictly greater than the last seen value
  • Retry + DLQ — exponential backoff with full jitter; dead letter queue with error metadata headers (original topic, error message, stack, attempt count)
  • Batch sending — send multiple messages in a single request
  • Batch consumingstartBatchConsumer() for high-throughput eachBatch processing
  • Partition key support — route related messages to the same partition
  • Custom headers — attach metadata headers to messages
  • Transactions — exactly-once semantics with producer.transaction()
  • EventEnvelope — every consumed message is wrapped in EventEnvelope<T> with eventId, correlationId, timestamp, schemaVersion, traceparent, and Kafka metadata
  • Correlation ID propagation — auto-generated on send, auto-propagated through AsyncLocalStorage so nested sends inherit the same correlation ID
  • OpenTelemetry support@drarzter/kafka-client/otel entrypoint with otelInstrumentation() for W3C Trace Context propagation
  • Consumer interceptors — before/after/onError hooks with EventEnvelope access
  • Client-wide instrumentationKafkaInstrumentation hooks for cross-cutting concerns (tracing, metrics)
  • Auto-create topicsautoCreateTopics: true for dev mode — no need to pre-create topics
  • Error classesKafkaProcessingError and KafkaRetryExhaustedError with topic, message, and attempt metadata
  • Health check — built-in health indicator for monitoring
  • Multiple consumer groups — named clients for different bounded contexts
  • Declarative & imperative — use @SubscribeTo() decorator or startConsumer() directly
  • Async iteratorconsume<K>() returns an AsyncIterableIterator<EventEnvelope<T[K]>> for for await consumption; breaking out of the loop stops the consumer automatically
  • Message TTLmessageTtlMs drops or DLQs messages older than a configurable threshold, preventing stale events from poisoning downstream systems after a lag spike
  • Circuit breakercircuitBreaker option applies a sliding-window breaker per topic-partition; pauses delivery on repeated DLQ failures and resumes after a configurable recovery window
  • Seek to offsetseekToOffset(groupId, assignments) seeks individual partitions to explicit offsets for fine-grained replay
  • Tombstone messagessendTombstone(topic, key) sends a null-value record to compact a key out of a log-compacted topic; all instrumentation hooks still fire
  • Regex topic subscriptionstartConsumer([/^orders\..+/], handler) subscribes using a pattern; the broker routes matching topics to the consumer dynamically
  • Compression — per-send compression option (gzip, snappy, lz4, zstd) in SendOptions and BatchSendOptions
  • Partition assignment strategypartitionAssigner in ConsumerOptions chooses between cooperative-sticky (default), roundrobin, and range
  • Admin APIlistConsumerGroups(), describeTopics(), deleteRecords() for group inspection, partition metadata, and message deletion

See the Roadmap for upcoming features and version history.

Installation

npm install @drarzter/kafka-client

@confluentinc/kafka-javascript uses a native librdkafka addon. On most systems it builds automatically. For faster installs (skips compilation), install the system library first:

# Arch / CachyOS
sudo pacman -S librdkafka

# Debian / Ubuntu
sudo apt-get install librdkafka-dev

# macOS
brew install librdkafka

Then install with BUILD_LIBRDKAFKA=0 npm install.

For NestJS projects, install peer dependencies: @nestjs/common, @nestjs/core, reflect-metadata, rxjs.

For standalone usage (Express, Fastify, raw Node), no extra dependencies needed — import from @drarzter/kafka-client/core.

Standalone usage (no NestJS)

import { KafkaClient, topic } from '@drarzter/kafka-client/core';

const OrderCreated = topic('order.created').type<{ orderId: string; amount: number }>();

const kafka = new KafkaClient('my-app', 'my-group', ['localhost:9092']);
await kafka.connectProducer();

// Send
await kafka.sendMessage(OrderCreated, { orderId: '123', amount: 100 });

// Consume — handler receives an EventEnvelope
await kafka.startConsumer([OrderCreated], async (envelope) => {
  console.log(`${envelope.topic}:`, envelope.payload.orderId);
});

// Custom logger (winston, pino, etc.)
const kafka2 = new KafkaClient('my-app', 'my-group', ['localhost:9092'], {
  logger: myWinstonLogger,
});

// All module options work in standalone mode too
const kafka3 = new KafkaClient('my-app', 'my-group', ['localhost:9092'], {
  autoCreateTopics: true,   // auto-create topics on first use
  numPartitions: 3,         // partitions for auto-created topics
  strictSchemas: false,     // disable schema enforcement for string topic keys
  instrumentation: [...],   // client-wide tracing/metrics hooks
});

// Health check — available directly, no NestJS needed
const status = await kafka.checkStatus();
// { status: 'up', clientId: 'my-app', topics: ['order.created', ...] }

// Stop all consumers without disconnecting the producer or admin
// Useful when you want to re-subscribe with different options
await kafka.stopConsumer();

Quick start (NestJS)

Send and receive a message in 3 files:

// types.ts
export interface MyTopics {
  'hello': { text: string };
}
// app.module.ts
import { Module } from '@nestjs/common';
import { KafkaModule } from '@drarzter/kafka-client';
import { MyTopics } from './types';
import { AppService } from './app.service';

@Module({
  imports: [
    KafkaModule.register<MyTopics>({
      clientId: 'my-app',
      groupId: 'my-group',
      brokers: ['localhost:9092'],
    }),
  ],
  providers: [AppService],
})
export class AppModule {}
// app.service.ts
import { Injectable } from '@nestjs/common';
import { InjectKafkaClient, KafkaClient, SubscribeTo, EventEnvelope } from '@drarzter/kafka-client';
import { MyTopics } from './types';

@Injectable()
export class AppService {
  constructor(
    @InjectKafkaClient() private readonly kafka: KafkaClient<MyTopics>,
  ) {}

  async send() {
    await this.kafka.sendMessage('hello', { text: 'Hello, Kafka!' });
  }

  @SubscribeTo('hello')
  async onHello(envelope: EventEnvelope<MyTopics['hello']>) {
    console.log('Received:', envelope.payload.text);
  }
}

Usage

1. Define your topic map

Both interface and type work — pick whichever you prefer:

// Explicit: extends TTopicMessageMap — IDE hints that values must be Record<string, any>
import { TTopicMessageMap } from '@drarzter/kafka-client';

export interface OrdersTopicMap extends TTopicMessageMap {
  'order.created': {
    orderId: string;
    userId: string;
    amount: number;
  };
  'order.completed': {
    orderId: string;
    completedAt: string;
  };
}
// Minimal: plain interface or type — works just the same
export interface OrdersTopicMap {
  'order.created': { orderId: string; userId: string; amount: number };
  'order.completed': { orderId: string; completedAt: string };
}

// or
export type OrdersTopicMap = {
  'order.created': { orderId: string; userId: string; amount: number };
  'order.completed': { orderId: string; completedAt: string };
};

Alternative: topic() descriptors

Instead of a centralized topic map, define each topic as a standalone typed object:

import { topic, TopicsFrom } from '@drarzter/kafka-client';

export const OrderCreated = topic('order.created').type<{
  orderId: string;
  userId: string;
  amount: number;
}>();

export const OrderCompleted = topic('order.completed').type<{
  orderId: string;
  completedAt: string;
}>();

// Combine into a topic map for KafkaModule generics
export type OrdersTopicMap = TopicsFrom<typeof OrderCreated | typeof OrderCompleted>;

Topic descriptors work everywhere strings work — sendMessage, sendBatch, transaction, startConsumer, and @SubscribeTo():

// Sending
await kafka.sendMessage(OrderCreated, { orderId: '123', userId: '456', amount: 100 });
await kafka.sendBatch(OrderCreated, [{ value: { orderId: '1', userId: '10', amount: 50 } }]);

// Transactions
await kafka.transaction(async (tx) => {
  await tx.send(OrderCreated, { orderId: '123', userId: '456', amount: 100 });
});

// Consuming (decorator)
@SubscribeTo(OrderCreated)
async handleOrder(envelope: EventEnvelope<OrdersTopicMap['order.created']>) { ... }

// Consuming (imperative)
await kafka.startConsumer([OrderCreated], handler);

2. Register the module

import { KafkaModule } from '@drarzter/kafka-client';
import { OrdersTopicMap } from './orders.types';

@Module({
  imports: [
    KafkaModule.register<OrdersTopicMap>({
      clientId: 'my-service',
      groupId: 'my-consumer-group',
      brokers: ['localhost:9092'],
      autoCreateTopics: true, // auto-create topics on first use (dev mode)
    }),
  ],
})
export class OrdersModule {}

autoCreateTopics calls admin.createTopics() (idempotent — no-op if topic already exists) before the first send and before each startConsumer / startBatchConsumer call. librdkafka errors on unknown topics at subscribe time, so consumer-side creation is required. Useful in development, not recommended for production.

Or with ConfigService:

KafkaModule.registerAsync<OrdersTopicMap>({
  imports: [ConfigModule],
  inject: [ConfigService],
  useFactory: (config: ConfigService) => ({
    clientId: 'my-service',
    groupId: 'my-consumer-group',
    brokers: config.get<string>('KAFKA_BROKERS').split(','),
  }),
})

Global module

By default, KafkaModule is scoped — you need to import it in every module that uses @InjectKafkaClient(). Pass isGlobal: true to make the client available everywhere:

// app.module.ts — register once
KafkaModule.register<OrdersTopicMap>({
  clientId: 'my-service',
  groupId: 'my-consumer-group',
  brokers: ['localhost:9092'],
  isGlobal: true,
})

// any other module — no need to import KafkaModule
@Injectable()
export class SomeService {
  constructor(@InjectKafkaClient() private readonly kafka: KafkaClient<OrdersTopicMap>) {}
}

Works with registerAsync() too:

KafkaModule.registerAsync<OrdersTopicMap>({
  isGlobal: true,
  imports: [ConfigModule],
  inject: [ConfigService],
  useFactory: (config: ConfigService) => ({ ... }),
})

3. Inject and use

import { Injectable } from '@nestjs/common';
import { InjectKafkaClient, KafkaClient } from '@drarzter/kafka-client';
import { OrdersTopicMap } from './orders.types';

@Injectable()
export class OrdersService {
  constructor(
    @InjectKafkaClient()
    private readonly kafka: KafkaClient<OrdersTopicMap>,
  ) {}

  async createOrder() {
    await this.kafka.sendMessage('order.created', {
      orderId: '123',
      userId: '456',
      amount: 100,
    });
  }
}

Consuming messages

Three ways — choose what fits your style.

Declarative: @SubscribeTo()

import { Injectable } from '@nestjs/common';
import { SubscribeTo } from '@drarzter/kafka-client';

@Injectable()
export class OrdersHandler {
  @SubscribeTo('order.created')
  async handleOrderCreated(envelope: EventEnvelope<OrdersTopicMap['order.created']>) {
    console.log('New order:', envelope.payload.orderId);
  }

  @SubscribeTo('order.completed', { retry: { maxRetries: 3 }, dlq: true })
  async handleOrderCompleted(envelope: EventEnvelope<OrdersTopicMap['order.completed']>) {
    console.log('Order completed:', envelope.payload.orderId);
  }
}

The module auto-discovers @SubscribeTo() methods on startup and subscribes them.

Imperative: startConsumer()

@Injectable()
export class OrdersService implements OnModuleInit {
  constructor(
    @InjectKafkaClient()
    private readonly kafka: KafkaClient<OrdersTopicMap>,
  ) {}

  async onModuleInit() {
    await this.kafka.startConsumer(
      ['order.created', 'order.completed'],
      async (envelope) => {
        console.log(`${envelope.topic}:`, envelope.payload);
      },
      {
        retry: { maxRetries: 3, backoffMs: 1000 },
        dlq: true,
      },
    );
  }
}

Regex topic subscription

Subscribe to multiple topics matching a pattern — the broker dynamically routes any topic whose name matches the regex to this consumer:

// Subscribe to all topics starting with "orders."
await kafka.startConsumer([/^orders\..+/], handler);

// Mix regexes and literal strings
await kafka.startConsumer([/^payments\..+/, 'audit.global'], handler);

Works with startBatchConsumer and @SubscribeTo too:

@SubscribeTo(/^events\..+/)
async handleEvent(envelope: EventEnvelope<any>) { ... }

Limitation: retryTopics: true is incompatible with regex subscriptions — the library cannot derive static retry topic names from a pattern. An error is thrown at startup if both are combined.

Iterator: consume()

Stream messages from a single topic as an AsyncIterableIterator — useful for scripts, one-off tasks, or any context where you prefer for await over a callback:

for await (const envelope of kafka.consume('order.created')) {
  console.log('Order:', envelope.payload.orderId);
}

// Breaking out of the loop stops the consumer automatically
for await (const envelope of kafka.consume('order.created')) {
  if (envelope.payload.orderId === targetId) break;
}

consume() accepts the same ConsumerOptions as startConsumer():

for await (const envelope of kafka.consume('orders', {
  retry: { maxRetries: 3 },
  dlq: true,
  messageTtlMs: 60_000,
})) {
  await processOrder(envelope.payload);
}

break, return, or any early exit from the loop calls the iterator's return() method, which closes the internal queue and calls handle.stop() on the background consumer.

Backpressure — use queueHighWaterMark to prevent unbounded queue growth when processing is slower than the message rate:

for await (const envelope of kafka.consume('orders', {
  queueHighWaterMark: 100, // pause partition when queue reaches 100 messages
})) {
  await slowProcessing(envelope.payload); // resumes when queue drains below 50
}

The partition is paused when the internal queue reaches queueHighWaterMark and automatically resumed when it drains below 50%. Without this option the queue is unbounded.

Error propagation — if the consumer fails to start (e.g. broker unreachable), the error surfaces on the next next() / for await iteration rather than being silently swallowed.

Multiple consumer groups

Per-consumer groupId

Override the default consumer group for specific consumers. Each unique groupId creates a separate librdkafka Consumer internally:

// Default group from constructor
await kafka.startConsumer(['orders'], handler);

// Custom group — receives its own copy of messages
await kafka.startConsumer(['orders'], auditHandler, { groupId: 'orders-audit' });

// Works with @SubscribeTo too
@SubscribeTo('orders', { groupId: 'orders-audit' })
async auditOrders(envelope) { ... }

Important: You cannot mix eachMessage and eachBatch consumers on the same groupId, and you cannot call startConsumer (or startBatchConsumer) twice on the same groupId without stopping it first. The library throws a clear error in both cases:

Cannot use eachBatch on consumer group "my-group" — it is already running with eachMessage.
Use a different groupId for this consumer.

startConsumer("my-group") called twice — this group is already consuming.
Call stopConsumer("my-group") first or pass a different groupId.

Named clients

Register multiple named clients for different bounded contexts:

@Module({
  imports: [
    KafkaModule.register<OrdersTopicMap>({
      name: 'orders',
      clientId: 'orders-service',
      groupId: 'orders-consumer',
      brokers: ['localhost:9092'],
    }),
    KafkaModule.register<PaymentsTopicMap>({
      name: 'payments',
      clientId: 'payments-service',
      groupId: 'payments-consumer',
      brokers: ['localhost:9092'],
    }),
  ],
})
export class AppModule {}

Inject by name — the string in @InjectKafkaClient() must match the name from register():

@Injectable()
export class OrdersService {
  constructor(
    @InjectKafkaClient('orders')    // ← matches name: 'orders' above
    private readonly kafka: KafkaClient<OrdersTopicMap>,
  ) {}
}

Same with @SubscribeTo() — use clientName to target a specific named client:

@SubscribeTo('payment.received', { clientName: 'payments' })  // ← matches name: 'payments'
async handlePayment(envelope: EventEnvelope<PaymentsTopicMap['payment.received']>) {
  // ...
}

Partition key

Route all events for the same order to the same partition:

await this.kafka.sendMessage(
  'order.created',
  { orderId: '123', userId: '456', amount: 100 },
  { key: '123' },
);

Message headers

Attach metadata to messages:

await this.kafka.sendMessage(
  'order.created',
  { orderId: '123', userId: '456', amount: 100 },
  {
    key: '123',
    headers: { 'x-correlation-id': 'abc-def', 'x-source': 'api-gateway' },
  },
);

Headers work with batch sending too:

await this.kafka.sendBatch('order.created', [
  {
    value: { orderId: '1', userId: '10', amount: 50 },
    key: '1',
    headers: { 'x-correlation-id': 'req-1' },
  },
]);

Batch sending

await this.kafka.sendBatch('order.created', [
  { value: { orderId: '1', userId: '10', amount: 50 }, key: '1' },
  { value: { orderId: '2', userId: '20', amount: 75 }, key: '2' },
  { value: { orderId: '3', userId: '30', amount: 100 }, key: '3' },
]);

Batch consuming

Process messages in batches for higher throughput. The handler receives an array of EventEnvelopes and a BatchMeta object with offset management controls:

await this.kafka.startBatchConsumer(
  ['order.created'],
  async (envelopes, meta) => {
    // envelopes: EventEnvelope<OrdersTopicMap['order.created']>[]
    for (const env of envelopes) {
      await processOrder(env.payload);
      meta.resolveOffset(env.offset);

      // Call heartbeat() during long-running batch processing to prevent
      // the broker from considering the consumer dead (session.timeout.ms)
      await meta.heartbeat();
    }
    await meta.commitOffsetsIfNecessary();
  },
  { retry: { maxRetries: 3 }, dlq: true },
);

With autoCommit: false for full manual offset control:

await this.kafka.startBatchConsumer(
  ['order.created'],
  async (envelopes, meta) => {
    for (const env of envelopes) {
      await processOrder(env.payload);
      meta.resolveOffset(env.offset);
    }
    // commitOffsetsIfNecessary() commits only when autoCommit is off
    // or when the commit interval has elapsed
    await meta.commitOffsetsIfNecessary();
  },
  { autoCommit: false },
);

Note: If your handler calls resolveOffset() or commitOffsetsIfNecessary() without setting autoCommit: false, a debug message is logged at consumer-start time — mixing autoCommit with manual offset control causes offset conflicts. Set autoCommit: false to suppress the message and take full control of offset management.

With @SubscribeTo():

@SubscribeTo('order.created', { batch: true })
async handleOrders(envelopes: EventEnvelope<OrdersTopicMap['order.created']>[], meta: BatchMeta) {
  for (const env of envelopes) { ... }
}

Schema validation runs per-message — invalid messages are skipped (DLQ'd if enabled), valid ones are passed to the handler. Retry applies to the whole batch.

retryTopics: true is also supported on startBatchConsumer. On handler failure, each envelope in the batch is routed individually to <topic>.retry.1; the companion retry consumers call the batch handler one message at a time with a stub BatchMeta (no-op heartbeat/resolveOffset/commitOffsetsIfNecessary):

await kafka.startBatchConsumer(
  ['orders.created'],
  async (envelopes, meta) => { /* same handler */ },
  {
    retry: { maxRetries: 3, backoffMs: 1000 },
    dlq: true,
    retryTopics: true, // ← now supported for batch consumers too
  },
);

BatchMeta exposes:

| Property/Method | Description | | --------------- | ----------- | | partition | Partition number for this batch | | highWatermark | Latest offset in the partition (string). null when the message is replayed via a retry topic consumer — in that path the broker high-watermark is not available. Guard against null before computing lag | | heartbeat() | Send a heartbeat to keep the consumer session alive — call during long processing loops | | resolveOffset(offset) | Mark offset as processed (required before commitOffsetsIfNecessary) | | commitOffsetsIfNecessary() | Commit resolved offsets; respects autoCommit setting |

Tombstone messages

Send a null-value Kafka record to compact a specific key out of a log-compacted topic:

// Delete the record with key "user-123" from the log-compacted "users" topic
await kafka.sendTombstone('users', 'user-123');

// With custom headers
await kafka.sendTombstone('users', 'user-123', { 'x-reason': 'gdpr-deletion' });

sendTombstone skips envelope headers, schema validation, and the Lamport clock — the record value is literally null, as required by Kafka's log compaction protocol. Both beforeSend and afterSend instrumentation hooks still fire so tracing works correctly.

Compression

Reduce network bandwidth with per-send compression. Supported codecs: 'gzip', 'snappy', 'lz4', 'zstd':

import { CompressionType } from '@drarzter/kafka-client/core';

// Single message
await kafka.sendMessage('events', payload, { compression: 'gzip' });

// Batch
await kafka.sendBatch('events', messages, { compression: 'snappy' });

Compression is applied at the Kafka message-set level — the broker decompresses transparently on the consumer side. 'snappy' and 'lz4' offer the best throughput/CPU trade-off for most workloads; 'gzip' gives the highest compression ratio; 'zstd' balances both.

Transactions

Send multiple messages atomically with exactly-once semantics:

await this.kafka.transaction(async (tx) => {
  await tx.send('order.created', {
    orderId: '123',
    userId: '456',
    amount: 100,
  });
  await tx.send('order.completed', {
    orderId: '123',
    completedAt: new Date().toISOString(),
  });
  // if anything throws, all messages are rolled back
});

tx.sendBatch() is also available inside transactions:

await this.kafka.transaction(async (tx) => {
  await tx.sendBatch('order.created', [
    { value: { orderId: '1', userId: '10', amount: 50 }, key: '1' },
    { value: { orderId: '2', userId: '20', amount: 75 }, key: '2' },
  ]);
  // if anything throws, all messages are rolled back
});

Consumer interceptors

Add before/after/onError hooks to message processing. Interceptors receive the full EventEnvelope:

import { ConsumerInterceptor } from '@drarzter/kafka-client';

const loggingInterceptor: ConsumerInterceptor<OrdersTopicMap> = {
  before: (envelope) => {
    console.log(`Processing ${envelope.topic}`, envelope.payload);
  },
  after: (envelope) => {
    console.log(`Done ${envelope.topic}`);
  },
  onError: (envelope, error) => {
    console.error(`Failed ${envelope.topic}:`, error.message);
  },
};

await this.kafka.startConsumer(['order.created'], handler, {
  interceptors: [loggingInterceptor],
});

Multiple interceptors run in order. All hooks are optional.

Instrumentation

For client-wide cross-cutting concerns (tracing, metrics), use KafkaInstrumentation hooks instead of per-consumer interceptors:

import { otelInstrumentation } from '@drarzter/kafka-client/otel';

const kafka = new KafkaClient('my-app', 'my-group', brokers, {
  instrumentation: [otelInstrumentation()],
});

otelInstrumentation() injects traceparent on send, extracts it on consume, and creates CONSUMER spans automatically. The span is set as the active OTel context for the handler's duration via context.with() — so trace.getActiveSpan() works inside your handler and any child spans are automatically parented to the consume span. Requires @opentelemetry/api as a peer dependency.

Custom instrumentation

beforeConsume can return a BeforeConsumeResult — either the legacy () => void cleanup function, or an object with cleanup and/or wrap:

import { KafkaInstrumentation, BeforeConsumeResult } from '@drarzter/kafka-client';

const myInstrumentation: KafkaInstrumentation = {
  beforeSend(topic, headers) { /* inject headers, start timer */ },
  afterSend(topic) { /* record send latency */ },

  beforeConsume(envelope): BeforeConsumeResult {
    const span = startMySpan(envelope.topic);
    return {
      // cleanup() is called after the handler completes (success or error)
      cleanup() { span.end(); },
      // wrap(fn) runs the handler inside the desired async context
      // call fn() wherever you need it in the context scope
      wrap(fn) { return runWithSpanActive(span, fn); },
    };
  },

  onConsumeError(envelope, error) { /* record error metric */ },
};

The legacy () => void form is still fully supported — return a function directly if you only need cleanup:

beforeConsume(envelope) {
  const timer = startTimer();
  return () => timer.end(); // cleanup only, no context wrapping
},

BeforeConsumeResult is a union:

type BeforeConsumeResult =
  | (() => void)                     // legacy: cleanup only
  | { cleanup?(): void;              // called after handler (success or error)
      wrap?(fn: () => Promise<void>): Promise<void>; // wraps handler execution
    };

When multiple instrumentations each provide a wrap, they compose in declaration order — the first instrumentation's wrap is the outermost.

Lifecycle event hooks

Three additional hooks fire for specific events in the consume pipeline:

| Hook | When called | Arguments | | ---- | ----------- | --------- | | onMessage | Handler successfully processed a message | (envelope) — use as a success counter for error-rate calculations | | onRetry | A message is queued for another attempt (in-process backoff or routed to a retry topic) | (envelope, attempt, maxRetries) | | onDlq | A message is routed to the dead letter queue | (envelope, reason) — reason is 'handler-error', 'validation-error', or 'lamport-clock-duplicate' | | onDuplicate | A duplicate is detected via Lamport Clock | (envelope, strategy) — strategy is 'drop', 'dlq', or 'topic' |

const myInstrumentation: KafkaInstrumentation = {
  onMessage(envelope) {
    metrics.increment('kafka.processed', { topic: envelope.topic });
  },
  onRetry(envelope, attempt, maxRetries) {
    console.warn(`Retrying ${envelope.topic} — attempt ${attempt}/${maxRetries}`);
  },
  onDlq(envelope, reason) {
    alertingSystem.send({ topic: envelope.topic, reason });
  },
  onDuplicate(envelope, strategy) {
    metrics.increment('kafka.duplicate', { topic: envelope.topic, strategy });
  },
};

Built-in metrics

KafkaClient maintains lightweight in-process event counters independently of any instrumentation:

// Global snapshot — aggregate across all topics
const snapshot = kafka.getMetrics();
// { processedCount: number; retryCount: number; dlqCount: number; dedupCount: number }

// Per-topic snapshot
const orderMetrics = kafka.getMetrics('order.created');
// { processedCount: 5, retryCount: 1, dlqCount: 0, dedupCount: 0 }

kafka.resetMetrics();                // reset all counters
kafka.resetMetrics('order.created'); // reset only one topic's counters

Passing a topic name that has not seen any events returns a zero-valued snapshot — it never throws.

Counters are incremented in the same code paths that fire the corresponding hooks — they are always active regardless of whether any instrumentation is configured.

Options reference

Send options

Options for sendMessage() — the third argument:

| Option | Default | Description | | ------ | ------- | ----------- | | key | — | Partition key for message routing | | headers | — | Custom metadata headers (merged with auto-generated envelope headers) | | correlationId | auto | Override the auto-propagated correlation ID (default: inherited from ALS context or new UUID) | | schemaVersion | 1 | Schema version for the payload | | eventId | auto | Override the auto-generated event ID (UUID v4) | | compression | — | Compression codec for the message set: 'gzip', 'snappy', 'lz4', 'zstd'; omit to send uncompressed |

sendBatch() accepts compression as a top-level option (not per-message); all other options are per-message inside the array items.

Consumer options

| Option | Default | Description | | ------ | ------- | ----------- | | groupId | constructor value | Override consumer group for this subscription | | fromBeginning | false | Read from the beginning of the topic | | autoCommit | true | Auto-commit offsets | | retry.maxRetries | — | Number of retry attempts | | retry.backoffMs | 1000 | Base delay for exponential backoff in ms | | retry.maxBackoffMs | 30000 | Maximum delay cap for exponential backoff in ms | | dlq | false | Send to {topic}.dlq after all retries exhausted — message carries x-dlq-* metadata headers | | retryTopics | false | Route failed messages through per-level topics ({topic}.retry.1, {topic}.retry.2, …) instead of sleeping in-process; exactly-once routing semantics within the retry chain; requires retry (see Retry topic chain) | | interceptors | [] | Array of before/after/onError hooks | | retryTopicAssignmentTimeoutMs | 10000 | Timeout (ms) to wait for each retry level consumer to receive partition assignments after connecting; increase for slow brokers | | handlerTimeoutMs | — | Log a warning if the handler hasn't resolved within this window (ms) — does not cancel the handler | | deduplication.strategy | 'drop' | What to do with duplicate messages: 'drop' silently discards, 'dlq' forwards to {topic}.dlq (requires dlq: true), 'topic' forwards to {topic}.duplicates | | deduplication.duplicatesTopic | {topic}.duplicates | Custom destination for strategy: 'topic' | | messageTtlMs | — | Drop (or DLQ) messages older than this many milliseconds at consumption time; evaluated against the x-timestamp header; see Message TTL | | circuitBreaker | — | Enable circuit breaker with {} for zero-config defaults; requires dlq: true; see Circuit breaker | | circuitBreaker.threshold | 5 | DLQ failures within windowSize that opens the circuit | | circuitBreaker.recoveryMs | 30_000 | Milliseconds to wait in OPEN state before entering HALF_OPEN | | circuitBreaker.windowSize | threshold × 2, min 10 | Sliding window size in messages | | circuitBreaker.halfOpenSuccesses | 1 | Consecutive successes in HALF_OPEN required to close the circuit | | queueHighWaterMark | unbounded | Max messages buffered in the consume() iterator queue before the partition is paused; resumes at 50% drain. Only applies to consume() | | batch | false | (decorator only) Use startBatchConsumer instead of startConsumer | | partitionAssigner | 'cooperative-sticky' | Partition assignment strategy: 'cooperative-sticky' (minimal movement on rebalance, best for horizontal scaling), 'roundrobin' (even distribution), 'range' (contiguous partition ranges) | | onTtlExpired | — | Per-consumer override of the client-level onTtlExpired callback; takes precedence when set. Receives TtlExpiredContext — same shape as the client-level hook | | subscribeRetry.retries | 5 | Max attempts for consumer.subscribe() when topic doesn't exist yet | | subscribeRetry.backoffMs | 5000 | Delay between subscribe retry attempts (ms) |

Module options

Passed to KafkaModule.register() or returned from registerAsync() factory:

| Option | Default | Description | | ------ | ------- | ----------- | | clientId | — | Kafka client identifier (required) | | groupId | — | Default consumer group ID (required) | | brokers | — | Array of broker addresses (required) | | name | — | Named client identifier for multi-client setups | | isGlobal | false | Make the client available in all modules without re-importing | | autoCreateTopics | false | Auto-create topics on first send (dev only) | | numPartitions | 1 | Number of partitions for auto-created topics | | strictSchemas | true | Validate string topic keys against schemas registered via TopicDescriptor | | instrumentation | [] | Client-wide instrumentation hooks (e.g. OTel). Applied to both send and consume paths | | transactionalId | ${clientId}-tx | Transactional producer ID for transaction() calls. Must be unique per producer instance across the cluster — two instances sharing the same ID will be fenced by Kafka. The client logs a warning when the same ID is registered twice within one process | | onMessageLost | — | Called when a message is silently dropped without DLQ — use to alert, log to external systems, or trigger fallback logic | | onTtlExpired | — | Called when a message is dropped due to TTL expiration (messageTtlMs) and dlq is not enabled; receives { topic, ageMs, messageTtlMs, headers } | | onRebalance | — | Called on every partition assign/revoke event across all consumers created by this client |

Module-scoped (default) — import KafkaModule in each module that needs it:

// orders.module.ts
@Module({
  imports: [
    KafkaModule.register<OrdersTopicMap>({
      clientId: 'orders',
      groupId: 'orders-group',
      brokers: ['localhost:9092'],
    }),
  ],
})
export class OrdersModule {}

App-wide — register once in AppModule with isGlobal: true, inject anywhere:

// app.module.ts
@Module({
  imports: [
    KafkaModule.register<MyTopics>({
      clientId: 'my-app',
      groupId: 'my-group',
      brokers: ['localhost:9092'],
      isGlobal: true,
    }),
  ],
})
export class AppModule {}

// any module — no KafkaModule import needed
@Injectable()
export class PaymentService {
  constructor(@InjectKafkaClient() private readonly kafka: KafkaClient<MyTopics>) {}
}

Error classes

When a consumer message handler fails after all retries, the library throws typed error objects:

import { KafkaProcessingError, KafkaRetryExhaustedError } from '@drarzter/kafka-client';

KafkaProcessingError — base class for processing failures. Has topic, originalMessage, and supports cause:

const err = new KafkaProcessingError('handler failed', 'order.created', rawMessage, { cause: originalError });
err.topic;            // 'order.created'
err.originalMessage;  // the parsed message object
err.cause;            // the original error

KafkaRetryExhaustedError — thrown after all retries are exhausted. Extends KafkaProcessingError and adds attempts:

// In an onError interceptor:
const interceptor: ConsumerInterceptor<MyTopics> = {
  onError: (envelope, error) => {
    if (error instanceof KafkaRetryExhaustedError) {
      console.log(`Failed after ${error.attempts} attempts on ${error.topic}`);
      console.log('Last error:', error.cause);
    }
  },
};

When retry.maxRetries is set and all attempts fail, KafkaRetryExhaustedError is passed to onError interceptors automatically.

KafkaValidationError — thrown when schema validation fails on the consumer side. Has topic, originalMessage, and cause:

import { KafkaValidationError } from '@drarzter/kafka-client';

const interceptor: ConsumerInterceptor<MyTopics> = {
  onError: (envelope, error) => {
    if (error instanceof KafkaValidationError) {
      console.log(`Bad message on ${error.topic}:`, error.cause?.message);
    }
  },
};

Deduplication (Lamport Clock)

Every outgoing message produced by this library is stamped with a monotonically increasing logical clock — the x-lamport-clock header. The counter lives in the KafkaClient instance and increments by one per message (including individual messages inside sendBatch and transaction).

On the consumer side, enable deduplication by passing deduplication to startConsumer or startBatchConsumer. The library checks the incoming clock against the last processed value for that topic:partition combination and skips any message whose clock is not strictly greater.

await kafka.startConsumer(['orders.created'], handler, {
  deduplication: {}, // 'drop' strategy — silently discard duplicates
});

How duplicates happen

The most common scenario: a producer service restarts. Its in-memory clock resets to 0. The consumer already processed messages with clocks 1…N. All new messages from the restarted producer (clocks 1, 2, 3, …) have clocks ≤ N and are treated as duplicates.

Producer A (running): sends clock 1, 2, 3, 4, 5  → consumer processes all 5
Producer A (restarts): sends clock 1, 2, 3         → consumer sees 1 ≤ 5 — duplicate!

Strategies

| Strategy | Behaviour | | -------- | --------- | | 'drop' (default) | Log a warning and silently discard the message | | 'dlq' | Forward to {topic}.dlq with reason metadata headers (x-dlq-reason, x-dlq-duplicate-incoming-clock, x-dlq-duplicate-last-processed-clock). Requires dlq: true | | 'topic' | Forward to {topic}.duplicates (or duplicatesTopic if set) with reason metadata headers (x-duplicate-reason, x-duplicate-incoming-clock, x-duplicate-last-processed-clock, x-duplicate-detected-at) |

// Strategy: drop (default)
await kafka.startConsumer(['orders'], handler, {
  deduplication: {},
});

// Strategy: DLQ — inspect duplicates from {topic}.dlq
await kafka.startConsumer(['orders'], handler, {
  dlq: true,
  deduplication: { strategy: 'dlq' },
});

// Strategy: dedicated topic — consume from {topic}.duplicates
await kafka.startConsumer(['orders'], handler, {
  deduplication: { strategy: 'topic' },
});

// Strategy: custom topic name
await kafka.startConsumer(['orders'], handler, {
  deduplication: {
    strategy: 'topic',
    duplicatesTopic: 'ops.orders.duplicates',
  },
});

Startup validation

When autoCreateTopics: false and strategy: 'topic', startConsumer / startBatchConsumer validates that the destination topic ({topic}.duplicates or duplicatesTopic) exists before starting the consumer. A clear error is thrown at startup listing every missing topic, rather than silently failing on the first duplicate.

With autoCreateTopics: true the check is skipped — the topic is created automatically instead.

Backwards compatibility

Messages without an x-lamport-clock header pass through unchanged. Producers not using this library are unaffected.

Limitations

Deduplication state is in-memory and per-consumer-instance. Understand what that means:

  • Consumer restart — state is cleared on restart. The first batch of messages after restart is accepted regardless of their clock values, so duplicates spanning a restart window are not caught.
  • Multiple consumer instances (same group, different machines) — each instance tracks its own partition subset. Partitions are reassigned on rebalance, so a rebalance can reset the state for moved partitions.
  • Cross-session duplicates — this guards against duplicates from a producer that restarted within the same consumer session. For durable, cross-restart deduplication, persist the clock state externally (Redis, database) and implement idempotent handlers.

Use this feature as a lightweight first line of defence — not as a substitute for idempotent business logic.

Retry topic chain

tl;dr — recommended production setup:

await kafka.startConsumer(['orders.created'], handler, {
  retry: { maxRetries: 3, backoffMs: 1_000, maxBackoffMs: 30_000 },
  dlq: true,          // ← messages never silently disappear
  retryTopics: true,  // ← retries survive restarts; routing is exactly-once
});

Just retry + dlq: true is already safe for most workloads — failed messages land in {topic}.dlq after all retries and are never silently dropped. Add retryTopics: true for crash-durable retries and exactly-once routing guarantees within the retry chain.

| Configuration | What happens to a message that always fails | Process crash mid-retry | | --- | --- | --- | | retry only | Dropped — onMessageLost fires | Lost if crash between attempts | | retry + dlq | Lands in {topic}.dlq after all attempts | DLQ write may duplicate (rare) | | retry + dlq + retryTopics | Lands in {topic}.dlq after all attempts | Retries survive restarts; routing is exactly-once |

By default, retry is handled in-process: the consumer sleeps between attempts while holding the partition. With retryTopics: true, failed messages are routed through a chain of Kafka topics instead — one topic per retry level. A companion consumer auto-starts per level, waits for the scheduled delay using partition pause/resume, then calls the same handler.

Benefits over in-process retry:

  • Durable — retry messages survive a consumer restart; all routing (main → retry.1, level N → N+1, retry → DLQ) is exactly-once via Kafka transactions
  • Non-blocking — the original consumer is free immediately; each level consumer only pauses its specific partition during the delay window, so other partitions continue processing
  • Isolated — each retry level has its own consumer group, so a slow level 3 consumer never blocks a level 1 consumer
await kafka.startConsumer(['orders.created'], handler, {
  retry: { maxRetries: 3, backoffMs: 1000, maxBackoffMs: 30_000 },
  dlq: true,
  retryTopics: true,   // ← opt in
});

With maxRetries: 3, this creates three dedicated topics and three companion consumers:

orders.created.retry.1  →  consumer group: my-group-retry.1  (delay ~1 s)
orders.created.retry.2  →  consumer group: my-group-retry.2  (delay ~2 s)
orders.created.retry.3  →  consumer group: my-group-retry.3  (delay ~4 s)

Message flow with maxRetries: 2 and dlq: true:

orders.created       →  handler fails  →  orders.created.retry.1  (attempt 1, delay ~1 s)
orders.created.retry.1  →  handler fails  →  orders.created.retry.2  (attempt 2, delay ~2 s)
orders.created.retry.2  →  handler fails  →  orders.created.dlq

Each level consumer uses consumer.pause → sleep(remaining) → consumer.resume so the partition offset is never committed before the message is processed. On a process crash during sleep or handler execution, the message is redelivered on restart.

The retry topic messages carry scheduling headers (x-retry-attempt, x-retry-after, x-retry-original-topic, x-retry-max-retries) that each level consumer reads automatically — no manual configuration needed.

Delivery guarantee: the entire retry chain — including the main consumer → retry.1 boundary — is exactly-once. Every routing step (main → retry.1, retry.N → retry.N+1, retry.N → DLQ) is wrapped in a Kafka transaction via sendOffsetsToTransaction: the produce and the consumer offset commit happen atomically. A crash at any point rolls back the transaction: the message is redelivered and the routing is retried, with no duplicate in the next level. If the EOS transaction fails (broker unavailable), the offset stays uncommitted and the message is safely redelivered — it is never lost.

The standard Kafka at-least-once guarantee still applies at the handler level: if your handler succeeds but the process crashes before the manual offset commit completes, the message is redelivered to the handler. Design handlers to be idempotent.

Startup validation: retryTopics requires retry to be set — an error is thrown at startup if retry is missing. When autoCreateTopics: false, all {topic}.retry.N topics are validated to exist at startup and a clear error lists any missing ones. With autoCreateTopics: true the check is skipped — topics are created automatically by the ensureTopic path. Supported by both startConsumer and startBatchConsumer.

stopConsumer(groupId) automatically stops all companion retry level consumers started for that group.

stopConsumer

Stop all consumers or a specific group:

// Stop a specific consumer group
await kafka.stopConsumer('my-group');

// Stop all consumers
await kafka.stopConsumer();

stopConsumer(groupId) disconnects and removes only that group's consumer, leaving other groups running. Useful when you want to pause processing for a specific topic without restarting the whole client.

Pause and resume

Temporarily stop delivering messages from specific partitions without disconnecting the consumer:

// Pause partition 0 of 'orders' (default group)
kafka.pauseConsumer(undefined, [{ topic: 'orders', partitions: [0] }]);

// Resume it later
kafka.resumeConsumer(undefined, [{ topic: 'orders', partitions: [0] }]);

// Target a specific consumer group, multiple partitions
kafka.pauseConsumer('payments-group', [{ topic: 'payments', partitions: [0, 1] }]);

The first argument is the consumer group ID — pass undefined to target the default group. A warning is logged if the group is not found.

Pausing is non-destructive: the consumer stays connected and Kafka preserves the partition assignment for as long as the group session is alive. Messages accumulate in the topic and are delivered once the consumer resumes. Typical use: apply backpressure when a downstream dependency (e.g. a database) is temporarily overloaded.

Circuit breaker

Automatically pause delivery from a topic-partition when its DLQ error rate exceeds a threshold. After a recovery window the partition is resumed automatically.

dlq: true is required — the breaker counts DLQ events as failures. Without it no failures are recorded and the circuit never opens.

Zero-config start — all options have sensible defaults:

await kafka.startConsumer(['orders'], handler, {
  dlq: true,
  circuitBreaker: {},
});

Full config for fine-tuning:

await kafka.startConsumer(['orders'], handler, {
  dlq: true,
  circuitBreaker: {
    threshold: 10,         // open after 10 failures (default: 5)
    recoveryMs: 60_000,   // wait 60 s before probing (default: 30 s)
    windowSize: 50,        // track last 50 messages (default: threshold × 2, min 10)
    halfOpenSuccesses: 3,  // 3 successes to close (default: 1)
  },
});

State machine per ${groupId}:${topic}:${partition}:

| State | Behaviour | | ----- | --------- | | CLOSED (normal) | Messages delivered. Failures recorded in sliding window. Opens when failures ≥ threshold. | | OPEN | Partition paused via pauseConsumer. After recoveryMs ms transitions to HALF_OPEN. | | HALF_OPEN | Partition resumed. After halfOpenSuccesses consecutive successes the circuit closes. Any single failure immediately re-opens it. |

Successful onMessage completions count as successes. The retry topic path is not subject to the breaker — it has its own backoff and EOS guarantees.

Options:

| Option | Default | Description | | ------ | ------- | ----------- | | threshold | 5 | DLQ failures within windowSize that opens the circuit | | recoveryMs | 30_000 | Milliseconds to wait in OPEN state before entering HALF_OPEN | | windowSize | threshold × 2, min 10 | Sliding window size in messages | | halfOpenSuccesses | 1 | Consecutive successes in HALF_OPEN required to close the circuit |

getCircuitState

Inspect the current circuit breaker state for a partition — useful for health endpoints and dashboards:

const state = kafka.getCircuitState('orders', 0);
// undefined — circuit not configured or never tripped
// { status: 'closed', failures: 2, windowSize: 10 }
// { status: 'open',   failures: 5, windowSize: 10 }
// { status: 'half-open', failures: 0, windowSize: 0 }

// With explicit group ID:
const state = kafka.getCircuitState('orders', 0, 'payments-group');

Returns undefined when circuitBreaker is not configured for the group or the circuit has never been tripped (state is lazily initialised on the first DLQ event).

Instrumentation hooks — react to state transitions via KafkaInstrumentation:

const kafka = new KafkaClient('svc', 'group', brokers, {
  instrumentation: [{
    onCircuitOpen(topic, partition) {
      metrics.increment('circuit_open', { topic, partition });
    },
    onCircuitHalfOpen(topic, partition) {
      logger.log(`Circuit probing ${topic}[${partition}]`);
    },
    onCircuitClose(topic, partition) {
      metrics.increment('circuit_close', { topic, partition });
    },
  }],
});

Reset consumer offsets

Seek a consumer group's committed offsets to the beginning or end of a topic:

// Seek to the beginning — re-process all existing messages
await kafka.resetOffsets(undefined, 'orders', 'earliest');

// Seek to the end — skip existing messages, process only new ones
await kafka.resetOffsets(undefined, 'orders', 'latest');

// Target a specific consumer group
await kafka.resetOffsets('payments-group', 'orders', 'earliest');

Important: the consumer for the specified group must be stopped before calling resetOffsets. An error is thrown if the group is currently running — this prevents the reset from racing with an active offset commit.

Seek to offset

Seek individual topic-partitions to explicit offsets — useful when resetOffsets is too coarse and you need per-partition control:

// Seek partition 0 of 'orders' to offset 100, partition 1 to offset 200
await kafka.seekToOffset(undefined, [
  { topic: 'orders', partition: 0, offset: '100' },
  { topic: 'orders', partition: 1, offset: '200' },
]);

// Multiple topics in one call
await kafka.seekToOffset('payments-group', [
  { topic: 'payments', partition: 0, offset: '0' },
  { topic: 'refunds',  partition: 0, offset: '500' },
]);

The first argument is the consumer group ID — pass undefined to target the default group. Assignments are grouped by topic internally so each admin.setOffsets call covers all partitions of one topic.

Important: the consumer for the specified group must be stopped before calling seekToOffset. An error is thrown if the group is currently running.

Seek to timestamp

Seek partitions to the offset nearest to a specific point in time — useful for replaying events that occurred after a known incident or deployment:

const ts = new Date('2024-06-01T12:00:00Z').getTime(); // Unix ms

await kafka.seekToTimestamp(undefined, [
  { topic: 'orders', partition: 0, timestamp: ts },
  { topic: 'orders', partition: 1, timestamp: ts },
]);

// Multiple topics in one call
await kafka.seekToTimestamp('payments-group', [
  { topic: 'payments', partition: 0, timestamp: ts },
  { topic: 'refunds',  partition: 0, timestamp: ts },
]);

Uses admin.fetchTopicOffsetsByTime under the hood. If no offset exists at the requested timestamp (e.g. the partition is empty or the timestamp is in the future), the partition falls back to -1 (end of topic — new messages only).

Important: the consumer group must be stopped before seeking. Assignments for the same topic are batched into a single admin.setOffsets call.

Message TTL

Drop or route expired messages using messageTtlMs in ConsumerOptions:

await kafka.startConsumer(['orders'], handler, {
  messageTtlMs: 60_000, // drop messages older than 60 s
  dlq: true,            // route expired messages to DLQ instead of dropping
});

The TTL is evaluated against the x-timestamp header stamped on every outgoing message by the producer. Messages whose age at consumption time exceeds messageTtlMs are:

  • Routed to DLQ with x-dlq-reason: ttl-expired when dlq: true
  • Dropped (calling onTtlExpired if configured) otherwise

Typical use: prevent stale events from poisoning downstream systems after a consumer lag spike — e.g. discard order events or push notifications that are no longer actionable.

DLQ replay

Re-publish messages from a dead letter queue back to the original topic:

// Re-publish all messages from 'orders.dlq' → 'orders'
const result = await kafka.replayDlq('orders');
// { replayed: 42, skipped: 0 }

Options:

| Option | Default | Description | | ------ | ------- | ----------- | | targetTopic | x-dlq-original-topic header | Override the destination topic | | dryRun | false | Count messages without sending | | filter | — | (headers) => boolean — skip messages where the callback returns false |

// Dry run — see how many messages would be replayed
const dry = await kafka.replayDlq('orders', { dryRun: true });

// Route to a different topic
const result = await kafka.replayDlq('orders', { targetTopic: 'orders.v2' });

// Only replay messages with a specific correlation ID
const filtered = await kafka.replayDlq('orders', {
  filter: (headers) => headers['x-correlation-id'] === 'corr-123',
});

replayDlq creates a temporary consumer group that reads the DLQ topic up to the high-watermark at the time of the call — messages published after replay starts are not included. DLQ metadata headers (x-dlq-original-topic, x-dlq-error-message, x-dlq-error-stack, x-dlq-failed-at, x-dlq-attempt-count) are stripped from the replayed messages; all other headers (e.g. x-correlation-id) are preserved.

Read snapshot

Read any topic from the beginning to its current high-watermark and return a Map<key, EventEnvelope<T>> with the latest value per key. Useful for bootstrapping in-memory state at service startup without an external cache:

// Build a key → latest-value index for a compacted topic
const orders = await kafka.readSnapshot('orders.state');
orders.get('order-123'); // EventEnvelope with the latest payload for that key

Tombstone records (null-value messages) remove the key from the map, consistent with log-compaction semantics:

const snapshot = await kafka.readSnapshot('orders.state', {
  onTombstone: (key) => console.log(`Key deleted: ${key}`),
});

Optional schema validation skips invalid messages with a warning instead of throwing:

import { z } from 'zod';

const OrderSchema = z.object({ orderId: z.string(), amount: z.number() });

const snapshot = await kafka.readSnapshot('orders.state', {
  schema: OrderSchema,
});

readSnapshot uses a short-lived temporary consumer that is not registered in the client's consumer map — it disconnects as soon as all partitions reach their high-watermark. The call resolves with the complete snapshot; it does not stream.

| Option | Description | | ------ | ----------- | | schema | Zod / Valibot / ArkType (any .parse() shape) — invalid messages are skipped with a warning | | onTombstone | Called for each tombstone key before it is removed from the map |

Offset checkpointing

Save and restore consumer group offsets via a dedicated Kafka topic. Useful for point-in-time recovery, blue/green deployments, and disaster recovery without resetting to earliest/latest.

checkpointOffsets

Snapshot the current committed offsets of a consumer group into a Kafka topic:

// Checkpoint the default group
const result = await kafka.checkpointOffsets(undefined, 'checkpoints');
// {
//   groupId: 'orders-group',
//   topics: ['orders', 'payments'],
//   partitionCount: 4,
//   savedAt: 1710000000000,
// }

// Checkpoint a specific group
await kafka.checkpointOffsets('payments-group', 'checkpoints');

Each call appends a new record to the checkpoint topic keyed by groupId, with x-checkpoint-timestamp and x-checkpoint-group-id headers. The checkpoint topic acts as an append-only audit log — use a non-compacted topic to retain history.

Requires connectProducer() to have been called before checkpointing.

restoreFromCheckpoint

Restore a consumer group's committed offsets from the nearest checkpoint:

// Restore to the latest checkpoint
const result = await kafka.restoreFromCheckpoint(undefined, 'checkpoints');
// {
//   groupId: 'orders-group',
//   offsets: [{ topic: 'orders', partition: 0, offset: '1500' }, ...],
//   restoredAt: 1710000000000,
//   checkpointAge: 3600000, // ms since the checkpoint was saved
// }

// Restore to the nearest checkpoint before a specific timestamp
const ts = new Date('2024-06-01T12:00:00Z').getTime();
await kafka.restoreFromCheckpoint(undefined, 'checkpoints', { timestamp: ts });

Checkpoint selection rules:

  1. If timestamp is omitted — the latest checkpoint is selected.
  2. If timestamp is given — the newest checkpoint whose savedAt ≤ timestamp is selected.
  3. If all checkpoints are newer than timestamp — falls back to the oldest checkpoint with a warning.
  4. Throws if no checkpoint exists for the group.

Important: the consumer group must be stopped before calling restoreFromCheckpoint. An error is thrown if any consumer in the group is currently running.

restoreFromCheckpoint uses a short-lived temporary consumer to read all checkpoint records up to the current high-watermark, then calls admin.setOffsets for every topic-partition in the selected checkpoint.

| Option | Description | | ------ | ----------- | | timestamp | Target Unix ms. Omit to restore the latest checkpoint |

Windowed batch consumer

Accumulate messages into a buffer and flush a handler when either a size or time trigger fires — whichever comes first. Gives explicit control over both batch size and processing latency, unlike startBatchConsumer which delivers broker-sized batches of unpredictable size:

const handle = await kafka.startWindowConsumer(
  'orders',
  async (envelopes, meta) => {
    console.log(`Flushing ${envelopes.length} orders (trigger: ${meta.trigger})`);
    await db.bulkInsert(envelopes.map((e) => e.payload));
  },
  {
    maxMessages: 100,  // flush when 100 messages accumulate
    maxMs: 5_000,      // or after 5 s, whichever fires first
  },
);

WindowMeta is passed to the handler on every flush:

| Field | Description | | ----- | ----------- | | trigger | "size" — buffer reached maxMessages; "time"maxMs elapsed | | windowStart | Unix ms of the first message in the flushed window | | windowEnd | Unix ms when the flush was initiated |

On handle.stop() any buffered messages are flushed before the consumer disconnects — no messages are lost on clean shutdown.

retryTopics: true is rejected at startup with a clear error — the retry topic chain is incompatible with windowed accumulation.

| Option | Default | Description | | ------ | ------- | ----------- | | maxMessages | required | Flush when the buffer reaches this many messages | | maxMs | required | Flush after this many ms since the first buffered message | | All ConsumerOptions fields | — | Standard consumer options apply (retry, dlq, deduplication, etc.) |

Header-based routing

Dispatch messages to different handlers based on the value of a Kafka header — no if/switch boilerplate in a catch-all handler. Useful when one topic carries multiple event types distinguished by a header like x-event-type:

await kafka.startRoutedConsumer(['events'], {
  header: 'x-event-type',
  routes: {
    'order.created':   async (e) => handleOrderCreated(e.payload),
    'order.cancelled': async (e) => handleOrderCancelled(e.payload),
    'order.shipped':   async (e) => handleOrderShipped(e.payload),
  },
  fallback: async (e) => logger.warn('Unknown event type', e.headers),
});

Messages are dispatched to the handler whose key matches envelope.headers[header]. If the header is absent or its value has no matching route:

  • The fallback handler is called if provided.
  • The message is silently skipped if fallback is omitted.

All standard ConsumerOptions apply uniformly across every route — retry, DLQ, deduplication, circuit breaker, interceptors, etc.:

await kafka.startRoutedConsumer(
  ['events'],
  {
    header: 'x-event-type'