kafka-crab-js
v4.1.1
Published
Kafka Crab JS is a native Kafka client for Node.js and TypeScript. It uses Rust, NAPI-RS, and librdkafka to reduce JavaScript heap pressure, expose Kafka's mature native client behavior, and push high-throughput consumer workloads through a small TypeScri
Maintainers
Readme
Kafka Crab JS
Kafka Crab JS is a native Kafka client for Node.js and TypeScript. It uses Rust, NAPI-RS, and librdkafka to reduce JavaScript heap pressure, expose Kafka's mature native client behavior, and push high-throughput consumer workloads through a small TypeScript-friendly API.
Version 4
kafka-crab-js v4 is the current stable release line. It focuses on high-throughput consumers, lower JavaScript heap
pressure, and a cleaner monorepo release model under flash-tecnologia/crab-js.
The main v4 additions are:
- Native Web Stream consumers through
client.createWebStreamConsumer(), with explicitserialandbatchmodes. - Direct batch consumption through
consumer.recvBatch(size, timeoutMs)for workers that can process chunks. - Compact native batch streaming internally, reducing native-to-JavaScript marshalling overhead before expanding to the
public
Message[]API. - Serial Web Stream prefetching, so message-by-message code still benefits from small native batches.
- Diagnostics-channel events in the core package, with OpenTelemetry tracing and metrics moved to the optional
kafka-crab-js-otelpackage. - Updated TypeScript exports where runtime enum-like values are now string literal types, avoiding missing runtime exports in ESM/CommonJS consumers.
For production adoption, validate v4 with your broker settings, payload sizes, partition count, commit strategy, and shutdown path before replacing an existing Kafka client.
Why Kafka Crab JS?
KafkaJS is a strong default for many Node.js services. Kafka Crab JS is for the cases where Kafka throughput, memory pressure, native batching, or librdkafka behavior start to matter.
| Need | What Kafka Crab JS Provides |
| ----------------------------------- | ---------------------------------------------------------------------------------------- |
| Higher consumer throughput | Native batch receive paths and Web Stream batch consumers. |
| Lower JavaScript heap pressure | Message fetch and Kafka protocol work happen in Rust/librdkafka before crossing into JS. |
| Predictable Kafka offset handling | commitMessage() commits message.offset + 1, the value Kafka expects. |
| Production Kafka tuning | Advanced librdkafka settings are passed through with their original names. |
| Node.js ecosystem integration | Direct APIs, Node.js Readable streams, native Web Streams, and CommonJS/ESM exports. |
| Observability without core coupling | Diagnostics-channel events power the optional kafka-crab-js-otel package. |
In the local isolated consumer benchmark snapshot, kafka-crab-js v4 (stream, batch) reached about
1.17M messages/sec, ahead of the measured @platformatic/kafka and KafkaJS batch scenarios, while using lower RSS
delta and peak heap than both. Treat benchmark numbers as workload-specific; the full methodology and memory/GC
breakdown are documented in BENCHMARKS.md and Performance Benchmarks.
Highlights
- Native librdkafka client exposed through a TypeScript-friendly API.
- Producer, direct consumer, Node.js
Readablestream, and v4 native Web Stream consumer APIs. - High-throughput batch receive APIs for workloads that can process more than one message at a time.
- Compact batch streaming and serial prefetching to reduce native boundary crossings.
- Manual commit helpers that commit
message.offset + 1correctly. - Optional diagnostics-channel instrumentation for OpenTelemetry through
kafka-crab-js-otel. - Advanced librdkafka settings are forwarded through
configurationwithout forcing a custom allowlist.
Requirements
- Node.js
>= 22. - A Kafka broker reachable from the Node.js process.
- No separate librdkafka install is required for the published binaries.
Table Of Contents
- Version 4
- Installation
- Module Usage
- Quick Start
- Message Model
- Choosing A Consumer API
- Producer API
- Consumer API
- Stream Consumers
- Batching, Backpressure, And Tuning
- Configuration
- OpenTelemetry
- Performance Benchmarks
- Migration Notes
- Troubleshooting
- Development
- License
Installation
npm install kafka-crab-jspnpm add kafka-crab-jsyarn add kafka-crab-jsModule Usage
The package publishes both ESM and CommonJS entry points:
import { KafkaClient } from 'kafka-crab-js'
import type { Message, ProducerRecord } from 'kafka-crab-js'const { KafkaClient } = require('kafka-crab-js')The public TypeScript types are generated from the native NAPI contract and the JavaScript wrapper. Runtime enum-like values such as commit modes and security protocols are string literals, not runtime objects:
await consumer.commitMessage(message, 'Sync')
const client = new KafkaClient({
brokers: 'localhost:9092',
securityProtocol: 'Plaintext',
})Quick Start
Produce Messages
import { KafkaClient } from 'kafka-crab-js'
const client = new KafkaClient({
brokers: 'localhost:9092',
clientId: 'orders-api',
securityProtocol: 'Plaintext',
})
const producer = client.createProducer()
const metadata = await producer.send({
topic: 'orders',
messages: [
{
key: Buffer.from('order-123'),
payload: Buffer.from(JSON.stringify({ id: 'order-123', status: 'created' })),
headers: {
'content-type': Buffer.from('application/json'),
},
},
],
})
console.log(metadata)Consume One Message At A Time
import { KafkaClient } from 'kafka-crab-js'
const client = new KafkaClient({
brokers: 'localhost:9092',
clientId: 'orders-worker',
securityProtocol: 'Plaintext',
})
const consumer = client.createConsumer({
groupId: 'orders-worker',
enableAutoCommit: false,
configuration: {
'auto.offset.reset': 'earliest',
},
})
await consumer.subscribe('orders')
try {
while (true) {
const message = await consumer.recv()
if (!message) {
break
}
const value = JSON.parse(message.payload.toString('utf8'))
console.log({ value, topic: message.topic, partition: message.partition, offset: message.offset })
await consumer.commitMessage(message, 'Sync')
}
} finally {
await consumer.disconnect()
}Consume In Batches
const consumer = client.createConsumer({
groupId: 'orders-batch-worker',
enableAutoCommit: false,
configuration: {
'auto.offset.reset': 'earliest',
},
})
await consumer.subscribe('orders')
try {
while (true) {
const batch = await consumer.recvBatch(500, 50)
if (batch.length === 0) {
continue
}
for (const message of batch) {
await processOrder(message.payload)
await consumer.commitMessage(message, 'Async')
}
}
} finally {
await consumer.disconnect()
}Message Model
Messages use Buffer values because the Kafka protocol treats keys, payloads, and headers as bytes. Decode them at the
edge of your application instead of assuming text:
import type { Message, MessageProducer, RecordMetadata } from 'kafka-crab-js'
const outgoing: MessageProducer = {
key: Buffer.from('order-123'),
payload: Buffer.from(JSON.stringify({ id: 'order-123' })),
headers: {
'content-type': Buffer.from('application/json'),
source: Buffer.from('orders-api'),
},
}
function decodeJsonMessage(message: Message) {
return JSON.parse(message.payload.toString('utf8')) as unknown
}
function logDelivery(records: RecordMetadata[]) {
for (const record of records) {
console.log(`${record.topic}[${record.partition}]@${record.offset}`)
}
}Message values contain the consumed Kafka offset. When committing manually, commit the next offset. Prefer
commitMessage() unless you intentionally need to calculate offsets yourself:
await consumer.commitMessage(message, 'Sync')
await consumer.commit(message.topic, message.partition, message.offset + 1, 'Async')Keys are optional. An omitted key keeps Kafka producer partitioning semantics for keyless records. An empty
Buffer.alloc(0) is still a present key.
Choosing A Consumer API
| API | Emits | Best For |
| ---------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------ |
| consumer.recv() | Message \| null | Simple sequential workers and explicit control. |
| consumer.recvBatch() | Message[] | High-throughput workers that can process chunks. |
| consumer.recvStream() | Web ReadableStream<Message> | Pull-based Web Stream workflows. |
| consumer.recvBatchStream() | Web ReadableStream<Message[]> | Native batch streaming with explicit consumer control. |
| client.createStreamConsumer() | Node.js Readable emitting Message | Existing Node stream pipelines. |
| client.createWebStreamConsumer() | Web ReadableStream<Message> or ReadableStream<Message[]> | v4 high-throughput stream consumers. |
Two details matter for choosing correctly:
createStreamConsumer()is a Node.jsReadablecompatibility API. Even whenbatchSize > 1, it emits individualMessageobjects and uses batching internally to reduce native boundary crossings.createWebStreamConsumer()returns a discriminated object. In serial mode it emitsMessage; in batch mode it emitsMessage[]. Use this API when your code can process whole batches directly.
Recommended starting points:
- Use
consumer.recv()for straightforward workers where one handler processes one message and then commits. - Use
consumer.recvBatch()when the handler can process chunks, flush work in groups, or commit after a batch. - Use
createStreamConsumer()when existing code already expects a Node.jsReadable. - Use
createWebStreamConsumer()for new v4 stream code, especially when you can keep batch chunks intact.
Producer API
Create a producer from a KafkaClient:
const producer = client.createProducer({
autoFlush: true,
queueTimeout: 5000,
configuration: {
'compression.type': 'lz4',
acks: 'all',
},
})Send one or more messages:
const records = await producer.send({
topic: 'orders',
messages: [
{ key: Buffer.from('1'), payload: Buffer.from('first') },
{ key: Buffer.from('2'), payload: Buffer.from('second') },
],
})When autoFlush is enabled, send() waits for delivery confirmation and returns RecordMetadata[]. When autoFlush
is disabled, send() buffers messages and flush() sends pending messages:
const producer = client.createProducer({ autoFlush: false })
await producer.send({
topic: 'orders',
messages: [{ payload: Buffer.from('buffered') }],
})
const records = await producer.flush()
console.log(records)producer.inFlightCount() returns the number of messages sent but not yet acknowledged.
Delivery Semantics
send() writes records to librdkafka and, with the default autoFlush: true, waits for delivery results before
resolving. Delivery metadata can still contain a per-record error, so production code should inspect it when failed
records must be retried or reported:
const records = await producer.send({
topic: 'orders',
messages: [{ payload: Buffer.from('created') }],
})
for (const record of records) {
if (record.error) {
throw new Error(`Kafka delivery failed: ${record.error.message}`)
}
}With autoFlush: false, send() returns an empty array and leaves delivery confirmation to the next flush() call.
This can improve throughput for bursty producers, but shutdown code must flush before the process exits:
import type { RecordMetadata } from 'kafka-crab-js'
const producer = client.createProducer({ autoFlush: false })
function assertAllDelivered(records: RecordMetadata[]) {
for (const record of records) {
if (record.error) {
throw new Error(`Kafka delivery failed: ${record.error.message}`)
}
}
}
try {
await producer.send({ topic: 'orders', messages })
const records = await producer.flush()
assertAllDelivered(records)
} finally {
const remaining = await producer.flush()
assertAllDelivered(remaining)
}Consumer API
Subscribe
The simplest subscription accepts a topic string:
await consumer.subscribe('orders')Use TopicPartitionConfig[] when you need offsets, partition-specific offsets, or topic creation:
await consumer.subscribe([
{
topic: 'orders',
createTopic: true,
numPartitions: 3,
replicas: 1,
allOffsets: { position: 'Beginning' },
},
])For partition-specific offsets:
await consumer.subscribe([
{
topic: 'orders',
partitionOffset: [
{
partition: 0,
offset: { offset: 42 },
},
],
},
])Receive
recv() waits for one message and returns null when the consumer is disconnected:
const message = await consumer.recv()
if (message) {
await handleMessage(message)
}recvBatch(size, timeoutMs) returns up to size messages:
const messages = await consumer.recvBatch(1000, 100)An empty array means the timeout elapsed without a full batch. It does not mean the consumer is closed.
Commit
Use commitMessage() for normal message processing. It commits message.offset + 1, which is the offset Kafka expects:
await consumer.commitMessage(message, 'Sync')Use commit() when you already calculated the offset:
await consumer.commit(message.topic, message.partition, message.offset + 1, 'Async')Commit modes are string literal values:
type CommitMode = 'Sync' | 'Async'For at-least-once processing, disable auto commit, process the message successfully, then commit. If processing fails before the commit, Kafka can redeliver that message after restart or rebalance:
const consumer = client.createConsumer({
groupId: 'orders-worker',
enableAutoCommit: false,
})
const message = await consumer.recv()
if (message) {
await processOrder(message)
await consumer.commitMessage(message, 'Sync')
}'Sync' waits for the broker commit response. 'Async' schedules the commit through librdkafka and returns sooner,
which can be useful in high-throughput workers that tolerate the usual async commit tradeoff.
Pause, Resume, Seek, And Assignment
consumer.pause()
consumer.resume()
consumer.seek('orders', 0, { position: 'Beginning' })
consumer.seek('orders', 0, { offset: 42 })
const assignment = consumer.assignment()
const subscription = consumer.getSubscription()Consumer events are available through onEvents():
consumer.onEvents((error, event) => {
if (error) {
console.error(error)
return
}
console.log(event.name, event.payload)
})Event names are currently PreRebalance, PostRebalance, and CommitCallback. The payload includes the topic-partition
list associated with the event and may include an error string for commit or rebalance failures.
Cleanup
Always disconnect direct consumers:
try {
await consumer.subscribe('orders')
// consume...
} finally {
await consumer.disconnect()
}Stream Consumers
Node.js Readable Stream
Use createStreamConsumer() when integrating with Node.js stream tooling. It emits Message objects.
const stream = client.createStreamConsumer({
groupId: 'orders-stream',
enableAutoCommit: false,
batchSize: 256,
batchTimeout: 50,
configuration: {
'auto.offset.reset': 'earliest',
},
})
await stream.subscribe('orders')
stream.on('data', async (message) => {
try {
await processOrder(message.payload)
await stream.commitMessage(message, 'Async')
} catch (error) {
stream.destroy(error instanceof Error ? error : new Error(String(error)))
}
})
stream.on('error', (error) => {
console.error('stream error', error)
})
stream.on('close', () => {
console.log('stream closed')
})Destroy stream consumers instead of calling disconnect() directly. The stream cleanup path cancels the source reader,
unsubscribes, and disconnects the native consumer:
stream.destroy()Native Web Stream
Use createWebStreamConsumer() when you want a Web Stream and clear serial/batch typing.
Serial mode is selected when batchSize is omitted, 0, or 1:
const webConsumer = client.createWebStreamConsumer({
groupId: 'orders-web-serial',
serialPrefetchSize: 64,
serialPrefetchTimeout: 1,
configuration: {
'auto.offset.reset': 'earliest',
},
})
await webConsumer.consumer.subscribe('orders')
const reader = webConsumer.stream.getReader()
try {
const { value: message, done } = await reader.read()
if (!done && message) {
await processOrder(message.payload)
await webConsumer.consumer.commitMessage(message, 'Async')
}
} finally {
await reader.cancel()
await webConsumer.consumer.disconnect()
}Batch mode is selected with batchSize > 1 and emits Message[] chunks:
const webConsumer = client.createWebStreamConsumer({
groupId: 'orders-web-batch',
batchSize: 1024,
batchTimeout: 10,
configuration: {
'auto.offset.reset': 'earliest',
},
})
if (webConsumer.mode === 'batch') {
await webConsumer.consumer.subscribe('orders')
const reader = webConsumer.stream.getReader()
try {
const { value: batch, done } = await reader.read()
if (!done && batch) {
for (const message of batch) {
await processOrder(message.payload)
}
}
} finally {
await reader.cancel()
await webConsumer.consumer.disconnect()
}
}Batching, Backpressure, And Tuning
The main throughput lever is how many messages cross the native-to-JavaScript boundary per call.
| Knob | Applies To | Effect |
| ----------------------- | ----------------------------------------------- | ---------------------------------------------------------------------- |
| recvBatch(size, ms) | Direct consumer | Pulls up to size messages, waiting up to ms for data. |
| batchSize | Node stream and Web Stream batch modes | Sets the maximum native batch size used by the stream. |
| batchTimeout | Node stream and Web Stream batch modes | Bounds how long a partially filled batch waits before being emitted. |
| serialPrefetchSize | createWebStreamConsumer() serial mode | Pulls small native batches and flattens them into individual messages. |
| serialPrefetchTimeout | createWebStreamConsumer() serial mode | Timeout for the serial-mode prefetch batch. |
| streamOptions | createStreamConsumer() Node.js Readable API | Lets Node stream highWaterMark participate in backpressure. |
Use larger batches when throughput matters and your handler can process arrays efficiently. Use smaller batches and shorter timeouts when tail latency matters more than total throughput. Very large batches can increase RSS because more payloads, keys, headers, and metadata must be retained at once.
The v4 Web Stream serial path defaults to serialPrefetchSize: 64 and serialPrefetchTimeout: 1. This keeps the public
serial API message-by-message while reducing native boundary crossings. Batch mode defaults batchTimeout to 1000
milliseconds when it is omitted.
For broker fetch tuning, pass librdkafka settings through configuration. Keep these values aligned with your expected
message size:
const consumer = client.createConsumer({
groupId: 'orders-worker',
configuration: {
'fetch.min.bytes': 1,
'fetch.wait.max.ms': 10,
'fetch.max.bytes': 1_048_576,
'max.partition.fetch.bytes': 1_048_576,
'message.max.bytes': 1_000_000,
},
})fetch.max.bytes must be greater than or equal to message.max.bytes. If this invariant is broken, librdkafka rejects
the consumer configuration before the benchmark or application starts.
Configuration
KafkaClient
const client = new KafkaClient({
brokers: 'localhost:9092',
clientId: 'orders-service',
securityProtocol: 'Plaintext',
logLevel: 'info',
brokerAddressFamily: 'v4',
diagnostics: true,
configuration: {
'socket.keepalive.enable': true,
},
})| Option | Type | Default | Description |
| --------------------- | ------------------------------------------------------ | --------- | ------------------------------------------------------------------------- |
| brokers | string | required | Comma-separated broker list, for example localhost:9092,localhost:9093. |
| clientId | string | rdkafka | Client identifier sent to Kafka. |
| securityProtocol | 'Plaintext' \| 'Ssl' \| 'SaslPlaintext' \| 'SaslSsl' | | Security protocol. |
| logLevel | string | error | librdkafka log level. |
| brokerAddressFamily | string | v4 | Address family hint such as v4 or any. |
| diagnostics | boolean | true | Enables diagnostic-channel events used by kafka-crab-js-otel. |
| configuration | Record<string, any> | | Additional librdkafka client settings. |
configuration is passed through to librdkafka after values are converted to strings. Use the original librdkafka
property names, for example sasl.mechanism, queued.min.messages, or fetch.wait.max.ms.
Common connection examples:
const localClient = new KafkaClient({
brokers: 'localhost:9092',
clientId: 'orders-local',
securityProtocol: 'Plaintext',
brokerAddressFamily: 'v4',
})const saslClient = new KafkaClient({
brokers: process.env.KAFKA_BROKERS!,
clientId: 'orders-worker',
securityProtocol: 'SaslSsl',
configuration: {
'sasl.mechanism': 'PLAIN',
'sasl.username': process.env.KAFKA_USERNAME!,
'sasl.password': process.env.KAFKA_PASSWORD!,
},
})ConsumerConfiguration
const consumer = client.createConsumer({
groupId: 'orders-worker',
enableAutoCommit: false,
fetchMetadataTimeout: 5000,
configuration: {
'auto.offset.reset': 'earliest',
'enable.auto.commit': false,
'fetch.min.bytes': 1,
},
})| Option | Type | Description |
| ---------------------- | --------------------- | ------------------------------------------ |
| groupId | string | Kafka consumer group id. |
| enableAutoCommit | boolean | Convenience flag for auto commit behavior. |
| fetchMetadataTimeout | number | Metadata fetch timeout in milliseconds. |
| configuration | Record<string, any> | Additional librdkafka consumer settings. |
enableAutoCommit is a convenience option that sets librdkafka enable.auto.commit. You can also pass
'enable.auto.commit' in configuration, but using both with different values makes the intent hard to read.
For new services that need explicit processing guarantees, start with manual commits:
const consumer = client.createConsumer({
groupId: 'orders-worker',
enableAutoCommit: false,
configuration: {
'auto.offset.reset': 'earliest',
'enable.partition.eof': false,
},
})ProducerConfiguration
const producer = client.createProducer({
autoFlush: true,
queueTimeout: 5000,
configuration: {
'compression.type': 'lz4',
},
})| Option | Type | Description |
| --------------- | --------------------- | ------------------------------------------------------- |
| autoFlush | boolean | When enabled, send() waits for delivery confirmation. |
| queueTimeout | number | Queue timeout in milliseconds. |
| configuration | Record<string, any> | Additional librdkafka producer settings. |
Producer settings are workload-specific. A low-latency producer might prefer smaller buffering windows, while a throughput-oriented producer can allow librdkafka to coalesce more work:
const producer = client.createProducer({
autoFlush: false,
queueTimeout: 5000,
configuration: {
acks: 'all',
'compression.type': 'lz4',
'queue.buffering.max.ms': 10,
'queue.buffering.max.messages': 100000,
},
})TopicPartitionConfig
| Option | Type | Description |
| ----------------- | ------------------- | ------------------------------------------------ |
| topic | string | Topic name. |
| allOffsets | OffsetModel | Offset model applied to all assigned partitions. |
| partitionOffset | PartitionOffset[] | Partition-specific offset model. |
| createTopic | boolean | Create the topic before subscribing. |
| numPartitions | number | Partition count when creating a topic. |
| replicas | number | Replica count when creating a topic. |
Offset models:
{
position: 'Beginning'
}
{
position: 'End'
}
{
position: 'Stored'
}
{
position: 'Invalid'
}
{
offset: 42
}Type-Only Literals
In v4, these names are TypeScript-only exports:
import type { CommitMode, KafkaEventName, PartitionPosition, SecurityProtocol } from 'kafka-crab-js'Use literal values at runtime:
const commitMode: CommitMode = 'Sync'
const securityProtocol: SecurityProtocol = 'Plaintext'OpenTelemetry
OpenTelemetry support lives in the separate kafka-crab-js-otel package. The core package emits diagnostic-channel
events when diagnostics !== false; the OTEL package subscribes to those events.
Install the instrumentation package and OpenTelemetry dependencies:
npm install kafka-crab-js-otel @opentelemetry/api @opentelemetry/sdk-nodeEnable instrumentation before creating KafkaClient:
import { KafkaClient } from 'kafka-crab-js'
import { enableOtelInstrumentation, endSpan } from 'kafka-crab-js-otel'
enableOtelInstrumentation({
metrics: { enabled: true },
})
const client = new KafkaClient({
brokers: 'localhost:9092',
clientId: 'orders-worker',
diagnostics: true,
})
const consumer = client.createConsumer({ groupId: 'orders-worker' })
await consumer.subscribe('orders')
const message = await consumer.recv()
if (message) {
try {
await processOrder(message.payload)
} finally {
endSpan(message)
}
}See the OpenTelemetry package README for full tracing and metrics setup.
Performance Benchmarks
The repository includes a benchmark suite that compares kafka-crab-js with KafkaJS and @platformatic/kafka. The
latest captured run in BENCHMARKS.md was recorded on May 25, 2026 on a MacBook Pro with Apple M1,
Node.js v24.16.0, and a local three-broker apache/kafka:4.0.0 cluster.
From the repository root:
podman compose up -d
cd benchmarks/kafka
# Set up benchmark data. Requires Kafka running locally.
KAFKA_BROKERS=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 vp run setup:consumer
# Run the default isolated-process memory benchmark.
KAFKA_BROKERS=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 vp run benchmarkThe default benchmark runs one child Node.js process per selected scenario. This avoids carrying V8 heap pages, native allocator arenas, sockets, librdkafka state, and Kafka metadata from one client into the next client's measurement.
Consumer Benchmark Snapshot
The captured run below is equivalent to:
cd benchmarks/kafka
KAFKA_BROKERS=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 vp run benchmarkThe run uses the default isolated-process lifecycle-memory benchmark mode.
Throughput
| Rank | Scenario | Result | Relative |
| ---: | ----------------------------------- | --------------------: | -------: |
| 1 | kafka-crab-js v4 (stream, batch) | 1,167,396.71 op/sec | 100.0% |
| 2 | @platformatic/kafka | 653,355.35 op/sec | 56.0% |
| 3 | kafka-crab-js v4 (stream, serial) | 558,968.71 op/sec | 47.9% |
| 4 | KafkaJS (eachBatch) | 478,411.67 op/sec | 41.0% |
| 5 | KafkaJS (eachMessage) | 431,519.93 op/sec | 37.0% |
| 6 | KafkaJS (eachMessage, concurrent) | 427,265.29 op/sec | 36.6% |
kafka-crab-js v4 (stream, batch) is the fastest scenario in this run. It is about 78.7% faster than
@platformatic/kafka and about 144.0% faster than KafkaJS (eachBatch). Its tolerance was +/- 4.40%, so the exact
gap should still be treated as a benchmark snapshot rather than a universal constant.
For message-by-message consumption, kafka-crab-js v4 (stream, serial) is close to KafkaJS throughput while using much
less lifecycle memory. In this run it is about 29.5% faster than KafkaJS (eachMessage) and about 30.8% faster
than the concurrent KafkaJS eachMessage scenario.
Memory And GC
| Scenario | RSS delta | Peak heap | GC time | GC share | Notes |
| ----------------------------------- | ----------: | ---------: | ---------: | -------: | ------------------------------------------ |
| kafka-crab-js v4 (stream, serial) | 64.7 MiB | 14.6 MiB | 30.46 ms | 3.41% | Lowest lifecycle RSS and heap. |
| kafka-crab-js v4 (stream, batch) | 106.0 MiB | 29.5 MiB | 21.69 ms | 5.06% | Best throughput, efficiency, and GC time. |
| KafkaJS (eachMessage, concurrent) | 178.3 MiB | 72.2 MiB | 83.77 ms | 7.16% | Concurrency did not improve this workload. |
| KafkaJS (eachMessage) | 186.3 MiB | 91.6 MiB | 90.15 ms | 7.78% | Highest peak heap and max GC pause. |
| KafkaJS (eachBatch) | 193.4 MiB | 83.8 MiB | 63.05 ms | 6.03% | KafkaJS batch baseline. |
| @platformatic/kafka | 216.0 MiB | 87.4 MiB | 74.27 ms | 9.71% | Fastest non-crab competitor. |
Lifecycle memory includes module loading, client creation, subscription, consumption, cleanup, and retained RSS after cleanup. GC metrics use the narrower first-to-last-message window used for throughput.
The most important memory result is efficiency, not just raw RSS. kafka-crab-js v4 (stream, batch) delivered
11016 op/sec/MiB, the best score in the run. kafka-crab-js v4 (stream, serial) delivered 8635 op/sec/MiB, ahead
of @platformatic/kafka at 3025 op/sec/MiB and KafkaJS (eachBatch) at 2474 op/sec/MiB.
Benchmark Interpretation
- Use
kafka-crab-js v4 (stream, batch)when raw throughput or memory efficiency is the priority. - Use
kafka-crab-js v4 (stream, serial)when message-by-message processing needs the lowest lifecycle RSS and heap pressure. - Compare batch scenarios with batch scenarios and message scenarios with message scenarios.
@platformatic/kafkais the strongest non-crab throughput competitor in this run, but it used more RSS, heap, and GC time than both v4 scenarios.- KafkaJS
eachMessageconcurrency did not help this workload.
Benchmark Configuration
Captured benchmark settings:
BENCHMARK_ITERATIONS=100000
BENCHMARK_RUNS=5
BENCHMARK_FETCH_MIN_BYTES=1
BENCHMARK_FETCH_WAIT_MS=10
BENCHMARK_MAX_BYTES=2048
BENCHMARK_BATCH_SIZE=4096
BENCHMARK_BATCH_TIMEOUT_MS=2
BENCHMARK_KAFKAJS_EACH_MESSAGE_CONCURRENCY=3
BENCHMARK_MEMORY=1
BENCHMARK_MEMORY_SAMPLE_MS=100
BENCHMARK_MEMORY_SETTLE_MS=100
BENCHMARK_TOPIC=benchmarks
BENCHMARK_PARTITIONS=3See BENCHMARKS.md for the latest captured run and the benchmark README for the full benchmark methodology and environment variables.
Migration Notes
v4 Type-Only Runtime Exports
CommitMode, KafkaEventName, PartitionPosition, and SecurityProtocol are no longer runtime exports from
kafka-crab-js.
Before:
import { CommitMode, SecurityProtocol } from 'kafka-crab-js'After:
import type { CommitMode, SecurityProtocol } from 'kafka-crab-js'
const commitMode: CommitMode = 'Sync'
const securityProtocol: SecurityProtocol = 'Plaintext'v3 OpenTelemetry Split
OpenTelemetry instrumentation moved from the core package to kafka-crab-js-otel in v3. This keeps the core package
small and makes tracing/metrics opt-in.
Troubleshooting
The Consumer Does Not Receive Existing Messages
Use an offset reset policy and a fresh consumer group:
const consumer = client.createConsumer({
groupId: `orders-worker-${Date.now()}`,
configuration: {
'auto.offset.reset': 'earliest',
},
})Or subscribe with an explicit starting position:
await consumer.subscribe([{ topic: 'orders', allOffsets: { position: 'Beginning' } }])The Process Hangs On Shutdown
Direct consumers should call disconnect():
await consumer.disconnect()Node stream consumers should call destroy():
stream.destroy()Web stream consumers should cancel the reader and disconnect the underlying consumer:
await reader.cancel()
await webConsumer.consumer.disconnect()Localhost Resolves To IPv6 But Kafka Listens On IPv4
Set brokerAddressFamily: 'v4':
const client = new KafkaClient({
brokers: 'localhost:9092',
brokerAddressFamily: 'v4',
})Need Advanced Kafka Settings
Pass librdkafka settings through configuration:
const client = new KafkaClient({
brokers: 'localhost:9092',
configuration: {
'socket.keepalive.enable': true,
'metadata.max.age.ms': 300000,
},
})Consumer and producer configuration objects also accept their own configuration maps.
fetch.max.bytes Must Be At Least message.max.bytes
librdkafka validates fetch limits at consumer creation time. If you tune fetch sizes for benchmarks or production and
set message.max.bytes higher than fetch.max.bytes, the consumer will fail to start with an error similar to:
`fetch.max.bytes` must be >= `message.max.bytes`Keep the fetch caps aligned:
const consumer = client.createConsumer({
groupId: 'orders-worker',
configuration: {
'message.max.bytes': 1_000_000,
'fetch.max.bytes': 1_048_576,
'max.partition.fetch.bytes': 1_048_576,
},
})Benchmark Numbers Move Between Runs
Kafka benchmarks are sensitive to CPU frequency, power mode, broker state, topic data already in page cache, and V8 heap history. The benchmark package defaults to isolated child processes for memory mode to reduce cross-scenario contamination, but you should still compare multiple runs and focus on large, repeatable gaps.
Development
From the repository root:
vp install
vp run build
vp run test
vp run test:integration
vp checkUseful package-local commands:
vp run --filter kafka-crab-js build
vp run --filter kafka-crab-js test
vp run --filter kafka-crab-js test:integrationIntegration tests require Kafka at localhost:9092 unless KAFKA_BROKERS is set.
License
MIT
