nestjs-kafkajs
v0.0.3
Published
A NestJS module for integrating with Kafka using kafkajs
Maintainers
Readme
nestjs-kafkajs
A NestJS module for integrating with Apache Kafka using kafkajs
Date: July 1, 2025
Description
This package provides an easy way to publish and subscribe to events on Apache Kafka from a NestJS application, leveraging the kafkajs library.
Features
- Reusable connection to one or multiple brokers.
@SubscribeTo(topic: string)decorator to subscribe methods to specific topics.- Automatic loading of event subscribers via a dynamic loader.
- Access to event metadata for extending functionality.
Prerequisites
- Node.js >= 14
- NestJS >= 8
- Access to an Apache Kafka cluster (local or remote)
Installation
npm install --save nestjs-kafkajs kafkajsConfiguration
In your root module (AppModule), import and register the Kafka connection:
// app.module.ts
import { Module } from '@nestjs/common';
import { KafkajsModule } from 'nestjs-kafkajs';
@Module({
imports: [
KafkajsModule.forRootAsync({
useFactory: (configService: ConfigService) => ({
client: {
clientId: configService.get('kafka.clientId'),
brokers: configService.get('kafka.brokers'),
},
consumer: {
groupId: configService.get('kafka.groupId'),
},
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}The KafkaOptions interface is located in lib/interfaces/kafka-options.interface.ts.
Usage
Publishing Events
To publish a message to a topic:
// example-publisher.service.ts
import { Injectable } from '@nestjs/common';
import { KafkaProducer } from 'nestjs-kafkajs';
@Injectable()
export class PublisherService {
constructor(private readonly producer: KafkaProducer) {}
async publish() {
await this.producer.send({
topic: 'my-topic',
messages: [
{ key: 'order', value: JSON.stringify({ id: 1, total: 100 }) },
],
});
}
}Subscribing to Events
Use the @SubscribeTo(topic: string) decorator to bind a method to incoming messages:
// order-subscriber.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { SubscribeTo } from 'nestjs-kafkajs';
@Injectable()
export class OrderSubscriber {
private readonly logger = new Logger(OrderSubscriber.name);
@SubscribeTo('my-topic')
handleOrderEvent(message: { key: Buffer; value: Buffer }) {
const payload = JSON.parse(message.value.toString());
this.logger.log(
`Processing order ${payload.id} with total ${payload.total}`,
);
// ... additional logic ...
}
}The loader in event-subscribers.loader.ts automatically detects classes with decorated methods.
Event Metadata
The EventsMetadataAccessor class allows you to:
- Inspect subscribed topics.
- Extend subscription behavior.
import { EventsMetadataAccessor } from 'nestjs-kafkajs';
const topics = new EventsMetadataAccessor().getTopics(new OrderSubscriber());Publishing to npm
Follow these steps to publish the package to the npm registry:
- Ensure the
nameandversionfields inpackage.jsonare correctly set. - Run
npm loginto authenticate with npm. - Publish the package:
npm publish --access public - To install and use the package from npm:
npm install --save nestjs-kafkajs
Contributing
- Fork the repository.
- Create a branch for your feature (
git checkout -b feature/new-feature). - Commit your changes.
- Open a pull request describing your contribution.
License
This project is licensed under the MIT License.
