kafka-pi-technologies
v0.1.16
Published
Reusable, robust Kafka consumer package with dynamic handler mapping, consumer pools and multi-topic support
Readme
kafka-pi
Generic, resilient Kafka consumer & producer toolkit based on kafkajs.
Features
- ResilientConsumer – robust wrapper with automatic retries, JSON parsing, and error propagation.
- ConsumerPool – spawn a configurable pool of consumers per topic.
- TopicManager – orchestrates multiple pools (multi-topic support).
- KafkaProducer – lightweight, connection-reusing producer.
- HandlerRegistry – simple mapping of
type -> async handlerfor dynamic routing.
Installation
npm install @pi-share-technologies/kafka-pi \Quick start
const { initKafka, HandlerRegistry, Producer } = require('kafka-pi');
const registry = new HandlerRegistry();
registry.register('productDetection', async (payload) => {
/* your handler code */
});
await initKafka({
kafkaConfig: {
brokers: ['localhost:9092'],
clientId: 'my-service',
groupId: 'my-group',
topics: ['product-app-events']
},
registry,
poolSize: 4,
});
// Producer example
const producer = new Producer({ brokers: ['localhost:9092'], clientId: 'my-service' });
await producer.ready;
await producer.send('my-topic', [{ key: 'k1', value: JSON.stringify({ msg: 'hi' }) }]);License
MIT
