@eventix/kafka-client
v1.0.1
Published
Kafka client library for Eventix services
Downloads
14
Maintainers
Readme
@eventix/kafka-client
A functional TypeScript Kafka client library for Eventix services, built on top of KafkaJS.
Features
- Functional Programming: No classes, pure functions with composable APIs
- TypeScript: Full type safety with TypeScript interfaces
- Comprehensive: Producer, Consumer, and Admin operations
- Easy to Use: Simple, intuitive API with sensible defaults
- Environment-based Configuration: Supports environment variables for configuration
Installation
pnpm install @eventix/kafka-clientUsage
Creating a Kafka Client
import {
createKafkaClient,
createServiceKafkaClient,
} from "@eventix/kafka-client";
// Create with default configuration
const kafka = createKafkaClient();
// Create with custom configuration
const kafka = createKafkaClient({
clientId: "my-service",
brokers: ["localhost:9092"],
});
// Create for a specific service (uses KAFKA_BROKERS env variable)
const kafka = createServiceKafkaClient("auth-api");Producer
import {
createProducer,
connectProducer,
sendMessage,
disconnectProducer,
} from "@eventix/kafka-client";
// Manual connection management
const producer = createProducer(kafka, {
idempotent: true,
retry: { retries: 10 },
});
await connectProducer(producer);
await sendMessage(producer, "my-topic", {
userId: "123",
action: "user-created",
});
await disconnectProducer(producer);
// Or use auto-connect helper
import { sendMessageWithAutoConnect } from "@eventix/kafka-client";
await sendMessageWithAutoConnect(kafka, "my-topic", {
userId: "123",
action: "user-created",
});Consumer
import {
createConsumer,
connectConsumer,
subscribeToTopics,
startConsuming,
} from "@eventix/kafka-client";
const consumer = createConsumer(kafka, "my-consumer-group");
await connectConsumer(consumer);
await subscribeToTopics(consumer, ["my-topic"], { fromBeginning: true });
await startConsuming(consumer, async ({ topic, partition, message }) => {
const value = message.value?.toString();
console.log(`Received: ${value} from ${topic}`);
// Process message...
});
// Or use the convenience function
import { createAndStartConsumer } from "@eventix/kafka-client";
const consumer = await createAndStartConsumer(
kafka,
"my-consumer-group",
["my-topic"],
async ({ topic, partition, message }) => {
// Handle message
},
{}, // consumer options
{ fromBeginning: true } // subscribe options
);Admin Operations
import {
createAdmin,
connectAdmin,
createTopics,
listTopics,
disconnectAdmin,
} from "@eventix/kafka-client";
const admin = createAdmin(kafka);
await connectAdmin(admin);
// Create topics
await createTopics(admin, [
{
topic: "my-topic",
numPartitions: 3,
replicationFactor: 1,
},
]);
// List all topics
const topics = await listTopics(admin);
await disconnectAdmin(admin);
// Or use the convenience function
import { withAdminClient } from "@eventix/kafka-client";
await withAdminClient(kafka, async (admin) => {
await createTopics(admin, [{ topic: "my-topic", numPartitions: 3 }]);
});API Reference
Connection
createKafkaClient(options?)- Create a Kafka client instancecreateServiceKafkaClient(serviceName)- Create a service-specific clientgetDefaultConfig()- Get default configuration
Producer
createProducer(kafka, options?)- Create a producer instanceconnectProducer(producer)- Connect producer to clusterdisconnectProducer(producer)- Disconnect producersendMessage(producer, topic, data, options?)- Send a single messagesendBulkMessages(producer, topic, messages, options?)- Send multiple messagescreateConnectedProducer(kafka, options?)- Create and connect producersendMessageWithAutoConnect(kafka, topic, data, producerOptions?, sendOptions?)- Send with auto-connection
Consumer
createConsumer(kafka, groupId, options?)- Create a consumer instanceconnectConsumer(consumer)- Connect consumer to clusterdisconnectConsumer(consumer)- Disconnect consumersubscribeToTopics(consumer, topics, options?)- Subscribe to topicsstartConsuming(consumer, messageHandler)- Start consuming messagesstartBatchConsuming(consumer, batchHandler)- Start consuming in batchescreateAndStartConsumer(kafka, groupId, topics, messageHandler, ...)- All-in-one consumer setuppauseTopics(consumer, topics)- Pause consumptionresumeTopics(consumer, topics)- Resume consumptionseekToOffset(consumer, topic, partition, offset)- Seek to specific offset
Admin
createAdmin(kafka)- Create an admin clientconnectAdmin(admin)- Connect admin clientdisconnectAdmin(admin)- Disconnect admin clientcreateTopics(admin, topics)- Create topicsdeleteTopics(admin, topics)- Delete topicslistTopics(admin)- List all topicsdescribeTopics(admin, topicNames)- Get topic metadatagetClusterMetadata(admin)- Get cluster informationcreateTopicIfNotExists(admin, topicConfig)- Create topic if it doesn't existcreateConnectedAdmin(kafka)- Create and connect admin clientwithAdminClient(kafka, operation)- Execute operation with auto-connection
Environment Variables
KAFKA_BROKERS- Comma-separated list of Kafka brokers (default:localhost:9092)
License
ISC
