@octaviaflow/eventbus
v0.1.0
Published
Bun-native NATS JetStream wrapper for Octaviaflow services
Maintainers
Readme
@octaviaflow/eventbus
Bun-native NATS JetStream wrapper for the Octaviaflow platform. Provides a single
EventBus interface used by Octaviaflow-CDC (producer) and downstream services
(Octaviaflow-Engine, Octaviaflow-Scheduler) as consumers.
Install
bun add @octaviaflow/eventbusQuick start (producer)
import { createEventBus, buildIngressSubject, makeEnvelope } from '@octaviaflow/eventbus';
const bus = createEventBus({
url: process.env.NATS_URL!,
serviceName: 'octaviaflow-cdc',
credsFile: process.env.NATS_CREDS_FILE,
});
await bus.connect();
await bus.ensureStreams();
const env = makeEnvelope({
tenantId: 'org_123',
connectionId: 'conn_456',
connector: 'salesforce',
entity: 'Account',
entityId: 'a01xx00000abcdef',
op: 'update',
occurredAt: new Date().toISOString(),
payload: { /* ... */ },
});
const subject = buildIngressSubject(env);
const ack = await bus.publish(subject, env);
// ack.duplicate === true means the broker rejected as a duplicate within the dedupe windowQuick start (consumer)
const stop = await bus.consume(
{
stream: 'EVENTS_INGRESS',
durable: 'engine-worker',
filterSubject: 'ingress.>',
maxAckPending: 64,
maxDeliver: 5,
},
async (env, ack) => {
await processFlow(env);
await ack.ack();
}
);
process.on('SIGTERM', async () => {
await stop();
await bus.disconnect();
});Streams
Three streams are managed by the package via ensureStreams():
| Stream | Subjects | Retention | Max age |
|---|---|---|---|
| EVENTS_INGRESS | ingress.> | WorkQueue | 7d |
| EVENTS_PROCESSED | processed.> | Limits | 30d |
| EVENTS_DLQ | dlq.> | Limits | 30d |
Tests
bun test # unit tests only (no NATS required)
NATS_TEST_URL=nats://localhost:4222 bun run test:integration # integration tests