@eqxjs/kafka-server-confluent-kafkajs
v1.2.0
Published
Kafka for nestjs microservices based on @confluentinc/kafka-javascript KafkaJS interface
Keywords
Readme
@eqxjs/kafka-server-confluent-kafkajs
NestJS custom transport strategy for Apache Kafka that uses the KafkaJS-compatible async API of @confluentinc/kafka-javascript — a drop-in replacement for the built-in NestJS Kafka microservice.
Features
- Fully
async/await-based connect / disconnect (no callback events) - Producer
send()returnsPromise<RecordMetadata[]>— no delivery-report polling - Consumer uses
consumer.run({ eachMessage })— no manual poll loop - Heap-based back-pressure — pauses consumption when heap exceeds a configurable limit, resumes automatically when it recovers
- Automatic topic-change monitoring — re-subscribes the consumer when topics are added or removed on the broker
- SASL/SSL support via librdkafka
ConsumerConstructorConfig/ProducerConstructorConfigconstructor arguments or environment variables - NestJS
Loggerinjected into the KafkaJS client for unified log output - Singleton accessor (
CustomServerConfluentKafkaJS.getInstance()) and static producer accessor (CustomServerConfluentKafkaJS.getProducer())
Installation
npm install @eqxjs/kafka-server-confluent-kafkajs@nestjs/common and @nestjs/microservices are included as dependencies.
Quick start
// main.ts
import { NestFactory } from "@nestjs/core";
import { CustomServerConfluentKafkaJS } from "@eqxjs/kafka-server-confluent-kafkajs";
import { AppModule } from "./app.module";
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerConfluentKafkaJS({
"bootstrap.servers": "localhost:9092",
"group.id": "my-consumer-group",
"client.id": "my-service",
}),
});
await app.listen();
}
bootstrap();Or configure entirely via environment variables — no constructor arguments needed:
# .env
KAFKA_CONSUMER_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_CONSUMER_GROUP_ID=my-consumer-group
KAFKA_CONSUMER_CLIENT_ID=my-service
KAFKA_PRODUCER_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_PRODUCER_COMPRESSION_TYPE=snappy
KAFKA_PRODUCER_RETRIES=3
KAFKA_PRODUCER_LINGER_MS=10// main.ts
const app = await NestFactory.createMicroservice(AppModule, {
strategy: new CustomServerConfluentKafkaJS(),
});
await app.listen();Handle messages in a NestJS controller using the standard @EventPattern decorator:
import { Controller } from "@nestjs/common";
import { EventPattern, Payload } from "@nestjs/microservices";
@Controller()
export class AppController {
@EventPattern("user.created")
async handleUserCreated(@Payload() data: unknown) {
console.log("Received:", data);
}
}Produce messages from anywhere using the static accessor:
import { CustomServerConfluentKafkaJS } from "@eqxjs/kafka-server-confluent-kafkajs";
const results = await CustomServerConfluentKafkaJS.getProducer().send({
topic: "user.created",
messages: [{ key: "u-1", value: JSON.stringify({ id: 1 }) }],
});Constructor
new CustomServerConfluentKafkaJS(
consumerConfig?, // ConsumerConstructorConfig (librdkafka dot-notation keys)
producerConfig?, // ProducerConstructorConfig (librdkafka dot-notation keys)
heapLimitPercent?, // number — default 85
)| Parameter | Type | Default | Description |
| ------------------ | --------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------- |
| consumerConfig | ConsumerConstructorConfig | {} | librdkafka consumer config using dot-notation keys (e.g. "bootstrap.servers", "group.id"). Merged on top of env-derived settings. |
| producerConfig | ProducerConstructorConfig | {} | librdkafka producer config using dot-notation keys. |
| heapLimitPercent | number | 85 | Heap usage percentage (10–99) above which consumption is paused. Also read from KAFKA_HEAP_LIMIT_PERCENT. |
Configuration priority (highest → lowest):
KAFKA_CONSUMER_*/KAFKA_PRODUCER_*environment variables- Explicit
consumerConfig/producerConfigconstructor arguments
Environment variables
All variables are optional. When set they override any value provided via the constructor.
KAFKA_CONSUMER_* variables are mapped to librdkafka consumer config keys by stripping the KAFKA_CONSUMER_ prefix, lowercasing, and replacing _ with .
(e.g. KAFKA_CONSUMER_GROUP_ID → group.id, KAFKA_CONSUMER_BOOTSTRAP_SERVERS → bootstrap.servers).
KAFKA_PRODUCER_* variables follow the same rule for the producer config.
Values "true" / "false" are coerced to booleans; numeric strings are coerced to numbers.
Consumer
| Variable | librdkafka key | Notes |
| ---------------------------------------- | --------------------------- | ------------------------------------------------------ |
| KAFKA_CONSUMER_BOOTSTRAP_SERVERS | bootstrap.servers | Comma-separated list, e.g. broker1:9092,broker2:9092 |
| KAFKA_CONSUMER_GROUP_ID | group.id | |
| KAFKA_CONSUMER_CLIENT_ID | client.id | |
| KAFKA_CONSUMER_SESSION_TIMEOUT_MS | session.timeout.ms | Milliseconds |
| KAFKA_CONSUMER_HEARTBEAT_INTERVAL_MS | heartbeat.interval.ms | Milliseconds |
| KAFKA_CONSUMER_MAX_BYTES_PER_PARTITION | max.partition.fetch.bytes | |
| KAFKA_CONSUMER_FETCH_MAX_BYTES | fetch.max.bytes | |
| KAFKA_CONSUMER_FETCH_WAIT_MAX_MS | fetch.wait.max.ms | Milliseconds |
| KAFKA_CONSUMER_ENABLE_AUTO_COMMIT | enable.auto.commit | "true" or "false" |
| KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL_MS | auto.commit.interval.ms | Milliseconds |
Producer
| Variable | librdkafka key | Notes |
| --------------------------------------- | ------------------------ | --------------------- |
| KAFKA_PRODUCER_BOOTSTRAP_SERVERS | bootstrap.servers | |
| KAFKA_PRODUCER_ENABLE_IDEMPOTENCE | enable.idempotence | "true" or "false" |
| KAFKA_PRODUCER_TRANSACTION_TIMEOUT_MS | transaction.timeout.ms | Milliseconds |
Any other librdkafka key can be set by following the same KAFKA_CONSUMER_ / KAFKA_PRODUCER_ naming convention.
Runtime behaviour
| Variable | Default | Description |
| --------------------------------- | -------- | -------------------------------------------- |
| KAFKA_HEAP_LIMIT_PERCENT | 85 | Heap % threshold for back-pressure pause |
| KAFKA_CONSUME_INTERVAL_MS | 1000 | Heap-check interval in milliseconds |
| KAFKA_TOPIC_MONITOR_INTERVAL_MS | 300000 | Topic change poll interval in milliseconds |
| KAFKA_DISABLE_TOPIC_MONITOR | — | Set to "true" to disable the topic monitor |
SASL / SSL
Pass SASL and SSL credentials via the librdkafka ConsumerConstructorConfig / ProducerConstructorConfig constructor arguments, or via the equivalent KAFKA_CONSUMER_* / KAFKA_PRODUCER_* environment variables:
new CustomServerConfluentKafkaJS({
"bootstrap.servers": "pkc-xxx.confluent.cloud:9092",
"client.id": "my-service",
"group.id": "my-group",
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "PLAIN",
"sasl.username": "API_KEY",
"sasl.password": "API_SECRET",
});Equivalent via environment variables:
KAFKA_CONSUMER_BOOTSTRAP_SERVERS=pkc-xxx.confluent.cloud:9092
KAFKA_CONSUMER_SECURITY_PROTOCOL=sasl_ssl
KAFKA_CONSUMER_SASL_MECHANISMS=PLAIN
KAFKA_CONSUMER_SASL_USERNAME=API_KEY
KAFKA_CONSUMER_SASL_PASSWORD=API_SECRETPublic API
listen(callback) / close()
NestJS lifecycle hooks — called automatically by the framework.
start(callback)
Connects all Kafka clients concurrently, subscribes to discovered topics, and starts the consume loop. Called internally by listen().
produce(record): Promise<RecordMetadata[]>
Send one or more messages via the producer.
await server.produce({
topic: "order.placed",
messages: [{ key: "o-1", value: JSON.stringify({ orderId: 1 }) }],
});static getProducer(): Producer
Returns the raw KafkaJS Producer instance. Throws if the server has not been started.
const producer = CustomServerConfluentKafkaJS.getProducer();
await producer.send({ topic: "...", messages: [...] });static getInstance(): CustomServerConfluentKafkaJS
Returns the singleton created by the last new CustomServerConfluentKafkaJS() call.
unwrap<T>(): T
Returns { consumer, producer } after the server has been started.
const { consumer, producer } = server.unwrap<{
consumer: Consumer;
producer: Producer;
}>();isKafkaConnected(): boolean
Returns true after all clients have connected successfully.
pauseConsumer() / resumeConsumer()
Manually pause or resume message consumption independent of heap back-pressure.
setConsumeTopics(topics: string[])
Override the topic list at runtime without restarting the server.
retrieveTopics(log: boolean): Promise<string[]>
Returns the intersection of registered handler patterns and topics currently on the broker.
setHeapCheckInterval() / clearHeapCheckInterval()
Create or clear the interval that checks heap pressure. Called automatically by start().
clearTopicMonitorInterval()
Clear the topic-change monitoring interval.
Back-pressure
Every KAFKA_CONSUME_INTERVAL_MS (default 1 s) the server reads the V8 heap snapshot:
- If heap usage ≥
heapLimitPercent% and the consumer is not yet paused →consumer.pause()is called on all assigned partitions. - Once heap usage drops below the threshold →
consumer.resume()is called on all paused partitions.
This prevents OOM conditions under heavy message load without any manual tuning.
Logger
The NestJS Logger instance is injected into the KafkaJS client at construction time. All internal KafkaJS log output (connection events, metadata refreshes, etc.) flows through the standard NestJS logging pipeline and respects the application log level.
License
ISC
