@nodii/outbox-dispatcher
v0.3.0
Published
Pattern 3 producer-side outbox drainer for the Nodii microservice stack — defineDispatcher + startDispatcher + single-active SET-NX-EX lease (D209) + FOR UPDATE SKIP LOCKED drain + XADD to Redis Streams + currentHwm + DLQ on max-retry + graceful drain + m
Readme
@nodii/outbox-dispatcher
Producer-side Pattern-3 outbox drainer for the Nodii microservice stack —
the symmetric peer of @nodii/replica-consumer (the
consumer side). Drains a service's <service>_outbox table to Redis Streams
via XADD, with single-active-worker leasing, retry-budget + DLQ, HWM
exposure for snapshot live-stitch, lag/depth metrics, and graceful drain.
Spec: planning hub feature_doc serviceId=nodii-libs docKey=outbox-dispatcher.
Implements 01-communication-doctrine § 9.2 (drainer, softened by D209:
SET-NX-EX lease is canonical, BullMQ optional) + 09-replication-doctrine
§ 2.4 (XADD envelope) + § 7 (snapshot HWM cursor).
Quick start
import { defineDispatcher, startDispatcher } from "@nodii/outbox-dispatcher";
const dispatcher = defineDispatcher({
serviceName: "nodii-tenant-service",
outboxTable: "outbox", // 09-replication § 2.2
streamFor: (row) => `tenant.${row.topic.split(".")[1]}`,
leaseKey: "lock:outbox:tenant-service", // D209 SET-NX-EX
leaseTtlMs: 30_000, // renew at ttl/2
pollMs: 500, // § 9.2
batchSize: 50, // § 9.2
publishMaxAttempts: 8, // DLQ threshold
drainDeadlineSeconds: 30, // graceful drain
});
const handle = await startDispatcher([dispatcher], { pgPool, redis, telemetry });
// snapshot RPC handler at freeze:
const hwm = handle.currentHwm("tenant.user_membership"); // → "1748100000000-0"Drain sequence (§ 5.2)
acquire-lease (D209) → SELECT … FOR UPDATE SKIP LOCKED → XADD canonical
envelope → UPDATE … SET dispatched_at = now(), stream_id = … → release.
At-least-once: the consumer dedupes on event_id via consumed_events.
Rows with dispatched_at IS NOT NULL are never re-claimed (idempotent
re-XADD on restart).
XADD envelope — consumer contract
The XADD output is byte-identical to what @nodii/replica-consumer reads:
a single Redis-Streams field named event carrying
JSON.stringify({ event_id, topic, aggregate_kind, aggregate_id,
entity_version, payload, stream_id, committed_at }). The outbox column
source_entity_version maps to the envelope key entity_version; the producer
telemetry_envelope is folded into payload._telemetry. The contract test
(tests/contract.test.ts) drives this output back through the consumer's read
path to prove round-trip. (The feature_doc § 5.2 lists individual XADD fields +
source_entity_version; the shipped consumer reads a single JSON event
field keyed entity_version — the consumer's read path is the binding
contract per the feature_doc's own statement; drift filed.)
DLQ (§ 5.4)
Rows hitting publish_attempts >= publishMaxAttempts move to
<service>_outbox_dlq (original_outbox_id, final_error, payload, …) in
the same tx as the failure, and a page-tier outbox.publish_failures counter
fires. DLQ rows are not auto-retried.
Metrics (§ 5.6)
outbox.depth{topic} · outbox.lag_seconds{topic} · outbox.dispatch_rate{topic}
· outbox.publish_failures{topic,reason} · outbox.hwm{stream} ·
outbox.lease_renewals{result}.
migrate-gen (§ 5.8)
bunx @nodii/outbox-dispatcher migrate-gen --service nodii-tenant-service [--out-dir .]Emits the <service>_outbox + <service>_outbox_dlq DDL + a src/outbox/boot.ts
skeleton.
Optional @nodii/db-rls (08-rls § 4)
Pass tenantId on the dispatcher + a dbRls handle at startDispatcher and
the drain SELECT runs inside withSystemContext({ tenantId }). Without it,
the raw pool is used (platform-tenant outbox only).
