@abinashpatri/kafka
v1.0.1
Published
Production-grade Kafka event utility library
Maintainers
Readme
@abinashpatri/kafka
Production-grade Kafka eventing helpers with typed APIs, retry/DLQ flows, and scoped clients for multi-service runtimes.
Install
npm install @abinashpatri/kafkaCompatibility
- Node.js:
>=18 - Module formats: CommonJS and ESM
- Type support: bundled
.d.ts
Import Patterns
Use either root namespace imports or subpath imports.
// Root namespace import
import { kafka } from "@abinashpatri/kafka";
// Subpath import
import * as kafkaApi from "@abinashpatri/kafka/kafka";Feature Overview
- Connect/disconnect producer client.
- Typed publish and consume helpers.
- Retry topic helpers (
<topic>.retry.<n>). - DLQ helper (
<topic>.dlq). - Consumer lifecycle control (
stop,disconnect). - Retry backoff + jitter controls.
- Scoped client factory for non-shared state.
Quick Start (TypeScript)
import { kafka } from "@abinashpatri/kafka";
type UserCreatedEvent = {
eventId: string;
userId: string;
email: string;
createdAt: string;
};
await kafka.connect({
clientId: "user-service",
brokers: ["localhost:9092"],
});
await kafka.publish<UserCreatedEvent>({
topic: "user.created",
key: "user_123",
message: {
eventId: "evt_1",
userId: "user_123",
email: "[email protected]",
createdAt: new Date().toISOString(),
},
});
const kafkaConsumer = await kafka.consume<UserCreatedEvent>({
topic: "user.created",
groupId: "analytics-group",
retryLimit: 5,
retryBaseDelayMs: 500,
retryBackoffMultiplier: 2,
retryMaxDelayMs: 30000,
retryJitterMs: 250,
handler: async (event) => {
console.log("analytics processing", event.userId);
},
});Quick Start (JavaScript)
const { kafka } = require("@abinashpatri/kafka");
async function runKafka() {
await kafka.connect({
clientId: "order-service",
brokers: ["localhost:9092"],
});
await kafka.publish({
topic: "order.created",
key: "order_1",
message: {
eventId: "evt_100",
orderId: "order_1",
total: 129.99,
createdAt: new Date().toISOString(),
},
});
return kafka.consume({
topic: "order.created",
groupId: "fulfillment-group",
retryLimit: 5,
retryBaseDelayMs: 500,
retryBackoffMultiplier: 2,
retryMaxDelayMs: 30000,
retryJitterMs: 250,
handler: async (event) => {
console.log("fulfillment:", event.orderId);
},
});
}
runKafka().catch(console.error);API Reference
Root export
kafkanamespace
Kafka API
kafka.connect({ clientId, brokers })kafka.disconnect()kafka.publish({ topic, message, key?, headers?, client? })kafka.consume({ topic, groupId, handler, retryLimit?, retryBaseDelayMs?, retryBackoffMultiplier?, retryMaxDelayMs?, retryJitterMs?, client? })kafka.createKafkaClient()kafka.createScopedKafkaClient()kafka.sendToDLQ(baseTopic, message)kafka.getRetryTopic(baseTopic, retryCount)kafka.getDLQTopic(baseTopic)kafka.getRetryCount(headers)kafka.buildRetryHeaders(retryCount)kafka.isProcessed(id)/kafka.markProcessed(id)/kafka.resetProcessedState()
Kafka consumer control returned by consume():
stop()disconnect()
Reliability Semantics
- Delivery model is generally at-least-once.
- Handlers should be idempotent.
- Retries are redirected to retry topics, then DLQ after retry limit.
Graceful Shutdown
Use returned consumer controls and connection disconnect on process shutdown.
import { kafka } from "@abinashpatri/kafka";
const kafkaControl = await kafka.consume({
topic: "user.created",
groupId: "worker",
handler: async () => {},
});
process.on("SIGTERM", async () => {
await Promise.all([kafkaControl.disconnect(), kafka.disconnect()]);
process.exit(0);
});License
MIT License.
