kafka-consumer-host
v2.2.0
Published
A NestJS Kafka consumer host module for scalable message consumption.
Maintainers
Readme
📦 kafka-consumer-host
A NestJS module that provides a plug-and-play Kafka consumer setup, making it easy to build Kafka consumers without boilerplate. Designed for scalability and simplicity, it supports automatic handler registration using a clean OOP pattern.
🚀 Features
- ✅ Quick setup with
KafkaConsumerHostModule.register() - ✅ Class-based Kafka handlers using
KafkaHandlerBase - ✅ Supports multiple topics and handlers
- ✅ Full NestJS DI support
- ✅ Clean logging & message metadata (topic, partition, offset)
- ✅ Automatic JSON parsing (fallback to raw string)
- ✅ Powered by kafkajs
📦 Installation
npm install kafka-consumer-host⚙️ Usage
1️⃣ Register the Kafka consumer in your AppModule
import { Module } from "@nestjs/common";
import {
KafkaConsumerHostModule,
KafkaConsumerOffsetReset,
} from "kafka-consumer-host";
import { YourHandlerService } from "./your-handler.service";
@Module({
imports: [
KafkaConsumerHostModule.register({
bootstrapServer: "localhost:9092",
groupId: "your-consumer-group",
clientId: "your-client-id",
offsetReset: KafkaConsumerOffsetReset.LATEST, // or KafkaConsumerOffsetReset.EARLIEST
}),
],
providers: [YourHandlerService],
})
export class AppModule {}2️⃣ Create your Kafka handler service
import { Injectable } from "@nestjs/common";
import { KafkaHandlerBase } from "kafka-consumer-host";
@Injectable()
export class YourHandlerService extends KafkaConsumerBase {
registerHandlers(): void {
this.handle("your-topic-1", this.handleTopic1);
this.handle("your-topic-2", this.handleTopic2);
}
async handleTopic1(payload: any, metadata: any): Promise<void> {
console.log("Received message for your-topic-1:", payload, metadata);
// Process your message here...
}
async handleTopic2(payload: any, metadata: any): Promise<void> {
console.log("Received message for your-topic-2:", payload, metadata);
// Process your message here...
}
}🛠 Handler Parameters
| Parameter | Type | Description |
| ---------- | -------- | -------------------------------------------------------------- |
| `payload` | `any` | Parsed Kafka message payload (JSON or raw string if not JSON) |
| `metadata` | `object` | Includes topic, partition, offset, timestamp, key, and headers |Example metadata:
{
"topic": "your-topic-1",
"partition": 0,
"offset": "42",
"timestamp": "1623762345678",
"key": "customer-123",
"headers": { "correlationId": "abc-123" }
}🔑 Configuration Options
| Option | Type | Required | Description |
| ----------------- | -------------------------- | -------- | ------------------------------------------------------------------- |
| `bootstrapServer` | `string \| string[]` | ✅ | Kafka broker(s) |
| `groupId` | `string` | ✅ | Kafka consumer group ID |
| `clientId` | `string` | ✅ | Kafka client ID |
| `offsetReset` | `KafkaConsumerOffsetReset` | ✅ | Start from `'earliest'` or `'latest'` if no committed offset exists |🧠 How It Works
- 🏁 On app startup:
- Initializes the Kafka consumer
- Waits for all handlers to be registered
- Subscribes to all registered topics
- 📩 When a message is received:
- Automatically parses JSON payload (or provides raw string)
- Calls your registered handler with payload + metadata
⚠️ Important Notes
- ✅ Handlers are registered via
KafkaConsumerBase; no manual registration needed. - ✅ Ensure your handler service is added to the providers array for DI.
- 🔄 To maintain message ordering, use a message key when producing so Kafka routes related messages to the same partition.
🤝 Contributing
PRs and ideas are welcome—feel free to open issues or suggest improvements!
💬 Questions?
Open an issue or start a discussion.
