@drarzter/kafka-client
v0.8.0
Published
Type-safe Kafka client wrapper for NestJS with typed topic-message maps
Downloads
1,242
Maintainers
Readme
@drarzter/kafka-client
Type-safe Kafka client for Node.js. Framework-agnostic core with a first-class NestJS adapter. Built on top of @confluentinc/kafka-javascript (librdkafka).
Table of contents
- What is this?
- Why?
- Installation
- Standalone usage
- Quick start (NestJS)
- Usage
- Consuming messages
- Multiple consumer groups
- Partition key
- Message headers
- Batch sending
- Batch consuming
- Tombstone messages
- Compression
- Transactions
- Consumer interceptors
- Instrumentation
- Options reference
- Error classes
- Deduplication (Lamport Clock)
- Retry topic chain
- stopConsumer
- Pause and resume
- Circuit breaker
- Reset consumer offsets
- Seek to offset
- Seek to timestamp
- Message TTL
- DLQ replay
- Read snapshot
- Offset checkpointing
- Windowed batch consumer
- Header-based routing
- Lag-based producer throttling
- Transactional consumer
- Admin API
- Graceful shutdown
- Consumer handles
- onMessageLost
- onTtlExpired
- onRebalance
- Consumer lag
- Handler timeout warning
- Schema validation
- Health check
- Testing
- Project structure
What is this?
An opinionated, type-safe abstraction over @confluentinc/kafka-javascript (librdkafka). Works standalone (Express, Fastify, raw Node) or as a NestJS DynamicModule. Not a full-featured framework — just a clean, typed layer for producing and consuming Kafka messages.
This library exists so you don't have to think about:
- rebalance edge cases
- retry loops and backoff scheduling
- dead letter queue wiring
- transaction coordinator warmup
- graceful shutdown and offset commit pitfalls
- silent message loss
Safe by default. Configurable when you need it. Escape hatches for when you know what you're doing.
Why?
- Typed topics — you define a map of topic -> message shape, and the compiler won't let you send wrong data to wrong topic
- Topic descriptors —
topic()DX sugar lets you define topics as standalone typed objects instead of string keys - Framework-agnostic — use standalone or with NestJS (
register()/registerAsync(), DI, lifecycle hooks) - Idempotent producer —
acks: -1,idempotent: trueby default - Lamport Clock deduplication — every outgoing message is stamped with a monotonically increasing
x-lamport-clockheader; the consumer tracks the last processed value pertopic:partitionand silently drops (or routes to DLQ / a dedicated topic) any message whose clock is not strictly greater than the last seen value - Retry + DLQ — exponential backoff with full jitter; dead letter queue with error metadata headers (original topic, error message, stack, attempt count)
- Batch sending — send multiple messages in a single request
- Batch consuming —
startBatchConsumer()for high-throughputeachBatchprocessing - Partition key support — route related messages to the same partition
- Custom headers — attach metadata headers to messages
- Transactions — exactly-once semantics with
producer.transaction() - EventEnvelope — every consumed message is wrapped in
EventEnvelope<T>witheventId,correlationId,timestamp,schemaVersion,traceparent, and Kafka metadata - Correlation ID propagation — auto-generated on send, auto-propagated through
AsyncLocalStorageso nested sends inherit the same correlation ID - OpenTelemetry support —
@drarzter/kafka-client/otelentrypoint withotelInstrumentation()for W3C Trace Context propagation - Consumer interceptors — before/after/onError hooks with
EventEnvelopeaccess - Client-wide instrumentation —
KafkaInstrumentationhooks for cross-cutting concerns (tracing, metrics) - Auto-create topics —
autoCreateTopics: truefor dev mode — no need to pre-create topics - Error classes —
KafkaProcessingErrorandKafkaRetryExhaustedErrorwith topic, message, and attempt metadata - Health check — built-in health indicator for monitoring
- Multiple consumer groups — named clients for different bounded contexts
- Declarative & imperative — use
@SubscribeTo()decorator orstartConsumer()directly - Async iterator —
consume<K>()returns anAsyncIterableIterator<EventEnvelope<T[K]>>forfor awaitconsumption; breaking out of the loop stops the consumer automatically - Message TTL —
messageTtlMsdrops or DLQs messages older than a configurable threshold, preventing stale events from poisoning downstream systems after a lag spike - Circuit breaker —
circuitBreakeroption applies a sliding-window breaker per topic-partition; pauses delivery on repeated DLQ failures and resumes after a configurable recovery window - Seek to offset —
seekToOffset(groupId, assignments)seeks individual partitions to explicit offsets for fine-grained replay - Tombstone messages —
sendTombstone(topic, key)sends a null-value record to compact a key out of a log-compacted topic; all instrumentation hooks still fire - Regex topic subscription —
startConsumer([/^orders\..+/], handler)subscribes using a pattern; the broker routes matching topics to the consumer dynamically - Compression — per-send
compressionoption (gzip,snappy,lz4,zstd) inSendOptionsandBatchSendOptions - Partition assignment strategy —
partitionAssignerinConsumerOptionschooses betweencooperative-sticky(default),roundrobin, andrange - Admin API —
listConsumerGroups(),describeTopics(),deleteRecords()for group inspection, partition metadata, and message deletion
See the Roadmap for upcoming features and version history.
Installation
npm install @drarzter/kafka-client@confluentinc/kafka-javascript uses a native librdkafka addon. On most systems it builds automatically. For faster installs (skips compilation), install the system library first:
# Arch / CachyOS
sudo pacman -S librdkafka
# Debian / Ubuntu
sudo apt-get install librdkafka-dev
# macOS
brew install librdkafkaThen install with BUILD_LIBRDKAFKA=0 npm install.
For NestJS projects, install peer dependencies: @nestjs/common, @nestjs/core, reflect-metadata, rxjs.
For standalone usage (Express, Fastify, raw Node), no extra dependencies needed — import from @drarzter/kafka-client/core.
Standalone usage (no NestJS)
import { KafkaClient, topic } from '@drarzter/kafka-client/core';
const OrderCreated = topic('order.created').type<{ orderId: string; amount: number }>();
const kafka = new KafkaClient('my-app', 'my-group', ['localhost:9092']);
await kafka.connectProducer();
// Send
await kafka.sendMessage(OrderCreated, { orderId: '123', amount: 100 });
// Consume — handler receives an EventEnvelope
await kafka.startConsumer([OrderCreated], async (envelope) => {
console.log(`${envelope.topic}:`, envelope.payload.orderId);
});
// Custom logger (winston, pino, etc.)
const kafka2 = new KafkaClient('my-app', 'my-group', ['localhost:9092'], {
logger: myWinstonLogger,
});
// All module options work in standalone mode too
const kafka3 = new KafkaClient('my-app', 'my-group', ['localhost:9092'], {
autoCreateTopics: true, // auto-create topics on first use
numPartitions: 3, // partitions for auto-created topics
strictSchemas: false, // disable schema enforcement for string topic keys
instrumentation: [...], // client-wide tracing/metrics hooks
});
// Health check — available directly, no NestJS needed
const status = await kafka.checkStatus();
// { status: 'up', clientId: 'my-app', topics: ['order.created', ...] }
// Stop all consumers without disconnecting the producer or admin
// Useful when you want to re-subscribe with different options
await kafka.stopConsumer();Quick start (NestJS)
Send and receive a message in 3 files:
// types.ts
export interface MyTopics {
'hello': { text: string };
}// app.module.ts
import { Module } from '@nestjs/common';
import { KafkaModule } from '@drarzter/kafka-client';
import { MyTopics } from './types';
import { AppService } from './app.service';
@Module({
imports: [
KafkaModule.register<MyTopics>({
clientId: 'my-app',
groupId: 'my-group',
brokers: ['localhost:9092'],
}),
],
providers: [AppService],
})
export class AppModule {}// app.service.ts
import { Injectable } from '@nestjs/common';
import { InjectKafkaClient, KafkaClient, SubscribeTo, EventEnvelope } from '@drarzter/kafka-client';
import { MyTopics } from './types';
@Injectable()
export class AppService {
constructor(
@InjectKafkaClient() private readonly kafka: KafkaClient<MyTopics>,
) {}
async send() {
await this.kafka.sendMessage('hello', { text: 'Hello, Kafka!' });
}
@SubscribeTo('hello')
async onHello(envelope: EventEnvelope<MyTopics['hello']>) {
console.log('Received:', envelope.payload.text);
}
}Usage
1. Define your topic map
Both interface and type work — pick whichever you prefer:
// Explicit: extends TTopicMessageMap — IDE hints that values must be Record<string, any>
import { TTopicMessageMap } from '@drarzter/kafka-client';
export interface OrdersTopicMap extends TTopicMessageMap {
'order.created': {
orderId: string;
userId: string;
amount: number;
};
'order.completed': {
orderId: string;
completedAt: string;
};
}// Minimal: plain interface or type — works just the same
export interface OrdersTopicMap {
'order.created': { orderId: string; userId: string; amount: number };
'order.completed': { orderId: string; completedAt: string };
}
// or
export type OrdersTopicMap = {
'order.created': { orderId: string; userId: string; amount: number };
'order.completed': { orderId: string; completedAt: string };
};Alternative: topic() descriptors
Instead of a centralized topic map, define each topic as a standalone typed object:
import { topic, TopicsFrom } from '@drarzter/kafka-client';
export const OrderCreated = topic('order.created').type<{
orderId: string;
userId: string;
amount: number;
}>();
export const OrderCompleted = topic('order.completed').type<{
orderId: string;
completedAt: string;
}>();
// Combine into a topic map for KafkaModule generics
export type OrdersTopicMap = TopicsFrom<typeof OrderCreated | typeof OrderCompleted>;Topic descriptors work everywhere strings work — sendMessage, sendBatch, transaction, startConsumer, and @SubscribeTo():
// Sending
await kafka.sendMessage(OrderCreated, { orderId: '123', userId: '456', amount: 100 });
await kafka.sendBatch(OrderCreated, [{ value: { orderId: '1', userId: '10', amount: 50 } }]);
// Transactions
await kafka.transaction(async (tx) => {
await tx.send(OrderCreated, { orderId: '123', userId: '456', amount: 100 });
});
// Consuming (decorator)
@SubscribeTo(OrderCreated)
async handleOrder(envelope: EventEnvelope<OrdersTopicMap['order.created']>) { ... }
// Consuming (imperative)
await kafka.startConsumer([OrderCreated], handler);2. Register the module
import { KafkaModule } from '@drarzter/kafka-client';
import { OrdersTopicMap } from './orders.types';
@Module({
imports: [
KafkaModule.register<OrdersTopicMap>({
clientId: 'my-service',
groupId: 'my-consumer-group',
brokers: ['localhost:9092'],
autoCreateTopics: true, // auto-create topics on first use (dev mode)
}),
],
})
export class OrdersModule {}autoCreateTopics calls admin.createTopics() (idempotent — no-op if topic already exists) before the first send and before each startConsumer / startBatchConsumer call. librdkafka errors on unknown topics at subscribe time, so consumer-side creation is required. Useful in development, not recommended for production.
Or with ConfigService:
KafkaModule.registerAsync<OrdersTopicMap>({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
clientId: 'my-service',
groupId: 'my-consumer-group',
brokers: config.get<string>('KAFKA_BROKERS').split(','),
}),
})Global module
By default, KafkaModule is scoped — you need to import it in every module that uses @InjectKafkaClient(). Pass isGlobal: true to make the client available everywhere:
// app.module.ts — register once
KafkaModule.register<OrdersTopicMap>({
clientId: 'my-service',
groupId: 'my-consumer-group',
brokers: ['localhost:9092'],
isGlobal: true,
})
// any other module — no need to import KafkaModule
@Injectable()
export class SomeService {
constructor(@InjectKafkaClient() private readonly kafka: KafkaClient<OrdersTopicMap>) {}
}Works with registerAsync() too:
KafkaModule.registerAsync<OrdersTopicMap>({
isGlobal: true,
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => ({ ... }),
})3. Inject and use
import { Injectable } from '@nestjs/common';
import { InjectKafkaClient, KafkaClient } from '@drarzter/kafka-client';
import { OrdersTopicMap } from './orders.types';
@Injectable()
export class OrdersService {
constructor(
@InjectKafkaClient()
private readonly kafka: KafkaClient<OrdersTopicMap>,
) {}
async createOrder() {
await this.kafka.sendMessage('order.created', {
orderId: '123',
userId: '456',
amount: 100,
});
}
}Consuming messages
Three ways — choose what fits your style.
Declarative: @SubscribeTo()
import { Injectable } from '@nestjs/common';
import { SubscribeTo } from '@drarzter/kafka-client';
@Injectable()
export class OrdersHandler {
@SubscribeTo('order.created')
async handleOrderCreated(envelope: EventEnvelope<OrdersTopicMap['order.created']>) {
console.log('New order:', envelope.payload.orderId);
}
@SubscribeTo('order.completed', { retry: { maxRetries: 3 }, dlq: true })
async handleOrderCompleted(envelope: EventEnvelope<OrdersTopicMap['order.completed']>) {
console.log('Order completed:', envelope.payload.orderId);
}
}The module auto-discovers @SubscribeTo() methods on startup and subscribes them.
Imperative: startConsumer()
@Injectable()
export class OrdersService implements OnModuleInit {
constructor(
@InjectKafkaClient()
private readonly kafka: KafkaClient<OrdersTopicMap>,
) {}
async onModuleInit() {
await this.kafka.startConsumer(
['order.created', 'order.completed'],
async (envelope) => {
console.log(`${envelope.topic}:`, envelope.payload);
},
{
retry: { maxRetries: 3, backoffMs: 1000 },
dlq: true,
},
);
}
}Regex topic subscription
Subscribe to multiple topics matching a pattern — the broker dynamically routes any topic whose name matches the regex to this consumer:
// Subscribe to all topics starting with "orders."
await kafka.startConsumer([/^orders\..+/], handler);
// Mix regexes and literal strings
await kafka.startConsumer([/^payments\..+/, 'audit.global'], handler);Works with startBatchConsumer and @SubscribeTo too:
@SubscribeTo(/^events\..+/)
async handleEvent(envelope: EventEnvelope<any>) { ... }Limitation:
retryTopics: trueis incompatible with regex subscriptions — the library cannot derive static retry topic names from a pattern. An error is thrown at startup if both are combined.
Iterator: consume()
Stream messages from a single topic as an AsyncIterableIterator — useful for scripts, one-off tasks, or any context where you prefer for await over a callback:
for await (const envelope of kafka.consume('order.created')) {
console.log('Order:', envelope.payload.orderId);
}
// Breaking out of the loop stops the consumer automatically
for await (const envelope of kafka.consume('order.created')) {
if (envelope.payload.orderId === targetId) break;
}consume() accepts the same ConsumerOptions as startConsumer():
for await (const envelope of kafka.consume('orders', {
retry: { maxRetries: 3 },
dlq: true,
messageTtlMs: 60_000,
})) {
await processOrder(envelope.payload);
}break, return, or any early exit from the loop calls the iterator's return() method, which closes the internal queue and calls handle.stop() on the background consumer.
Backpressure — use queueHighWaterMark to prevent unbounded queue growth when processing is slower than the message rate:
for await (const envelope of kafka.consume('orders', {
queueHighWaterMark: 100, // pause partition when queue reaches 100 messages
})) {
await slowProcessing(envelope.payload); // resumes when queue drains below 50
}The partition is paused when the internal queue reaches queueHighWaterMark and automatically resumed when it drains below 50%. Without this option the queue is unbounded.
Error propagation — if the consumer fails to start (e.g. broker unreachable), the error surfaces on the next next() / for await iteration rather than being silently swallowed.
Multiple consumer groups
Per-consumer groupId
Override the default consumer group for specific consumers. Each unique groupId creates a separate librdkafka Consumer internally:
// Default group from constructor
await kafka.startConsumer(['orders'], handler);
// Custom group — receives its own copy of messages
await kafka.startConsumer(['orders'], auditHandler, { groupId: 'orders-audit' });
// Works with @SubscribeTo too
@SubscribeTo('orders', { groupId: 'orders-audit' })
async auditOrders(envelope) { ... }Important: You cannot mix eachMessage and eachBatch consumers on the same groupId, and you cannot call startConsumer (or startBatchConsumer) twice on the same groupId without stopping it first. The library throws a clear error in both cases:
Cannot use eachBatch on consumer group "my-group" — it is already running with eachMessage.
Use a different groupId for this consumer.
startConsumer("my-group") called twice — this group is already consuming.
Call stopConsumer("my-group") first or pass a different groupId.Named clients
Register multiple named clients for different bounded contexts:
@Module({
imports: [
KafkaModule.register<OrdersTopicMap>({
name: 'orders',
clientId: 'orders-service',
groupId: 'orders-consumer',
brokers: ['localhost:9092'],
}),
KafkaModule.register<PaymentsTopicMap>({
name: 'payments',
clientId: 'payments-service',
groupId: 'payments-consumer',
brokers: ['localhost:9092'],
}),
],
})
export class AppModule {}Inject by name — the string in @InjectKafkaClient() must match the name from register():
@Injectable()
export class OrdersService {
constructor(
@InjectKafkaClient('orders') // ← matches name: 'orders' above
private readonly kafka: KafkaClient<OrdersTopicMap>,
) {}
}Same with @SubscribeTo() — use clientName to target a specific named client:
@SubscribeTo('payment.received', { clientName: 'payments' }) // ← matches name: 'payments'
async handlePayment(envelope: EventEnvelope<PaymentsTopicMap['payment.received']>) {
// ...
}Partition key
Route all events for the same order to the same partition:
await this.kafka.sendMessage(
'order.created',
{ orderId: '123', userId: '456', amount: 100 },
{ key: '123' },
);Message headers
Attach metadata to messages:
await this.kafka.sendMessage(
'order.created',
{ orderId: '123', userId: '456', amount: 100 },
{
key: '123',
headers: { 'x-correlation-id': 'abc-def', 'x-source': 'api-gateway' },
},
);Headers work with batch sending too:
await this.kafka.sendBatch('order.created', [
{
value: { orderId: '1', userId: '10', amount: 50 },
key: '1',
headers: { 'x-correlation-id': 'req-1' },
},
]);Batch sending
await this.kafka.sendBatch('order.created', [
{ value: { orderId: '1', userId: '10', amount: 50 }, key: '1' },
{ value: { orderId: '2', userId: '20', amount: 75 }, key: '2' },
{ value: { orderId: '3', userId: '30', amount: 100 }, key: '3' },
]);Batch consuming
Process messages in batches for higher throughput. The handler receives an array of EventEnvelopes and a BatchMeta object with offset management controls:
await this.kafka.startBatchConsumer(
['order.created'],
async (envelopes, meta) => {
// envelopes: EventEnvelope<OrdersTopicMap['order.created']>[]
for (const env of envelopes) {
await processOrder(env.payload);
meta.resolveOffset(env.offset);
// Call heartbeat() during long-running batch processing to prevent
// the broker from considering the consumer dead (session.timeout.ms)
await meta.heartbeat();
}
await meta.commitOffsetsIfNecessary();
},
{ retry: { maxRetries: 3 }, dlq: true },
);With autoCommit: false for full manual offset control:
await this.kafka.startBatchConsumer(
['order.created'],
async (envelopes, meta) => {
for (const env of envelopes) {
await processOrder(env.payload);
meta.resolveOffset(env.offset);
}
// commitOffsetsIfNecessary() commits only when autoCommit is off
// or when the commit interval has elapsed
await meta.commitOffsetsIfNecessary();
},
{ autoCommit: false },
);Note: If your handler calls
resolveOffset()orcommitOffsetsIfNecessary()without settingautoCommit: false, adebugmessage is logged at consumer-start time — mixing autoCommit with manual offset control causes offset conflicts. SetautoCommit: falseto suppress the message and take full control of offset management.
With @SubscribeTo():
@SubscribeTo('order.created', { batch: true })
async handleOrders(envelopes: EventEnvelope<OrdersTopicMap['order.created']>[], meta: BatchMeta) {
for (const env of envelopes) { ... }
}Schema validation runs per-message — invalid messages are skipped (DLQ'd if enabled), valid ones are passed to the handler. Retry applies to the whole batch.
retryTopics: true is also supported on startBatchConsumer. On handler failure, each envelope in the batch is routed individually to <topic>.retry.1; the companion retry consumers call the batch handler one message at a time with a stub BatchMeta (no-op heartbeat/resolveOffset/commitOffsetsIfNecessary):
await kafka.startBatchConsumer(
['orders.created'],
async (envelopes, meta) => { /* same handler */ },
{
retry: { maxRetries: 3, backoffMs: 1000 },
dlq: true,
retryTopics: true, // ← now supported for batch consumers too
},
);BatchMeta exposes:
| Property/Method | Description |
| --------------- | ----------- |
| partition | Partition number for this batch |
| highWatermark | Latest offset in the partition (string). null when the message is replayed via a retry topic consumer — in that path the broker high-watermark is not available. Guard against null before computing lag |
| heartbeat() | Send a heartbeat to keep the consumer session alive — call during long processing loops |
| resolveOffset(offset) | Mark offset as processed (required before commitOffsetsIfNecessary) |
| commitOffsetsIfNecessary() | Commit resolved offsets; respects autoCommit setting |
Tombstone messages
Send a null-value Kafka record to compact a specific key out of a log-compacted topic:
// Delete the record with key "user-123" from the log-compacted "users" topic
await kafka.sendTombstone('users', 'user-123');
// With custom headers
await kafka.sendTombstone('users', 'user-123', { 'x-reason': 'gdpr-deletion' });sendTombstone skips envelope headers, schema validation, and the Lamport clock — the record value is literally null, as required by Kafka's log compaction protocol. Both beforeSend and afterSend instrumentation hooks still fire so tracing works correctly.
Compression
Reduce network bandwidth with per-send compression. Supported codecs: 'gzip', 'snappy', 'lz4', 'zstd':
import { CompressionType } from '@drarzter/kafka-client/core';
// Single message
await kafka.sendMessage('events', payload, { compression: 'gzip' });
// Batch
await kafka.sendBatch('events', messages, { compression: 'snappy' });Compression is applied at the Kafka message-set level — the broker decompresses transparently on the consumer side. 'snappy' and 'lz4' offer the best throughput/CPU trade-off for most workloads; 'gzip' gives the highest compression ratio; 'zstd' balances both.
Transactions
Send multiple messages atomically with exactly-once semantics:
await this.kafka.transaction(async (tx) => {
await tx.send('order.created', {
orderId: '123',
userId: '456',
amount: 100,
});
await tx.send('order.completed', {
orderId: '123',
completedAt: new Date().toISOString(),
});
// if anything throws, all messages are rolled back
});tx.sendBatch() is also available inside transactions:
await this.kafka.transaction(async (tx) => {
await tx.sendBatch('order.created', [
{ value: { orderId: '1', userId: '10', amount: 50 }, key: '1' },
{ value: { orderId: '2', userId: '20', amount: 75 }, key: '2' },
]);
// if anything throws, all messages are rolled back
});Consumer interceptors
Add before/after/onError hooks to message processing. Interceptors receive the full EventEnvelope:
import { ConsumerInterceptor } from '@drarzter/kafka-client';
const loggingInterceptor: ConsumerInterceptor<OrdersTopicMap> = {
before: (envelope) => {
console.log(`Processing ${envelope.topic}`, envelope.payload);
},
after: (envelope) => {
console.log(`Done ${envelope.topic}`);
},
onError: (envelope, error) => {
console.error(`Failed ${envelope.topic}:`, error.message);
},
};
await this.kafka.startConsumer(['order.created'], handler, {
interceptors: [loggingInterceptor],
});Multiple interceptors run in order. All hooks are optional.
Instrumentation
For client-wide cross-cutting concerns (tracing, metrics), use KafkaInstrumentation hooks instead of per-consumer interceptors:
import { otelInstrumentation } from '@drarzter/kafka-client/otel';
const kafka = new KafkaClient('my-app', 'my-group', brokers, {
instrumentation: [otelInstrumentation()],
});otelInstrumentation() injects traceparent on send, extracts it on consume, and creates CONSUMER spans automatically. The span is set as the active OTel context for the handler's duration via context.with() — so trace.getActiveSpan() works inside your handler and any child spans are automatically parented to the consume span. Requires @opentelemetry/api as a peer dependency.
Custom instrumentation
beforeConsume can return a BeforeConsumeResult — either the legacy () => void cleanup function, or an object with cleanup and/or wrap:
import { KafkaInstrumentation, BeforeConsumeResult } from '@drarzter/kafka-client';
const myInstrumentation: KafkaInstrumentation = {
beforeSend(topic, headers) { /* inject headers, start timer */ },
afterSend(topic) { /* record send latency */ },
beforeConsume(envelope): BeforeConsumeResult {
const span = startMySpan(envelope.topic);
return {
// cleanup() is called after the handler completes (success or error)
cleanup() { span.end(); },
// wrap(fn) runs the handler inside the desired async context
// call fn() wherever you need it in the context scope
wrap(fn) { return runWithSpanActive(span, fn); },
};
},
onConsumeError(envelope, error) { /* record error metric */ },
};The legacy () => void form is still fully supported — return a function directly if you only need cleanup:
beforeConsume(envelope) {
const timer = startTimer();
return () => timer.end(); // cleanup only, no context wrapping
},BeforeConsumeResult is a union:
type BeforeConsumeResult =
| (() => void) // legacy: cleanup only
| { cleanup?(): void; // called after handler (success or error)
wrap?(fn: () => Promise<void>): Promise<void>; // wraps handler execution
};When multiple instrumentations each provide a wrap, they compose in declaration order — the first instrumentation's wrap is the outermost.
Lifecycle event hooks
Three additional hooks fire for specific events in the consume pipeline:
| Hook | When called | Arguments |
| ---- | ----------- | --------- |
| onMessage | Handler successfully processed a message | (envelope) — use as a success counter for error-rate calculations |
| onRetry | A message is queued for another attempt (in-process backoff or routed to a retry topic) | (envelope, attempt, maxRetries) |
| onDlq | A message is routed to the dead letter queue | (envelope, reason) — reason is 'handler-error', 'validation-error', or 'lamport-clock-duplicate' |
| onDuplicate | A duplicate is detected via Lamport Clock | (envelope, strategy) — strategy is 'drop', 'dlq', or 'topic' |
const myInstrumentation: KafkaInstrumentation = {
onMessage(envelope) {
metrics.increment('kafka.processed', { topic: envelope.topic });
},
onRetry(envelope, attempt, maxRetries) {
console.warn(`Retrying ${envelope.topic} — attempt ${attempt}/${maxRetries}`);
},
onDlq(envelope, reason) {
alertingSystem.send({ topic: envelope.topic, reason });
},
onDuplicate(envelope, strategy) {
metrics.increment('kafka.duplicate', { topic: envelope.topic, strategy });
},
};Built-in metrics
KafkaClient maintains lightweight in-process event counters independently of any instrumentation:
// Global snapshot — aggregate across all topics
const snapshot = kafka.getMetrics();
// { processedCount: number; retryCount: number; dlqCount: number; dedupCount: number }
// Per-topic snapshot
const orderMetrics = kafka.getMetrics('order.created');
// { processedCount: 5, retryCount: 1, dlqCount: 0, dedupCount: 0 }
kafka.resetMetrics(); // reset all counters
kafka.resetMetrics('order.created'); // reset only one topic's countersPassing a topic name that has not seen any events returns a zero-valued snapshot — it never throws.
Counters are incremented in the same code paths that fire the corresponding hooks — they are always active regardless of whether any instrumentation is configured.
Options reference
Send options
Options for sendMessage() — the third argument:
| Option | Default | Description |
| ------ | ------- | ----------- |
| key | — | Partition key for message routing |
| headers | — | Custom metadata headers (merged with auto-generated envelope headers) |
| correlationId | auto | Override the auto-propagated correlation ID (default: inherited from ALS context or new UUID) |
| schemaVersion | 1 | Schema version for the payload |
| eventId | auto | Override the auto-generated event ID (UUID v4) |
| compression | — | Compression codec for the message set: 'gzip', 'snappy', 'lz4', 'zstd'; omit to send uncompressed |
sendBatch() accepts compression as a top-level option (not per-message); all other options are per-message inside the array items.
Consumer options
| Option | Default | Description |
| ------ | ------- | ----------- |
| groupId | constructor value | Override consumer group for this subscription |
| fromBeginning | false | Read from the beginning of the topic |
| autoCommit | true | Auto-commit offsets |
| retry.maxRetries | — | Number of retry attempts |
| retry.backoffMs | 1000 | Base delay for exponential backoff in ms |
| retry.maxBackoffMs | 30000 | Maximum delay cap for exponential backoff in ms |
| dlq | false | Send to {topic}.dlq after all retries exhausted — message carries x-dlq-* metadata headers |
| retryTopics | false | Route failed messages through per-level topics ({topic}.retry.1, {topic}.retry.2, …) instead of sleeping in-process; exactly-once routing semantics within the retry chain; requires retry (see Retry topic chain) |
| interceptors | [] | Array of before/after/onError hooks |
| retryTopicAssignmentTimeoutMs | 10000 | Timeout (ms) to wait for each retry level consumer to receive partition assignments after connecting; increase for slow brokers |
| handlerTimeoutMs | — | Log a warning if the handler hasn't resolved within this window (ms) — does not cancel the handler |
| deduplication.strategy | 'drop' | What to do with duplicate messages: 'drop' silently discards, 'dlq' forwards to {topic}.dlq (requires dlq: true), 'topic' forwards to {topic}.duplicates |
| deduplication.duplicatesTopic | {topic}.duplicates | Custom destination for strategy: 'topic' |
| messageTtlMs | — | Drop (or DLQ) messages older than this many milliseconds at consumption time; evaluated against the x-timestamp header; see Message TTL |
| circuitBreaker | — | Enable circuit breaker with {} for zero-config defaults; requires dlq: true; see Circuit breaker |
| circuitBreaker.threshold | 5 | DLQ failures within windowSize that opens the circuit |
| circuitBreaker.recoveryMs | 30_000 | Milliseconds to wait in OPEN state before entering HALF_OPEN |
| circuitBreaker.windowSize | threshold × 2, min 10 | Sliding window size in messages |
| circuitBreaker.halfOpenSuccesses | 1 | Consecutive successes in HALF_OPEN required to close the circuit |
| queueHighWaterMark | unbounded | Max messages buffered in the consume() iterator queue before the partition is paused; resumes at 50% drain. Only applies to consume() |
| batch | false | (decorator only) Use startBatchConsumer instead of startConsumer |
| partitionAssigner | 'cooperative-sticky' | Partition assignment strategy: 'cooperative-sticky' (minimal movement on rebalance, best for horizontal scaling), 'roundrobin' (even distribution), 'range' (contiguous partition ranges) |
| onTtlExpired | — | Per-consumer override of the client-level onTtlExpired callback; takes precedence when set. Receives TtlExpiredContext — same shape as the client-level hook |
| subscribeRetry.retries | 5 | Max attempts for consumer.subscribe() when topic doesn't exist yet |
| subscribeRetry.backoffMs | 5000 | Delay between subscribe retry attempts (ms) |
Module options
Passed to KafkaModule.register() or returned from registerAsync() factory:
| Option | Default | Description |
| ------ | ------- | ----------- |
| clientId | — | Kafka client identifier (required) |
| groupId | — | Default consumer group ID (required) |
| brokers | — | Array of broker addresses (required) |
| name | — | Named client identifier for multi-client setups |
| isGlobal | false | Make the client available in all modules without re-importing |
| autoCreateTopics | false | Auto-create topics on first send (dev only) |
| numPartitions | 1 | Number of partitions for auto-created topics |
| strictSchemas | true | Validate string topic keys against schemas registered via TopicDescriptor |
| instrumentation | [] | Client-wide instrumentation hooks (e.g. OTel). Applied to both send and consume paths |
| transactionalId | ${clientId}-tx | Transactional producer ID for transaction() calls. Must be unique per producer instance across the cluster — two instances sharing the same ID will be fenced by Kafka. The client logs a warning when the same ID is registered twice within one process |
| onMessageLost | — | Called when a message is silently dropped without DLQ — use to alert, log to external systems, or trigger fallback logic |
| onTtlExpired | — | Called when a message is dropped due to TTL expiration (messageTtlMs) and dlq is not enabled; receives { topic, ageMs, messageTtlMs, headers } |
| onRebalance | — | Called on every partition assign/revoke event across all consumers created by this client |
Module-scoped (default) — import KafkaModule in each module that needs it:
// orders.module.ts
@Module({
imports: [
KafkaModule.register<OrdersTopicMap>({
clientId: 'orders',
groupId: 'orders-group',
brokers: ['localhost:9092'],
}),
],
})
export class OrdersModule {}App-wide — register once in AppModule with isGlobal: true, inject anywhere:
// app.module.ts
@Module({
imports: [
KafkaModule.register<MyTopics>({
clientId: 'my-app',
groupId: 'my-group',
brokers: ['localhost:9092'],
isGlobal: true,
}),
],
})
export class AppModule {}
// any module — no KafkaModule import needed
@Injectable()
export class PaymentService {
constructor(@InjectKafkaClient() private readonly kafka: KafkaClient<MyTopics>) {}
}Error classes
When a consumer message handler fails after all retries, the library throws typed error objects:
import { KafkaProcessingError, KafkaRetryExhaustedError } from '@drarzter/kafka-client';KafkaProcessingError — base class for processing failures. Has topic, originalMessage, and supports cause:
const err = new KafkaProcessingError('handler failed', 'order.created', rawMessage, { cause: originalError });
err.topic; // 'order.created'
err.originalMessage; // the parsed message object
err.cause; // the original errorKafkaRetryExhaustedError — thrown after all retries are exhausted. Extends KafkaProcessingError and adds attempts:
// In an onError interceptor:
const interceptor: ConsumerInterceptor<MyTopics> = {
onError: (envelope, error) => {
if (error instanceof KafkaRetryExhaustedError) {
console.log(`Failed after ${error.attempts} attempts on ${error.topic}`);
console.log('Last error:', error.cause);
}
},
};When retry.maxRetries is set and all attempts fail, KafkaRetryExhaustedError is passed to onError interceptors automatically.
KafkaValidationError — thrown when schema validation fails on the consumer side. Has topic, originalMessage, and cause:
import { KafkaValidationError } from '@drarzter/kafka-client';
const interceptor: ConsumerInterceptor<MyTopics> = {
onError: (envelope, error) => {
if (error instanceof KafkaValidationError) {
console.log(`Bad message on ${error.topic}:`, error.cause?.message);
}
},
};Deduplication (Lamport Clock)
Every outgoing message produced by this library is stamped with a monotonically increasing logical clock — the x-lamport-clock header. The counter lives in the KafkaClient instance and increments by one per message (including individual messages inside sendBatch and transaction).
On the consumer side, enable deduplication by passing deduplication to startConsumer or startBatchConsumer. The library checks the incoming clock against the last processed value for that topic:partition combination and skips any message whose clock is not strictly greater.
await kafka.startConsumer(['orders.created'], handler, {
deduplication: {}, // 'drop' strategy — silently discard duplicates
});How duplicates happen
The most common scenario: a producer service restarts. Its in-memory clock resets to 0. The consumer already processed messages with clocks 1…N. All new messages from the restarted producer (clocks 1, 2, 3, …) have clocks ≤ N and are treated as duplicates.
Producer A (running): sends clock 1, 2, 3, 4, 5 → consumer processes all 5
Producer A (restarts): sends clock 1, 2, 3 → consumer sees 1 ≤ 5 — duplicate!Strategies
| Strategy | Behaviour |
| -------- | --------- |
| 'drop' (default) | Log a warning and silently discard the message |
| 'dlq' | Forward to {topic}.dlq with reason metadata headers (x-dlq-reason, x-dlq-duplicate-incoming-clock, x-dlq-duplicate-last-processed-clock). Requires dlq: true |
| 'topic' | Forward to {topic}.duplicates (or duplicatesTopic if set) with reason metadata headers (x-duplicate-reason, x-duplicate-incoming-clock, x-duplicate-last-processed-clock, x-duplicate-detected-at) |
// Strategy: drop (default)
await kafka.startConsumer(['orders'], handler, {
deduplication: {},
});
// Strategy: DLQ — inspect duplicates from {topic}.dlq
await kafka.startConsumer(['orders'], handler, {
dlq: true,
deduplication: { strategy: 'dlq' },
});
// Strategy: dedicated topic — consume from {topic}.duplicates
await kafka.startConsumer(['orders'], handler, {
deduplication: { strategy: 'topic' },
});
// Strategy: custom topic name
await kafka.startConsumer(['orders'], handler, {
deduplication: {
strategy: 'topic',
duplicatesTopic: 'ops.orders.duplicates',
},
});Startup validation
When autoCreateTopics: false and strategy: 'topic', startConsumer / startBatchConsumer validates that the destination topic ({topic}.duplicates or duplicatesTopic) exists before starting the consumer. A clear error is thrown at startup listing every missing topic, rather than silently failing on the first duplicate.
With autoCreateTopics: true the check is skipped — the topic is created automatically instead.
Backwards compatibility
Messages without an x-lamport-clock header pass through unchanged. Producers not using this library are unaffected.
Limitations
Deduplication state is in-memory and per-consumer-instance. Understand what that means:
- Consumer restart — state is cleared on restart. The first batch of messages after restart is accepted regardless of their clock values, so duplicates spanning a restart window are not caught.
- Multiple consumer instances (same group, different machines) — each instance tracks its own partition subset. Partitions are reassigned on rebalance, so a rebalance can reset the state for moved partitions.
- Cross-session duplicates — this guards against duplicates from a producer that restarted within the same consumer session. For durable, cross-restart deduplication, persist the clock state externally (Redis, database) and implement idempotent handlers.
Use this feature as a lightweight first line of defence — not as a substitute for idempotent business logic.
Retry topic chain
tl;dr — recommended production setup:
await kafka.startConsumer(['orders.created'], handler, { retry: { maxRetries: 3, backoffMs: 1_000, maxBackoffMs: 30_000 }, dlq: true, // ← messages never silently disappear retryTopics: true, // ← retries survive restarts; routing is exactly-once });Just
retry+dlq: trueis already safe for most workloads — failed messages land in{topic}.dlqafter all retries and are never silently dropped. AddretryTopics: truefor crash-durable retries and exactly-once routing guarantees within the retry chain.| Configuration | What happens to a message that always fails | Process crash mid-retry | | --- | --- | --- | |
retryonly | Dropped —onMessageLostfires | Lost if crash between attempts | |retry+dlq| Lands in{topic}.dlqafter all attempts | DLQ write may duplicate (rare) | |retry+dlq+retryTopics| Lands in{topic}.dlqafter all attempts | Retries survive restarts; routing is exactly-once |
By default, retry is handled in-process: the consumer sleeps between attempts while holding the partition. With retryTopics: true, failed messages are routed through a chain of Kafka topics instead — one topic per retry level. A companion consumer auto-starts per level, waits for the scheduled delay using partition pause/resume, then calls the same handler.
Benefits over in-process retry:
- Durable — retry messages survive a consumer restart; all routing (main → retry.1, level N → N+1, retry → DLQ) is exactly-once via Kafka transactions
- Non-blocking — the original consumer is free immediately; each level consumer only pauses its specific partition during the delay window, so other partitions continue processing
- Isolated — each retry level has its own consumer group, so a slow level 3 consumer never blocks a level 1 consumer
await kafka.startConsumer(['orders.created'], handler, {
retry: { maxRetries: 3, backoffMs: 1000, maxBackoffMs: 30_000 },
dlq: true,
retryTopics: true, // ← opt in
});With maxRetries: 3, this creates three dedicated topics and three companion consumers:
orders.created.retry.1 → consumer group: my-group-retry.1 (delay ~1 s)
orders.created.retry.2 → consumer group: my-group-retry.2 (delay ~2 s)
orders.created.retry.3 → consumer group: my-group-retry.3 (delay ~4 s)Message flow with maxRetries: 2 and dlq: true:
orders.created → handler fails → orders.created.retry.1 (attempt 1, delay ~1 s)
orders.created.retry.1 → handler fails → orders.created.retry.2 (attempt 2, delay ~2 s)
orders.created.retry.2 → handler fails → orders.created.dlqEach level consumer uses consumer.pause → sleep(remaining) → consumer.resume so the partition offset is never committed before the message is processed. On a process crash during sleep or handler execution, the message is redelivered on restart.
The retry topic messages carry scheduling headers (x-retry-attempt, x-retry-after, x-retry-original-topic, x-retry-max-retries) that each level consumer reads automatically — no manual configuration needed.
Delivery guarantee: the entire retry chain — including the main consumer → retry.1 boundary — is exactly-once. Every routing step (main → retry.1, retry.N → retry.N+1, retry.N → DLQ) is wrapped in a Kafka transaction via
sendOffsetsToTransaction: the produce and the consumer offset commit happen atomically. A crash at any point rolls back the transaction: the message is redelivered and the routing is retried, with no duplicate in the next level. If the EOS transaction fails (broker unavailable), the offset stays uncommitted and the message is safely redelivered — it is never lost.The standard Kafka at-least-once guarantee still applies at the handler level: if your handler succeeds but the process crashes before the manual offset commit completes, the message is redelivered to the handler. Design handlers to be idempotent.
Startup validation:
retryTopicsrequiresretryto be set — an error is thrown at startup ifretryis missing. WhenautoCreateTopics: false, all{topic}.retry.Ntopics are validated to exist at startup and a clear error lists any missing ones. WithautoCreateTopics: truethe check is skipped — topics are created automatically by theensureTopicpath. Supported by bothstartConsumerandstartBatchConsumer.
stopConsumer(groupId) automatically stops all companion retry level consumers started for that group.
stopConsumer
Stop all consumers or a specific group:
// Stop a specific consumer group
await kafka.stopConsumer('my-group');
// Stop all consumers
await kafka.stopConsumer();stopConsumer(groupId) disconnects and removes only that group's consumer, leaving other groups running. Useful when you want to pause processing for a specific topic without restarting the whole client.
Pause and resume
Temporarily stop delivering messages from specific partitions without disconnecting the consumer:
// Pause partition 0 of 'orders' (default group)
kafka.pauseConsumer(undefined, [{ topic: 'orders', partitions: [0] }]);
// Resume it later
kafka.resumeConsumer(undefined, [{ topic: 'orders', partitions: [0] }]);
// Target a specific consumer group, multiple partitions
kafka.pauseConsumer('payments-group', [{ topic: 'payments', partitions: [0, 1] }]);The first argument is the consumer group ID — pass undefined to target the default group. A warning is logged if the group is not found.
Pausing is non-destructive: the consumer stays connected and Kafka preserves the partition assignment for as long as the group session is alive. Messages accumulate in the topic and are delivered once the consumer resumes. Typical use: apply backpressure when a downstream dependency (e.g. a database) is temporarily overloaded.
Circuit breaker
Automatically pause delivery from a topic-partition when its DLQ error rate exceeds a threshold. After a recovery window the partition is resumed automatically.
dlq: true is required — the breaker counts DLQ events as failures. Without it no failures are recorded and the circuit never opens.
Zero-config start — all options have sensible defaults:
await kafka.startConsumer(['orders'], handler, {
dlq: true,
circuitBreaker: {},
});Full config for fine-tuning:
await kafka.startConsumer(['orders'], handler, {
dlq: true,
circuitBreaker: {
threshold: 10, // open after 10 failures (default: 5)
recoveryMs: 60_000, // wait 60 s before probing (default: 30 s)
windowSize: 50, // track last 50 messages (default: threshold × 2, min 10)
halfOpenSuccesses: 3, // 3 successes to close (default: 1)
},
});State machine per ${groupId}:${topic}:${partition}:
| State | Behaviour |
| ----- | --------- |
| CLOSED (normal) | Messages delivered. Failures recorded in sliding window. Opens when failures ≥ threshold. |
| OPEN | Partition paused via pauseConsumer. After recoveryMs ms transitions to HALF_OPEN. |
| HALF_OPEN | Partition resumed. After halfOpenSuccesses consecutive successes the circuit closes. Any single failure immediately re-opens it. |
Successful onMessage completions count as successes. The retry topic path is not subject to the breaker — it has its own backoff and EOS guarantees.
Options:
| Option | Default | Description |
| ------ | ------- | ----------- |
| threshold | 5 | DLQ failures within windowSize that opens the circuit |
| recoveryMs | 30_000 | Milliseconds to wait in OPEN state before entering HALF_OPEN |
| windowSize | threshold × 2, min 10 | Sliding window size in messages |
| halfOpenSuccesses | 1 | Consecutive successes in HALF_OPEN required to close the circuit |
getCircuitState
Inspect the current circuit breaker state for a partition — useful for health endpoints and dashboards:
const state = kafka.getCircuitState('orders', 0);
// undefined — circuit not configured or never tripped
// { status: 'closed', failures: 2, windowSize: 10 }
// { status: 'open', failures: 5, windowSize: 10 }
// { status: 'half-open', failures: 0, windowSize: 0 }
// With explicit group ID:
const state = kafka.getCircuitState('orders', 0, 'payments-group');Returns undefined when circuitBreaker is not configured for the group or the circuit has never been tripped (state is lazily initialised on the first DLQ event).
Instrumentation hooks — react to state transitions via KafkaInstrumentation:
const kafka = new KafkaClient('svc', 'group', brokers, {
instrumentation: [{
onCircuitOpen(topic, partition) {
metrics.increment('circuit_open', { topic, partition });
},
onCircuitHalfOpen(topic, partition) {
logger.log(`Circuit probing ${topic}[${partition}]`);
},
onCircuitClose(topic, partition) {
metrics.increment('circuit_close', { topic, partition });
},
}],
});Reset consumer offsets
Seek a consumer group's committed offsets to the beginning or end of a topic:
// Seek to the beginning — re-process all existing messages
await kafka.resetOffsets(undefined, 'orders', 'earliest');
// Seek to the end — skip existing messages, process only new ones
await kafka.resetOffsets(undefined, 'orders', 'latest');
// Target a specific consumer group
await kafka.resetOffsets('payments-group', 'orders', 'earliest');Important: the consumer for the specified group must be stopped before calling resetOffsets. An error is thrown if the group is currently running — this prevents the reset from racing with an active offset commit.
Seek to offset
Seek individual topic-partitions to explicit offsets — useful when resetOffsets is too coarse and you need per-partition control:
// Seek partition 0 of 'orders' to offset 100, partition 1 to offset 200
await kafka.seekToOffset(undefined, [
{ topic: 'orders', partition: 0, offset: '100' },
{ topic: 'orders', partition: 1, offset: '200' },
]);
// Multiple topics in one call
await kafka.seekToOffset('payments-group', [
{ topic: 'payments', partition: 0, offset: '0' },
{ topic: 'refunds', partition: 0, offset: '500' },
]);The first argument is the consumer group ID — pass undefined to target the default group. Assignments are grouped by topic internally so each admin.setOffsets call covers all partitions of one topic.
Important: the consumer for the specified group must be stopped before calling seekToOffset. An error is thrown if the group is currently running.
Seek to timestamp
Seek partitions to the offset nearest to a specific point in time — useful for replaying events that occurred after a known incident or deployment:
const ts = new Date('2024-06-01T12:00:00Z').getTime(); // Unix ms
await kafka.seekToTimestamp(undefined, [
{ topic: 'orders', partition: 0, timestamp: ts },
{ topic: 'orders', partition: 1, timestamp: ts },
]);
// Multiple topics in one call
await kafka.seekToTimestamp('payments-group', [
{ topic: 'payments', partition: 0, timestamp: ts },
{ topic: 'refunds', partition: 0, timestamp: ts },
]);Uses admin.fetchTopicOffsetsByTime under the hood. If no offset exists at the requested timestamp (e.g. the partition is empty or the timestamp is in the future), the partition falls back to -1 (end of topic — new messages only).
Important: the consumer group must be stopped before seeking. Assignments for the same topic are batched into a single admin.setOffsets call.
Message TTL
Drop or route expired messages using messageTtlMs in ConsumerOptions:
await kafka.startConsumer(['orders'], handler, {
messageTtlMs: 60_000, // drop messages older than 60 s
dlq: true, // route expired messages to DLQ instead of dropping
});The TTL is evaluated against the x-timestamp header stamped on every outgoing message by the producer. Messages whose age at consumption time exceeds messageTtlMs are:
- Routed to DLQ with
x-dlq-reason: ttl-expiredwhendlq: true - Dropped (calling
onTtlExpiredif configured) otherwise
Typical use: prevent stale events from poisoning downstream systems after a consumer lag spike — e.g. discard order events or push notifications that are no longer actionable.
DLQ replay
Re-publish messages from a dead letter queue back to the original topic:
// Re-publish all messages from 'orders.dlq' → 'orders'
const result = await kafka.replayDlq('orders');
// { replayed: 42, skipped: 0 }Options:
| Option | Default | Description |
| ------ | ------- | ----------- |
| targetTopic | x-dlq-original-topic header | Override the destination topic |
| dryRun | false | Count messages without sending |
| filter | — | (headers) => boolean — skip messages where the callback returns false |
// Dry run — see how many messages would be replayed
const dry = await kafka.replayDlq('orders', { dryRun: true });
// Route to a different topic
const result = await kafka.replayDlq('orders', { targetTopic: 'orders.v2' });
// Only replay messages with a specific correlation ID
const filtered = await kafka.replayDlq('orders', {
filter: (headers) => headers['x-correlation-id'] === 'corr-123',
});replayDlq creates a temporary consumer group that reads the DLQ topic up to the high-watermark at the time of the call — messages published after replay starts are not included. DLQ metadata headers (x-dlq-original-topic, x-dlq-error-message, x-dlq-error-stack, x-dlq-failed-at, x-dlq-attempt-count) are stripped from the replayed messages; all other headers (e.g. x-correlation-id) are preserved.
Read snapshot
Read any topic from the beginning to its current high-watermark and return a Map<key, EventEnvelope<T>> with the latest value per key. Useful for bootstrapping in-memory state at service startup without an external cache:
// Build a key → latest-value index for a compacted topic
const orders = await kafka.readSnapshot('orders.state');
orders.get('order-123'); // EventEnvelope with the latest payload for that keyTombstone records (null-value messages) remove the key from the map, consistent with log-compaction semantics:
const snapshot = await kafka.readSnapshot('orders.state', {
onTombstone: (key) => console.log(`Key deleted: ${key}`),
});Optional schema validation skips invalid messages with a warning instead of throwing:
import { z } from 'zod';
const OrderSchema = z.object({ orderId: z.string(), amount: z.number() });
const snapshot = await kafka.readSnapshot('orders.state', {
schema: OrderSchema,
});readSnapshot uses a short-lived temporary consumer that is not registered in the client's consumer map — it disconnects as soon as all partitions reach their high-watermark. The call resolves with the complete snapshot; it does not stream.
| Option | Description |
| ------ | ----------- |
| schema | Zod / Valibot / ArkType (any .parse() shape) — invalid messages are skipped with a warning |
| onTombstone | Called for each tombstone key before it is removed from the map |
Offset checkpointing
Save and restore consumer group offsets via a dedicated Kafka topic. Useful for point-in-time recovery, blue/green deployments, and disaster recovery without resetting to earliest/latest.
checkpointOffsets
Snapshot the current committed offsets of a consumer group into a Kafka topic:
// Checkpoint the default group
const result = await kafka.checkpointOffsets(undefined, 'checkpoints');
// {
// groupId: 'orders-group',
// topics: ['orders', 'payments'],
// partitionCount: 4,
// savedAt: 1710000000000,
// }
// Checkpoint a specific group
await kafka.checkpointOffsets('payments-group', 'checkpoints');Each call appends a new record to the checkpoint topic keyed by groupId, with x-checkpoint-timestamp and x-checkpoint-group-id headers. The checkpoint topic acts as an append-only audit log — use a non-compacted topic to retain history.
Requires connectProducer() to have been called before checkpointing.
restoreFromCheckpoint
Restore a consumer group's committed offsets from the nearest checkpoint:
// Restore to the latest checkpoint
const result = await kafka.restoreFromCheckpoint(undefined, 'checkpoints');
// {
// groupId: 'orders-group',
// offsets: [{ topic: 'orders', partition: 0, offset: '1500' }, ...],
// restoredAt: 1710000000000,
// checkpointAge: 3600000, // ms since the checkpoint was saved
// }
// Restore to the nearest checkpoint before a specific timestamp
const ts = new Date('2024-06-01T12:00:00Z').getTime();
await kafka.restoreFromCheckpoint(undefined, 'checkpoints', { timestamp: ts });Checkpoint selection rules:
- If
timestampis omitted — the latest checkpoint is selected. - If
timestampis given — the newest checkpoint whosesavedAt ≤ timestampis selected. - If all checkpoints are newer than
timestamp— falls back to the oldest checkpoint with a warning. - Throws if no checkpoint exists for the group.
Important: the consumer group must be stopped before calling restoreFromCheckpoint. An error is thrown if any consumer in the group is currently running.
restoreFromCheckpoint uses a short-lived temporary consumer to read all checkpoint records up to the current high-watermark, then calls admin.setOffsets for every topic-partition in the selected checkpoint.
| Option | Description |
| ------ | ----------- |
| timestamp | Target Unix ms. Omit to restore the latest checkpoint |
Windowed batch consumer
Accumulate messages into a buffer and flush a handler when either a size or time trigger fires — whichever comes first. Gives explicit control over both batch size and processing latency, unlike startBatchConsumer which delivers broker-sized batches of unpredictable size:
const handle = await kafka.startWindowConsumer(
'orders',
async (envelopes, meta) => {
console.log(`Flushing ${envelopes.length} orders (trigger: ${meta.trigger})`);
await db.bulkInsert(envelopes.map((e) => e.payload));
},
{
maxMessages: 100, // flush when 100 messages accumulate
maxMs: 5_000, // or after 5 s, whichever fires first
},
);WindowMeta is passed to the handler on every flush:
| Field | Description |
| ----- | ----------- |
| trigger | "size" — buffer reached maxMessages; "time" — maxMs elapsed |
| windowStart | Unix ms of the first message in the flushed window |
| windowEnd | Unix ms when the flush was initiated |
On handle.stop() any buffered messages are flushed before the consumer disconnects — no messages are lost on clean shutdown.
retryTopics: true is rejected at startup with a clear error — the retry topic chain is incompatible with windowed accumulation.
| Option | Default | Description |
| ------ | ------- | ----------- |
| maxMessages | required | Flush when the buffer reaches this many messages |
| maxMs | required | Flush after this many ms since the first buffered message |
| All ConsumerOptions fields | — | Standard consumer options apply (retry, dlq, deduplication, etc.) |
Header-based routing
Dispatch messages to different handlers based on the value of a Kafka header — no if/switch boilerplate in a catch-all handler. Useful when one topic carries multiple event types distinguished by a header like x-event-type:
await kafka.startRoutedConsumer(['events'], {
header: 'x-event-type',
routes: {
'order.created': async (e) => handleOrderCreated(e.payload),
'order.cancelled': async (e) => handleOrderCancelled(e.payload),
'order.shipped': async (e) => handleOrderShipped(e.payload),
},
fallback: async (e) => logger.warn('Unknown event type', e.headers),
});Messages are dispatched to the handler whose key matches envelope.headers[header]. If the header is absent or its value has no matching route:
- The
fallbackhandler is called if provided. - The message is silently skipped if
fallbackis omitted.
All standard ConsumerOptions apply uniformly across every route — retry, DLQ, deduplication, circuit breaker, interceptors, etc.:
await kafka.startRoutedConsumer(
['events'],
{
header: 'x-event-type'