@arbitro/client
v0.5.2
Published
Official TypeScript client for the Arbitro message broker
Maintainers
Readme
@arbitro/client
Official TypeScript client for the Arbitro stateful flow broker.
Status: under active development. APIs, benchmarks, defaults, and reconnect behavior may still change.
@arbitro/client is built for the features that make Arbitro different from a plain pub/sub broker:
- durable streams and consumers
- exact
stream/consumerintrospection and idempotentupsert - automatic reconnect + subscription reattach
- subject-level
maxSubjectInflights— the strongest flow-control feature in the system - live
ack_pendingqueries per consumer - client + broker metrics for observability
Why Arbitro
The headline feature is maxSubjectInflights — per-subject in-flight caps with wildcard patterns inside a single consumer group, so one hot subject does not starve the rest of the workload.
That means you can run one worker pool and still say:
payments.critical→ max1payments.heavy.>→ max3payments.light.>→ max10
without splitting your topology into many queues just to protect fairness.
Requirements
- Node.js
>= 20 - Arbitro broker reachable on
127.0.0.1:9898or your own--addr
Install
npm install @arbitro/clientRun the broker locally (Docker)
The broker ships as a public Docker image (musl-static, ~3 MB, scratch base):
docker run --rm -p 9898:9898 ghcr.io/arbitro-io/arbitro-server:latestPin a major/minor tag for production:
ghcr.io/arbitro-io/arbitro-server:0.1.0— immutable release tagghcr.io/arbitro-io/arbitro-server:0.1— auto-updates within0.1.*ghcr.io/arbitro-io/arbitro-server:latest— latest tagged release
Quick start
import { ArbitroClient } from '@arbitro/client'
const client = new ArbitroClient({ servers: ['127.0.0.1:9898'] })
await client.connect()
await client.createStream('orders', {
subjectFilter: 'orders.>',
})
await client.createConsumer('orders', {
name: 'workers',
filter: 'orders.>',
})
const sub = await client.subscribe('workers', (msg) => {
console.log(msg.data().toString())
msg.ack()
})
// publish() returns Promise<void> — caller decides whether to await.
await client.publish('orders', 'orders.new', Buffer.from('hello'))Publish
publish() returns Promise<void> that resolves once the broker confirms receipt (RepOk). The TypeScript idiom is "everything async, the caller chooses to await" — the same call site supports both semantics:
await client.publish('orders', 'orders.new', data) // wait for broker ack
client.publish('orders', 'orders.new', data) // fire-and-forget
client.publish('orders', 'orders.new', data).catch(onError) // async error pathThe broker emits RepOk regardless of whether the caller awaits, so there's no wire-level savings from a "no-reply" variant. That's the property that lets TS expose a single, ergonomic API. Rust's lazy-future model has to pick publish (fire-and-forget) vs publish_sync (awaited) at the call site.
Durable management
await client.streamExists('orders') // true
await client.getStreamInfo('orders') // StreamInfo | null
await client.listStreams() // StreamInfo[]
await client.consumerExists('workers') // true
await client.getConsumerInfo('workers') // ConsumerInfo | null
await client.listConsumers() // ConsumerInfo[]Upsert / delete
upsert* is strict: it succeeds when the entity does not exist or already exists with an equivalent config. It does not silently mutate a conflicting durable entity.
await client.upsertStream('orders', { subjectFilter: 'orders.>' })
await client.upsertConsumer('orders', { name: 'workers', filter: 'orders.>' })
await client.deleteConsumer('workers')
await client.deleteStream('orders') // default: delete metadata + data
await client.deleteStream('orders', { deleteData: false }) // preserve journal bytes
// Per-message deletion (tombstones a single message by sequence number)
await client.deleteMessage('orders', 42n) // true if deleted, false if not found/already deleted
await stream.deleteMessage(42n) // convenience — delegates to client.deleteMessage
await consumer.deleteMessage(42n) // convenience — delegates to client.deleteMessageStream / consumer sugar
const stream = client.stream('orders')
const consumer = stream.consumer({ name: 'workers', filter: 'orders.>' })
await consumer.create()
const sub = await consumer.subscribe((msg) => {
msg.ack()
})Per-subject inflight limits
maxSubjectInflights caps the in-flight (delivered, unacked) count per subject pattern, with full wildcard support (*, >). Only enforced when ackPolicy: Explicit; silently dropped for fire-and-forget consumers (the engine doesn't track inflight without acks).
import { AckPolicy, DeliverPolicy } from '@arbitro/client'
await client.createConsumer('orders', {
name: 'workers',
filter: 'orders.>',
ackPolicy: AckPolicy.Explicit,
deliverPolicy: DeliverPolicy.All,
maxAckPending: 20_000,
maxSubjectInflights: [
{ pattern: 'orders.critical', limit: 1 },
{ pattern: 'orders.heavy.>', limit: 3 },
{ pattern: 'orders.light.>', limit: 10 },
],
})Query pending acks
Live count of messages delivered to a consumer but not yet acked (equivalent of NATS JetStream num_ack_pending). One broker round-trip; engine cost is O(1) per shard.
// Via Consumer wrapper
const consumer = await client.stream('orders')
.consumer({ name: 'workers' })
.create()
await consumer.getPendings() // number
// Or directly via client (when you only have the id, or by name)
await client.getPending(consumerId) // number
await client.getPending('orders', 'workers') // number (resolves id by name)Client metrics
The client tracks atomic counters readable via client.metrics(). Use it as a saturation gauge for dashboards or alerts.
const snap = client.metrics()
// {
// publishesSent: 12048,
// publishBatchEntries: 3210,
// deliveriesReceived: 15258,
// activeSubscriptions: 7, // gauge
// acksSent: 15101,
// nacksSent: 12,
// reconnects: 0,
// pendingReplies: 0,
// }Typed lazy decode
import { schema } from '@arbitro/client'
const OrderCodec = schema({ id: 'number', status: 'string' })
const sub = await client
.stream('orders')
.consumer({ name: 'workers', filter: 'orders.>' })
.subscribe(OrderCodec, (msg) => {
console.log(msg.id, msg.status)
msg.ack()
})Zod codec (optional)
If you already model your payloads with zod, use zodCodec for free runtime validation on decode:
import { ArbitroClient, zodCodec } from '@arbitro/client'
import { z } from 'zod'
const Order = z.object({ id: z.number(), status: z.string() })
const codec = zodCodec(Order)
const sub = await client
.stream('orders')
.consumer({ name: 'workers', filter: 'orders.>' })
.subscribe(codec, (msg) => {
// msg is typed as `z.output<typeof Order>` — validated on decode
msg.ack()
})zod is an optional peer dependency. @arbitro/client references zod only via import type, so users who never call zodCodec pay zero runtime cost and don't need zod installed.
Cron Scheduling
Register distributed cron jobs with queue semantics — multiple workers, single delivery per fire.
const cron = await client.cron("billing-monthly")
.every("0 0 1 * *")
.tz("America/New_York")
.run(async (ctx) => {
console.log(`fire #${ctx.fireCount} at ${ctx.fireTime}`);
await processBilling();
});
// Stop when done
await cron.stop();Crons re-register automatically on reconnect. No persistence — if the broker restarts, clients re-register their crons when they reconnect.
Delayed Publish
Schedule message delivery for the future:
await client.publishDelayed("ORDERS", "orders.reminder", payload, 5000); // 5s delayMessages are persisted immediately — survives broker restart.
Workflow Orchestration
Client-side linear pipelines over Arbitro streams. The broker has no workflow-specific code -- everything uses streams, consumer groups, and idempotent publish.
WorkflowBuilder API
| Method | Signature | Description |
|--------|-----------|-------------|
| trigger | (subject: string) => this | Subject pattern that triggers new instances. Required. |
| triggerStream | (not yet implemented) | Planned: auto-subscribe to an external stream for trigger. |
| step | (name: string, handler: StepHandler) => this | Append a processing step. |
| compensate | (not yet implemented) | Planned: rollback handler per step (saga pattern). |
| maxRetries | (not yet implemented) | Planned: attempts before DLQ. |
| maxContextSize | (not yet implemented) | Planned: max context payload in bytes. |
| ackWait | (ms: number) => this | Ack timeout for failover (default: 30000). |
| inflight | (n: number) => this | Concurrent tasks per worker (default: 10). |
| start | () => Promise<WorkflowHandle> | Register streams, consumer, and start processing. |
WorkflowHandle API
| Method | Signature | Description |
|--------|-----------|-------------|
| trigger | (client: ArbitroClient, context: Buffer) => Promise<number> | Trigger a new workflow instance. Returns the instance ID. |
| name | string (getter) | Workflow name. |
Complete Example
import { ArbitroClient, WorkflowBuilder } from '@arbitro/client'
import type { StepContext, StepResult } from '@arbitro/client'
const client = new ArbitroClient({ servers: ['127.0.0.1:9898'] })
await client.connect()
const wf = await new WorkflowBuilder(client, 'order-process')
.trigger('orders.created')
// Step 1: validate
.step('validate', async (ctx: StepContext): Promise<StepResult> => {
const validated = await validateOrder(ctx.context)
return { context: validated }
})
// Step 2: charge
.step('charge', async (ctx: StepContext): Promise<StepResult> => {
const receipt = await chargePayment(ctx.context)
return { context: receipt }
})
// Step 3: ship
.step('ship', async (ctx: StepContext): Promise<StepResult> => {
const tracking = await createShipment(ctx.context)
return { context: tracking }
})
.ackWait(30_000)
.inflight(10)
.start()
// Manual trigger
const instanceId = await wf.trigger(client, Buffer.from('order-123-payload'))
console.log(`started instance ${instanceId}`)Internals
- Tasks flow through
_wf.{name}.tasksstream with a consumer_wf.{name}.workers. - Each step transition publishes with
msgIdformatwf:{instance}:{step}:{attempt}for deduplication. ackWaitenables failover: if a worker dies, the broker redelivers to another subscriber.
Note: The TypeScript workflow module currently implements the core step pipeline. Compensation (saga), DLQ,
triggerStream,maxRetries, andmaxContextSizeare available in the Rust client and planned for the TS client.
Reconnect behavior
The TS client reconnects transport automatically and reattaches active subscriptions and cron jobs after reconnect. That behavior lives in the client, not in the benchmarks. This matters for:
- Docker restarts
- broker failover tests
- chaos scenarios with durable consumers
Validation
npm run typecheck
npm test
npm run test:integration # requires DockerDocumentation
CONTRIBUTING.md— dev setup, branch + commit conventions, PR review.RELEASING.md— SemVer policy and the npm publish flow..agent/rules/*.md— internal coding rules (hot-path discipline, wire protocol, etc.).CLAUDE.md— index pointing at the rule files.
Replication
Replication is transparent to the client -- replicas is set at create_stream time. The client publishes normally; the broker handles replication internally.
License
MIT — see LICENSE.
