npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@kaapi/kafka-messaging

v0.0.42

Published

Kafka-based messaging for kaapi

Downloads

109

Readme

📦 @kaapi/kafka-messaging

@kaapi/kafka-messaging is a lightweight wrapper around kafkajs that integrates with the Kaapi framework to provide a clean and consistent message publishing and consuming interface.

It abstracts Kafka's producer/consumer logic and provides a simple interface to:

  • ✅ Publish messages (single or batch)
  • ✅ Subscribe to topics with flexible consumer group options
  • ✅ Support structured logging via Kaapi's logger
  • ✅ Handle offsets and message metadata
  • ✅ Reuse Kafka producers/consumers with race-condition protection
  • ✅ Custom error handling for failed message handlers

✨ Features

  • Simple publish(topic, message) and publishBatch(topic, messages) APIs
  • Flexible subscribe(topic, handler, config) with custom groupId, error handling, and offset tracking
  • Singleton producer with race-condition safe initialization
  • Lazy admin initialization to minimize connections
  • KafkaJS-compatible configuration
  • Structured logging via Kaapi's ILogger
  • Typed message handling with TypeScript
  • Graceful shutdown with detailed summary

🚀 Getting Started with KafkaMessaging

This guide walks you through setting up and using the KafkaMessaging class to publish and consume messages with Apache Kafka.

Installation

npm install @kaapi/kafka-messaging kafkajs

Basic Setup

import { KafkaMessaging } from '@kaapi/kafka-messaging';

const messaging = new KafkaMessaging({
    clientId: 'my-app',
    brokers: ['localhost:9092'],
    name: 'my-service',
    address: 'service-1',
    logger: createLogger() // optional, use Kaapi ILogger
});

The constructor accepts a KafkaMessagingConfig object, which extends KafkaConfig from kafkajs:

| Option | Type | Description | | ---------- | ---------------- | ------------------------------------------------------------------------- | | brokers | string[] | List of Kafka broker addresses (e.g. ['localhost:9092']). Required. | | clientId | string | Unique client identifier for Kafka. | | logger | ILogger | Optional logger implementing Kaapi's ILogger interface. | | address | string | Optional unique service address for routing and identification. | | name | string | Optional human-readable name for service tracking/monitoring. | | producer | ProducerConfig | Optional default KafkaJS producer configuration. |


Creating a Topic

await messaging.createTopic({
    topic: 'my-topic',
    numPartitions: 1,
    replicationFactor: 1,
}, {
    waitForLeaders: true
});

// ensure the topic is ready before publishing
const timeoutMs = 10000;
const checkIntervalMs = 200;
await messaging.waitForTopicReady('my-topic', timeoutMs, checkIntervalMs);

Publishing Messages

Single Message

publish(topic, message) sends a message to a given Kafka topic.

await messaging.publish('my-topic', {
    userId: '123',
    action: 'login',
});

Messages can be:

  • Objects → automatically JSON-serialized
  • Strings → sent as-is
  • Buffers → sent as-is (for binary data)
  • null → sent as null (tombstone messages)

Batch Publishing

publishBatch(topic, messages) sends multiple messages in a single request for better throughput.

await messaging.publishBatch('user-events', [
    { value: { event: 'user.created', userId: '1' } },
    { value: { event: 'user.created', userId: '2' } },
    { value: { event: 'user.updated', userId: '3' }, key: 'user-3' },
    { value: { event: 'user.deleted', userId: '4' }, headers: { priority: 'high' } },
]);

Each message in the batch can include:

  • value — the message payload (required)
  • key — optional partition key
  • partition — optional specific partition
  • headers — optional custom headers

Subscribing to a Topic

subscribe(topic, handler, config?) subscribes to a Kafka topic and calls the provided handler on each message.

await messaging.subscribe('my-topic', async (message, context) => {
    console.log('Received:', message);
    console.log('Offset:', context.offset);
    console.log('Timestamp:', context.timestamp);
}, {
    fromBeginning: true
});

Subscribe Configuration

| Option | Type | Description | | --------------- | ---------- | --------------------------------------------------------------------------- | | groupId | string | Custom consumer group ID. Overrides auto-generated ID. | | groupIdPrefix | string | Prefix for auto-generated group ID (default: service name). | | fromBeginning | boolean | Start consuming from the beginning of the topic. | | logOffsets | boolean | Log partition offsets on subscribe (adds admin overhead). Default: false. | | onReady | function | Callback invoked when the consumer is ready. | | onError | function | Callback invoked when a message handler throws an error. |

Consumer Group ID Resolution

The consumer group ID is resolved in this order:

  1. groupId if provided
  2. {groupIdPrefix}.{topic} if prefix provided
  3. {name}.{topic} using the service name from config
  4. group.{topic} as fallback
// Using custom group ID
await messaging.subscribe('user-events', handler, { 
    groupId: 'my-custom-consumer-group' 
});

// Using custom prefix → "analytics.user-events"
await messaging.subscribe('user-events', handler, { 
    groupIdPrefix: 'analytics' 
});

Error Handling

Use the onError callback to handle errors from message handlers without crashing:

await messaging.subscribe('user-events', async (message) => {
    await processMessage(message); // might throw
}, {
    onError: async (error, message, context) => {
        console.error('Failed to process message:', error);
        console.error('Message:', message);
        console.error('Offset:', context.offset);
        
        // Log to external service, send to DLQ, etc.
        await alertService.notify(error);
    }
});

The onError callback receives:

  • error — the error thrown by the handler
  • message — the parsed message that failed
  • context — the message context (offset, headers, timestamp, etc.)

Consumer Ready Callback

await messaging.subscribe('my-topic', handler, {
    onReady: (consumer) => {
        console.log('Consumer is ready!');
        // Access the raw KafkaJS consumer if needed
    }
});

Fetching Topic Offsets

const offsets = await messaging.fetchTopicOffsets('my-topic');

offsets?.forEach((partition) => {
    console.log(`Partition ${partition.partition}: offset=${partition.offset}, high=${partition.high}, low=${partition.low}`);
});

Graceful Shutdown

const result = await messaging.shutdown();

console.log(`Disconnected ${result.successProducers} producers`);
console.log(`Disconnected ${result.successConsumers} consumers`);
console.log(`Disconnected ${result.successAdmins} admins`);
console.log(`Errors: ${result.errorCount}`);

This will disconnect all tracked producers, consumers, and admin clients safely.

// Example: graceful shutdown on SIGTERM
process.on('SIGTERM', async () => {
    const result = await messaging.shutdown();
    console.log(`Shutdown complete: ${result.errorCount} errors`);
    process.exit(0);
});

🧱 Example Usage

// messaging.ts

import { Kaapi } from '@kaapi/kaapi'
import { KafkaMessaging } from '@kaapi/kafka-messaging';

const messaging = new KafkaMessaging({
    clientId: 'my-app',
    brokers: ['localhost:9092'],
    name: 'my-service',
    address: 'service-1'
});

/**
 * Initialize the Kaapi app with messaging
 */
const app = new Kaapi({
    port: 3000,
    host: 'localhost',
    messaging,
});

/**
 * Demonstrates how to subscribe and publish a message
 */
async function runExample(): Promise<void> {

    /**
     * Option 1: Use Kaapi app (recommended in app lifecycle)
     */
    // Publish a message
    await app.publish('my-topic', { event: 'user.created', userId: 456 });

    // Subscribe to messages
    await app.subscribe('my-topic', async (message, context) => {
        console.log('Received:', message);
        console.log('Offset:', context.offset);
    });

    /**
     * Option 2: Use messaging directly (standalone)
     */
    // Publish a message
    await messaging.publish('my-topic', { event: 'user.created', userId: 123 });

    // Subscribe with error handling
    await messaging.subscribe('my-topic', async (message, context) => {
        console.log('Received:', message);
        console.log('Offset:', context.offset);
    }, {
        fromBeginning: true,
        onError: (error, message, context) => {
            console.error('Handler failed:', error);
        }
    });

    // Batch publish
    await messaging.publishBatch('my-topic', [
        { value: { event: 'user.created', userId: 1 } },
        { value: { event: 'user.created', userId: 2 } },
    ]);
}

runExample().catch((err) => {
    console.error('❌ Messaging example failed:', err);
});

Public API Contract

The KafkaMessaging class provides a safe and resilient interface for interacting with Kafka. Developers should use the following methods to ensure proper lifecycle management, resource tracking, and graceful shutdown.

Public Methods

| Method | Purpose | | ------------------------------------------------- | ---------------------------------------------------------------------------- | | createProducer(config?) | Creates and connects a Kafka producer. Automatically tracked. | | createConsumer(groupId, config?) | Creates and connects a Kafka consumer. Automatically tracked. | | createAdmin(config?) | Creates and connects a Kafka admin client. Tracked for shutdown. | | getProducer() | Gets or creates the singleton producer (race-condition safe). | | publish(topic, message) | Sends a message to the specified topic. | | publishBatch(topic, messages) | Sends multiple messages in a single batch. | | subscribe(topic, handler, config?) | Subscribes to a topic and processes messages with the given handler. | | fetchTopicOffsets(topic) | Fetches partition offsets for a topic. | | createTopic(topicConfig, options?) | Creates a Kafka topic with optional validation and leader wait. | | waitForTopicReady(topic, timeoutMs?, intervalMs?) | Waits for a topic to be ready (has partitions). | | shutdown() | Gracefully disconnects all tracked clients. Returns a summary. | | safeDisconnect(client, timeoutMs?) | Disconnects a Kafka client with timeout protection. | | disconnectProducer() | Disconnects the singleton producer. |

Read-only Properties

| Property | Type | Description | | ----------------- | ----------------------- | ---------------------------------- | | activeProducers | ReadonlySet<Producer> | Currently tracked producers. | | activeConsumers | ReadonlySet<Consumer> | Currently tracked consumers. |

Internal/Protected Methods

| Method | Status | Reason | | ---------------- | --------- | ------------------------------------------------------------- | | getKafka() | Protected | Used internally to instantiate Kafka clients. | | getSharedAdmin() | Protected | Lazy-initialized shared admin for internal operations. |

Best Practices

  • Always use createProducer, createConsumer, or createAdmin to ensure proper tracking.
  • Use getProducer() for the singleton producer pattern (recommended for most use cases).
  • Avoid accessing the raw Kafka instance directly.
  • Call shutdown() during application teardown to release resources.
  • Use createTopic() and waitForTopicReady() in tests or dynamic topic scenarios.
  • Use onError callback in subscribe() to handle message processing failures gracefully.

🛠️ Requirements

  • Node.js 18+
  • A running Kafka instance
  • Optional: integrate into a Kaapi service lifecycle

📚 Related


🧪 Testing

# Run mock tests (no Kafka required)
pnpm test

# Run integration tests (requires Kafka broker)
pnpm test:integration

# Run all tests
pnpm test:all

You can run Kafka locally using Docker:

docker run -d --name kafka \
  -p 9092:9092 \
  apache/kafka:latest

📝 License

MIT