@anabranch/eventlog-kafka
v0.2.1
Published
Kafka adapter for eventlog using kafkajs with Task/Stream semantics
Readme
@anabranch/eventlog-kafka
Kafka adapter for @anabranch/eventlog using kafkajs.
Provides Task/Stream semantics for event-sourced systems with Apache Kafka,
Confluent Cloud, Redpanda, and other Kafka-compatible services.
Usage
import { EventLog } from '@anabranch/eventlog'
import { createKafka } from '@anabranch/eventlog-kafka'
const connector = createKafka({
brokers: ['localhost:9092'],
clientId: 'my-app',
consumer: {
maxWaitTimeInMs: 100,
sessionTimeout: 6000,
},
})
const log = await EventLog.connect(connector).run()
// Append events with partition keys
const eventId = await log.append('users', {
type: 'UserCreated',
userId: 'user-123',
email: '[email protected]',
}, { partitionKey: 'user-123' }).run()
// Consume events as a stream
await log
.consume<UserEvent>('users', 'my-processor', {
batchSize: 50,
})
.tap((batch) => {
for (const event of batch.events) {
console.log(event.data)
}
})
.map(async (batch) => {
await batch.commit()
})
.partition()API
createKafka(options)
Creates a Kafka connector.
import { createKafka } from '@anabranch/eventlog-kafka'
const connector = createKafka({
brokers: ['localhost:9092'],
clientId: 'my-app',
sasl: {
mechanism: 'plain',
username: 'admin',
password: 'secret',
},
ssl: true,
consumer: {
maxWaitTimeInMs: 100,
sessionTimeout: 6000,
},
})Options
brokers(required): Array of Kafka broker addressesclientId: Client ID for the producersasl: SASL authentication configurationssl: Enable SSL/TLS (default:false)consumer: Kafka consumer configuration (passed to kafkajs)producer: Kafka producer configuration (passed to kafkajs)admin: Kafka admin configuration (passed to kafkajs)onMalformedMessage: Callback for unparseable messages
Environment Variables
KAFKA_URL: Comma-separated list of broker addresses (alternative tobrokers)
Requirements
- Node.js 24+ or Deno
- Kafka server (local or remote)
Installation
Deno:
import { createKafka } from '@anabranch/eventlog-kafka'Node.js:
npm install @anabranch/eventlog-kafka @anabranch/eventlog kafkajsSee @anabranch/eventlog for the core event log abstraction.
See generated documentation for full API details.
