@eqxjs/kafka-server-node-rdkafka
v2.0.8
Published
Kafka for nestjs microservices
Downloads
130
Keywords
Readme
@eqxjs/kafka-server-node-rdkafka
A custom Kafka transport strategy for NestJS microservices using node-rdkafka (via @confluentinc/kafka-javascript). This package provides a robust, production-ready Kafka integration with advanced features like automatic topic monitoring, rebalancing support, and flexible configuration.
Features
- 🚀 Full NestJS Microservices Integration - Drop-in replacement for standard Kafka transport
- 🔄 Automatic Topic Monitoring - Detects new topics and handles topic deletions dynamically
- ⚖️ Consumer Group Rebalancing - Built-in rebalancing callbacks for partition management
- 🔐 Authentication Support - SASL/PLAIN, SASL/SCRAM, and SSL/TLS configurations
- 📊 Delivery Reports - Track message delivery success and failures with callbacks
- 💪 High Performance - Uses native node-rdkafka under the hood
- 🛠️ Flexible Configuration - Support for both high-level and low-level producers
- 📝 TypeScript Support - Fully typed with TypeScript definitions
Installation
npm install @eqxjs/kafka-server-node-rdkafkaPeer Dependencies
This package requires the following peer dependencies:
npm install @nestjs/common @nestjs/microservicesUsage
Basic Setup
1. Create a Kafka Server in your NestJS Application
import { NestFactory } from '@nestjs/core';
import { CustomServerRdKafka } from '@eqxjs/kafka-server-node-rdkafka';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerRdKafka({
options: {
client: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my-consumer-group',
},
},
}),
});
await app.listen();
}
bootstrap();2. Create Message Handlers
import { Controller } from '@nestjs/common';
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { KafkaContext } from '@eqxjs/kafka-server-node-rdkafka';
@Controller()
export class AppController {
@EventPattern('user.created')
async handleUserCreated(
@Payload() data: any,
@Ctx() context: KafkaContext,
) {
console.log('Received message:', data);
console.log('Topic:', context.getTopic());
console.log('Partition:', context.getPartition());
console.log('Offset:', context.getMessage().offset);
}
}Configuration Options
With SASL Authentication
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerRdKafka({
options: {
client: {
clientId: 'my-app',
brokers: ['kafka.example.com:9092'],
sasl: {
mechanism: 'plain', // or 'scram-sha-256', 'scram-sha-512'
username: 'my-username',
password: 'my-password',
},
},
consumer: {
groupId: 'my-consumer-group',
sessionTimeout: 30000,
},
producer: {
retry: {
maxRetryTime: 3,
},
},
},
}),
});With SSL/TLS
import * as fs from 'fs';
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerRdKafka({
options: {
client: {
clientId: 'my-app',
brokers: ['kafka.example.com:9093'],
ssl: {
ca: fs.readFileSync('./ca-cert.pem'),
cert: fs.readFileSync('./client-cert.pem'),
key: fs.readFileSync('./client-key.pem'),
},
},
consumer: {
groupId: 'my-consumer-group',
},
},
}),
});With SASL + SSL
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerRdKafka({
options: {
client: {
clientId: 'my-app',
brokers: ['kafka.example.com:9093'],
sasl: {
mechanism: 'scram-sha-256',
username: 'my-username',
password: 'my-password',
},
ssl: true, // or provide SSL certificates as object
},
consumer: {
groupId: 'my-consumer-group',
},
},
}),
});Using Native ConsumerGlobalConfig and ProducerGlobalConfig
For advanced use cases, you can pass native node-rdkafka configurations directly:
import {
CustomServerRdKafka,
ConsumerGlobalConfig,
ProducerGlobalConfig
} from '@eqxjs/kafka-server-node-rdkafka';
const consumerConfig: ConsumerGlobalConfig = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-group',
'client.id': 'my-client',
'enable.auto.commit': true,
'auto.commit.interval.ms': 5000,
'session.timeout.ms': 30000,
'max.poll.interval.ms': 300000,
'fetch.min.bytes': 1,
'fetch.wait.max.ms': 100,
'heartbeat.interval.ms': 3000,
'partition.assignment.strategy': 'range,roundrobin',
};
const producerConfig: ProducerGlobalConfig = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'my-client',
'compression.type': 'snappy',
'acks': -1, // all replicas
'retries': 3,
'max.in.flight.requests.per.connection': 5,
'linger.ms': 10,
'batch.size': 16384,
'request.timeout.ms': 30000,
};
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerRdKafka(
{
options: {
client: {
brokers: ['localhost:9092'],
},
},
},
consumerConfig,
producerConfig,
),
});Note: When using native configs, they will be merged with/override options from the first parameter. This approach gives you full access to all librdkafka configuration options. See the librdkafka configuration documentation for all available options.
Advanced Usage
Producing Messages
import { Injectable } from '@nestjs/common';
import { CustomServerRdKafka, Message } from '@eqxjs/kafka-server-node-rdkafka';
@Injectable()
export class KafkaProducerService {
private kafkaServer: CustomServerRdKafka;
constructor() {
this.kafkaServer = CustomServerRdKafka.getInstance();
}
async sendMessage(topic: string, message: any) {
const kafkaMessage: Message = {
value: Buffer.from(JSON.stringify(message)),
key: 'my-key',
partition: 0, // optional, -1 for auto-assignment
headers: {
'content-type': 'application/json',
},
};
await this.kafkaServer.produce(topic, kafkaMessage);
}
}Setting Delivery Callbacks
const kafkaServer = CustomServerRdKafka.getInstance();
// Success callback
kafkaServer.setSuccessCallback((err, report) => {
console.log('Message delivered successfully:', report);
});
// Error callback
kafkaServer.setErrorCallback((err, report) => {
console.error('Message delivery failed:', err.message);
});Accessing Consumer and Producer Directly
const { consumer, producer } = kafkaServer.unwrap();
// Now you can use native node-rdkafka methods
const metadata = await consumer.getMetadata({ timeout: 10000 });Consumer Throughput Control
Control the rate at which messages are consumed:
# Set environment variable to limit consumer throughput
export KAFKA_CONSUMER_TPS_LIMIT=50 # messages per second (default: 10)Re-exported Types
The package re-exports useful types from @confluentinc/kafka-javascript:
import {
Producer,
KafkaConsumer,
LibrdKafkaError,
TopicPartition,
ConsumerGlobalConfig,
ProducerGlobalConfig,
Message,
DeliveryReport,
} from '@eqxjs/kafka-server-node-rdkafka';API Reference
CustomServerRdKafka
The main class implementing the Kafka transport strategy.
Methods
listen(callback)- Start the Kafka serverclose()- Disconnect consumer and producersproduce(topic, message)- Produce a message using the high-level producer (returns Promise)sendMessage(topic, message)- Send a message using the standard producer (fire-and-forget)setSuccessCallback(callback)- Set callback for successful message deliverysetErrorCallback(callback)- Set callback for failed message deliverysetConsumeTopics(topics)- Manually set topics to consumeunwrap()- Get access to underlying consumer and producer instancesgetInstance()- Static method to get the singleton instance
Properties
memberAssignment- Current partition assignments per topicassignment- Current topic-partition assignments array
Configuration Details
Consumer Configuration
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| groupId | string | - | Consumer group ID (required) |
| sessionTimeout | number | - | Session timeout in milliseconds |
Producer Configuration
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| retry.maxRetryTime | number | 2 | Maximum number of retries |
Client Configuration
| Option | Type | Description |
|--------|------|-------------|
| clientId | string | Client identifier |
| brokers | string[] | List of Kafka broker URLs |
| sasl | object | SASL authentication configuration |
| ssl | boolean | object | SSL/TLS configuration |
Features in Detail
Automatic Topic Monitoring
The server automatically monitors for:
- New topics: Subscribes to newly created topics matching your handlers
- Deleted topics: Unsubscribes from deleted topics and logs warnings
- Check interval: 5 minutes (300 seconds)
Rebalancing Support
Built-in rebalancing callback handles:
- Partition assignment changes
- Consumer group coordination
- Graceful partition revocation
- Safe assign/unassign —
consumer.assign()andconsumer.unassign()are guarded withisConnected()checks and wrapped in try/catch to avoid theERR__STATE("Local: Erroneous state") crash that occurs when a rebalance fires during shutdown or reconnection
Error Handling
Comprehensive error handling for:
- Connection failures
- Topic/partition errors
- Message delivery failures
- Rebalancing errors
Development
Build
npm run buildFormat Code
npm run formatTesting
npm testEnvironment Variables
KAFKA_CONSUMER_TPS_LIMIT- Messages to consume per second (default: 10)
License
ISC
Author
Atit Plangson
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Troubleshooting
Local: Erroneous state (ERR__STATE) during rebalance
This error surfaces as:
Error: Local: Erroneous state
at KafkaConsumer.assign ...
at CustomServerRdKafka.rb_callback ...It happens when consumer.assign() or consumer.unassign() is called from the rebalance callback while the consumer is disconnecting or reconnecting. The fix introduced in v2.0.1 wraps both calls with an isConnected() guard and a try/catch, so the rebalance is silently skipped instead of crashing the process.
If you are on an older version, upgrade:
npm install @eqxjs/kafka-server-node-rdkafka@latestConsumer not receiving messages
- Check that your consumer group ID is set correctly
- Verify topic names match your
@EventPatterndecorators - Ensure brokers are reachable
- Check consumer logs for rebalancing or connection issues
Messages not being produced
- Verify producer is connected (check logs for "ready" event)
- Check delivery report callbacks for errors
- Ensure topics exist on the Kafka cluster
- Verify authentication credentials if using SASL
Connection issues
- Verify broker URLs are correct and reachable
- Check firewall rules
- Verify SSL/TLS certificates if using encrypted connections
- Check SASL credentials if using authentication
Changelog
v2.0.2
- Fix:
rb_callbacknow supports both EAGER and COOPERATIVE rebalance protocols. Whenpartition.assignment.strategyis set tocooperative-sticky(or any cooperative variant), the callback automatically detects the protocol on the first rebalance event and switches toincremental_assign(assignment)/incremental_unassign(assignment)for all subsequent rebalances. The protocol is auto-detected at runtime — no configuration change required.
v2.0.1
- Fix:
consumer.assign()/consumer.unassign()now guarded withisConnected()and wrapped in try/catch insiderb_callbackto prevent "Local: Erroneous state" (ERR__STATE) crash during partition rebalance when the consumer is in a transitional (disconnecting / reconnecting) state.
Related Projects
- kafkajs - Alternative JavaScript Kafka client
- @nestjs/microservices - NestJS microservices module
- node-rdkafka - High-performance Kafka client for Node.js
