@tamasha/kafka-connection
v1.0.6
Published
KafkaJS connection manager for producers and consumers with safe cleanup
Maintainers
Readme
@tamasha/kafka-connection
KafkaJS connection manager for microservice architecture with connection instances.
- Initialize connection with
await KafkaConnection.init({...})- returns a connection instance - Use
produce()andsubscribe()methods on the connection instance - Support multiple connection instances for different microservices
- Safe cleanup with
destroy()method
Install
npm install @tamasha/kafka-connectionBasic Usage
import { KafkaConnection } from "@tamasha/kafka-connection";
// Initialize connection - returns a connection instance
const kafka = await KafkaConnection.init({
brokers: ["localhost:9092"],
clientId: "my-app",
});
// Produce messages
await kafka.produce({
topic: "events",
messages: [
{
key: "user-123",
value: JSON.stringify({ userId: "user-123", event: "login" }),
},
],
});
// Subscribe to topics
await kafka.subscribe(
{
groupId: "my-consumer-group",
},
"events",
async ({ topic, partition, message }) => {
const data = JSON.parse(message.value?.toString() || "{}");
console.log("Received:", data);
// Process message...
}
);
// Cleanup
await kafka.destroy();Microservice Architecture
Each microservice can create its own connection instance:
// User Service
class UserService {
private constructor(private kafka: any) {}
static async create() {
const kafka = await KafkaConnection.init({
name: "user-service",
brokers: ["localhost:9092"],
clientId: "user-service",
});
return new UserService(kafka);
}
async publishUserEvent(event: any) {
await this.kafka.produce({
topic: "user-events",
messages: [{ key: event.userId, value: JSON.stringify(event) }],
});
}
async startConsuming() {
await this.kafka.subscribe(
{ groupId: "user-service-consumer" },
"order-events",
async ({ message }) => {
// Process order events...
}
);
}
async shutdown() {
await this.kafka.destroy();
}
}
// Order Service
class OrderService {
private constructor(private kafka: any) {}
static async create() {
const kafka = await KafkaConnection.init({
name: "order-service",
brokers: ["localhost:9092"],
clientId: "order-service",
});
return new OrderService(kafka);
}
async publishOrderEvent(event: any) {
await this.kafka.produce({
topic: "order-events",
messages: [{ key: event.orderId, value: JSON.stringify(event) }],
});
}
async shutdown() {
await this.kafka.destroy();
}
}Connection Management
Named Connections
// Create named connections
const userKafka = await KafkaConnection.init({
name: "user-service",
brokers: ["localhost:9092"],
});
const orderKafka = await KafkaConnection.init({
name: "order-service",
brokers: ["localhost:9092"],
});
// Later, retrieve the same connection
const sameKafka = KafkaConnection.getConnection("user-service");Default Connection
// If no name is provided, uses "default"
const kafka = await KafkaConnection.init({
brokers: ["localhost:9092"],
});
// Retrieve default connection
const defaultKafka = KafkaConnection.getConnection("default");Cleanup
// Destroy specific connection
await KafkaConnection.destroyConnection("user-service");
// Destroy all connections
await KafkaConnection.destroyAll();Advanced Features
Custom Producer Configuration
const kafka = await KafkaConnection.init({
brokers: ["localhost:9092"],
defaultProducerConfig: {
idempotent: true,
maxInFlightRequests: 5,
},
});
// Or specify per message
await kafka.produce(
{
topic: "events",
messages: [{ key: "test", value: "test" }],
},
{
idempotent: false,
}
);Batch Processing
// Produce multiple messages
await kafka.produceBatch([
{
topic: "events",
messages: [{ key: "1", value: "message1" }],
},
{
topic: "events",
messages: [{ key: "2", value: "message2" }],
},
]);
// Subscribe with batch handler
await kafka.subscribe(
{ groupId: "batch-consumer" },
"events",
async ({ batch }) => {
for (const message of batch.messages) {
// Process each message in batch
}
},
{ useBatch: true }
);Multiple Topics
await kafka.subscribe(
{ groupId: "multi-topic-consumer" },
["topic1", "topic2", "topic3"],
async ({ topic, message }) => {
console.log(`Received from ${topic}:`, message.value?.toString());
}
);Notes
- Each connection instance manages its own producers and consumers
- Producers are lazily initialized and connected when first used
- Consumers must be explicitly subscribed using
subscribe() - Call
destroy()during graceful shutdown to ensure all connections are closed - Connection instances are cached by name - calling
init()with the same name returns the existing instance
