@nwire/nats
v0.12.0
Published
Nwire — NATS-backed EventBus adapter. Core pub/sub OR JetStream (at-least-once + DLQ) — same EventBus contract.
Downloads
440
Readme
@nwire/nats
NATS-backed
EventBus— pick core pub/sub or JetStream + DLQ.
What it does
Two adapters share this package, both satisfying the @nwire/bus EventBus
contract — the runtime doesn't know which is wired in:
| Adapter | Semantics | Durability | Pick when |
| --------------- | ------------- | ------------------------- | ---------------------------------------------------------- |
| NatsEventBus | At-most-once | None (core pub/sub) | Low-ceremony cross-service fan-out. |
| NatsBus (G11) | At-least-once | JetStream + DLQ + retries | Restarts must not drop events; poison messages quarantine. |
Subject layout: <prefix>.<eventName>. Distinct prefixes isolate
deployments on a shared cluster.
Install
pnpm add @nwire/nats natsQuick start — JetStream (recommended for production)
import { connect } from "nats";
import { NatsBus } from "@nwire/nats";
import { lxApp } from "@amit/lx";
const bus = new NatsBus({
servers: "nats://nats:4222",
prefix: "lemida.events",
maxDeliver: 5,
ackWaitMs: 2_000,
connect,
});
await bus.connect();
const app = lxApp.create({ bus, publishToBus: true, appName: "lx-service" });
await app.start();Quick start — core pub/sub
import { connect } from "nats";
import { NatsEventBus } from "@nwire/nats";
const nc = await connect({ servers: "nats://nats:4222" });
const bus = new NatsEventBus({ connection: nc, prefix: "lemida" });NatsBus config
| Option | Default | Meaning |
| ------------ | --------------------- | --------------------------------------------------------- |
| servers | (required) | NATS URL(s). |
| name | none | Client name (visible in nats-top). |
| prefix | nwire.events | Subject prefix; the stream binds <prefix>.>. |
| streamName | derived from prefix | JetStream stream name. |
| dlqSubject | <prefix>.dlq | Where exhausted messages land. |
| maxDeliver | 5 | How many times JetStream redelivers before DLQ. |
| ackWaitMs | 1000 | Ack window; also used as nak backoff. |
| connect | (required) | connect from the nats package (injected for testing). |
| logger | no-op | @nwire/logger instance. |
DLQ pattern
When a handler throws and JetStream's maxDeliver is exhausted, NatsBus
publishes a BusDeadLetterRecord onto dlqSubject:
{
originalSubject: "nwire.events.billing.charge-failed",
event: { /* the full BusEventMessage */ },
error: { message: "card declined", stack: "..." },
deliveryCount: 5,
deadLetteredAt: "2026-05-29T12:00:00.000Z",
}Drain it into your incident pipeline by subscribing through any NATS client:
import { connect } from "nats";
const raw = await connect({ servers: "nats://nats:4222" });
const sub = raw.subscribe("nwire.events.dlq");
for await (const m of sub) {
const record = JSON.parse(new TextDecoder().decode(m.data));
await sentry.captureMessage(`Dead-letter ${record.event.eventName}`, {
extra: record,
});
}Local development
docker-compose.yml at the repo root ships a nats service with JetStream
enabled. Start it with nwire infra up (or docker compose up -d nats).
Testing
Unit tests run without docker (fake NATS client). Integration tests spin up
a real nats:2-alpine container via testcontainers and are gated behind
RUN_INTEGRATION=1:
pnpm --filter @nwire/nats test # unit only
RUN_INTEGRATION=1 pnpm --filter @nwire/nats test:integrationWithin nwire-app
Wire the bus on createApp and the runtime fans cross-service events
through it automatically — no domain code changes.
import { createApp } from "@nwire/forge";
const app = createApp({
modules: [
/* ... */
],
bus,
publishToBus: true,
appName: "lx-service",
});