@woovi/kafka
v1.0.14
Published
Kafka setup and utilities for Woovi microservices
Downloads
901
Readme
@woovi/kafka
Kafka client for Woovi microservices. Handles JSON serialization, error handling, tracing, metrics, and graceful shutdown.
Install
pnpm add @woovi/kafkaSetup
Initialize the Kafka client once at startup:
import { createKafkaFromEnv } from '@woovi/kafka';
createKafkaFromEnv('my-service');Or configure manually:
import { createKafka } from '@woovi/kafka';
createKafka({
clientId: 'my-service',
brokers: ['kafka-1:9092', 'kafka-2:9092'],
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: 'user',
password: 'password',
},
});Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| KAFKA_BROKERS or KAFKA_BOOTSTRAP_SERVERS | Comma-separated broker list | localhost:9092 |
| KAFKA_SSL | Enable SSL (true/false) | false |
| KAFKA_SASL_MECHANISM | plain, scram-sha-256, or scram-sha-512 | plain |
| KAFKA_SASL_USERNAME | SASL username | - |
| KAFKA_SASL_PASSWORD | SASL password | - |
| KAFKA_GROUP_ID_SUFFIX | Suffix appended to all consumer group IDs (e.g. my-group becomes my-group-<suffix>) | - |
Producer
import { createProducer } from '@woovi/kafka';
const producer = createProducer({ defaultTopic: 'events' });
// Publish a message (auto-connects on first call)
await producer.publish({ data: { type: 'user.created', userId: '123' } });
// Publish with key and headers
await producer.publish({
data: { type: 'order.created', orderId: '456' },
key: 'order-456',
headers: { source: 'api' },
});
// Publish to a different topic
await producer.publish({
data: { type: 'notification' },
topic: 'notifications',
});Batch Publishing
// Multiple messages to one topic
await producer.publishBatch({
messages: [
{ data: { id: 1 }, key: 'key-1' },
{ data: { id: 2 }, key: 'key-2' },
],
});
// Multiple messages to multiple topics
await producer.publishToMultipleTopics({
messages: [
{ topic: 'users', data: { id: 1 } },
{ topic: 'events', data: { type: 'created' } },
],
});Producer Options
const producer = createProducer({
defaultTopic: 'events', // Default topic for publish()
idempotent: true, // Exactly-once delivery (default: true)
maxInFlightRequests: 5, // Max concurrent requests (default: 5)
});Consumer
import { createConsumer } from '@woovi/kafka';
const consumer = createConsumer({
groupId: 'my-service-group',
topics: ['user-events'],
});
await consumer.run({
handler: async (message) => {
// message.data is already parsed from JSON
console.log(message.data);
console.log(message.key, message.headers, message.topic, message.partition, message.offset);
},
});Dead Letter Queue
When a message fails and continueOnError is true (the default), you can send it to a dead letter queue:
await consumer.run({
handler: async (message) => {
await processEvent(message.data);
},
onDeadLetter: async (message, error) => {
await dlqProducer.publish({
data: { originalMessage: message, error: error.message },
});
},
});Error Callback
Use onError in the consumer config for global error tracking (e.g. Sentry):
const consumer = createConsumer({
groupId: 'my-service-group',
topics: ['events'],
onError: async (error, message) => {
await reportToSentry(error, { topic: message.topic, offset: message.offset });
},
});Consumer Run Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| handler | (message) => Promise<void> | required | Called for each message. message.data is already parsed JSON. |
| continueOnError | boolean | true | When true, failed messages are skipped and the consumer keeps going. When false, the batch stops on first error. |
| onDeadLetter | (message, error) => Promise<void> | - | Called when a message fails and continueOnError is true. Use this to send failed messages to a DLQ. |
| maxMessagesPerHeartbeat | number | 1 | How many messages to process per chunk before sending a heartbeat. Higher values = more throughput, but too high can cause rebalances if processing is slow. |
| partitionsConsumedConcurrently | number | 1 | Number of partitions to process in parallel. |
| processConcurrently | boolean | false | When true, messages within a chunk are processed in parallel via Promise.all. Offsets are only resolved after all messages in the chunk succeed. |
| commitAfterChunk | boolean | false | When true, offsets are explicitly committed after each chunk. Otherwise relies on auto-commit plus the batch-end commit. |
Consumer Config Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| groupId | string | required | Consumer group ID. |
| topics | string[] | required | Topics to subscribe to. |
| fromBeginning | boolean | false | Start reading from the earliest offset instead of latest. |
| sessionTimeout | number | 30000 | Time in ms before a consumer is considered dead if no heartbeat is received. |
| heartbeatInterval | number | 3000 | How often in ms the consumer sends heartbeats. Also controls mid-chunk heartbeat frequency in sequential mode. |
| rebalanceTimeout | number | 60000 | Max time in ms for a rebalance to complete. |
| maxBytesPerPartition | number | - | Max bytes to fetch per partition per request. |
| onError | (error, message) => Promise<void> | - | Global error handler called on every processing failure. |
| tracing | { customAttributes?: Record<string, string \| number \| boolean> } | - | OpenTelemetry tracing config. |
| retry | { initialRetryTime: number, retries: number } | { initialRetryTime: 100, retries: 8 } | Retry strategy for the underlying KafkaJS consumer. |
Pause and Resume
await consumer.pause(); // Pause all topics
await consumer.pause(['events']); // Pause specific topics
await consumer.resume(); // Resume all topicsEvent Envelope
Wrap event data with standardized metadata using the event envelope pattern:
import { createEventEnvelope } from '@woovi/kafka';
const envelope = createEventEnvelope({
eventType: 'pix-out.compliance.approved',
operationType: 'update',
source: 'woovi-compliance',
data: { movementId: '123', pixOutId: '456' },
extraMetadata: { correlationId: 'abc-123', traceId: 'xyz-789' },
});
await producer.publish({ data: envelope });The envelope has the shape:
{
metadata: {
eventId: string; // auto-generated UUID
eventType: string; // e.g. "pix-out.compliance.approved"
eventTime: string; // auto-generated ISO 8601 timestamp
operationType?: string; // e.g. "insert", "update", "delete"
source: string; // e.g. "woovi-compliance"
// ...any extra metadata fields
},
data: T;
}Parsing Event Envelopes (Consumer Side)
Use parseEventEnvelope to validate and unwrap envelope-wrapped messages in your consumer handlers:
import type { ConsumerMessage, EventEnvelope } from '@woovi/kafka';
import { parseEventEnvelope } from '@woovi/kafka';
const handler = async (message: ConsumerMessage<EventEnvelope<MyEvent>>) => {
const { metadata, data } = parseEventEnvelope(message);
console.log(metadata.eventId); // UUID
console.log(metadata.eventType); // e.g. "pix-out.compliance.approved"
console.log(metadata.eventTime); // ISO 8601 timestamp
console.log(metadata.source); // e.g. "woovi-compliance"
console.log(data); // typed as MyEvent
};Throws with a descriptive error if the message does not follow the envelope pattern (missing metadata, missing data, or malformed metadata fields).
Health Check
import { healthCheck } from '@woovi/kafka';
app.get('/health', async (req, res) => {
const { healthy, error } = await healthCheck();
res.status(healthy ? 200 : 503).json({ status: healthy ? 'ok' : 'error', error });
});Graceful Shutdown
Producers and consumers disconnect automatically on SIGTERM/SIGINT. You can add custom cleanup:
import { onShutdown } from '@woovi/kafka';
onShutdown(async () => {
await myCleanupFunction();
});Metrics
Prometheus metrics are built in.
import { getMetrics, getMetricsContentType, enableDefaultMetrics } from '@woovi/kafka';
enableDefaultMetrics(); // optional: Node.js default metrics
app.get('/metrics', async (req, res) => {
res.set('Content-Type', getMetricsContentType());
res.send(await getMetrics());
});Available Metrics
| Metric | Type | Description |
|--------|------|-------------|
| kafka_consumer_message_processing_duration_seconds | Histogram | Message processing latency |
| kafka_consumer_messages_processed_total | Counter | Total messages processed |
| kafka_consumer_messages_failed_total | Counter | Failed messages |
| kafka_consumer_batch_processing_duration_seconds | Histogram | Batch processing latency |
| kafka_consumer_batch_size | Histogram | Messages per batch |
| kafka_consumer_last_message_timestamp_seconds | Gauge | Last processed timestamp |
| kafka_producer_send_duration_seconds | Histogram | Producer send latency |
| kafka_producer_messages_produced_total | Counter | Total messages produced |
Testing
Mock KafkaJS in your test setup:
// Vitest
vi.mock('kafkajs', () => import('@woovi/kafka/test-utils'));
// Jest
jest.mock('kafkajs', () => require('@woovi/kafka/test-utils'));Then use the assertion helpers:
import { kafkaAssert, kafkaAssertLength, getKafkaMessages, clearAllMocks } from '@woovi/kafka/test-utils';
beforeEach(() => clearAllMocks());
it('publishes a user event', async () => {
await myService.createUser({ name: 'John' });
kafkaAssert({ topic: 'user-events', message: { type: 'user.created', name: 'John' } });
kafkaAssertLength({ topic: 'user-events', length: 1 });
// Or access raw messages
const messages = getKafkaMessages();
expect(messages[0].topic).toBe('user-events');
});Asserting Event Envelopes
Use kafkaAssertEventEnvelope to assert messages published with the event envelope pattern:
import { kafkaAssertEventEnvelope, clearAllMocks } from '@woovi/kafka/test-utils';
beforeEach(() => clearAllMocks());
it('publishes a compliance event', async () => {
await myService.approveCompliance({ movementId: '123' });
kafkaAssertEventEnvelope({
topic: 'pix-out.compliance.approved',
eventType: 'pix-out.compliance.approved',
source: 'woovi-compliance',
data: { movementId: '123', pixOutId: '456' },
});
});This validates the full envelope structure (metadata with eventId, eventType, eventTime, source) and checks that the data is a subset match.
License
ISC
