npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@arbitro/client

v0.5.2

Published

Official TypeScript client for the Arbitro message broker

Readme

@arbitro/client

Official TypeScript client for the Arbitro stateful flow broker.

Status: under active development. APIs, benchmarks, defaults, and reconnect behavior may still change.

@arbitro/client is built for the features that make Arbitro different from a plain pub/sub broker:

  • durable streams and consumers
  • exact stream / consumer introspection and idempotent upsert
  • automatic reconnect + subscription reattach
  • subject-level maxSubjectInflights — the strongest flow-control feature in the system
  • live ack_pending queries per consumer
  • client + broker metrics for observability

Why Arbitro

The headline feature is maxSubjectInflights — per-subject in-flight caps with wildcard patterns inside a single consumer group, so one hot subject does not starve the rest of the workload.

That means you can run one worker pool and still say:

  • payments.critical → max 1
  • payments.heavy.> → max 3
  • payments.light.> → max 10

without splitting your topology into many queues just to protect fairness.

Requirements

  • Node.js >= 20
  • Arbitro broker reachable on 127.0.0.1:9898 or your own --addr

Install

npm install @arbitro/client

Run the broker locally (Docker)

The broker ships as a public Docker image (musl-static, ~3 MB, scratch base):

docker run --rm -p 9898:9898 ghcr.io/arbitro-io/arbitro-server:latest

Pin a major/minor tag for production:

  • ghcr.io/arbitro-io/arbitro-server:0.1.0 — immutable release tag
  • ghcr.io/arbitro-io/arbitro-server:0.1 — auto-updates within 0.1.*
  • ghcr.io/arbitro-io/arbitro-server:latest — latest tagged release

Quick start

import { ArbitroClient } from '@arbitro/client'

const client = new ArbitroClient({ servers: ['127.0.0.1:9898'] })
await client.connect()

await client.createStream('orders', {
  subjectFilter: 'orders.>',
})

await client.createConsumer('orders', {
  name: 'workers',
  filter: 'orders.>',
})

const sub = await client.subscribe('workers', (msg) => {
  console.log(msg.data().toString())
  msg.ack()
})

// publish() returns Promise<void> — caller decides whether to await.
await client.publish('orders', 'orders.new', Buffer.from('hello'))

Publish

publish() returns Promise<void> that resolves once the broker confirms receipt (RepOk). The TypeScript idiom is "everything async, the caller chooses to await" — the same call site supports both semantics:

await client.publish('orders', 'orders.new', data)            // wait for broker ack
client.publish('orders', 'orders.new', data)                  // fire-and-forget
client.publish('orders', 'orders.new', data).catch(onError)   // async error path

The broker emits RepOk regardless of whether the caller awaits, so there's no wire-level savings from a "no-reply" variant. That's the property that lets TS expose a single, ergonomic API. Rust's lazy-future model has to pick publish (fire-and-forget) vs publish_sync (awaited) at the call site.

Durable management

await client.streamExists('orders')           // true
await client.getStreamInfo('orders')          // StreamInfo | null
await client.listStreams()                    // StreamInfo[]

await client.consumerExists('workers')        // true
await client.getConsumerInfo('workers')       // ConsumerInfo | null
await client.listConsumers()                  // ConsumerInfo[]

Upsert / delete

upsert* is strict: it succeeds when the entity does not exist or already exists with an equivalent config. It does not silently mutate a conflicting durable entity.

await client.upsertStream('orders', { subjectFilter: 'orders.>' })
await client.upsertConsumer('orders', { name: 'workers', filter: 'orders.>' })

await client.deleteConsumer('workers')
await client.deleteStream('orders')                        // default: delete metadata + data
await client.deleteStream('orders', { deleteData: false }) // preserve journal bytes

// Per-message deletion (tombstones a single message by sequence number)
await client.deleteMessage('orders', 42n)                  // true if deleted, false if not found/already deleted
await stream.deleteMessage(42n)                            // convenience — delegates to client.deleteMessage
await consumer.deleteMessage(42n)                          // convenience — delegates to client.deleteMessage

Stream / consumer sugar

const stream = client.stream('orders')
const consumer = stream.consumer({ name: 'workers', filter: 'orders.>' })

await consumer.create()

const sub = await consumer.subscribe((msg) => {
  msg.ack()
})

Per-subject inflight limits

maxSubjectInflights caps the in-flight (delivered, unacked) count per subject pattern, with full wildcard support (*, >). Only enforced when ackPolicy: Explicit; silently dropped for fire-and-forget consumers (the engine doesn't track inflight without acks).

import { AckPolicy, DeliverPolicy } from '@arbitro/client'

await client.createConsumer('orders', {
  name: 'workers',
  filter: 'orders.>',
  ackPolicy: AckPolicy.Explicit,
  deliverPolicy: DeliverPolicy.All,
  maxAckPending: 20_000,
  maxSubjectInflights: [
    { pattern: 'orders.critical',  limit: 1 },
    { pattern: 'orders.heavy.>',   limit: 3 },
    { pattern: 'orders.light.>',   limit: 10 },
  ],
})

Query pending acks

Live count of messages delivered to a consumer but not yet acked (equivalent of NATS JetStream num_ack_pending). One broker round-trip; engine cost is O(1) per shard.

// Via Consumer wrapper
const consumer = await client.stream('orders')
  .consumer({ name: 'workers' })
  .create()
await consumer.getPendings()                     // number

// Or directly via client (when you only have the id, or by name)
await client.getPending(consumerId)              // number
await client.getPending('orders', 'workers')     // number (resolves id by name)

Client metrics

The client tracks atomic counters readable via client.metrics(). Use it as a saturation gauge for dashboards or alerts.

const snap = client.metrics()
// {
//   publishesSent:        12048,
//   publishBatchEntries:  3210,
//   deliveriesReceived:   15258,
//   activeSubscriptions:  7,     // gauge
//   acksSent:             15101,
//   nacksSent:            12,
//   reconnects:           0,
//   pendingReplies:       0,
// }

Typed lazy decode

import { schema } from '@arbitro/client'

const OrderCodec = schema({ id: 'number', status: 'string' })

const sub = await client
  .stream('orders')
  .consumer({ name: 'workers', filter: 'orders.>' })
  .subscribe(OrderCodec, (msg) => {
    console.log(msg.id, msg.status)
    msg.ack()
  })

Zod codec (optional)

If you already model your payloads with zod, use zodCodec for free runtime validation on decode:

import { ArbitroClient, zodCodec } from '@arbitro/client'
import { z } from 'zod'

const Order = z.object({ id: z.number(), status: z.string() })
const codec  = zodCodec(Order)

const sub = await client
  .stream('orders')
  .consumer({ name: 'workers', filter: 'orders.>' })
  .subscribe(codec, (msg) => {
    // msg is typed as `z.output<typeof Order>` — validated on decode
    msg.ack()
  })

zod is an optional peer dependency. @arbitro/client references zod only via import type, so users who never call zodCodec pay zero runtime cost and don't need zod installed.

Cron Scheduling

Register distributed cron jobs with queue semantics — multiple workers, single delivery per fire.

const cron = await client.cron("billing-monthly")
    .every("0 0 1 * *")
    .tz("America/New_York")
    .run(async (ctx) => {
        console.log(`fire #${ctx.fireCount} at ${ctx.fireTime}`);
        await processBilling();
    });

// Stop when done
await cron.stop();

Crons re-register automatically on reconnect. No persistence — if the broker restarts, clients re-register their crons when they reconnect.

Delayed Publish

Schedule message delivery for the future:

await client.publishDelayed("ORDERS", "orders.reminder", payload, 5000); // 5s delay

Messages are persisted immediately — survives broker restart.

Workflow Orchestration

Client-side linear pipelines over Arbitro streams. The broker has no workflow-specific code -- everything uses streams, consumer groups, and idempotent publish.

WorkflowBuilder API

| Method | Signature | Description | |--------|-----------|-------------| | trigger | (subject: string) => this | Subject pattern that triggers new instances. Required. | | triggerStream | (not yet implemented) | Planned: auto-subscribe to an external stream for trigger. | | step | (name: string, handler: StepHandler) => this | Append a processing step. | | compensate | (not yet implemented) | Planned: rollback handler per step (saga pattern). | | maxRetries | (not yet implemented) | Planned: attempts before DLQ. | | maxContextSize | (not yet implemented) | Planned: max context payload in bytes. | | ackWait | (ms: number) => this | Ack timeout for failover (default: 30000). | | inflight | (n: number) => this | Concurrent tasks per worker (default: 10). | | start | () => Promise<WorkflowHandle> | Register streams, consumer, and start processing. |

WorkflowHandle API

| Method | Signature | Description | |--------|-----------|-------------| | trigger | (client: ArbitroClient, context: Buffer) => Promise<number> | Trigger a new workflow instance. Returns the instance ID. | | name | string (getter) | Workflow name. |

Complete Example

import { ArbitroClient, WorkflowBuilder } from '@arbitro/client'
import type { StepContext, StepResult } from '@arbitro/client'

const client = new ArbitroClient({ servers: ['127.0.0.1:9898'] })
await client.connect()

const wf = await new WorkflowBuilder(client, 'order-process')
  .trigger('orders.created')
  // Step 1: validate
  .step('validate', async (ctx: StepContext): Promise<StepResult> => {
    const validated = await validateOrder(ctx.context)
    return { context: validated }
  })
  // Step 2: charge
  .step('charge', async (ctx: StepContext): Promise<StepResult> => {
    const receipt = await chargePayment(ctx.context)
    return { context: receipt }
  })
  // Step 3: ship
  .step('ship', async (ctx: StepContext): Promise<StepResult> => {
    const tracking = await createShipment(ctx.context)
    return { context: tracking }
  })
  .ackWait(30_000)
  .inflight(10)
  .start()

// Manual trigger
const instanceId = await wf.trigger(client, Buffer.from('order-123-payload'))
console.log(`started instance ${instanceId}`)

Internals

  • Tasks flow through _wf.{name}.tasks stream with a consumer _wf.{name}.workers.
  • Each step transition publishes with msgId format wf:{instance}:{step}:{attempt} for deduplication.
  • ackWait enables failover: if a worker dies, the broker redelivers to another subscriber.

Note: The TypeScript workflow module currently implements the core step pipeline. Compensation (saga), DLQ, triggerStream, maxRetries, and maxContextSize are available in the Rust client and planned for the TS client.

Reconnect behavior

The TS client reconnects transport automatically and reattaches active subscriptions and cron jobs after reconnect. That behavior lives in the client, not in the benchmarks. This matters for:

  • Docker restarts
  • broker failover tests
  • chaos scenarios with durable consumers

Validation

npm run typecheck
npm test
npm run test:integration   # requires Docker

Documentation

  • CONTRIBUTING.md — dev setup, branch + commit conventions, PR review.
  • RELEASING.md — SemVer policy and the npm publish flow.
  • .agent/rules/*.md — internal coding rules (hot-path discipline, wire protocol, etc.).
  • CLAUDE.md — index pointing at the rule files.

Replication

Replication is transparent to the client -- replicas is set at create_stream time. The client publishes normally; the broker handles replication internally.

License

MIT — see LICENSE.