@qualithm/kafka-client
v0.1.13
Published
Native Apache Kafka client for JavaScript and TypeScript runtimes.
Maintainers
Readme
Kafka Client
Native Apache Kafka client for JavaScript and TypeScript runtimes. Implements the Kafka binary protocol directly for producing, consuming, and administering Kafka clusters.
Features
- Zero native dependencies — pure TypeScript binary protocol implementation
- Multi-runtime — Bun, Node.js 20+, and Deno
- Producer — batching, partitioning (murmur2/round-robin/custom), retries, idempotent mode
- Consumer — group coordination, offset management, rebalance listeners, auto-commit
- Admin — topic/partition CRUD, config describe/alter
- SASL authentication — PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
- SSL/TLS — mutual TLS support via runtime socket adapters
- Serialisation — built-in JSON/string, pluggable Avro and Protobuf via Schema Registry
- Compression — gzip, snappy, lz4, zstd
Installation
bun add @qualithm/kafka-client
# or
npm install @qualithm/kafka-clientQuick Start
import { createKafka, createNodeSocketFactory } from "@qualithm/kafka-client"
const kafka = createKafka({
config: { brokers: ["localhost:9092"], clientId: "my-app" },
socketFactory: createNodeSocketFactory()
})
await kafka.connect()
const producer = kafka.producer()
await producer.send("my-topic", [
{ key: new TextEncoder().encode("key-1"), value: new TextEncoder().encode("hello") }
])
await producer.close()
await kafka.disconnect()Usage
Producing Messages
import { createKafka, createNodeSocketFactory } from "@qualithm/kafka-client"
const kafka = createKafka({
config: { brokers: ["localhost:9092"], clientId: "my-app" },
socketFactory: createNodeSocketFactory()
})
await kafka.connect()
const producer = kafka.producer()
await producer.send("my-topic", [
{ key: new TextEncoder().encode("key-1"), value: new TextEncoder().encode("hello") }
])
await producer.close()
await kafka.disconnect()Consuming Messages
import { createKafka, createNodeSocketFactory } from "@qualithm/kafka-client"
const kafka = createKafka({
config: { brokers: ["localhost:9092"], clientId: "my-app" },
socketFactory: createNodeSocketFactory()
})
await kafka.connect()
const consumer = kafka.consumer({ groupId: "my-group" })
consumer.subscribe(["my-topic"])
await consumer.connect()
const records = await consumer.poll()
for (const record of records) {
console.log(new TextDecoder().decode(record.message.value!))
}
await consumer.close()
await kafka.disconnect()Admin Operations
const admin = kafka.admin()
await admin.createTopics({
topics: [{ name: "new-topic", numPartitions: 3, replicationFactor: 1 }],
timeoutMs: 30000
})
const topics = await admin.listTopics()Runtime Adapters
// Bun
import { createBunSocketFactory } from "@qualithm/kafka-client"
const socketFactory = createBunSocketFactory()
// Node.js
import { createNodeSocketFactory } from "@qualithm/kafka-client"
const socketFactory = createNodeSocketFactory()
// Deno
import { createDenoSocketFactory } from "@qualithm/kafka-client"
const socketFactory = createDenoSocketFactory()Compression
Register compression providers before producing or consuming compressed record batches:
import { registerCompressionProvider, createSnappyProvider } from "@qualithm/kafka-client"
import snappy from "snappy" // bring your own codec
registerCompressionProvider(createSnappyProvider(snappy))Available: gzipProvider, deflateProvider, createSnappyProvider, createLz4Provider,
createZstdProvider.
Schema Registry
import { SchemaRegistry, createAvroSerde } from "@qualithm/kafka-client"
const registry = new SchemaRegistry({ baseUrl: "http://localhost:8081" })
const serde = createAvroSerde<MyType>({ registry, subject: "my-topic-value", codec: avroCodec })
// Serialize for producing
const encoded = await serde.serialize("my-topic", myData)
// Deserialize when consuming
const decoded = await serde.deserialize("my-topic", record.message.value!)API Reference
Full API documentation is generated with TypeDoc:
bun run docs
# Output in docs/Examples
See the examples/ directory for runnable examples:
| Example | Description |
| ----------------------------------------------------- | ------------------------------- |
| basic-usage.ts | Connect, produce, and consume |
| batch-processing.ts | Batch produce and consume |
| produce-consume.ts | End-to-end produce/consume flow |
| error-handling.ts | Error handling patterns |
bun run examples/basic-usage.tsDevelopment
Prerequisites
Setup
bun installBuilding
bun run buildTesting
bun run test # unit tests
bun run test:integration # integration tests (requires a running broker)
bun run test:coverage # with coverage reportLinting & Formatting
bun run lint
bun run format
bun run typecheckBenchmarks
bun run benchPublishing
The package is automatically published to NPM when CI passes on main. Update the version in
package.json before merging to trigger a new release.
Licence
Apache-2.0
