@luckykiet/kafka
v1.0.1
Published
Generic Kafka client wrapper around kafkajs with injectable logger. Designed for microservice reuse.
Readme
@luckykiet/kafka
Lightweight, project-agnostic Kafka client wrapper around kafkajs.
One class. Inject your own logger. Zero domain coupling.
Why
kafkajs is great but every service ends up writing the same connect /
publish / subscribe boilerplate. This package collapses that into a single
KafkaClient class with a small surface, designed to be reused across any
microservice (in this monorepo or another repo entirely).
- Single dep:
kafkajs - No logger lock-in: pass any object with
info/warn/error(winston, pino, console) - No domain types: bring your own event shapes via TypeScript generics
- Lazy connect: producer + consumers connect on demand
- Idempotent disconnect: tears down everything in one call
Install
npm install @luckykiet/kafka kafkajs
# or
yarn add @luckykiet/kafka kafkajsUsage
Import from the /common subpath:
import { KafkaClient } from "@luckykiet/kafka/common"Producer
import { KafkaClient } from "@luckykiet/kafka/common"
const kafka = new KafkaClient({
clientId: "my-service",
brokers: ["localhost:9092"],
logger: console, // optional
})
await kafka.connectProducer()
await kafka.publish({
topic: "orders.events",
key: "order-123", // same key → same partition (preserves ordering)
value: { eventType: "order.created", orderId: "order-123", total: 42 },
headers: { eventType: "order.created" },
})
await kafka.disconnect()Consumer
import { KafkaClient } from "@luckykiet/kafka/common"
interface OrderEvent {
eventType: "order.created" | "order.cancelled"
orderId: string
total: number
}
const kafka = new KafkaClient({
clientId: "billing-service",
brokers: ["localhost:9092"],
})
await kafka.subscribe<OrderEvent>({
topic: "orders.events",
groupId: "billing-orders-consumer",
handler: async (event, raw) => {
console.log("got", event.eventType, "from offset", raw.offset)
// throw to retry on the same offset
},
})With a custom logger (winston, pino, etc.)
The logger option accepts anything matching:
interface KafkaLogger {
info: (msg: string, meta?: Record<string, unknown>) => void
warn: (msg: string, meta?: Record<string, unknown>) => void
error: (msg: string, meta?: Record<string, unknown>) => void
debug?: (msg: string, meta?: Record<string, unknown>) => void
}Winston loggers and pino satisfy this out of the box.
API
new KafkaClient(options)
| Option | Type | Description |
| ------ | ---- | ----------- |
| clientId | string | Required. Kafka client id. |
| brokers | string[] | Required. List of host:port. |
| logger | KafkaLogger | Optional. Defaults to silent no-op. |
| kafka | KafkaConfig | Optional. Extra kafkajs options (SASL, SSL, retry, etc.). |
connectProducer(producerConfig?)
Lazily creates and connects a kafkajs producer. Idempotent.
publish({ topic, key?, value, headers? })
Sends one message. value is JSON-stringified unless it's already a string
or Buffer. Throws if the producer hasn't been connected.
subscribe<T>({ topic, groupId, handler, fromBeginning?, consumer? })
Connects a fresh consumer in the given group, subscribes to topic, and
runs handler for each message. The handler receives:
event: T— JSON-parsed value (or raw string if not JSON)raw—{ topic, partition, offset, key, headers, rawValue }
Throwing from the handler triggers kafkajs's offset retry. Returns the
underlying Consumer if you need direct access.
disconnect()
Disconnects the producer and every consumer. Idempotent.
Design notes
- Per-key ordering: pass a stable
keytopublish()— kafkajs hashes it to a partition, so all messages for one entity stay ordered. - Consumer groups own offsets: pick
groupIdper logical consumer, not per process. Multiple replicas in the same group share partitions. - Retries: kafkajs default retry policy applies on send. Handler errors re-deliver the same offset until success or rebalance.
License
MIT
