npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@woovi/kafka

v1.0.14

Published

Kafka setup and utilities for Woovi microservices

Downloads

901

Readme

@woovi/kafka

Kafka client for Woovi microservices. Handles JSON serialization, error handling, tracing, metrics, and graceful shutdown.

Install

pnpm add @woovi/kafka

Setup

Initialize the Kafka client once at startup:

import { createKafkaFromEnv } from '@woovi/kafka';

createKafkaFromEnv('my-service');

Or configure manually:

import { createKafka } from '@woovi/kafka';

createKafka({
  clientId: 'my-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092'],
  ssl: true,
  sasl: {
    mechanism: 'scram-sha-512',
    username: 'user',
    password: 'password',
  },
});

Environment Variables

| Variable | Description | Default | |----------|-------------|---------| | KAFKA_BROKERS or KAFKA_BOOTSTRAP_SERVERS | Comma-separated broker list | localhost:9092 | | KAFKA_SSL | Enable SSL (true/false) | false | | KAFKA_SASL_MECHANISM | plain, scram-sha-256, or scram-sha-512 | plain | | KAFKA_SASL_USERNAME | SASL username | - | | KAFKA_SASL_PASSWORD | SASL password | - | | KAFKA_GROUP_ID_SUFFIX | Suffix appended to all consumer group IDs (e.g. my-group becomes my-group-<suffix>) | - |

Producer

import { createProducer } from '@woovi/kafka';

const producer = createProducer({ defaultTopic: 'events' });

// Publish a message (auto-connects on first call)
await producer.publish({ data: { type: 'user.created', userId: '123' } });

// Publish with key and headers
await producer.publish({
  data: { type: 'order.created', orderId: '456' },
  key: 'order-456',
  headers: { source: 'api' },
});

// Publish to a different topic
await producer.publish({
  data: { type: 'notification' },
  topic: 'notifications',
});

Batch Publishing

// Multiple messages to one topic
await producer.publishBatch({
  messages: [
    { data: { id: 1 }, key: 'key-1' },
    { data: { id: 2 }, key: 'key-2' },
  ],
});

// Multiple messages to multiple topics
await producer.publishToMultipleTopics({
  messages: [
    { topic: 'users', data: { id: 1 } },
    { topic: 'events', data: { type: 'created' } },
  ],
});

Producer Options

const producer = createProducer({
  defaultTopic: 'events',      // Default topic for publish()
  idempotent: true,            // Exactly-once delivery (default: true)
  maxInFlightRequests: 5,      // Max concurrent requests (default: 5)
});

Consumer

import { createConsumer } from '@woovi/kafka';

const consumer = createConsumer({
  groupId: 'my-service-group',
  topics: ['user-events'],
});

await consumer.run({
  handler: async (message) => {
    // message.data is already parsed from JSON
    console.log(message.data);
    console.log(message.key, message.headers, message.topic, message.partition, message.offset);
  },
});

Dead Letter Queue

When a message fails and continueOnError is true (the default), you can send it to a dead letter queue:

await consumer.run({
  handler: async (message) => {
    await processEvent(message.data);
  },
  onDeadLetter: async (message, error) => {
    await dlqProducer.publish({
      data: { originalMessage: message, error: error.message },
    });
  },
});

Error Callback

Use onError in the consumer config for global error tracking (e.g. Sentry):

const consumer = createConsumer({
  groupId: 'my-service-group',
  topics: ['events'],
  onError: async (error, message) => {
    await reportToSentry(error, { topic: message.topic, offset: message.offset });
  },
});

Consumer Run Options

| Option | Type | Default | Description | |--------|------|---------|-------------| | handler | (message) => Promise<void> | required | Called for each message. message.data is already parsed JSON. | | continueOnError | boolean | true | When true, failed messages are skipped and the consumer keeps going. When false, the batch stops on first error. | | onDeadLetter | (message, error) => Promise<void> | - | Called when a message fails and continueOnError is true. Use this to send failed messages to a DLQ. | | maxMessagesPerHeartbeat | number | 1 | How many messages to process per chunk before sending a heartbeat. Higher values = more throughput, but too high can cause rebalances if processing is slow. | | partitionsConsumedConcurrently | number | 1 | Number of partitions to process in parallel. | | processConcurrently | boolean | false | When true, messages within a chunk are processed in parallel via Promise.all. Offsets are only resolved after all messages in the chunk succeed. | | commitAfterChunk | boolean | false | When true, offsets are explicitly committed after each chunk. Otherwise relies on auto-commit plus the batch-end commit. |

Consumer Config Options

| Option | Type | Default | Description | |--------|------|---------|-------------| | groupId | string | required | Consumer group ID. | | topics | string[] | required | Topics to subscribe to. | | fromBeginning | boolean | false | Start reading from the earliest offset instead of latest. | | sessionTimeout | number | 30000 | Time in ms before a consumer is considered dead if no heartbeat is received. | | heartbeatInterval | number | 3000 | How often in ms the consumer sends heartbeats. Also controls mid-chunk heartbeat frequency in sequential mode. | | rebalanceTimeout | number | 60000 | Max time in ms for a rebalance to complete. | | maxBytesPerPartition | number | - | Max bytes to fetch per partition per request. | | onError | (error, message) => Promise<void> | - | Global error handler called on every processing failure. | | tracing | { customAttributes?: Record<string, string \| number \| boolean> } | - | OpenTelemetry tracing config. | | retry | { initialRetryTime: number, retries: number } | { initialRetryTime: 100, retries: 8 } | Retry strategy for the underlying KafkaJS consumer. |

Pause and Resume

await consumer.pause();            // Pause all topics
await consumer.pause(['events']);   // Pause specific topics
await consumer.resume();           // Resume all topics

Event Envelope

Wrap event data with standardized metadata using the event envelope pattern:

import { createEventEnvelope } from '@woovi/kafka';

const envelope = createEventEnvelope({
  eventType: 'pix-out.compliance.approved',
  operationType: 'update',
  source: 'woovi-compliance',
  data: { movementId: '123', pixOutId: '456' },
  extraMetadata: { correlationId: 'abc-123', traceId: 'xyz-789' },
});

await producer.publish({ data: envelope });

The envelope has the shape:

{
  metadata: {
    eventId: string;      // auto-generated UUID
    eventType: string;    // e.g. "pix-out.compliance.approved"
    eventTime: string;    // auto-generated ISO 8601 timestamp
    operationType?: string; // e.g. "insert", "update", "delete"
    source: string;       // e.g. "woovi-compliance"
    // ...any extra metadata fields
  },
  data: T;
}

Parsing Event Envelopes (Consumer Side)

Use parseEventEnvelope to validate and unwrap envelope-wrapped messages in your consumer handlers:

import type { ConsumerMessage, EventEnvelope } from '@woovi/kafka';
import { parseEventEnvelope } from '@woovi/kafka';

const handler = async (message: ConsumerMessage<EventEnvelope<MyEvent>>) => {
  const { metadata, data } = parseEventEnvelope(message);

  console.log(metadata.eventId);    // UUID
  console.log(metadata.eventType);  // e.g. "pix-out.compliance.approved"
  console.log(metadata.eventTime);  // ISO 8601 timestamp
  console.log(metadata.source);     // e.g. "woovi-compliance"
  console.log(data);                // typed as MyEvent
};

Throws with a descriptive error if the message does not follow the envelope pattern (missing metadata, missing data, or malformed metadata fields).

Health Check

import { healthCheck } from '@woovi/kafka';

app.get('/health', async (req, res) => {
  const { healthy, error } = await healthCheck();
  res.status(healthy ? 200 : 503).json({ status: healthy ? 'ok' : 'error', error });
});

Graceful Shutdown

Producers and consumers disconnect automatically on SIGTERM/SIGINT. You can add custom cleanup:

import { onShutdown } from '@woovi/kafka';

onShutdown(async () => {
  await myCleanupFunction();
});

Metrics

Prometheus metrics are built in.

import { getMetrics, getMetricsContentType, enableDefaultMetrics } from '@woovi/kafka';

enableDefaultMetrics(); // optional: Node.js default metrics

app.get('/metrics', async (req, res) => {
  res.set('Content-Type', getMetricsContentType());
  res.send(await getMetrics());
});

Available Metrics

| Metric | Type | Description | |--------|------|-------------| | kafka_consumer_message_processing_duration_seconds | Histogram | Message processing latency | | kafka_consumer_messages_processed_total | Counter | Total messages processed | | kafka_consumer_messages_failed_total | Counter | Failed messages | | kafka_consumer_batch_processing_duration_seconds | Histogram | Batch processing latency | | kafka_consumer_batch_size | Histogram | Messages per batch | | kafka_consumer_last_message_timestamp_seconds | Gauge | Last processed timestamp | | kafka_producer_send_duration_seconds | Histogram | Producer send latency | | kafka_producer_messages_produced_total | Counter | Total messages produced |

Testing

Mock KafkaJS in your test setup:

// Vitest
vi.mock('kafkajs', () => import('@woovi/kafka/test-utils'));

// Jest
jest.mock('kafkajs', () => require('@woovi/kafka/test-utils'));

Then use the assertion helpers:

import { kafkaAssert, kafkaAssertLength, getKafkaMessages, clearAllMocks } from '@woovi/kafka/test-utils';

beforeEach(() => clearAllMocks());

it('publishes a user event', async () => {
  await myService.createUser({ name: 'John' });

  kafkaAssert({ topic: 'user-events', message: { type: 'user.created', name: 'John' } });
  kafkaAssertLength({ topic: 'user-events', length: 1 });

  // Or access raw messages
  const messages = getKafkaMessages();
  expect(messages[0].topic).toBe('user-events');
});

Asserting Event Envelopes

Use kafkaAssertEventEnvelope to assert messages published with the event envelope pattern:

import { kafkaAssertEventEnvelope, clearAllMocks } from '@woovi/kafka/test-utils';

beforeEach(() => clearAllMocks());

it('publishes a compliance event', async () => {
  await myService.approveCompliance({ movementId: '123' });

  kafkaAssertEventEnvelope({
    topic: 'pix-out.compliance.approved',
    eventType: 'pix-out.compliance.approved',
    source: 'woovi-compliance',
    data: { movementId: '123', pixOutId: '456' },
  });
});

This validates the full envelope structure (metadata with eventId, eventType, eventTime, source) and checks that the data is a subset match.

License

ISC