npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@eqxjs/kafka-server-confluent-kafka

v1.0.5

Published

Kafka for nestjs microservices base on @confluentinc/kafka-javascript

Readme

@eqxjs/kafka-server-confluent-kafka

A custom Kafka transport strategy for NestJS microservices using node-rdkafka (via @confluentinc/kafka-javascript). This package provides a robust, production-ready Kafka integration with advanced features like automatic topic monitoring, rebalancing support, heap-based back-pressure, and flexible configuration.

Table of Contents

Features

  • Full NestJS Microservices Integration - Drop-in replacement for standard Kafka transport
  • Automatic Topic Monitoring - Detects new topics and handles topic deletions dynamically
  • Consumer Group Rebalancing - Full EAGER and COOPERATIVE protocol support with detailed partition assignment logging
  • Authentication Support - SASL/PLAIN, SASL/SCRAM, and SSL/TLS configurations
  • Delivery Reports - Track message delivery success and failures with callbacks
  • High Performance - Uses native node-rdkafka under the hood
  • Heap-based Back-pressure - Automatically pauses/resumes the consumer when Node.js heap usage exceeds a configurable threshold
  • Flexible Configuration - Support for both high-level and low-level producers
  • Rich Connection Logging - Consumer and producers log client identity, bootstrap broker, and full broker list on connect; uptime is logged on disconnect
  • TypeScript Support - Fully typed with TypeScript definitions

Installation

npm install @eqxjs/kafka-server-confluent-kafka

Peer Dependencies

This package requires the following peer dependencies:

npm install @nestjs/common @nestjs/microservices

Usage

Basic Setup

1. Create a Kafka Server in your NestJS Application

import { NestFactory } from '@nestjs/core';
import { CustomServerConfluentKafka } from '@eqxjs/kafka-server-confluent-kafka';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    strategy: new CustomServerConfluentKafka({
      options: {
        client: {
          clientId: 'my-app',
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'my-consumer-group',
        },
      },
    }),
  });

  await app.listen();
}
bootstrap();

Important: Always provide a groupId. If omitted, the consumer defaults to "nestjs-kafka-consumer". Without a stable group ID every restart creates a new consumer group, causing offsets to reset and messages to be reprocessed or skipped.

2. Create Message Handlers

import { Controller } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';

@Controller()
export class AppController {
  @EventPattern('user.created')
  async handleUserCreated(@Payload() data: any) {
    console.log('Received message:', data);
  }
}

Configuration Options

With SASL Authentication

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new CustomServerConfluentKafka({
    options: {
      client: {
        clientId: 'my-app',
        brokers: ['kafka.example.com:9092'],
        sasl: {
          mechanism: 'plain', // or 'scram-sha-256', 'scram-sha-512'
          username: 'my-username',
          password: 'my-password',
        },
      },
      consumer: {
        groupId: 'my-consumer-group',
        sessionTimeout: 30000,
      },
      producer: {
        retry: {
          maxRetryTime: 3,
        },
      },
    },
  }),
});

With SSL/TLS

import * as fs from 'fs';

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new CustomServerConfluentKafka({
    options: {
      client: {
        clientId: 'my-app',
        brokers: ['kafka.example.com:9093'],
        ssl: {
          ca: fs.readFileSync('./ca-cert.pem'),
          cert: fs.readFileSync('./client-cert.pem'),
          key: fs.readFileSync('./client-key.pem'),
        },
      },
      consumer: {
        groupId: 'my-consumer-group',
      },
    },
  }),
});

With SASL + SSL

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new CustomServerConfluentKafka({
    options: {
      client: {
        clientId: 'my-app',
        brokers: ['kafka.example.com:9093'],
        sasl: {
          mechanism: 'scram-sha-256',
          username: 'my-username',
          password: 'my-password',
        },
        ssl: true, // or provide SSL certificates as object
      },
      consumer: {
        groupId: 'my-consumer-group',
      },
    },
  }),
});

Using Native ConsumerGlobalConfig and ProducerGlobalConfig

For advanced use cases, you can pass native node-rdkafka configurations directly:

import {
  CustomServerConfluentKafka,
  ConsumerGlobalConfig,
  ProducerGlobalConfig,
} from '@eqxjs/kafka-server-confluent-kafka';

const consumerConfig: ConsumerGlobalConfig = {
  'bootstrap.servers': 'localhost:9092',
  'group.id': 'my-consumer-group',
  'client.id': 'my-client',
  'enable.auto.commit': true,
  'auto.commit.interval.ms': 5000,
  'session.timeout.ms': 30000,
  'max.poll.interval.ms': 300000,
  'fetch.min.bytes': 1,
  'fetch.wait.max.ms': 100,
  'heartbeat.interval.ms': 3000,
  'partition.assignment.strategy': 'range,roundrobin',
};

const producerConfig: ProducerGlobalConfig = {
  'bootstrap.servers': 'localhost:9092',
  'client.id': 'my-client',
  'compression.type': 'snappy',
  'acks': -1, // all replicas
  'retries': 3,
  'max.in.flight.requests.per.connection': 5,
  'linger.ms': 10,
  'batch.size': 16384,
  'request.timeout.ms': 30000,
};

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new CustomServerConfluentKafka(
    undefined,
    consumerConfig,
    producerConfig,
  ),
});

Native consumerConfig/producerConfig keys are the base layer. If an options (KafkaOptions) parameter is also provided, it overwrites specific keys on top (bootstrap.servers, auth, client.id, group.id, session.timeout.ms). KAFKA_CONSUMER_* / KAFKA_PRODUCER_* env vars are applied last and win over both. See KAFKA-CONFIG.md for a complete reference of all available keys.

Config via Environment Variables Only

You can configure the server entirely through environment variables with zero code-level config. Pass no arguments to the constructor and set KAFKA_CONSUMER_* / KAFKA_PRODUCER_* env vars:

// main.ts — no config in code at all
const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new CustomServerConfluentKafka(),
});
await app.listen();
# .env / container environment
KAFKA_CONSUMER_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_CONSUMER_GROUP_ID=my-service
KAFKA_CONSUMER_SESSION_TIMEOUT_MS=30000
KAFKA_CONSUMER_MAX_POLL_INTERVAL_MS=300000
KAFKA_CONSUMER_PARTITION_ASSIGNMENT_STRATEGY=cooperative-sticky
KAFKA_PRODUCER_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_PRODUCER_COMPRESSION_TYPE=snappy
KAFKA_PRODUCER_LINGER_MS=10
KAFKA_HEAP_LIMIT_PERCENT=80
KAFKA_CONSUME_INTERVAL_MS=500
KAFKA_CONSUME_MESSAGES_PER_INTERVAL=20

Conversion rule: strip the KAFKA_CONSUMER_ / KAFKA_PRODUCER_ prefix, lowercase the remainder, replace every _ with .

KAFKA_CONSUMER_GROUP_ID                       → group.id
KAFKA_CONSUMER_SESSION_TIMEOUT_MS             → session.timeout.ms
KAFKA_CONSUMER_MAX_POLL_INTERVAL_MS           → max.poll.interval.ms
KAFKA_CONSUMER_PARTITION_ASSIGNMENT_STRATEGY  → partition.assignment.strategy
KAFKA_PRODUCER_COMPRESSION_TYPE               → compression.type
KAFKA_PRODUCER_LINGER_MS                      → linger.ms

Values are automatically coerced — "true"/"false"boolean, numeric strings → number, everything else stays a string. Any ConsumerGlobalConfig or ProducerGlobalConfig key is supported. See KAFKA-CONFIG.md for the full list.

Configuration Priority

When multiple configuration sources are provided, they are merged in the following order — later sources override earlier ones:

| Priority | Source | How to set | |----------|--------|------------| | 1 (lowest) | consumerConfig / producerConfig constructor args | new CustomServerConfluentKafka(options, consumerConfig, producerConfig) | | 2 | KafkaOptions (options first arg) | Overwrites bootstrap.servers, auth, client.id, group.id, session.timeout.ms | | 3 (highest) | KAFKA_CONSUMER_* / KAFKA_PRODUCER_* env vars | Applied last — overrides everything above |

The KAFKA_HEAP_LIMIT_PERCENT, KAFKA_CONSUME_INTERVAL_MS, KAFKA_CONSUME_MESSAGES_PER_INTERVAL, KAFKA_DISABLE_TOPIC_MONITOR, and KAFKA_TOPIC_MONITOR_INTERVAL_MS variables are independent behaviour controls read directly at runtime and are not part of the librdkafka config merge.

Heap-based Back-pressure

The consumer automatically pauses fetching when Node.js heap usage exceeds a configurable threshold, and resumes when it recovers. Configure via the 4th constructor argument or an environment variable:

// Pause when heap usage reaches 80%
new CustomServerConfluentKafka(options, consumerConfig, producerConfig, 80);
# Or via environment variable (default: 85)
KAFKA_HEAP_LIMIT_PERCENT=80

The threshold is clamped to [10, 99]. When paused/resumed, the consumer logs:

Consumer PAUSED — heap usage 87.350% (used=694.123 MB, limit=796.000 MB) exceeds limit 85%
Consumer RESUMED — heap usage recovered to 81.200% (used=646.400 MB, limit=796.000 MB)

The pause does not unsubscribe or trigger a rebalance — the consumer stays in the group and simply stops fetching until memory recovers.

Advanced Usage

Producing Messages

import { Injectable } from '@nestjs/common';
import {
  CustomServerConfluentKafka,
  ProducerRecord,
  RecordMetadata,
} from '@eqxjs/kafka-server-confluent-kafka';

@Injectable()
export class KafkaProducerService {
  private kafkaServer = CustomServerConfluentKafka.getInstance();

  async sendMessage(topic: string, message: any): Promise<RecordMetadata[]> {
    const record: ProducerRecord = {
      topic,
      messages: [
        {
          key: 'my-key',
          value: JSON.stringify(message),
          headers: { 'content-type': 'application/json' },
        },
      ],
    };

    // Returns one RecordMetadata per message
    return this.kafkaServer.produce(record);
  }

  async sendBatch(topic: string, items: any[]): Promise<RecordMetadata[]> {
    return this.kafkaServer.produce({
      topic,
      messages: items.map((item, i) => ({
        key: String(i),
        value: JSON.stringify(item),
      })),
    });
  }
}

Using the Static Producer Helper

getProducer() returns a KafkaProducer object whose send method is already bound to the singleton. Use it when you want a typed producer reference without holding the full server instance:

import { CustomServerConfluentKafka, KafkaProducer } from '@eqxjs/kafka-server-confluent-kafka';

const producer: KafkaProducer = CustomServerConfluentKafka.getProducer();

// send is bound — safe to destructure and pass around
await producer.send({
  topic: 'user.created',
  messages: [
    { key: 'user-1', value: JSON.stringify({ id: 1 }) },
  ],
});

Note: getProducer() must be called after the server has been started (i.e. after app.listen() resolves). The singleton is set during construction, but the produce implementation requires a connected producer client.

Setting Delivery Callbacks

const kafkaServer = CustomServerConfluentKafka.getInstance();

kafkaServer.setSuccessCallback((err, report) => {
  console.log('Message delivered:', report);
});

kafkaServer.setErrorCallback((err, report) => {
  console.error('Delivery failed:', err.message);
});

Accessing Consumer and Producer Directly

const { consumer, producer } = kafkaServer.unwrap();
const metadata = await consumer.getMetadata({ timeout: 10000 });

Reading Current Partition Assignment

The current partition assignment is always available on the instance:

const server = CustomServerConfluentKafka.getInstance();

// Map of topic → partition[]
console.log(server.memberAssignment);
// { 'orders': [0, 1], 'events': [2] }

// Raw TopicPartition[] array
console.log(server.assignment);

Consumer Throughput Control

# Messages to fetch per consume() call, called once per second (default: 10)
KAFKA_CONSUME_MESSAGES_PER_INTERVAL=50

Re-exported Types

The package re-exports useful types from @confluentinc/kafka-javascript and its own type definitions:

import {
  // produce API (KafkaJS-style)
  ProducerRecord,
  ProducerMessage,
  KafkaProducer,
  RecordMetadata,
  // librdkafka types
  Producer,
  KafkaConsumer,
  LibrdKafkaError,
  TopicPartition,
  ConsumerGlobalConfig,
  ProducerGlobalConfig,
  Message,
  DeliveryReport,
} from '@eqxjs/kafka-server-confluent-kafka';

API Reference

CustomServerConfluentKafka

The main class implementing the Kafka transport strategy.

Constructor

new CustomServerConfluentKafka(
  options?: KafkaOptions,
  consumerConfig?: ConsumerGlobalConfig,
  producerConfig?: ProducerGlobalConfig,
  heapLimitPercent?: number,   // default: KAFKA_HEAP_LIMIT_PERCENT env var or 85
)

Methods

Lifecycle

| Method | Signature | Description | |--------|-----------|-------------| | listen | (callback) => Promise<void> | Start the Kafka server | | close | () => Promise<void> | Clear intervals, disconnect consumer and all producers | | start | (callback) => Promise<void> | Internal bootstrap — connects clients and binds events |

Consume Control

| Method | Signature | Description | |--------|-----------|-------------| | setConsumeInterval | () => Promise<void> | Start (or restart) the consume polling interval | | clearConsumeInterval | () => Promise<void> | Stop and null the consume interval | | clearTopicMonitorInterval | () => Promise<void> | Stop and null the topic monitor interval | | pauseConsumer | () => void | Manually pause consuming (clears interval) | | resumeConsumer | () => void | Manually resume consuming (restarts interval) |

Produce

| Method | Signature | Description | |--------|-----------|-------------| | produce | (record: ProducerRecord) => Promise<RecordMetadata[]> | Produce one or more messages — resolves with one RecordMetadata per message, mirroring the KafkaJS send() API | | sendMessage | (topic: string, message: Partial<Message>, timestamp?: number) => Promise<void> | Low-level fire-and-forget produce — delivery result handled asynchronously via deliveryReport | | setSuccessCallback | (cb) => void | Register a callback for successful delivery reports | | setErrorCallback | (cb) => void | Register a callback for failed delivery reports |

Topic & Subscription

| Method | Signature | Description | |--------|-----------|-------------| | setConsumeTopics | (topics: string[]) => void | Override the topic list to consume | | retrieveTopics | (log: boolean) => Promise<string[]> | Fetch matching topics from broker metadata |

Events & Rebalance

| Method | Signature | Description | |--------|-----------|-------------| | bindEvents | () => Promise<void> | Attach all consumer event listeners | | rb_callback | (err, assignment) => void | Rebalance callback — handles ASSIGN and REVOKE | | deliveryReport | (err?, report?) => Promise<void> | Handle a producer delivery report | | handleMessage | (payload: Message) => Promise<void> | Dispatch a consumed message to its registered handler |

Utility

| Method | Signature | Description | |--------|-----------|-------------| | unwrap<T> | () => T | Returns { consumer, producer } for direct librdkafka access | | isKafkaConnected | () => boolean | true when consumer and producer are both connected | | getInstance | static () => CustomServerConfluentKafka | Returns the singleton instance | | getProducer | static () => KafkaProducer | Convenience wrapper around produce — returns a KafkaProducer whose send is correctly bound to the singleton |

Properties

| Property | Type | Description | |----------|------|-------------| | memberAssignment | Record<string, number[]> | Current topic → partition list assignment | | assignment | TopicPartition[] | Raw current assignment array | | consumeInterval | NodeJS.Timeout | Reference to the active consume interval | | topicMonitorInterval | NodeJS.Timeout | Reference to the active topic monitor interval | | _consumeTopics | string[] | Topics derived from registered @EventPattern handlers | | instance | static CustomServerConfluentKafka | Singleton reference set on construction |

ProducerRecord, ProducerMessage and KafkaProducer

Types that mirror the KafkaJS producer API, accepted by produce() and getProducer().send().

type ProducerMessage = {
  key?: Buffer | string | null;      // message key; null for no key
  value: Buffer | string | null;     // payload; null produces a tombstone
  partition?: number;                // omit or -1 for auto-partitioning
  headers?: Record<string, string | Buffer>;
  timestamp?: string;                // ms since epoch as a string; defaults to Date.now()
};

type ProducerRecord = {
  topic: string;
  messages: ProducerMessage[];
};

type KafkaProducer = {
  send: (record: ProducerRecord) => Promise<RecordMetadata[]>;
};

getHeapUsage()utils/get-mem

import { getHeapUsage } from '@eqxjs/kafka-server-confluent-kafka/utils/get-mem';

const heap = getHeapUsage();
heap.usedPercent          // 87.3
heap.usedMB               // 694.1
heap.limitMB              // 796.0
heap.isOverLimit(85)      // true
heap.format()             // "87.350% (used=694.123 MB, limit=796.000 MB)"

parseEnvConfig() / parseEnvValue()utils/parse-env

import { parseEnvConfig, parseEnvValue } from '@eqxjs/kafka-server-confluent-kafka/utils/parse-env';

// Returns { consumer: Partial<ConsumerGlobalConfig>, producer: Partial<ProducerGlobalConfig> }
// populated from KAFKA_CONSUMER_* and KAFKA_PRODUCER_* env vars
const { consumer, producer } = parseEnvConfig();

// Coerces a raw string to the most specific primitive type
parseEnvValue("true")    // true  (boolean)
parseEnvValue("30000")   // 30000 (number)
parseEnvValue("snappy")  // "snappy" (string)

Configuration Details

Consumer Configuration

| Option | Type | Default | Description | |--------|------|---------|-------------| | groupId | string | "nestjs-kafka-consumer" | Consumer group ID — always provide a stable value | | sessionTimeout | number | — | Session timeout in milliseconds |

Producer Configuration

| Option | Type | Default | Description | |--------|------|---------|-------------| | retry.maxRetryTime | number | 2 | Maximum number of send retries |

Client Configuration

| Option | Type | Description | |--------|------|-------------| | clientId | string | Client identifier | | brokers | string[] | List of Kafka broker URLs | | sasl | object | SASL authentication configuration | | ssl | boolean | object | SSL/TLS configuration |

Features in Detail

Stable Consumer Group Identity

group.id is always written to consumerConfig — defaulting to "nestjs-kafka-consumer" when not provided. Additionally, group.instance.id defaults to os.hostname() if not already set, enabling static membership which reduces unnecessary rebalances on container restarts. You can override it by setting group.instance.id explicitly in your consumerConfig.

Connection Logging

On the ready event, the consumer and producer log a summary sourced directly from the broker response:

Kafka consumer connected — client=rdkafka#consumer-1, group.id=my-group, assignment.strategy=range, bootstrap.broker=kafka-1:9092, brokers=[kafka-1:9092, kafka-2:9092, kafka-3:9092]
Kafka producer connected — client=rdkafka#producer-1, bootstrap.broker=kafka-1:9092, brokers=[kafka-1:9092, kafka-2:9092, kafka-3:9092]

On disconnected, both clients log their connection uptime:

Kafka consumer disconnected — uptime=43205ms
Kafka producer disconnected — uptime=43210ms

On connection.failure, the error message and uptime are logged for both clients:

consumer: Kafka consumer connection failure — Broker transport failure, uptime=0ms
producer: Kafka connection failure — Broker transport failure, uptime=0ms

Rebalance Logging with Partition State

Every rebalance event logs the consumer's partition state before and after:

ASSIGN:

Rebalance ASSIGN [Assign] — group=my-group, strategy=range | assigned={"orders":[0,1]}
Rebalance ASSIGN — current assignment: {"orders":[0,1],"events":[0]}

REVOKE:

Rebalance REVOKE [Revoke] — group=my-group, strategy=range | before={"orders":[0,1],"events":[0]} | revoking={"events":[0]}
Rebalance REVOKE — remaining assignment: {"orders":[0,1]}

Rebalancing Support

Full support for both EAGER and COOPERATIVE protocols:

  • Eager (range, roundrobin): Full revoke and reassign — assign() / unassign() called with the complete new assignment
  • Cooperative (cooperative-sticky): Incremental delta — incrementalAssign() / incrementalUnassign() called with only the changed partitions; memberAssignment is updated by merging or filtering rather than replacing

Protocol is auto-detected from partition.assignment.strategy at construction time. All assign/unassign calls are guarded with isConnected() and wrapped in try/catch to prevent ERR__STATE crashes during shutdown.

Heap-based Back-pressure

Every second, before calling consumer.consume(), the server checks Node.js heap usage via v8.getHeapStatistics(). If used_heap_size / heap_size_limit exceeds heapLimitPercent, the consume tick is skipped. The consumer logs PAUSED on the first over-limit tick and RESUMED once it recovers, avoiding log spam.

Automatic Topic Monitoring

The server automatically monitors for:

  • New topics: Subscribes to newly created topics matching your handlers
  • Deleted topics: Unsubscribes from deleted topics and logs warnings

The monitor runs on a setInterval (stored in topicMonitorInterval) and logs at startup:

Topic monitor started — interval=300000ms

Set KAFKA_DISABLE_TOPIC_MONITOR=true to disable this behaviour. When disabled, the consumer subscribes only to topics available at connect time.

Configure the polling interval:

KAFKA_TOPIC_MONITOR_INTERVAL_MS=60000   # default: 300000 (5 minutes)

Error Handling

Comprehensive error handling for:

  • Connection failures (connection.failure) — logged with error message and uptime for consumer and producer
  • Topic/partition errors (ERR_NOT_LEADER_FOR_PARTITION, ERR__UNKNOWN_TOPIC, etc.) — triggers automatic topic list refresh and resubscription
  • Message delivery failures — routed to the optional error delivery callback
  • Rebalancing errors — logged with error code and message

Development

Build

npm run build

Format Code

npm run format

Testing

The test suite uses Jest with ts-jest and requires no running Kafka broker — all librdkafka clients are fully mocked.

# Run tests
npm test

# Run tests with coverage report
npm run test:coverage

Coverage is enforced at 100% for statements, branches, functions, and lines across all src/ files. The suite is located under test/ and is excluded from the published package via .npmignore.

| Test file | What it covers | |-----------|----------------| | test/kafka.server.spec.ts | Full CustomServerConfluentKafka class — constructor options, rebalance (eager & cooperative), event listeners, produce API, back-pressure, topic monitor, delivery reports, message handling | | test/get-mem.spec.ts | getHeapUsage() — all HeapUsage fields and methods | | test/parse-env.spec.ts | parseEnvValue() and parseEnvConfig() — all coercion branches and env var mapping |

Environment Variables

| Variable | Default | Description | |----------|---------|-------------| | KAFKA_CONSUME_INTERVAL_MS | 1000 | Consume polling interval in milliseconds | | KAFKA_CONSUME_MESSAGES_PER_INTERVAL | 10 | Messages fetched per consume tick | | KAFKA_HEAP_LIMIT_PERCENT | 85 | Heap usage % at which the consumer is paused (clamped 10–99) | | KAFKA_DISABLE_TOPIC_MONITOR | false | Set to true to disable automatic topic monitoring | | KAFKA_TOPIC_MONITOR_INTERVAL_MS | 300000 | Topic monitor polling interval in milliseconds (default: 5 minutes) |

Note: KAFKA_CONSUMER_* / KAFKA_PRODUCER_* env vars are applied last in the config merge and take final precedence over both consumerConfig/producerConfig constructor args and KafkaOptions. See Configuration Priority for the full order.

librdkafka Config via Environment Variables

Any ConsumerGlobalConfig or ProducerGlobalConfig key can be set via environment variables using the following convention:

| Prefix | Applies to | |--------|------------| | KAFKA_CONSUMER_ | ConsumerGlobalConfig | | KAFKA_PRODUCER_ | ProducerGlobalConfig |

Conversion rule: strip the prefix, lowercase the remainder, replace every _ with .

KAFKA_CONSUMER_GROUP_ID            → group.id
KAFKA_CONSUMER_SESSION_TIMEOUT_MS  → session.timeout.ms
KAFKA_CONSUMER_BOOTSTRAP_SERVERS   → bootstrap.servers
KAFKA_PRODUCER_COMPRESSION_TYPE    → compression.type
KAFKA_PRODUCER_LINGER_MS           → linger.ms

Env vars are applied last in the merge order — after consumerConfig/producerConfig args and after KafkaOptions — so they take final precedence over all code-level config.

# Example: configure consumer entirely from env
KAFKA_CONSUMER_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_CONSUMER_GROUP_ID=my-service
KAFKA_CONSUMER_SESSION_TIMEOUT_MS=30000
KAFKA_CONSUMER_ENABLE_AUTO_COMMIT=true
KAFKA_PRODUCER_COMPRESSION_TYPE=snappy
KAFKA_PRODUCER_LINGER_MS=10

Note: Values are automatically coerced — "true"/"false"boolean, numeric strings → number, everything else stays a string. Env vars have the highest config priority — they override both constructor args and KafkaOptions.

For a full list of all supported config keys and their defaults, see KAFKA-CONFIG.md.

Troubleshooting

Consumer behaves like a different consumer group on every restart

Caused by a missing or randomly-generated group.id. Since v2.1.0 the library always sets group.id from config (defaulting to "nestjs-kafka-consumer") so offsets are persisted across restarts. Always specify an explicit groupId for production deployments:

consumer: { groupId: 'my-service-orders-consumer' }

Local: Erroneous state (ERR__STATE) during rebalance

Error: Local: Erroneous state
    at KafkaConsumer.assign ...
    at CustomServerConfluentKafka.rb_callback ...

Happens when assign() / unassign() is called while the consumer is disconnecting. Fixed in v2.0.1 — both calls are guarded with isConnected() and wrapped in try/catch.

Consumer pausing unexpectedly

Check heap usage. If KAFKA_HEAP_LIMIT_PERCENT is set too low (or Node.js heap is genuinely exhausted), the consumer will pause. Look for:

Consumer PAUSED — heap usage 87.350% (used=694.123 MB, limit=796.000 MB) exceeds limit 85%

Increase the threshold or add --max-old-space-size to Node.js flags to raise the heap ceiling.

Consumer not receiving messages

  1. Ensure groupId is stable and not changing between restarts
  2. Verify topic names match your @EventPattern decorators
  3. Ensure brokers are reachable
  4. Check consumer logs for rebalancing or connection issues

Messages not being produced

  1. Verify producer is connected (check logs for "ready" event)
  2. Check delivery report callbacks for errors
  3. Ensure topics exist on the Kafka cluster
  4. Verify authentication credentials if using SASL

Connection issues

  1. Verify broker URLs are correct and reachable
  2. Check firewall rules
  3. Verify SSL/TLS certificates if using encrypted connections
  4. Check SASL credentials if using authentication

Changelog

See CHANGELOG.md for the full version history.

Related Projects

License

ISC

Author

Atit Plangson

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.