@babelqueue/pulsar
v1.0.0
Published
Apache Pulsar adapter for BabelQueue — canonical-envelope publisher and a URN-routed consumer.
Maintainers
Readme
@babelqueue/pulsar
Apache Pulsar adapter for BabelQueue — a canonical-envelope
publisher and a URN-routed consumer over pulsar-client, on @babelqueue/core.
Implements
§5 of the broker-bindings contract,
so a Pulsar-based Node service speaks the same wire contract as the .NET, Java, Python and Go
SDKs.
Install
npm i @babelqueue/pulsar pulsar-clientpulsar-client is an optional peer — you provide the producer/consumer (a Producer /
Consumer from pulsar-client satisfies the adapter structurally).
Use
import Pulsar from "pulsar-client";
import { PulsarPublisher, PulsarConsumer } from "@babelqueue/pulsar";
const client = new Pulsar.Client({ serviceUrl: "pulsar://localhost:6650" });
// produce
const producer = await client.createProducer({ topic: "orders" });
const id = await new PulsarPublisher(producer)
.publish("urn:babel:orders:created", { order_id: 1042 });
// consume (Shared subscription)
const sub = await client.subscribe({
topic: "orders",
subscription: "babelqueue",
subscriptionType: "Shared",
});
const consumer = new PulsarConsumer(
sub,
{
"urn:babel:orders:created": async (env, message) => {
console.log(env.data.order_id, env.trace_id, env.attempts);
},
},
{ onError: (err) => console.error(err) },
);
await consumer.run();Delayed delivery: publish(urn, data, { delayMs: 300000 }) → native deliverAfter. The
consumer routes purely on the bq-job property.
Contract mapping (§5)
| Envelope | Apache Pulsar |
| :--- | :--- |
| body | message payload (byte-identical across SDKs) |
| job (URN) | property bq-job (consumer routes on this) |
| trace_id | property bq-trace-id |
| meta.id | property bq-message-id |
| meta.schema_version | property bq-schema-version |
| meta.lang | property bq-source-lang |
| meta.created_at | publish time (mirror; body authoritative) |
| attempts | property bq-attempts (authoritative), cross-checked against getRedeliveryCount() |
| reserve / ack / retry | acknowledge / negativeAcknowledge |
Pulsar properties are string→string, so bq-attempts carries the contract attempts and is
authoritative. The consumer reconciles to max(bq-attempts, getRedeliveryCount()):
getRedeliveryCount() is 0-based (0 on first delivery) so it maps directly with no −1,
and the max never lowers a higher body count — so a republish-driven retry and a native
redelivery both converge on the same number. A throwing handler negativeAcknowledges, so
the broker redelivers (at-least-once). The poll loop never stops on a bad message — observe
via onError / onUnknownUrn. The envelope is unchanged (schema_version stays 1);
Pulsar is purely additive.
The producer/consumer are injected, so the unit tests use fakes — no Pulsar, no broker. Dual ESM + CJS.
License
MIT
