@kaapi/kafka-messaging
v0.0.42
Published
Kafka-based messaging for kaapi
Downloads
109
Maintainers
Readme
📦 @kaapi/kafka-messaging
@kaapi/kafka-messaging is a lightweight wrapper around kafkajs that integrates with the Kaapi framework to provide a clean and consistent message publishing and consuming interface.
It abstracts Kafka's producer/consumer logic and provides a simple interface to:
- ✅ Publish messages (single or batch)
- ✅ Subscribe to topics with flexible consumer group options
- ✅ Support structured logging via Kaapi's logger
- ✅ Handle offsets and message metadata
- ✅ Reuse Kafka producers/consumers with race-condition protection
- ✅ Custom error handling for failed message handlers
✨ Features
- Simple
publish(topic, message)andpublishBatch(topic, messages)APIs - Flexible
subscribe(topic, handler, config)with custom groupId, error handling, and offset tracking - Singleton producer with race-condition safe initialization
- Lazy admin initialization to minimize connections
- KafkaJS-compatible configuration
- Structured logging via Kaapi's
ILogger - Typed message handling with TypeScript
- Graceful shutdown with detailed summary
🚀 Getting Started with KafkaMessaging
This guide walks you through setting up and using the KafkaMessaging class to publish and consume messages with Apache Kafka.
Installation
npm install @kaapi/kafka-messaging kafkajsBasic Setup
import { KafkaMessaging } from '@kaapi/kafka-messaging';
const messaging = new KafkaMessaging({
clientId: 'my-app',
brokers: ['localhost:9092'],
name: 'my-service',
address: 'service-1',
logger: createLogger() // optional, use Kaapi ILogger
});The constructor accepts a KafkaMessagingConfig object, which extends KafkaConfig from kafkajs:
| Option | Type | Description |
| ---------- | ---------------- | ------------------------------------------------------------------------- |
| brokers | string[] | List of Kafka broker addresses (e.g. ['localhost:9092']). Required. |
| clientId | string | Unique client identifier for Kafka. |
| logger | ILogger | Optional logger implementing Kaapi's ILogger interface. |
| address | string | Optional unique service address for routing and identification. |
| name | string | Optional human-readable name for service tracking/monitoring. |
| producer | ProducerConfig | Optional default KafkaJS producer configuration. |
Creating a Topic
await messaging.createTopic({
topic: 'my-topic',
numPartitions: 1,
replicationFactor: 1,
}, {
waitForLeaders: true
});
// ensure the topic is ready before publishing
const timeoutMs = 10000;
const checkIntervalMs = 200;
await messaging.waitForTopicReady('my-topic', timeoutMs, checkIntervalMs);Publishing Messages
Single Message
publish(topic, message) sends a message to a given Kafka topic.
await messaging.publish('my-topic', {
userId: '123',
action: 'login',
});Messages can be:
- Objects → automatically JSON-serialized
- Strings → sent as-is
- Buffers → sent as-is (for binary data)
- null → sent as null (tombstone messages)
Batch Publishing
publishBatch(topic, messages) sends multiple messages in a single request for better throughput.
await messaging.publishBatch('user-events', [
{ value: { event: 'user.created', userId: '1' } },
{ value: { event: 'user.created', userId: '2' } },
{ value: { event: 'user.updated', userId: '3' }, key: 'user-3' },
{ value: { event: 'user.deleted', userId: '4' }, headers: { priority: 'high' } },
]);Each message in the batch can include:
value— the message payload (required)key— optional partition keypartition— optional specific partitionheaders— optional custom headers
Subscribing to a Topic
subscribe(topic, handler, config?) subscribes to a Kafka topic and calls the provided handler on each message.
await messaging.subscribe('my-topic', async (message, context) => {
console.log('Received:', message);
console.log('Offset:', context.offset);
console.log('Timestamp:', context.timestamp);
}, {
fromBeginning: true
});Subscribe Configuration
| Option | Type | Description |
| --------------- | ---------- | --------------------------------------------------------------------------- |
| groupId | string | Custom consumer group ID. Overrides auto-generated ID. |
| groupIdPrefix | string | Prefix for auto-generated group ID (default: service name). |
| fromBeginning | boolean | Start consuming from the beginning of the topic. |
| logOffsets | boolean | Log partition offsets on subscribe (adds admin overhead). Default: false. |
| onReady | function | Callback invoked when the consumer is ready. |
| onError | function | Callback invoked when a message handler throws an error. |
Consumer Group ID Resolution
The consumer group ID is resolved in this order:
groupIdif provided{groupIdPrefix}.{topic}if prefix provided{name}.{topic}using the service name from configgroup.{topic}as fallback
// Using custom group ID
await messaging.subscribe('user-events', handler, {
groupId: 'my-custom-consumer-group'
});
// Using custom prefix → "analytics.user-events"
await messaging.subscribe('user-events', handler, {
groupIdPrefix: 'analytics'
});Error Handling
Use the onError callback to handle errors from message handlers without crashing:
await messaging.subscribe('user-events', async (message) => {
await processMessage(message); // might throw
}, {
onError: async (error, message, context) => {
console.error('Failed to process message:', error);
console.error('Message:', message);
console.error('Offset:', context.offset);
// Log to external service, send to DLQ, etc.
await alertService.notify(error);
}
});The onError callback receives:
error— the error thrown by the handlermessage— the parsed message that failedcontext— the message context (offset, headers, timestamp, etc.)
Consumer Ready Callback
await messaging.subscribe('my-topic', handler, {
onReady: (consumer) => {
console.log('Consumer is ready!');
// Access the raw KafkaJS consumer if needed
}
});Fetching Topic Offsets
const offsets = await messaging.fetchTopicOffsets('my-topic');
offsets?.forEach((partition) => {
console.log(`Partition ${partition.partition}: offset=${partition.offset}, high=${partition.high}, low=${partition.low}`);
});Graceful Shutdown
const result = await messaging.shutdown();
console.log(`Disconnected ${result.successProducers} producers`);
console.log(`Disconnected ${result.successConsumers} consumers`);
console.log(`Disconnected ${result.successAdmins} admins`);
console.log(`Errors: ${result.errorCount}`);This will disconnect all tracked producers, consumers, and admin clients safely.
// Example: graceful shutdown on SIGTERM
process.on('SIGTERM', async () => {
const result = await messaging.shutdown();
console.log(`Shutdown complete: ${result.errorCount} errors`);
process.exit(0);
});🧱 Example Usage
// messaging.ts
import { Kaapi } from '@kaapi/kaapi'
import { KafkaMessaging } from '@kaapi/kafka-messaging';
const messaging = new KafkaMessaging({
clientId: 'my-app',
brokers: ['localhost:9092'],
name: 'my-service',
address: 'service-1'
});
/**
* Initialize the Kaapi app with messaging
*/
const app = new Kaapi({
port: 3000,
host: 'localhost',
messaging,
});
/**
* Demonstrates how to subscribe and publish a message
*/
async function runExample(): Promise<void> {
/**
* Option 1: Use Kaapi app (recommended in app lifecycle)
*/
// Publish a message
await app.publish('my-topic', { event: 'user.created', userId: 456 });
// Subscribe to messages
await app.subscribe('my-topic', async (message, context) => {
console.log('Received:', message);
console.log('Offset:', context.offset);
});
/**
* Option 2: Use messaging directly (standalone)
*/
// Publish a message
await messaging.publish('my-topic', { event: 'user.created', userId: 123 });
// Subscribe with error handling
await messaging.subscribe('my-topic', async (message, context) => {
console.log('Received:', message);
console.log('Offset:', context.offset);
}, {
fromBeginning: true,
onError: (error, message, context) => {
console.error('Handler failed:', error);
}
});
// Batch publish
await messaging.publishBatch('my-topic', [
{ value: { event: 'user.created', userId: 1 } },
{ value: { event: 'user.created', userId: 2 } },
]);
}
runExample().catch((err) => {
console.error('❌ Messaging example failed:', err);
});Public API Contract
The KafkaMessaging class provides a safe and resilient interface for interacting with Kafka. Developers should use the following methods to ensure proper lifecycle management, resource tracking, and graceful shutdown.
Public Methods
| Method | Purpose |
| ------------------------------------------------- | ---------------------------------------------------------------------------- |
| createProducer(config?) | Creates and connects a Kafka producer. Automatically tracked. |
| createConsumer(groupId, config?) | Creates and connects a Kafka consumer. Automatically tracked. |
| createAdmin(config?) | Creates and connects a Kafka admin client. Tracked for shutdown. |
| getProducer() | Gets or creates the singleton producer (race-condition safe). |
| publish(topic, message) | Sends a message to the specified topic. |
| publishBatch(topic, messages) | Sends multiple messages in a single batch. |
| subscribe(topic, handler, config?) | Subscribes to a topic and processes messages with the given handler. |
| fetchTopicOffsets(topic) | Fetches partition offsets for a topic. |
| createTopic(topicConfig, options?) | Creates a Kafka topic with optional validation and leader wait. |
| waitForTopicReady(topic, timeoutMs?, intervalMs?) | Waits for a topic to be ready (has partitions). |
| shutdown() | Gracefully disconnects all tracked clients. Returns a summary. |
| safeDisconnect(client, timeoutMs?) | Disconnects a Kafka client with timeout protection. |
| disconnectProducer() | Disconnects the singleton producer. |
Read-only Properties
| Property | Type | Description |
| ----------------- | ----------------------- | ---------------------------------- |
| activeProducers | ReadonlySet<Producer> | Currently tracked producers. |
| activeConsumers | ReadonlySet<Consumer> | Currently tracked consumers. |
Internal/Protected Methods
| Method | Status | Reason |
| ---------------- | --------- | ------------------------------------------------------------- |
| getKafka() | Protected | Used internally to instantiate Kafka clients. |
| getSharedAdmin() | Protected | Lazy-initialized shared admin for internal operations. |
Best Practices
- Always use
createProducer,createConsumer, orcreateAdminto ensure proper tracking. - Use
getProducer()for the singleton producer pattern (recommended for most use cases). - Avoid accessing the raw Kafka instance directly.
- Call
shutdown()during application teardown to release resources. - Use
createTopic()andwaitForTopicReady()in tests or dynamic topic scenarios. - Use
onErrorcallback insubscribe()to handle message processing failures gracefully.
🛠️ Requirements
- Node.js 18+
- A running Kafka instance
- Optional: integrate into a Kaapi service lifecycle
📚 Related
- KafkaJS — the underlying Kafka client
- Kaapi — framework powering this abstraction
- @kaapi/kaapi
🧪 Testing
# Run mock tests (no Kafka required)
pnpm test
# Run integration tests (requires Kafka broker)
pnpm test:integration
# Run all tests
pnpm test:allYou can run Kafka locally using Docker:
docker run -d --name kafka \
-p 9092:9092 \
apache/kafka:latest📝 License
MIT
