@eqxjs/kafka-server-confluent-kafka
v1.0.5
Published
Kafka for nestjs microservices base on @confluentinc/kafka-javascript
Keywords
Readme
@eqxjs/kafka-server-confluent-kafka
A custom Kafka transport strategy for NestJS microservices using node-rdkafka (via @confluentinc/kafka-javascript). This package provides a robust, production-ready Kafka integration with advanced features like automatic topic monitoring, rebalancing support, heap-based back-pressure, and flexible configuration.
Table of Contents
- Features
- Installation
- Usage
- API Reference
- Configuration Details
- Features in Detail
- Development
- Environment Variables
- Troubleshooting
- Changelog
- Related Projects
- License
- Author
- Contributing
Features
- Full NestJS Microservices Integration - Drop-in replacement for standard Kafka transport
- Automatic Topic Monitoring - Detects new topics and handles topic deletions dynamically
- Consumer Group Rebalancing - Full EAGER and COOPERATIVE protocol support with detailed partition assignment logging
- Authentication Support - SASL/PLAIN, SASL/SCRAM, and SSL/TLS configurations
- Delivery Reports - Track message delivery success and failures with callbacks
- High Performance - Uses native node-rdkafka under the hood
- Heap-based Back-pressure - Automatically pauses/resumes the consumer when Node.js heap usage exceeds a configurable threshold
- Flexible Configuration - Support for both high-level and low-level producers
- Rich Connection Logging - Consumer and producers log client identity, bootstrap broker, and full broker list on connect; uptime is logged on disconnect
- TypeScript Support - Fully typed with TypeScript definitions
Installation
npm install @eqxjs/kafka-server-confluent-kafkaPeer Dependencies
This package requires the following peer dependencies:
npm install @nestjs/common @nestjs/microservicesUsage
Basic Setup
1. Create a Kafka Server in your NestJS Application
import { NestFactory } from '@nestjs/core';
import { CustomServerConfluentKafka } from '@eqxjs/kafka-server-confluent-kafka';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerConfluentKafka({
options: {
client: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my-consumer-group',
},
},
}),
});
await app.listen();
}
bootstrap();Important: Always provide a
groupId. If omitted, the consumer defaults to"nestjs-kafka-consumer". Without a stable group ID every restart creates a new consumer group, causing offsets to reset and messages to be reprocessed or skipped.
2. Create Message Handlers
import { Controller } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
@Controller()
export class AppController {
@EventPattern('user.created')
async handleUserCreated(@Payload() data: any) {
console.log('Received message:', data);
}
}Configuration Options
With SASL Authentication
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerConfluentKafka({
options: {
client: {
clientId: 'my-app',
brokers: ['kafka.example.com:9092'],
sasl: {
mechanism: 'plain', // or 'scram-sha-256', 'scram-sha-512'
username: 'my-username',
password: 'my-password',
},
},
consumer: {
groupId: 'my-consumer-group',
sessionTimeout: 30000,
},
producer: {
retry: {
maxRetryTime: 3,
},
},
},
}),
});With SSL/TLS
import * as fs from 'fs';
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerConfluentKafka({
options: {
client: {
clientId: 'my-app',
brokers: ['kafka.example.com:9093'],
ssl: {
ca: fs.readFileSync('./ca-cert.pem'),
cert: fs.readFileSync('./client-cert.pem'),
key: fs.readFileSync('./client-key.pem'),
},
},
consumer: {
groupId: 'my-consumer-group',
},
},
}),
});With SASL + SSL
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerConfluentKafka({
options: {
client: {
clientId: 'my-app',
brokers: ['kafka.example.com:9093'],
sasl: {
mechanism: 'scram-sha-256',
username: 'my-username',
password: 'my-password',
},
ssl: true, // or provide SSL certificates as object
},
consumer: {
groupId: 'my-consumer-group',
},
},
}),
});Using Native ConsumerGlobalConfig and ProducerGlobalConfig
For advanced use cases, you can pass native node-rdkafka configurations directly:
import {
CustomServerConfluentKafka,
ConsumerGlobalConfig,
ProducerGlobalConfig,
} from '@eqxjs/kafka-server-confluent-kafka';
const consumerConfig: ConsumerGlobalConfig = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-group',
'client.id': 'my-client',
'enable.auto.commit': true,
'auto.commit.interval.ms': 5000,
'session.timeout.ms': 30000,
'max.poll.interval.ms': 300000,
'fetch.min.bytes': 1,
'fetch.wait.max.ms': 100,
'heartbeat.interval.ms': 3000,
'partition.assignment.strategy': 'range,roundrobin',
};
const producerConfig: ProducerGlobalConfig = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'my-client',
'compression.type': 'snappy',
'acks': -1, // all replicas
'retries': 3,
'max.in.flight.requests.per.connection': 5,
'linger.ms': 10,
'batch.size': 16384,
'request.timeout.ms': 30000,
};
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerConfluentKafka(
undefined,
consumerConfig,
producerConfig,
),
});Native consumerConfig/producerConfig keys are the base layer. If an options (KafkaOptions) parameter is also provided, it overwrites specific keys on top (bootstrap.servers, auth, client.id, group.id, session.timeout.ms). KAFKA_CONSUMER_* / KAFKA_PRODUCER_* env vars are applied last and win over both. See KAFKA-CONFIG.md for a complete reference of all available keys.
Config via Environment Variables Only
You can configure the server entirely through environment variables with zero code-level config. Pass no arguments to the constructor and set KAFKA_CONSUMER_* / KAFKA_PRODUCER_* env vars:
// main.ts — no config in code at all
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerConfluentKafka(),
});
await app.listen();# .env / container environment
KAFKA_CONSUMER_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_CONSUMER_GROUP_ID=my-service
KAFKA_CONSUMER_SESSION_TIMEOUT_MS=30000
KAFKA_CONSUMER_MAX_POLL_INTERVAL_MS=300000
KAFKA_CONSUMER_PARTITION_ASSIGNMENT_STRATEGY=cooperative-sticky
KAFKA_PRODUCER_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_PRODUCER_COMPRESSION_TYPE=snappy
KAFKA_PRODUCER_LINGER_MS=10
KAFKA_HEAP_LIMIT_PERCENT=80
KAFKA_CONSUME_INTERVAL_MS=500
KAFKA_CONSUME_MESSAGES_PER_INTERVAL=20Conversion rule: strip the KAFKA_CONSUMER_ / KAFKA_PRODUCER_ prefix, lowercase the remainder, replace every _ with .
KAFKA_CONSUMER_GROUP_ID → group.id
KAFKA_CONSUMER_SESSION_TIMEOUT_MS → session.timeout.ms
KAFKA_CONSUMER_MAX_POLL_INTERVAL_MS → max.poll.interval.ms
KAFKA_CONSUMER_PARTITION_ASSIGNMENT_STRATEGY → partition.assignment.strategy
KAFKA_PRODUCER_COMPRESSION_TYPE → compression.type
KAFKA_PRODUCER_LINGER_MS → linger.msValues are automatically coerced — "true"/"false" → boolean, numeric strings → number, everything else stays a string. Any ConsumerGlobalConfig or ProducerGlobalConfig key is supported. See KAFKA-CONFIG.md for the full list.
Configuration Priority
When multiple configuration sources are provided, they are merged in the following order — later sources override earlier ones:
| Priority | Source | How to set |
|----------|--------|------------|
| 1 (lowest) | consumerConfig / producerConfig constructor args | new CustomServerConfluentKafka(options, consumerConfig, producerConfig) |
| 2 | KafkaOptions (options first arg) | Overwrites bootstrap.servers, auth, client.id, group.id, session.timeout.ms |
| 3 (highest) | KAFKA_CONSUMER_* / KAFKA_PRODUCER_* env vars | Applied last — overrides everything above |
The
KAFKA_HEAP_LIMIT_PERCENT,KAFKA_CONSUME_INTERVAL_MS,KAFKA_CONSUME_MESSAGES_PER_INTERVAL,KAFKA_DISABLE_TOPIC_MONITOR, andKAFKA_TOPIC_MONITOR_INTERVAL_MSvariables are independent behaviour controls read directly at runtime and are not part of the librdkafka config merge.
Heap-based Back-pressure
The consumer automatically pauses fetching when Node.js heap usage exceeds a configurable threshold, and resumes when it recovers. Configure via the 4th constructor argument or an environment variable:
// Pause when heap usage reaches 80%
new CustomServerConfluentKafka(options, consumerConfig, producerConfig, 80);# Or via environment variable (default: 85)
KAFKA_HEAP_LIMIT_PERCENT=80The threshold is clamped to [10, 99]. When paused/resumed, the consumer logs:
Consumer PAUSED — heap usage 87.350% (used=694.123 MB, limit=796.000 MB) exceeds limit 85%
Consumer RESUMED — heap usage recovered to 81.200% (used=646.400 MB, limit=796.000 MB)The pause does not unsubscribe or trigger a rebalance — the consumer stays in the group and simply stops fetching until memory recovers.
Advanced Usage
Producing Messages
import { Injectable } from '@nestjs/common';
import {
CustomServerConfluentKafka,
ProducerRecord,
RecordMetadata,
} from '@eqxjs/kafka-server-confluent-kafka';
@Injectable()
export class KafkaProducerService {
private kafkaServer = CustomServerConfluentKafka.getInstance();
async sendMessage(topic: string, message: any): Promise<RecordMetadata[]> {
const record: ProducerRecord = {
topic,
messages: [
{
key: 'my-key',
value: JSON.stringify(message),
headers: { 'content-type': 'application/json' },
},
],
};
// Returns one RecordMetadata per message
return this.kafkaServer.produce(record);
}
async sendBatch(topic: string, items: any[]): Promise<RecordMetadata[]> {
return this.kafkaServer.produce({
topic,
messages: items.map((item, i) => ({
key: String(i),
value: JSON.stringify(item),
})),
});
}
}Using the Static Producer Helper
getProducer() returns a KafkaProducer object whose send method is already bound to the singleton. Use it when you want a typed producer reference without holding the full server instance:
import { CustomServerConfluentKafka, KafkaProducer } from '@eqxjs/kafka-server-confluent-kafka';
const producer: KafkaProducer = CustomServerConfluentKafka.getProducer();
// send is bound — safe to destructure and pass around
await producer.send({
topic: 'user.created',
messages: [
{ key: 'user-1', value: JSON.stringify({ id: 1 }) },
],
});Note:
getProducer()must be called after the server has been started (i.e. afterapp.listen()resolves). The singleton is set during construction, but theproduceimplementation requires a connected producer client.
Setting Delivery Callbacks
const kafkaServer = CustomServerConfluentKafka.getInstance();
kafkaServer.setSuccessCallback((err, report) => {
console.log('Message delivered:', report);
});
kafkaServer.setErrorCallback((err, report) => {
console.error('Delivery failed:', err.message);
});Accessing Consumer and Producer Directly
const { consumer, producer } = kafkaServer.unwrap();
const metadata = await consumer.getMetadata({ timeout: 10000 });Reading Current Partition Assignment
The current partition assignment is always available on the instance:
const server = CustomServerConfluentKafka.getInstance();
// Map of topic → partition[]
console.log(server.memberAssignment);
// { 'orders': [0, 1], 'events': [2] }
// Raw TopicPartition[] array
console.log(server.assignment);Consumer Throughput Control
# Messages to fetch per consume() call, called once per second (default: 10)
KAFKA_CONSUME_MESSAGES_PER_INTERVAL=50Re-exported Types
The package re-exports useful types from @confluentinc/kafka-javascript and its own type definitions:
import {
// produce API (KafkaJS-style)
ProducerRecord,
ProducerMessage,
KafkaProducer,
RecordMetadata,
// librdkafka types
Producer,
KafkaConsumer,
LibrdKafkaError,
TopicPartition,
ConsumerGlobalConfig,
ProducerGlobalConfig,
Message,
DeliveryReport,
} from '@eqxjs/kafka-server-confluent-kafka';API Reference
CustomServerConfluentKafka
The main class implementing the Kafka transport strategy.
Constructor
new CustomServerConfluentKafka(
options?: KafkaOptions,
consumerConfig?: ConsumerGlobalConfig,
producerConfig?: ProducerGlobalConfig,
heapLimitPercent?: number, // default: KAFKA_HEAP_LIMIT_PERCENT env var or 85
)Methods
Lifecycle
| Method | Signature | Description |
|--------|-----------|-------------|
| listen | (callback) => Promise<void> | Start the Kafka server |
| close | () => Promise<void> | Clear intervals, disconnect consumer and all producers |
| start | (callback) => Promise<void> | Internal bootstrap — connects clients and binds events |
Consume Control
| Method | Signature | Description |
|--------|-----------|-------------|
| setConsumeInterval | () => Promise<void> | Start (or restart) the consume polling interval |
| clearConsumeInterval | () => Promise<void> | Stop and null the consume interval |
| clearTopicMonitorInterval | () => Promise<void> | Stop and null the topic monitor interval |
| pauseConsumer | () => void | Manually pause consuming (clears interval) |
| resumeConsumer | () => void | Manually resume consuming (restarts interval) |
Produce
| Method | Signature | Description |
|--------|-----------|-------------|
| produce | (record: ProducerRecord) => Promise<RecordMetadata[]> | Produce one or more messages — resolves with one RecordMetadata per message, mirroring the KafkaJS send() API |
| sendMessage | (topic: string, message: Partial<Message>, timestamp?: number) => Promise<void> | Low-level fire-and-forget produce — delivery result handled asynchronously via deliveryReport |
| setSuccessCallback | (cb) => void | Register a callback for successful delivery reports |
| setErrorCallback | (cb) => void | Register a callback for failed delivery reports |
Topic & Subscription
| Method | Signature | Description |
|--------|-----------|-------------|
| setConsumeTopics | (topics: string[]) => void | Override the topic list to consume |
| retrieveTopics | (log: boolean) => Promise<string[]> | Fetch matching topics from broker metadata |
Events & Rebalance
| Method | Signature | Description |
|--------|-----------|-------------|
| bindEvents | () => Promise<void> | Attach all consumer event listeners |
| rb_callback | (err, assignment) => void | Rebalance callback — handles ASSIGN and REVOKE |
| deliveryReport | (err?, report?) => Promise<void> | Handle a producer delivery report |
| handleMessage | (payload: Message) => Promise<void> | Dispatch a consumed message to its registered handler |
Utility
| Method | Signature | Description |
|--------|-----------|-------------|
| unwrap<T> | () => T | Returns { consumer, producer } for direct librdkafka access |
| isKafkaConnected | () => boolean | true when consumer and producer are both connected |
| getInstance | static () => CustomServerConfluentKafka | Returns the singleton instance |
| getProducer | static () => KafkaProducer | Convenience wrapper around produce — returns a KafkaProducer whose send is correctly bound to the singleton |
Properties
| Property | Type | Description |
|----------|------|-------------|
| memberAssignment | Record<string, number[]> | Current topic → partition list assignment |
| assignment | TopicPartition[] | Raw current assignment array |
| consumeInterval | NodeJS.Timeout | Reference to the active consume interval |
| topicMonitorInterval | NodeJS.Timeout | Reference to the active topic monitor interval |
| _consumeTopics | string[] | Topics derived from registered @EventPattern handlers |
| instance | static CustomServerConfluentKafka | Singleton reference set on construction |
ProducerRecord, ProducerMessage and KafkaProducer
Types that mirror the KafkaJS producer API, accepted by produce() and getProducer().send().
type ProducerMessage = {
key?: Buffer | string | null; // message key; null for no key
value: Buffer | string | null; // payload; null produces a tombstone
partition?: number; // omit or -1 for auto-partitioning
headers?: Record<string, string | Buffer>;
timestamp?: string; // ms since epoch as a string; defaults to Date.now()
};
type ProducerRecord = {
topic: string;
messages: ProducerMessage[];
};
type KafkaProducer = {
send: (record: ProducerRecord) => Promise<RecordMetadata[]>;
};getHeapUsage() — utils/get-mem
import { getHeapUsage } from '@eqxjs/kafka-server-confluent-kafka/utils/get-mem';
const heap = getHeapUsage();
heap.usedPercent // 87.3
heap.usedMB // 694.1
heap.limitMB // 796.0
heap.isOverLimit(85) // true
heap.format() // "87.350% (used=694.123 MB, limit=796.000 MB)"parseEnvConfig() / parseEnvValue() — utils/parse-env
import { parseEnvConfig, parseEnvValue } from '@eqxjs/kafka-server-confluent-kafka/utils/parse-env';
// Returns { consumer: Partial<ConsumerGlobalConfig>, producer: Partial<ProducerGlobalConfig> }
// populated from KAFKA_CONSUMER_* and KAFKA_PRODUCER_* env vars
const { consumer, producer } = parseEnvConfig();
// Coerces a raw string to the most specific primitive type
parseEnvValue("true") // true (boolean)
parseEnvValue("30000") // 30000 (number)
parseEnvValue("snappy") // "snappy" (string)Configuration Details
Consumer Configuration
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| groupId | string | "nestjs-kafka-consumer" | Consumer group ID — always provide a stable value |
| sessionTimeout | number | — | Session timeout in milliseconds |
Producer Configuration
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| retry.maxRetryTime | number | 2 | Maximum number of send retries |
Client Configuration
| Option | Type | Description |
|--------|------|-------------|
| clientId | string | Client identifier |
| brokers | string[] | List of Kafka broker URLs |
| sasl | object | SASL authentication configuration |
| ssl | boolean | object | SSL/TLS configuration |
Features in Detail
Stable Consumer Group Identity
group.id is always written to consumerConfig — defaulting to "nestjs-kafka-consumer" when not provided. Additionally, group.instance.id defaults to os.hostname() if not already set, enabling static membership which reduces unnecessary rebalances on container restarts. You can override it by setting group.instance.id explicitly in your consumerConfig.
Connection Logging
On the ready event, the consumer and producer log a summary sourced directly from the broker response:
Kafka consumer connected — client=rdkafka#consumer-1, group.id=my-group, assignment.strategy=range, bootstrap.broker=kafka-1:9092, brokers=[kafka-1:9092, kafka-2:9092, kafka-3:9092]
Kafka producer connected — client=rdkafka#producer-1, bootstrap.broker=kafka-1:9092, brokers=[kafka-1:9092, kafka-2:9092, kafka-3:9092]On disconnected, both clients log their connection uptime:
Kafka consumer disconnected — uptime=43205ms
Kafka producer disconnected — uptime=43210msOn connection.failure, the error message and uptime are logged for both clients:
consumer: Kafka consumer connection failure — Broker transport failure, uptime=0ms
producer: Kafka connection failure — Broker transport failure, uptime=0msRebalance Logging with Partition State
Every rebalance event logs the consumer's partition state before and after:
ASSIGN:
Rebalance ASSIGN [Assign] — group=my-group, strategy=range | assigned={"orders":[0,1]}
Rebalance ASSIGN — current assignment: {"orders":[0,1],"events":[0]}REVOKE:
Rebalance REVOKE [Revoke] — group=my-group, strategy=range | before={"orders":[0,1],"events":[0]} | revoking={"events":[0]}
Rebalance REVOKE — remaining assignment: {"orders":[0,1]}Rebalancing Support
Full support for both EAGER and COOPERATIVE protocols:
- Eager (
range,roundrobin): Full revoke and reassign —assign()/unassign()called with the complete new assignment - Cooperative (
cooperative-sticky): Incremental delta —incrementalAssign()/incrementalUnassign()called with only the changed partitions;memberAssignmentis updated by merging or filtering rather than replacing
Protocol is auto-detected from partition.assignment.strategy at construction time. All assign/unassign calls are guarded with isConnected() and wrapped in try/catch to prevent ERR__STATE crashes during shutdown.
Heap-based Back-pressure
Every second, before calling consumer.consume(), the server checks Node.js heap usage via v8.getHeapStatistics(). If used_heap_size / heap_size_limit exceeds heapLimitPercent, the consume tick is skipped. The consumer logs PAUSED on the first over-limit tick and RESUMED once it recovers, avoiding log spam.
Automatic Topic Monitoring
The server automatically monitors for:
- New topics: Subscribes to newly created topics matching your handlers
- Deleted topics: Unsubscribes from deleted topics and logs warnings
The monitor runs on a setInterval (stored in topicMonitorInterval) and logs at startup:
Topic monitor started — interval=300000msSet KAFKA_DISABLE_TOPIC_MONITOR=true to disable this behaviour. When disabled, the consumer subscribes only to topics available at connect time.
Configure the polling interval:
KAFKA_TOPIC_MONITOR_INTERVAL_MS=60000 # default: 300000 (5 minutes)Error Handling
Comprehensive error handling for:
- Connection failures (
connection.failure) — logged with error message and uptime for consumer and producer - Topic/partition errors (
ERR_NOT_LEADER_FOR_PARTITION,ERR__UNKNOWN_TOPIC, etc.) — triggers automatic topic list refresh and resubscription - Message delivery failures — routed to the optional error delivery callback
- Rebalancing errors — logged with error code and message
Development
Build
npm run buildFormat Code
npm run formatTesting
The test suite uses Jest with ts-jest and requires no running Kafka broker — all librdkafka clients are fully mocked.
# Run tests
npm test
# Run tests with coverage report
npm run test:coverageCoverage is enforced at 100% for statements, branches, functions, and lines across all src/ files. The suite is located under test/ and is excluded from the published package via .npmignore.
| Test file | What it covers |
|-----------|----------------|
| test/kafka.server.spec.ts | Full CustomServerConfluentKafka class — constructor options, rebalance (eager & cooperative), event listeners, produce API, back-pressure, topic monitor, delivery reports, message handling |
| test/get-mem.spec.ts | getHeapUsage() — all HeapUsage fields and methods |
| test/parse-env.spec.ts | parseEnvValue() and parseEnvConfig() — all coercion branches and env var mapping |
Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| KAFKA_CONSUME_INTERVAL_MS | 1000 | Consume polling interval in milliseconds |
| KAFKA_CONSUME_MESSAGES_PER_INTERVAL | 10 | Messages fetched per consume tick |
| KAFKA_HEAP_LIMIT_PERCENT | 85 | Heap usage % at which the consumer is paused (clamped 10–99) |
| KAFKA_DISABLE_TOPIC_MONITOR | false | Set to true to disable automatic topic monitoring |
| KAFKA_TOPIC_MONITOR_INTERVAL_MS | 300000 | Topic monitor polling interval in milliseconds (default: 5 minutes) |
Note:
KAFKA_CONSUMER_*/KAFKA_PRODUCER_*env vars are applied last in the config merge and take final precedence over bothconsumerConfig/producerConfigconstructor args andKafkaOptions. See Configuration Priority for the full order.
librdkafka Config via Environment Variables
Any ConsumerGlobalConfig or ProducerGlobalConfig key can be set via environment variables using the following convention:
| Prefix | Applies to |
|--------|------------|
| KAFKA_CONSUMER_ | ConsumerGlobalConfig |
| KAFKA_PRODUCER_ | ProducerGlobalConfig |
Conversion rule: strip the prefix, lowercase the remainder, replace every _ with .
KAFKA_CONSUMER_GROUP_ID → group.id
KAFKA_CONSUMER_SESSION_TIMEOUT_MS → session.timeout.ms
KAFKA_CONSUMER_BOOTSTRAP_SERVERS → bootstrap.servers
KAFKA_PRODUCER_COMPRESSION_TYPE → compression.type
KAFKA_PRODUCER_LINGER_MS → linger.msEnv vars are applied last in the merge order — after consumerConfig/producerConfig args and after KafkaOptions — so they take final precedence over all code-level config.
# Example: configure consumer entirely from env
KAFKA_CONSUMER_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_CONSUMER_GROUP_ID=my-service
KAFKA_CONSUMER_SESSION_TIMEOUT_MS=30000
KAFKA_CONSUMER_ENABLE_AUTO_COMMIT=true
KAFKA_PRODUCER_COMPRESSION_TYPE=snappy
KAFKA_PRODUCER_LINGER_MS=10Note: Values are automatically coerced —
"true"/"false"→boolean, numeric strings →number, everything else stays astring. Env vars have the highest config priority — they override both constructor args andKafkaOptions.
For a full list of all supported config keys and their defaults, see KAFKA-CONFIG.md.
Troubleshooting
Consumer behaves like a different consumer group on every restart
Caused by a missing or randomly-generated group.id. Since v2.1.0 the library always sets group.id from config (defaulting to "nestjs-kafka-consumer") so offsets are persisted across restarts. Always specify an explicit groupId for production deployments:
consumer: { groupId: 'my-service-orders-consumer' }Local: Erroneous state (ERR__STATE) during rebalance
Error: Local: Erroneous state
at KafkaConsumer.assign ...
at CustomServerConfluentKafka.rb_callback ...Happens when assign() / unassign() is called while the consumer is disconnecting. Fixed in v2.0.1 — both calls are guarded with isConnected() and wrapped in try/catch.
Consumer pausing unexpectedly
Check heap usage. If KAFKA_HEAP_LIMIT_PERCENT is set too low (or Node.js heap is genuinely exhausted), the consumer will pause. Look for:
Consumer PAUSED — heap usage 87.350% (used=694.123 MB, limit=796.000 MB) exceeds limit 85%Increase the threshold or add --max-old-space-size to Node.js flags to raise the heap ceiling.
Consumer not receiving messages
- Ensure
groupIdis stable and not changing between restarts - Verify topic names match your
@EventPatterndecorators - Ensure brokers are reachable
- Check consumer logs for rebalancing or connection issues
Messages not being produced
- Verify producer is connected (check logs for "ready" event)
- Check delivery report callbacks for errors
- Ensure topics exist on the Kafka cluster
- Verify authentication credentials if using SASL
Connection issues
- Verify broker URLs are correct and reachable
- Check firewall rules
- Verify SSL/TLS certificates if using encrypted connections
- Check SASL credentials if using authentication
Changelog
See CHANGELOG.md for the full version history.
Related Projects
- kafkajs - Alternative JavaScript Kafka client
- @nestjs/microservices - NestJS microservices module
- node-rdkafka - High-performance Kafka client for Node.js
License
ISC
Author
Atit Plangson
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
