npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@eqxjs/kafka-server-confluent-kafkajs

v1.2.0

Published

Kafka for nestjs microservices based on @confluentinc/kafka-javascript KafkaJS interface

Readme

@eqxjs/kafka-server-confluent-kafkajs

NestJS custom transport strategy for Apache Kafka that uses the KafkaJS-compatible async API of @confluentinc/kafka-javascript — a drop-in replacement for the built-in NestJS Kafka microservice.

Features

  • Fully async/await-based connect / disconnect (no callback events)
  • Producer send() returns Promise<RecordMetadata[]> — no delivery-report polling
  • Consumer uses consumer.run({ eachMessage }) — no manual poll loop
  • Heap-based back-pressure — pauses consumption when heap exceeds a configurable limit, resumes automatically when it recovers
  • Automatic topic-change monitoring — re-subscribes the consumer when topics are added or removed on the broker
  • SASL/SSL support via librdkafka ConsumerConstructorConfig / ProducerConstructorConfig constructor arguments or environment variables
  • NestJS Logger injected into the KafkaJS client for unified log output
  • Singleton accessor (CustomServerConfluentKafkaJS.getInstance()) and static producer accessor (CustomServerConfluentKafkaJS.getProducer())

Installation

npm install @eqxjs/kafka-server-confluent-kafkajs

@nestjs/common and @nestjs/microservices are included as dependencies.


Quick start

// main.ts
import { NestFactory } from "@nestjs/core";
import { CustomServerConfluentKafkaJS } from "@eqxjs/kafka-server-confluent-kafkajs";
import { AppModule } from "./app.module";

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    strategy: new CustomServerConfluentKafkaJS({
      "bootstrap.servers": "localhost:9092",
      "group.id": "my-consumer-group",
      "client.id": "my-service",
    }),
  });

  await app.listen();
}
bootstrap();

Or configure entirely via environment variables — no constructor arguments needed:

# .env
KAFKA_CONSUMER_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_CONSUMER_GROUP_ID=my-consumer-group
KAFKA_CONSUMER_CLIENT_ID=my-service

KAFKA_PRODUCER_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_PRODUCER_COMPRESSION_TYPE=snappy
KAFKA_PRODUCER_RETRIES=3
KAFKA_PRODUCER_LINGER_MS=10
// main.ts
const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new CustomServerConfluentKafkaJS(),
});
await app.listen();

Handle messages in a NestJS controller using the standard @EventPattern decorator:

import { Controller } from "@nestjs/common";
import { EventPattern, Payload } from "@nestjs/microservices";

@Controller()
export class AppController {
  @EventPattern("user.created")
  async handleUserCreated(@Payload() data: unknown) {
    console.log("Received:", data);
  }
}

Produce messages from anywhere using the static accessor:

import { CustomServerConfluentKafkaJS } from "@eqxjs/kafka-server-confluent-kafkajs";

const results = await CustomServerConfluentKafkaJS.getProducer().send({
  topic: "user.created",
  messages: [{ key: "u-1", value: JSON.stringify({ id: 1 }) }],
});

Constructor

new CustomServerConfluentKafkaJS(
  consumerConfig?,   // ConsumerConstructorConfig (librdkafka dot-notation keys)
  producerConfig?,   // ProducerConstructorConfig (librdkafka dot-notation keys)
  heapLimitPercent?, // number — default 85
)

| Parameter | Type | Default | Description | | ------------------ | --------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------- | | consumerConfig | ConsumerConstructorConfig | {} | librdkafka consumer config using dot-notation keys (e.g. "bootstrap.servers", "group.id"). Merged on top of env-derived settings. | | producerConfig | ProducerConstructorConfig | {} | librdkafka producer config using dot-notation keys. | | heapLimitPercent | number | 85 | Heap usage percentage (10–99) above which consumption is paused. Also read from KAFKA_HEAP_LIMIT_PERCENT. |

Configuration priority (highest → lowest):

  1. KAFKA_CONSUMER_* / KAFKA_PRODUCER_* environment variables
  2. Explicit consumerConfig / producerConfig constructor arguments

Environment variables

All variables are optional. When set they override any value provided via the constructor.

KAFKA_CONSUMER_* variables are mapped to librdkafka consumer config keys by stripping the KAFKA_CONSUMER_ prefix, lowercasing, and replacing _ with .
(e.g. KAFKA_CONSUMER_GROUP_IDgroup.id, KAFKA_CONSUMER_BOOTSTRAP_SERVERSbootstrap.servers).

KAFKA_PRODUCER_* variables follow the same rule for the producer config.

Values "true" / "false" are coerced to booleans; numeric strings are coerced to numbers.

Consumer

| Variable | librdkafka key | Notes | | ---------------------------------------- | --------------------------- | ------------------------------------------------------ | | KAFKA_CONSUMER_BOOTSTRAP_SERVERS | bootstrap.servers | Comma-separated list, e.g. broker1:9092,broker2:9092 | | KAFKA_CONSUMER_GROUP_ID | group.id | | | KAFKA_CONSUMER_CLIENT_ID | client.id | | | KAFKA_CONSUMER_SESSION_TIMEOUT_MS | session.timeout.ms | Milliseconds | | KAFKA_CONSUMER_HEARTBEAT_INTERVAL_MS | heartbeat.interval.ms | Milliseconds | | KAFKA_CONSUMER_MAX_BYTES_PER_PARTITION | max.partition.fetch.bytes | | | KAFKA_CONSUMER_FETCH_MAX_BYTES | fetch.max.bytes | | | KAFKA_CONSUMER_FETCH_WAIT_MAX_MS | fetch.wait.max.ms | Milliseconds | | KAFKA_CONSUMER_ENABLE_AUTO_COMMIT | enable.auto.commit | "true" or "false" | | KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL_MS | auto.commit.interval.ms | Milliseconds |

Producer

| Variable | librdkafka key | Notes | | --------------------------------------- | ------------------------ | --------------------- | | KAFKA_PRODUCER_BOOTSTRAP_SERVERS | bootstrap.servers | | | KAFKA_PRODUCER_ENABLE_IDEMPOTENCE | enable.idempotence | "true" or "false" | | KAFKA_PRODUCER_TRANSACTION_TIMEOUT_MS | transaction.timeout.ms | Milliseconds |

Any other librdkafka key can be set by following the same KAFKA_CONSUMER_ / KAFKA_PRODUCER_ naming convention.

Runtime behaviour

| Variable | Default | Description | | --------------------------------- | -------- | -------------------------------------------- | | KAFKA_HEAP_LIMIT_PERCENT | 85 | Heap % threshold for back-pressure pause | | KAFKA_CONSUME_INTERVAL_MS | 1000 | Heap-check interval in milliseconds | | KAFKA_TOPIC_MONITOR_INTERVAL_MS | 300000 | Topic change poll interval in milliseconds | | KAFKA_DISABLE_TOPIC_MONITOR | — | Set to "true" to disable the topic monitor |


SASL / SSL

Pass SASL and SSL credentials via the librdkafka ConsumerConstructorConfig / ProducerConstructorConfig constructor arguments, or via the equivalent KAFKA_CONSUMER_* / KAFKA_PRODUCER_* environment variables:

new CustomServerConfluentKafkaJS({
  "bootstrap.servers": "pkc-xxx.confluent.cloud:9092",
  "client.id": "my-service",
  "group.id": "my-group",
  "security.protocol": "sasl_ssl",
  "sasl.mechanisms": "PLAIN",
  "sasl.username": "API_KEY",
  "sasl.password": "API_SECRET",
});

Equivalent via environment variables:

KAFKA_CONSUMER_BOOTSTRAP_SERVERS=pkc-xxx.confluent.cloud:9092
KAFKA_CONSUMER_SECURITY_PROTOCOL=sasl_ssl
KAFKA_CONSUMER_SASL_MECHANISMS=PLAIN
KAFKA_CONSUMER_SASL_USERNAME=API_KEY
KAFKA_CONSUMER_SASL_PASSWORD=API_SECRET

Public API

listen(callback) / close()

NestJS lifecycle hooks — called automatically by the framework.

start(callback)

Connects all Kafka clients concurrently, subscribes to discovered topics, and starts the consume loop. Called internally by listen().

produce(record): Promise<RecordMetadata[]>

Send one or more messages via the producer.

await server.produce({
  topic: "order.placed",
  messages: [{ key: "o-1", value: JSON.stringify({ orderId: 1 }) }],
});

static getProducer(): Producer

Returns the raw KafkaJS Producer instance. Throws if the server has not been started.

const producer = CustomServerConfluentKafkaJS.getProducer();
await producer.send({ topic: "...", messages: [...] });

static getInstance(): CustomServerConfluentKafkaJS

Returns the singleton created by the last new CustomServerConfluentKafkaJS() call.

unwrap<T>(): T

Returns { consumer, producer } after the server has been started.

const { consumer, producer } = server.unwrap<{
  consumer: Consumer;
  producer: Producer;
}>();

isKafkaConnected(): boolean

Returns true after all clients have connected successfully.

pauseConsumer() / resumeConsumer()

Manually pause or resume message consumption independent of heap back-pressure.

setConsumeTopics(topics: string[])

Override the topic list at runtime without restarting the server.

retrieveTopics(log: boolean): Promise<string[]>

Returns the intersection of registered handler patterns and topics currently on the broker.

setHeapCheckInterval() / clearHeapCheckInterval()

Create or clear the interval that checks heap pressure. Called automatically by start().

clearTopicMonitorInterval()

Clear the topic-change monitoring interval.


Back-pressure

Every KAFKA_CONSUME_INTERVAL_MS (default 1 s) the server reads the V8 heap snapshot:

  • If heap usage heapLimitPercent% and the consumer is not yet paused → consumer.pause() is called on all assigned partitions.
  • Once heap usage drops below the thresholdconsumer.resume() is called on all paused partitions.

This prevents OOM conditions under heavy message load without any manual tuning.


Logger

The NestJS Logger instance is injected into the KafkaJS client at construction time. All internal KafkaJS log output (connection events, metadata refreshes, etc.) flows through the standard NestJS logging pipeline and respects the application log level.


License

ISC