npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@eqxjs/kafka-server-node-rdkafka

v2.0.8

Published

Kafka for nestjs microservices

Downloads

130

Readme

@eqxjs/kafka-server-node-rdkafka

A custom Kafka transport strategy for NestJS microservices using node-rdkafka (via @confluentinc/kafka-javascript). This package provides a robust, production-ready Kafka integration with advanced features like automatic topic monitoring, rebalancing support, and flexible configuration.

Features

  • 🚀 Full NestJS Microservices Integration - Drop-in replacement for standard Kafka transport
  • 🔄 Automatic Topic Monitoring - Detects new topics and handles topic deletions dynamically
  • ⚖️ Consumer Group Rebalancing - Built-in rebalancing callbacks for partition management
  • 🔐 Authentication Support - SASL/PLAIN, SASL/SCRAM, and SSL/TLS configurations
  • 📊 Delivery Reports - Track message delivery success and failures with callbacks
  • 💪 High Performance - Uses native node-rdkafka under the hood
  • 🛠️ Flexible Configuration - Support for both high-level and low-level producers
  • 📝 TypeScript Support - Fully typed with TypeScript definitions

Installation

npm install @eqxjs/kafka-server-node-rdkafka

Peer Dependencies

This package requires the following peer dependencies:

npm install @nestjs/common @nestjs/microservices

Usage

Basic Setup

1. Create a Kafka Server in your NestJS Application

import { NestFactory } from '@nestjs/core';
import { CustomServerRdKafka } from '@eqxjs/kafka-server-node-rdkafka';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    strategy: new CustomServerRdKafka({
      options: {
        client: {
          clientId: 'my-app',
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'my-consumer-group',
        },
      },
    }),
  });

  await app.listen();
}
bootstrap();

2. Create Message Handlers

import { Controller } from '@nestjs/common';
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { KafkaContext } from '@eqxjs/kafka-server-node-rdkafka';

@Controller()
export class AppController {
  @EventPattern('user.created')
  async handleUserCreated(
    @Payload() data: any,
    @Ctx() context: KafkaContext,
  ) {
    console.log('Received message:', data);
    console.log('Topic:', context.getTopic());
    console.log('Partition:', context.getPartition());
    console.log('Offset:', context.getMessage().offset);
  }
}

Configuration Options

With SASL Authentication

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new CustomServerRdKafka({
    options: {
      client: {
        clientId: 'my-app',
        brokers: ['kafka.example.com:9092'],
        sasl: {
          mechanism: 'plain', // or 'scram-sha-256', 'scram-sha-512'
          username: 'my-username',
          password: 'my-password',
        },
      },
      consumer: {
        groupId: 'my-consumer-group',
        sessionTimeout: 30000,
      },
      producer: {
        retry: {
          maxRetryTime: 3,
        },
      },
    },
  }),
});

With SSL/TLS

import * as fs from 'fs';

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new CustomServerRdKafka({
    options: {
      client: {
        clientId: 'my-app',
        brokers: ['kafka.example.com:9093'],
        ssl: {
          ca: fs.readFileSync('./ca-cert.pem'),
          cert: fs.readFileSync('./client-cert.pem'),
          key: fs.readFileSync('./client-key.pem'),
        },
      },
      consumer: {
        groupId: 'my-consumer-group',
      },
    },
  }),
});

With SASL + SSL

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new CustomServerRdKafka({
    options: {
      client: {
        clientId: 'my-app',
        brokers: ['kafka.example.com:9093'],
        sasl: {
          mechanism: 'scram-sha-256',
          username: 'my-username',
          password: 'my-password',
        },
        ssl: true, // or provide SSL certificates as object
      },
      consumer: {
        groupId: 'my-consumer-group',
      },
    },
  }),
});

Using Native ConsumerGlobalConfig and ProducerGlobalConfig

For advanced use cases, you can pass native node-rdkafka configurations directly:

import { 
  CustomServerRdKafka, 
  ConsumerGlobalConfig, 
  ProducerGlobalConfig 
} from '@eqxjs/kafka-server-node-rdkafka';

const consumerConfig: ConsumerGlobalConfig = {
  'bootstrap.servers': 'localhost:9092',
  'group.id': 'my-consumer-group',
  'client.id': 'my-client',
  'enable.auto.commit': true,
  'auto.commit.interval.ms': 5000,
  'session.timeout.ms': 30000,
  'max.poll.interval.ms': 300000,
  'fetch.min.bytes': 1,
  'fetch.wait.max.ms': 100,
  'heartbeat.interval.ms': 3000,
  'partition.assignment.strategy': 'range,roundrobin',
};

const producerConfig: ProducerGlobalConfig = {
  'bootstrap.servers': 'localhost:9092',
  'client.id': 'my-client',
  'compression.type': 'snappy',
  'acks': -1, // all replicas
  'retries': 3,
  'max.in.flight.requests.per.connection': 5,
  'linger.ms': 10,
  'batch.size': 16384,
  'request.timeout.ms': 30000,
};

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new CustomServerRdKafka(
    {
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
      },
    },
    consumerConfig,
    producerConfig,
  ),
});

Note: When using native configs, they will be merged with/override options from the first parameter. This approach gives you full access to all librdkafka configuration options. See the librdkafka configuration documentation for all available options.

Advanced Usage

Producing Messages

import { Injectable } from '@nestjs/common';
import { CustomServerRdKafka, Message } from '@eqxjs/kafka-server-node-rdkafka';

@Injectable()
export class KafkaProducerService {
  private kafkaServer: CustomServerRdKafka;

  constructor() {
    this.kafkaServer = CustomServerRdKafka.getInstance();
  }

  async sendMessage(topic: string, message: any) {
    const kafkaMessage: Message = {
      value: Buffer.from(JSON.stringify(message)),
      key: 'my-key',
      partition: 0, // optional, -1 for auto-assignment
      headers: {
        'content-type': 'application/json',
      },
    };

    await this.kafkaServer.produce(topic, kafkaMessage);
  }
}

Setting Delivery Callbacks

const kafkaServer = CustomServerRdKafka.getInstance();

// Success callback
kafkaServer.setSuccessCallback((err, report) => {
  console.log('Message delivered successfully:', report);
});

// Error callback
kafkaServer.setErrorCallback((err, report) => {
  console.error('Message delivery failed:', err.message);
});

Accessing Consumer and Producer Directly

const { consumer, producer } = kafkaServer.unwrap();

// Now you can use native node-rdkafka methods
const metadata = await consumer.getMetadata({ timeout: 10000 });

Consumer Throughput Control

Control the rate at which messages are consumed:

# Set environment variable to limit consumer throughput
export KAFKA_CONSUMER_TPS_LIMIT=50  # messages per second (default: 10)

Re-exported Types

The package re-exports useful types from @confluentinc/kafka-javascript:

import {
  Producer,
  KafkaConsumer,
  LibrdKafkaError,
  TopicPartition,
  ConsumerGlobalConfig,
  ProducerGlobalConfig,
  Message,
  DeliveryReport,
} from '@eqxjs/kafka-server-node-rdkafka';

API Reference

CustomServerRdKafka

The main class implementing the Kafka transport strategy.

Methods

  • listen(callback) - Start the Kafka server
  • close() - Disconnect consumer and producers
  • produce(topic, message) - Produce a message using the high-level producer (returns Promise)
  • sendMessage(topic, message) - Send a message using the standard producer (fire-and-forget)
  • setSuccessCallback(callback) - Set callback for successful message delivery
  • setErrorCallback(callback) - Set callback for failed message delivery
  • setConsumeTopics(topics) - Manually set topics to consume
  • unwrap() - Get access to underlying consumer and producer instances
  • getInstance() - Static method to get the singleton instance

Properties

  • memberAssignment - Current partition assignments per topic
  • assignment - Current topic-partition assignments array

Configuration Details

Consumer Configuration

| Option | Type | Default | Description | |--------|------|---------|-------------| | groupId | string | - | Consumer group ID (required) | | sessionTimeout | number | - | Session timeout in milliseconds |

Producer Configuration

| Option | Type | Default | Description | |--------|------|---------|-------------| | retry.maxRetryTime | number | 2 | Maximum number of retries |

Client Configuration

| Option | Type | Description | |--------|------|-------------| | clientId | string | Client identifier | | brokers | string[] | List of Kafka broker URLs | | sasl | object | SASL authentication configuration | | ssl | boolean | object | SSL/TLS configuration |

Features in Detail

Automatic Topic Monitoring

The server automatically monitors for:

  • New topics: Subscribes to newly created topics matching your handlers
  • Deleted topics: Unsubscribes from deleted topics and logs warnings
  • Check interval: 5 minutes (300 seconds)

Rebalancing Support

Built-in rebalancing callback handles:

  • Partition assignment changes
  • Consumer group coordination
  • Graceful partition revocation
  • Safe assign/unassignconsumer.assign() and consumer.unassign() are guarded with isConnected() checks and wrapped in try/catch to avoid the ERR__STATE ("Local: Erroneous state") crash that occurs when a rebalance fires during shutdown or reconnection

Error Handling

Comprehensive error handling for:

  • Connection failures
  • Topic/partition errors
  • Message delivery failures
  • Rebalancing errors

Development

Build

npm run build

Format Code

npm run format

Testing

npm test

Environment Variables

  • KAFKA_CONSUMER_TPS_LIMIT - Messages to consume per second (default: 10)

License

ISC

Author

Atit Plangson

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Troubleshooting

Local: Erroneous state (ERR__STATE) during rebalance

This error surfaces as:

Error: Local: Erroneous state
    at KafkaConsumer.assign ...
    at CustomServerRdKafka.rb_callback ...

It happens when consumer.assign() or consumer.unassign() is called from the rebalance callback while the consumer is disconnecting or reconnecting. The fix introduced in v2.0.1 wraps both calls with an isConnected() guard and a try/catch, so the rebalance is silently skipped instead of crashing the process.

If you are on an older version, upgrade:

npm install @eqxjs/kafka-server-node-rdkafka@latest

Consumer not receiving messages

  1. Check that your consumer group ID is set correctly
  2. Verify topic names match your @EventPattern decorators
  3. Ensure brokers are reachable
  4. Check consumer logs for rebalancing or connection issues

Messages not being produced

  1. Verify producer is connected (check logs for "ready" event)
  2. Check delivery report callbacks for errors
  3. Ensure topics exist on the Kafka cluster
  4. Verify authentication credentials if using SASL

Connection issues

  1. Verify broker URLs are correct and reachable
  2. Check firewall rules
  3. Verify SSL/TLS certificates if using encrypted connections
  4. Check SASL credentials if using authentication

Changelog

v2.0.2

  • Fix: rb_callback now supports both EAGER and COOPERATIVE rebalance protocols. When partition.assignment.strategy is set to cooperative-sticky (or any cooperative variant), the callback automatically detects the protocol on the first rebalance event and switches to incremental_assign(assignment) / incremental_unassign(assignment) for all subsequent rebalances. The protocol is auto-detected at runtime — no configuration change required.

v2.0.1

  • Fix: consumer.assign() / consumer.unassign() now guarded with isConnected() and wrapped in try/catch inside rb_callback to prevent "Local: Erroneous state" (ERR__STATE) crash during partition rebalance when the consumer is in a transitional (disconnecting / reconnecting) state.

Related Projects