npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@drarzter/kafka-client

v0.2.2

Published

Type-safe Kafka client wrapper for NestJS with typed topic-message maps

Readme

@drarzter/kafka-client

npm version CI License: MIT

Type-safe Kafka client wrapper for NestJS. Built on top of kafkajs.

What is this?

An opinionated wrapper around kafkajs that integrates with NestJS as a DynamicModule. Not a full-featured framework — just a clean, typed abstraction for producing and consuming Kafka messages.

Why?

  • Typed topics — you define a map of topic -> message shape, and the compiler won't let you send wrong data to wrong topic
  • Topic descriptorstopic() DX sugar lets you define topics as standalone typed objects instead of string keys
  • NestJS-nativeregister() / registerAsync(), DI injection, lifecycle hooks out of the box
  • Idempotent produceracks: -1, idempotent: true by default
  • Retry + DLQ — configurable retries with backoff, dead letter queue for failed messages
  • Batch sending — send multiple messages in a single request
  • Batch consumingstartBatchConsumer() for high-throughput eachBatch processing
  • Partition key support — route related messages to the same partition
  • Custom headers — attach metadata headers to messages
  • Transactions — exactly-once semantics with producer.transaction()
  • Consumer interceptors — before/after/onError hooks for message processing
  • Auto-create topicsautoCreateTopics: true for dev mode — no need to pre-create topics
  • Error classesKafkaProcessingError and KafkaRetryExhaustedError with topic, message, and attempt metadata
  • Health check — built-in health indicator for monitoring
  • Multiple consumer groups — named clients for different bounded contexts
  • Declarative & imperative — use @SubscribeTo() decorator or startConsumer() directly

Installation

npm install @drarzter/kafka-client
# or
pnpm add @drarzter/kafka-client

Peer dependencies: @nestjs/common, @nestjs/core, reflect-metadata, rxjs

Quick start

Send and receive a message in 3 files:

// types.ts
export interface MyTopics {
  'hello': { text: string };
}
// app.module.ts
import { Module } from '@nestjs/common';
import { KafkaModule } from '@drarzter/kafka-client';
import { MyTopics } from './types';
import { AppService } from './app.service';

@Module({
  imports: [
    KafkaModule.register<MyTopics>({
      clientId: 'my-app',
      groupId: 'my-group',
      brokers: ['localhost:9092'],
    }),
  ],
  providers: [AppService],
})
export class AppModule {}
// app.service.ts
import { Injectable } from '@nestjs/common';
import { InjectKafkaClient, KafkaClient, SubscribeTo } from '@drarzter/kafka-client';
import { MyTopics } from './types';

@Injectable()
export class AppService {
  constructor(
    @InjectKafkaClient() private readonly kafka: KafkaClient<MyTopics>,
  ) {}

  async send() {
    await this.kafka.sendMessage('hello', { text: 'Hello, Kafka!' });
  }

  @SubscribeTo('hello')
  async onHello(message: MyTopics['hello']) {
    console.log('Received:', message.text);
  }
}

Usage

1. Define your topic map

Both interface and type work — pick whichever you prefer:

// Explicit: extends TTopicMessageMap — IDE hints that values must be Record<string, any>
import { TTopicMessageMap } from '@drarzter/kafka-client';

export interface OrdersTopicMap extends TTopicMessageMap {
  'order.created': {
    orderId: string;
    userId: string;
    amount: number;
  };
  'order.completed': {
    orderId: string;
    completedAt: string;
  };
}
// Minimal: plain interface or type — works just the same
export interface OrdersTopicMap {
  'order.created': { orderId: string; userId: string; amount: number };
  'order.completed': { orderId: string; completedAt: string };
}

// or
export type OrdersTopicMap = {
  'order.created': { orderId: string; userId: string; amount: number };
  'order.completed': { orderId: string; completedAt: string };
};

Alternative: topic() descriptors

Instead of a centralized topic map, define each topic as a standalone typed object:

import { topic, TopicsFrom } from '@drarzter/kafka-client';

export const OrderCreated = topic('order.created')<{
  orderId: string;
  userId: string;
  amount: number;
}>();

export const OrderCompleted = topic('order.completed')<{
  orderId: string;
  completedAt: string;
}>();

// Combine into a topic map for KafkaModule generics
export type OrdersTopicMap = TopicsFrom<typeof OrderCreated | typeof OrderCompleted>;

Topic descriptors work everywhere strings work — sendMessage, sendBatch, transaction, startConsumer, and @SubscribeTo():

// Sending
await kafka.sendMessage(OrderCreated, { orderId: '123', userId: '456', amount: 100 });
await kafka.sendBatch(OrderCreated, [{ value: { orderId: '1', userId: '10', amount: 50 } }]);

// Transactions
await kafka.transaction(async (tx) => {
  await tx.send(OrderCreated, { orderId: '123', userId: '456', amount: 100 });
});

// Consuming (decorator)
@SubscribeTo(OrderCreated)
async handleOrder(message: OrdersTopicMap['order.created']) { ... }

// Consuming (imperative)
await kafka.startConsumer([OrderCreated], handler);

2. Register the module

import { KafkaModule } from '@drarzter/kafka-client';
import { OrdersTopicMap } from './orders.types';

@Module({
  imports: [
    KafkaModule.register<OrdersTopicMap>({
      clientId: 'my-service',
      groupId: 'my-consumer-group',
      brokers: ['localhost:9092'],
      autoCreateTopics: true, // auto-create topics on first use (dev mode)
    }),
  ],
})
export class OrdersModule {}

autoCreateTopics calls admin.createTopics() (idempotent — no-op if topic already exists) before the first send/consume for each topic. Useful in development, not recommended for production.

Or with ConfigService:

KafkaModule.registerAsync<OrdersTopicMap>({
  imports: [ConfigModule],
  inject: [ConfigService],
  useFactory: (config: ConfigService) => ({
    clientId: 'my-service',
    groupId: 'my-consumer-group',
    brokers: config.get<string>('KAFKA_BROKERS').split(','),
  }),
})

Global module

By default, KafkaModule is scoped — you need to import it in every module that uses @InjectKafkaClient(). Pass isGlobal: true to make the client available everywhere:

// app.module.ts — register once
KafkaModule.register<OrdersTopicMap>({
  clientId: 'my-service',
  groupId: 'my-consumer-group',
  brokers: ['localhost:9092'],
  isGlobal: true,
})

// any other module — no need to import KafkaModule
@Injectable()
export class SomeService {
  constructor(@InjectKafkaClient() private readonly kafka: KafkaClient<OrdersTopicMap>) {}
}

Works with registerAsync() too:

KafkaModule.registerAsync<OrdersTopicMap>({
  isGlobal: true,
  imports: [ConfigModule],
  inject: [ConfigService],
  useFactory: (config: ConfigService) => ({ ... }),
})

3. Inject and use

import { Injectable } from '@nestjs/common';
import { InjectKafkaClient, KafkaClient } from '@drarzter/kafka-client';
import { OrdersTopicMap } from './orders.types';

@Injectable()
export class OrdersService {
  constructor(
    @InjectKafkaClient()
    private readonly kafka: KafkaClient<OrdersTopicMap>,
  ) {}

  async createOrder() {
    await this.kafka.sendMessage('order.created', {
      orderId: '123',
      userId: '456',
      amount: 100,
    });
  }
}

Consuming messages

Two ways — choose what fits your style.

Declarative: @SubscribeTo()

import { Injectable } from '@nestjs/common';
import { SubscribeTo } from '@drarzter/kafka-client';

@Injectable()
export class OrdersHandler {
  @SubscribeTo('order.created')
  async handleOrderCreated(message: OrdersTopicMap['order.created'], topic: string) {
    console.log('New order:', message.orderId);
  }

  @SubscribeTo('order.completed', { retry: { maxRetries: 3 }, dlq: true })
  async handleOrderCompleted(message: OrdersTopicMap['order.completed'], topic: string) {
    console.log('Order completed:', message.orderId);
  }
}

The module auto-discovers @SubscribeTo() methods on startup and subscribes them.

Imperative: startConsumer()

@Injectable()
export class OrdersService implements OnModuleInit {
  constructor(
    @InjectKafkaClient()
    private readonly kafka: KafkaClient<OrdersTopicMap>,
  ) {}

  async onModuleInit() {
    await this.kafka.startConsumer(
      ['order.created', 'order.completed'],
      async (message, topic) => {
        console.log(`${topic}:`, message);
      },
      {
        retry: { maxRetries: 3, backoffMs: 1000 },
        dlq: true,
      },
    );
  }
}

Multiple consumer groups

Per-consumer groupId

Override the default consumer group for specific consumers. Each unique groupId creates a separate kafkajs Consumer internally:

// Default group from constructor
await kafka.startConsumer(['orders'], handler);

// Custom group — receives its own copy of messages
await kafka.startConsumer(['orders'], auditHandler, { groupId: 'orders-audit' });

// Works with @SubscribeTo too
@SubscribeTo('orders', { groupId: 'orders-audit' })
async auditOrders(message) { ... }

Important: You cannot mix eachMessage and eachBatch consumers on the same groupId. The library throws a clear error if you try:

Cannot use eachBatch on consumer group "my-group" — it is already running with eachMessage.
Use a different groupId for this consumer.

Named clients

Register multiple named clients for different bounded contexts:

@Module({
  imports: [
    KafkaModule.register<OrdersTopicMap>({
      name: 'orders',
      clientId: 'orders-service',
      groupId: 'orders-consumer',
      brokers: ['localhost:9092'],
    }),
    KafkaModule.register<PaymentsTopicMap>({
      name: 'payments',
      clientId: 'payments-service',
      groupId: 'payments-consumer',
      brokers: ['localhost:9092'],
    }),
  ],
})
export class AppModule {}

Inject by name — the string in @InjectKafkaClient() must match the name from register():

@Injectable()
export class OrdersService {
  constructor(
    @InjectKafkaClient('orders')    // ← matches name: 'orders' above
    private readonly kafka: KafkaClient<OrdersTopicMap>,
  ) {}
}

Same with @SubscribeTo() — use clientName to target a specific named client:

@SubscribeTo('payment.received', { clientName: 'payments' })  // ← matches name: 'payments'
async handlePayment(message: PaymentsTopicMap['payment.received']) {
  // ...
}

Partition key

Route all events for the same order to the same partition:

await this.kafka.sendMessage(
  'order.created',
  { orderId: '123', userId: '456', amount: 100 },
  { key: '123' },
);

Message headers

Attach metadata to messages:

await this.kafka.sendMessage(
  'order.created',
  { orderId: '123', userId: '456', amount: 100 },
  {
    key: '123',
    headers: { 'x-correlation-id': 'abc-def', 'x-source': 'api-gateway' },
  },
);

Headers work with batch sending too:

await this.kafka.sendBatch('order.created', [
  {
    value: { orderId: '1', userId: '10', amount: 50 },
    key: '1',
    headers: { 'x-correlation-id': 'req-1' },
  },
]);

Batch sending

await this.kafka.sendBatch('order.created', [
  { value: { orderId: '1', userId: '10', amount: 50 }, key: '1' },
  { value: { orderId: '2', userId: '20', amount: 75 }, key: '2' },
  { value: { orderId: '3', userId: '30', amount: 100 }, key: '3' },
]);

Batch consuming

Process messages in batches for higher throughput. The handler receives an array of parsed messages and a BatchMeta object with offset management controls:

await this.kafka.startBatchConsumer(
  ['order.created'],
  async (messages, topic, meta) => {
    // messages: OrdersTopicMap['order.created'][]
    for (const msg of messages) {
      await processOrder(msg);
      meta.resolveOffset(/* ... */);
    }
    await meta.commitOffsetsIfNecessary();
  },
  { retry: { maxRetries: 3 }, dlq: true },
);

With @SubscribeTo():

@SubscribeTo('order.created', { batch: true })
async handleOrders(messages: OrdersTopicMap['order.created'][], topic: string) {
  // messages is an array
}

Schema validation runs per-message — invalid messages are skipped (DLQ'd if enabled), valid ones are passed to the handler. Retry applies to the whole batch.

BatchMeta exposes: partition, highWatermark, heartbeat(), resolveOffset(offset), commitOffsetsIfNecessary().

Transactions

Send multiple messages atomically with exactly-once semantics:

await this.kafka.transaction(async (tx) => {
  await tx.send('order.created', {
    orderId: '123',
    userId: '456',
    amount: 100,
  });
  await tx.send('order.completed', {
    orderId: '123',
    completedAt: new Date().toISOString(),
  });
  // if anything throws, all messages are rolled back
});

tx.sendBatch() is also available inside transactions.

Consumer interceptors

Add before/after/onError hooks to message processing:

import { ConsumerInterceptor } from '@drarzter/kafka-client';

const loggingInterceptor: ConsumerInterceptor<OrdersTopicMap> = {
  before: (message, topic) => {
    console.log(`Processing ${topic}`, message);
  },
  after: (message, topic) => {
    console.log(`Done ${topic}`);
  },
  onError: (message, topic, error) => {
    console.error(`Failed ${topic}:`, error.message);
  },
};

await this.kafka.startConsumer(['order.created'], handler, {
  interceptors: [loggingInterceptor],
});

Multiple interceptors run in order. All hooks are optional.

Options reference

Send options

Options for sendMessage() — the third argument:

| Option | Default | Description | |-----------|---------|--------------------------------------------------| | key | — | Partition key for message routing | | headers | — | Custom metadata headers (Record<string, string>) |

sendBatch() accepts key and headers per message inside the array items.

Consumer options

| Option | Default | Description | |--------|---------|-------------| | groupId | constructor value | Override consumer group for this subscription | | fromBeginning | false | Read from the beginning of the topic | | autoCommit | true | Auto-commit offsets | | retry.maxRetries | — | Number of retry attempts | | retry.backoffMs | 1000 | Base delay between retries (multiplied by attempt number) | | dlq | false | Send to {topic}.dlq after all retries exhausted | | interceptors | [] | Array of before/after/onError hooks | | batch | false | (decorator only) Use startBatchConsumer instead of startConsumer | | subscribeRetry.retries | 5 | Max attempts for consumer.subscribe() when topic doesn't exist yet | | subscribeRetry.backoffMs | 5000 | Delay between subscribe retry attempts (ms) |

Module options

Passed to KafkaModule.register() or returned from registerAsync() factory:

| Option | Default | Description | |--------|---------|-------------| | clientId | — | Kafka client identifier (required) | | groupId | — | Default consumer group ID (required) | | brokers | — | Array of broker addresses (required) | | name | — | Named client identifier for multi-client setups | | isGlobal | false | Make the client available in all modules without re-importing | | autoCreateTopics | false | Auto-create topics on first send (dev only) | | strictSchemas | true | Validate string topic keys against schemas registered via TopicDescriptor |

Module-scoped (default) — import KafkaModule in each module that needs it:

// orders.module.ts
@Module({
  imports: [
    KafkaModule.register<OrdersTopicMap>({
      clientId: 'orders',
      groupId: 'orders-group',
      brokers: ['localhost:9092'],
    }),
  ],
})
export class OrdersModule {}

App-wide — register once in AppModule with isGlobal: true, inject anywhere:

// app.module.ts
@Module({
  imports: [
    KafkaModule.register<MyTopics>({
      clientId: 'my-app',
      groupId: 'my-group',
      brokers: ['localhost:9092'],
      isGlobal: true,
    }),
  ],
})
export class AppModule {}

// any module — no KafkaModule import needed
@Injectable()
export class PaymentService {
  constructor(@InjectKafkaClient() private readonly kafka: KafkaClient<MyTopics>) {}
}

Error classes

When a consumer message handler fails after all retries, the library throws typed error objects:

import { KafkaProcessingError, KafkaRetryExhaustedError } from '@drarzter/kafka-client';

KafkaProcessingError — base class for processing failures. Has topic, originalMessage, and supports cause:

const err = new KafkaProcessingError('handler failed', 'order.created', rawMessage, { cause: originalError });
err.topic;            // 'order.created'
err.originalMessage;  // the parsed message object
err.cause;            // the original error

KafkaRetryExhaustedError — thrown after all retries are exhausted. Extends KafkaProcessingError and adds attempts:

// In an onError interceptor:
const interceptor: ConsumerInterceptor<MyTopics> = {
  onError: (message, topic, error) => {
    if (error instanceof KafkaRetryExhaustedError) {
      console.log(`Failed after ${error.attempts} attempts on ${error.topic}`);
      console.log('Last error:', error.cause);
    }
  },
};

When retry.maxRetries is set and all attempts fail, KafkaRetryExhaustedError is passed to onError interceptors automatically.

KafkaValidationError — thrown when schema validation fails on the consumer side. Has topic, originalMessage, and cause:

import { KafkaValidationError } from '@drarzter/kafka-client';

const interceptor: ConsumerInterceptor<MyTopics> = {
  onError: (message, topic, error) => {
    if (error instanceof KafkaValidationError) {
      console.log(`Bad message on ${error.topic}:`, error.cause?.message);
    }
  },
};

Schema validation

Add runtime message validation using any library with a .parse() method — Zod, Valibot, ArkType, or a custom validator. No extra dependency required.

Defining topics with schemas

import { topic, TopicsFrom } from '@drarzter/kafka-client';
import { z } from 'zod';  // or valibot, arktype, etc.

// Schema-validated — type inferred from schema, no generic needed
export const OrderCreated = topic('order.created').schema(z.object({
  orderId: z.string(),
  userId: z.string(),
  amount: z.number().positive(),
}));

// Without schema — explicit generic (still works)
export const OrderAudit = topic('order.audit')<{ orderId: string; action: string }>();

export type MyTopics = TopicsFrom<typeof OrderCreated | typeof OrderAudit>;

How it works

On sendsendMessage, sendBatch, and transaction call schema.parse(message) before serializing. Invalid messages throw immediately (the schema library's error, e.g. ZodError):

// This throws ZodError — amount must be positive
await kafka.sendMessage(OrderCreated, { orderId: '1', userId: '2', amount: -5 });

On consume — after JSON.parse, the consumer validates each message against the schema. Invalid messages are:

  1. Logged as errors
  2. Sent to DLQ if dlq: true
  3. Passed to onError interceptors as KafkaValidationError
  4. Skipped (handler is NOT called)
@SubscribeTo(OrderCreated, { dlq: true })
async handleOrder(message) {
  // `message` is guaranteed to match the schema
  console.log(message.orderId); // string — validated at runtime
}

Strict schema mode

By default (strictSchemas: true), once a schema is registered via a TopicDescriptor, string topic keys are also validated against it:

// First call registers the schema in the internal registry
await kafka.sendMessage(OrderCreated, { orderId: '1', userId: '2', amount: 100 });

// Now this is ALSO validated — throws if data doesn't match OrderCreated's schema
await kafka.sendMessage('order.created', { orderId: 123, userId: null, amount: -5 });

Disable with strictSchemas: false in KafkaModule.register() options if you want the old behavior (string topics bypass validation).

Bring your own validator

Any object with parse(data: unknown): T works:

import { SchemaLike } from '@drarzter/kafka-client';

const customValidator: SchemaLike<{ id: string }> = {
  parse(data: unknown) {
    const d = data as any;
    if (typeof d?.id !== 'string') throw new Error('id must be a string');
    return { id: d.id };
  },
};

const MyTopic = topic('my.topic').schema(customValidator);

Health check

Monitor Kafka connectivity with the built-in health indicator:

import { Injectable } from '@nestjs/common';
import { InjectKafkaClient, KafkaClient, KafkaHealthIndicator } from '@drarzter/kafka-client';
import { OrdersTopicMap } from './orders.types';

@Injectable()
export class HealthService {
  private readonly health = new KafkaHealthIndicator();

  constructor(
    @InjectKafkaClient()
    private readonly kafka: KafkaClient<OrdersTopicMap>,
  ) {}

  async checkKafka() {
    return this.health.check(this.kafka);
    // { status: 'up', clientId: 'my-service', topics: ['order.created', ...] }
    // or { status: 'down', clientId: 'my-service', error: 'Connection refused' }
  }
}

Testing

Unit tests (mocked kafkajs):

npm test

Integration tests with a real Kafka broker via testcontainers (requires Docker):

npm run test:integration

The integration suite spins up a single-node KRaft Kafka container and tests sending, consuming, batching, transactions, retry + DLQ, interceptors, health checks, and fromBeginning — no mocks.

Both suites run in CI on every push to main.

Project structure

src/
├── client/         # KafkaClient, types, topic(), error classes
├── module/         # KafkaModule, KafkaExplorer, DI constants
├── decorators/     # @InjectKafkaClient(), @SubscribeTo()
├── health/         # KafkaHealthIndicator
└── index.ts        # Public API re-exports

All exported types and methods have JSDoc comments — your IDE will show inline docs and autocomplete.

License

MIT