@composable-backend/kafka
v1.1.0
Published
First-party Kafka companion package for composable-backend
Downloads
65
Readme
@composable-backend/kafka
First-party Kafka companion package for composable-backend.
How to use @composable-backend/kafka
1. Install
npm install @composable-backend/kafka2. Configure application.yml
modules.autostart:
- 'kafka.adapter'
modules.autostop:
- 'kafka.adapter'3. Configure kafka-adapter.yaml (in resources)
consumer:
- broker: 'localhost:9092'
topic: 'leads.scored'
flow: 'process-lead'
group: 'lead-workflow-internal'
tracing: true
- broker: 'localhost:9091'
topic: 'orders.created'
flow: 'process-order'
group: 'order-processing'
tracing: true
producer:
- broker: 'localhost:9092'
topic: 'leads.scored'
- broker: 'kafka-prod:9092'
topic: 'notifications.email'Each consumer entry routes incoming messages to a composable-backend flow.
4. Define the flow (in resources/flows/)
# process-lead.yml
flow:
id: 'process-lead'
description: 'Handle incoming scored leads'
ttl: 10s
first.task: 'handle.lead'
tasks:
- name: 'handle.lead'
input:
- 'input.body -> *'
- 'input.header.topic -> header.topic'
process: 'v1.lead.log-scored'
output:
- 'result -> output.body'
description: 'Process the lead'
execution: end5. Register in preload
import { KafkaAdapter, KafkaNotification } from '@composable-backend/kafka';
platform.registerComposable(KafkaAdapter);
platform.registerComposable(KafkaNotification);6. Publish from any task (via PostOffice)
const req = new EventEnvelope()
.setTo('kafka.notification')
.setHeader('topic', 'leads.scored')
.setBody({ content: myPayload });
const po = new PostOffice();
await po.send(req);The broker is resolved from the producer section in kafka-adapter.yaml. You can also override it per-request with a broker header:
const req = new EventEnvelope()
.setTo('kafka.notification')
.setHeader('broker', 'kafka-prod:9092')
.setHeader('topic', 'notifications.email')
.setBody({ content: myPayload });7. Consume — just write a normal task
export const handler = async (evt: EventEnvelope) => {
const body = evt.getBody();
log.info(JSON.stringify(body, null, 2));
return body;
};
export default defineComposable({
process: 'v1.lead.log-scored', // matches the flow task process
handler,
instances: 10,
});8. SSL/SASL (optional)
Add ssl and sasl per entry to connect to secured brokers:
consumer:
- broker: 'kafka-prod:9093'
topic: 'leads.scored'
flow: 'process-lead'
group: 'lead-workflow-internal'
tracing: true
ssl: true
sasl:
mechanism: 'scram-sha-256'
username: 'my-user'
password: 'my-pass'
producer:
- broker: 'kafka-prod:9093'
topic: 'notifications.email'
ssl: true
sasl:
mechanism: 'scram-sha-256'
username: 'my-user'
password: 'my-pass'Supported SASL mechanisms: plain, scram-sha-256, scram-sha-512.
9. Independent consumer/producer clusters
Consumer and producer configurations are fully independent — even for the same topic. Each side resolves its own broker addresses and security credentials, so you can consume from one cluster and produce to another:
consumer:
- broker: 'cluster-a:9092'
topic: 'events'
flow: 'process-events'
group: 'my-group'
ssl: true
sasl:
mechanism: 'scram-sha-256'
username: 'reader'
password: 'reader-secret'
producer:
- broker: 'cluster-b:9093'
topic: 'events'
ssl: true
sasl:
mechanism: 'scram-sha-512'
username: 'writer'
password: 'writer-secret'In this example, incoming events messages are consumed from cluster-a with one set of credentials, while publishing to events goes to cluster-b with a different mechanism and user. No shared state between the two — brokers, SSL, and SASL are resolved independently per direction.
That's it
- No
src/kafka/folder - No KafkaJS imports in your app
- No manual consumer/producer code
- No startup/shutdown wiring
- Consumer topics route to flows — decide how to handle messages in YAML
- Add more topics = add more YAML lines
Publishing
New versions are published to npm automatically via GitHub Actions when a version tag is pushed.
npm version patch # or minor / major
git push --follow-tagsThe workflow builds, runs tests, and publishes with provenance. An NPM_TOKEN secret must be configured in the repository settings.
