npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

service-bridge

v1.9.0-dev.52

Published

ServiceBridge SDK for Node.js — production-ready RPC, durable events, workflows, jobs, and distributed tracing. One Go runtime + PostgreSQL replaces Istio, RabbitMQ, Temporal, and Jaeger.

Readme

service-bridge

npm version License TypeScript Node

The Unified Bridge for Microservices Interaction

Node.js SDK for ServiceBridge — production-ready RPC, durable events, workflows, jobs, and distributed tracing in a single SDK. One Go runtime and PostgreSQL.

┌─────────────────────────────────────────────────────────────────┐
│                    BEFORE: 10 moving parts                      │
│  Istio · Envoy · RabbitMQ · Temporal · Jaeger · Consul ·       │
│  cert-manager · Alertmanager · cron · custom glue              │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│               AFTER: ServiceBridge + PostgreSQL                 │
│  RPC · Events · Workflows · Jobs · Tracing · mTLS · Dashboard  │
│            One SDK  ·  One runtime  ·  Zero sidecars            │
└─────────────────────────────────────────────────────────────────┘

Table of Contents


Why ServiceBridge

| Problem | Without ServiceBridge | With ServiceBridge | |---|---|---| | Service-to-service calls | Istio/Envoy sidecar proxy per pod | Direct SDK-to-worker gRPC, zero proxy hops | | Async messaging | Kafka/RabbitMQ + retry logic + DLQ setup | Built-in durable events with retry, DLQ, replay | | Background jobs | Bull/BullMQ + Redis + cron daemon | Built-in cron and delayed jobs | | Workflow orchestration | Temporal/Conductor cluster + persistence | Built-in DAG workflows | | Distributed tracing | Jaeger/Tempo + OTEL collector + dashboards | Built-in traces + realtime UI | | Service discovery | Consul/etcd + DNS glue | Built-in registry + health-aware balancing | | mTLS | cert-manager + Vault PKI | Auto-provisioned certs from service key |

Result: 10 tools → 1 runtime. One Go binary + PostgreSQL replaces the entire stack.


Use Cases

Microservice communication — Replace sidecar mesh with direct RPC calls. Get sub-millisecond overhead instead of double proxy hop latency.

Event-driven architecture — Publish durable events with fan-out, retries, DLQ, idempotency, and server-side filtering. No broker infrastructure to manage.

Background job scheduling — Cron jobs, delayed execution, and job-triggered workflows in a single API. No Redis, no separate queue workers.

Saga / distributed transactions — DAG workflows with typed steps (rpc, event, event_wait, sleep, child workflow). Compensations and rollbacks via workflow step dependencies.

AI agent orchestration — Stream LLM tokens via realtime trace streams with replay. Orchestrate multi-step AI pipelines as workflows.

Full-stack observability — Every RPC call, event delivery, workflow step, and HTTP request traced automatically. One timeline, one dashboard. Prometheus metrics and Loki-compatible log API included.


Quick Start

1. Install

npm i service-bridge
# or
bun add service-bridge

2. Create a worker (service that handles calls)

import { ServiceBridge } from "service-bridge";

const sb = new ServiceBridge(
  process.env.SERVICEBRIDGE_URL ?? "localhost:14445",
  process.env.SERVICEBRIDGE_SERVICE_KEY!,
);

sb.rpc.handle("payment.charge", async (payload: { orderId: string; amount: number }) => {
  return { ok: true, txId: `tx_${Date.now()}`, orderId: payload.orderId };
});

await sb.start({ host: "localhost" });

3. Call it from another service

import { ServiceBridge } from "service-bridge";

const sb = new ServiceBridge(
  process.env.SERVICEBRIDGE_URL ?? "localhost:14445",
  process.env.SERVICEBRIDGE_SERVICE_KEY!,
);

const result = await sb.rpc.invoke<{ ok: boolean; txId: string }>("payment.charge", {
  orderId: "ord_42",
  amount: 4990,
});

console.log(result.txId); // tx_1711234567890

That's it. No broker, no sidecar, no proxy — direct gRPC call between services.


Runtime Setup

The SDK connects to a ServiceBridge runtime. The fastest way to start:

bash <(curl -fsSL https://servicebridge.dev/install.sh)

This installs ServiceBridge + PostgreSQL via Docker Compose and generates an admin password automatically. After install, the dashboard is at http://localhost:14444 and the gRPC control plane at localhost:14445.

For manual Docker Compose setup, configuration reference, and all runtime environment variables, see the Runtime Setup section in the main SDK README.


End-to-End Example

A complete order flow: HTTP request → RPC → Event → Event handler with streaming.

import { ServiceBridge } from "service-bridge";

// --- Payments service (worker) ---

const payments = new ServiceBridge("localhost:14445", process.env.SERVICEBRIDGE_SERVICE_KEY!);

payments.rpc.handle("payment.charge", async (payload: { orderId: string; amount: number }, ctx) => {
  await ctx?.stream.write({ status: "charging", orderId: payload.orderId }, "progress");

  // ... charge logic ...

  await ctx?.stream.write({ status: "charged" }, "progress");
  return { ok: true, txId: `tx_${Date.now()}` };
});

await payments.start({ host: "localhost" });
// --- Orders service (caller + event publisher) ---

const orders = new ServiceBridge("localhost:14445", process.env.SERVICEBRIDGE_SERVICE_KEY!);

// Call payments, then publish event
const charge = await orders.rpc.invoke<{ ok: boolean; txId: string }>("payment.charge", {
  orderId: "ord_42",
  amount: 4990,
});

await orders.events.publish("orders.completed", {
  orderId: "ord_42",
  txId: charge.txId,
}, {
  idempotencyKey: "order:ord_42:completed",
  headers: { source: "checkout" },
});
// --- Notifications service (event consumer) ---

const notifications = new ServiceBridge("localhost:14445", process.env.SERVICEBRIDGE_SERVICE_KEY!);

notifications.events.handle("orders.*", async (payload, ctx) => {
  const body = payload as { orderId: string; txId: string };
  await ctx.stream.write({ status: "sending_email", orderId: body.orderId }, "progress");
  // ... send email ...
});

await notifications.start({ host: "localhost" });
// --- Orchestrate as a workflow ---

await orders.workflows.run("order.fulfillment", [
  { id: "reserve", type: "rpc", service: "inventory", ref: "inventory.reserve" },
  { id: "charge", type: "rpc", service: "payment", ref: "payment.charge", deps: ["reserve"] },
  { id: "wait_dlv", type: "event_wait", ref: "shipping.delivered", deps: ["charge"] },
  { id: "notify", type: "event", ref: "orders.fulfilled", deps: ["wait_dlv"] },
]);

Every step above — RPC, event publish, event delivery, workflow execution — appears in a single trace timeline in the built-in dashboard.


Platform Features

Communication

  • Direct RPC — zero-hop gRPC calls with retries, deadlines, and mTLS identity
  • Durable Events — fan-out delivery, guaranteed delivery (RabbitMQ-style), at-least-once guarantees, retries, DLQ, replay, idempotency. If a consumer is offline, the message waits in the server-side queue and is dispatched the moment the consumer reconnects — no retry budget consumed while waiting.
  • Realtime Streams — live chunks with replay for AI/progress/log streaming
  • Service Discovery — automatic endpoint resolution and round-robin balancing
  • HTTP Middleware — Express and Fastify instrumentation with automatic trace propagation

Orchestration

  • Workflows — DAG steps: rpc, event, event_wait, sleep, child workflow
  • Jobs — cron, delayed, and workflow-triggered scheduling

Security

  • TLS by default — control plane TLS + worker mTLS with gRPC certificate provisioning
  • Access Policy — service-level caller/target restrictions and RBAC

Observability

  • Unified Tracing — single trace timeline across HTTP, RPC, events, workflows, and jobs
  • Metrics — Prometheus-compatible /metrics endpoint (30+ metric families)
  • Logs — structured log ingest with Loki-compatible query API
  • Alerts — runtime alerts for delivery failures, errors, and service health
  • Dashboard — realtime web UI for traces, events, workflows, jobs, DLQ, service map, and service keys

How It Compares

| Concern | Istio + Envoy | Dapr | Temporal + Kafka | ServiceBridge | |---|---|---|---|---| | RPC data path | Sidecar proxy hop | Sidecar/daemon hop | N/A | Direct (proxyless) | | Service discovery | K8s control plane | Sidecar placement | External registry | Built-in registry | | Durable events + DLQ | External broker | Pub/Sub component | Kafka + consumers | Built-in | | Workflow orchestration | External engine | External engine | Built-in | Built-in | | Job scheduling | External cron/queue | External scheduler | External scheduler | Built-in | | Traces + UI | Jaeger/Tempo + dashboards | OTEL backend + dashboards | Temporal UI | Built-in | | Logs for Grafana | Loki + Promtail pipeline | Log pipeline | Log pipeline | Built-in Loki API | | Metrics | App/exporter setup | App/exporter setup | Multiple exporters | Built-in /metrics | | Security model | Mesh PKI + policy | Deployment-dependent mTLS | Mixed | Service keys + auto mTLS | | Operational footprint | Multi-component mesh | Runtime + sidecars | Workflow + broker + DB | One binary + PostgreSQL |


API Reference

ServiceBridge / ServiceBridgeService surface

Per-instance API for new ServiceBridge(...) (implements ServiceBridgeService):

  • Namespaces: rpc (handle, invoke, declare), events (handle, publish, publishWorker, declare), jobs (run), workflows (run, declare).
  • Lifecycle: start(opts?), stop().
  • Workflows: cancelWorkflow(traceId).
  • HTTP & traces: startHttpSpan(opts), registerHttpEndpoint(opts), watchTrace(traceId, opts?).
  • Module helpers (exported from service-bridge): getTraceContext, withTraceContext, ServiceBridgeError, mapGrpcStatus, SB, SB_MESSAGES. (captureConsole exists internally for log capture but is not part of the public package exports.)

Cross-SDK parity notes

ServiceBridge keeps the core API shape consistent across Node.js, Go, and Python: constructor, rpc / events / jobs / workflows namespaces, streams, start/stop, and ServiceBridgeError.

Constructor-level defaults for timeout, retries, and retryDelay are available across all three SDKs. Parity differences are naming-only (language idioms):

  • Constructor TLS overrides: workerTLS/caCert (Node), WorkerTLS/CACert (Go), worker_tls/ca_cert (Python)
  • Handler hints: timeout/retryable/concurrency/prefetch are advisory in all SDKs
  • Shared start() fields across SDKs: host, max in-flight, instance ID, weight, and per-start TLS override

new ServiceBridge(url, serviceKey, opts?)

class ServiceBridge {
  constructor(url: string, serviceKey: string, opts?: ServiceBridgeOpts);
}

Creates an SDK client instance. Service identity is resolved by the runtime from the sbv2 serviceKey (key id). Use new ServiceBridge(...) as the public entry point from the service-bridge package (the constructor delegates to the same internal client setup used by the SDK; a lower-level factory exists in source but is not exported from the published entry).

ServiceBridgeOpts:

| Option | Type | Default | Description | |---|---|---|---| | timeout | number | 30000 | Default hard timeout per rpc.invoke() attempt (ms). | | retries | number | 3 | Default retry count for rpc.invoke(). | | retryDelay | number | 300 | Base backoff delay (ms) for rpc.invoke(). | | discoveryRefreshMs | number | 10000 | Discovery refresh period for endpoint updates. | | queueMaxSize | number | 1000 | Max offline queue size for control-plane writes. | | queueOverflow | "drop-oldest" \| "drop-newest" \| "error" | "drop-oldest" | Overflow strategy for offline queue. | | heartbeatIntervalMs | number | 10000 | Base heartbeat period for worker registrations. | | captureLogs | boolean | true | Forward console.* logs to ServiceBridge. | | strictOutboundDeclarations | boolean | false | When true, every outbound rpc.invoke() must be preceded by rpc.declare(fn) for the resolved target. |

Advanced TLS overrides

| Option | Type | Default | Description | |---|---|---|---| | workerTLS | WorkerTLSOpts | auto | Explicit cert/key/CA for worker mTLS. | | caCert | string \| Buffer | from serviceKey | Optional control-plane CA override. By default SDK reads CA from sbv2 service key. |

WorkerTLSOpts:

type WorkerTLSOpts = {
  caCert?: string | Buffer;
  cert?: string | Buffer;
  key?: string | Buffer;
  serverName?: string;
}

rpc.invoke(fn, payload?, opts?)

invoke<T = unknown>(fn: string, payload?: unknown, opts?: RpcOpts): Promise<T>

Calls a registered RPC handler on another worker. Direct gRPC path, no proxy.

Function namefn is a single global function name (the same string passed to rpc.handle on the callee), e.g. payment.charge or user.get. It must be unique in the catalog and must not contain /.

RpcOpts:

| Option | Type | Description | |---|---|---| | timeout | number | Call timeout in ms. | | retries | number | Retry count override. | | retryDelay | number | Base retry delay override. | | traceId | string | Explicit trace id. | | parentSpanId | string | Explicit parent span id. | | mode | "direct" \| "proxy" | Transport mode. "direct" (default) connects directly to the worker. "proxy" routes through the control plane when direct connection is unavailable. |

const user = await sb.rpc.invoke<{ id: string; name: string }>("user.get", { id: "u_1" });

const user2 = await sb.rpc.invoke<{ id: string; name: string }>("user.get", { id: "u_1" }, {
  timeout: 5000,
  retries: 2,
});

rpc.invoke() is bounded even when a downstream worker is silent: each attempt has a hard local timeout, retries are finite (retries + 1 total attempts), and after the final failed attempt the root RPC span is closed with error.

Retry delay uses exponential backoff: retryDelay * 2^(attempt-1).


rpc.declare(fn)

declare(fn: string): void

Declares an outbound RPC dependency for registration metadata. When strictOutboundDeclarations is true, you must call rpc.declare(fn) before rpc.invoke(fn, ...) for that function. Does not invoke the remote handler.


events.publish(topic, payload?, opts?)

publish(topic: string, payload?: unknown, opts?: EventOpts): Promise<string>

Publishes a durable event. Returns messageId when online.

EventOpts:

| Option | Type | Description | |---|---|---| | traceId | string | Explicit trace id. | | parentSpanId | string | Explicit parent span id. | | idempotencyKey | string | Idempotency key for dedup-safe publishing. | | headers | Record<string, string> | Custom metadata headers. |

await sb.events.publish("orders.created", { orderId: "ord_42" }, {
  idempotencyKey: "order:ord_42",
  headers: { source: "checkout" },
});

events.publishWorker(topic, payload?, opts?)

publishWorker(
  topic: string,
  payload?: unknown,
  opts?: { traceId?: string; parentSpanId?: string; headers?: Record<string, string> },
): Promise<string>

Publishes over the worker session stream (after start()). If no worker session is active, the promise is rejected.


events.declare(topic)

declare(topic: string): void

Declares an outbound event dependency for registration metadata (does not publish a message).


jobs.run(service, fn, opts) / jobs.run(target, opts)

run(service: string, fn: string, opts: ScheduleOpts & { via: "rpc" }): Promise<string>
run(target: string, opts: ScheduleOpts & { via: "event" | "workflow" }): Promise<string>

Registers a scheduled or delayed job. Resolves to the registration key: "${service}/${fn}" for the RPC overload, or the target string for the event / workflow overload.

ScheduleOpts:

| Option | Type | Description | |---|---|---| | cron | string | Cron expression. | | delay | number | Delay in ms before execution. Backed by int32 in the proto — maximum ~24.8 days (~2,147,483,647 ms). | | timezone | string | Timezone for cron execution. | | misfire | "fire_now" \| "skip" | Misfire policy. | | via | "event" \| "rpc" \| "workflow" | Target type. | | retryPolicyJson | string | Retry policy JSON string. |

await sb.jobs.run("payments", "billing.collect", {
  cron: "0 * * * *",
  timezone: "UTC",
  via: "rpc",
});

workflows.run(name, steps, opts?) — register DAG

TypeScript (single method; behavior depends on the second argument):

run(
  nameOrService: string,
  stepsOrName: WorkflowStep[] | string,
  inputOrOpts?: unknown,
  opts?: ExecuteWorkflowOpts,
): Promise<string | ExecuteWorkflowResult>
  • Register: when stepsOrName is WorkflowStep[], nameOrService is the workflow name, inputOrOpts is optional WorkflowOpts, and the promise resolves to that name (string).
  • Execute: when stepsOrName is a string, nameOrService is the target service name, stepsOrName is the workflow name, inputOrOpts is the optional execution input, and opts is optional ExecuteWorkflowOpts (see execute section below).

Overload as used when registering:

run(name: string, steps: WorkflowStep[], opts?: WorkflowOpts): Promise<string>

Registers (or updates) a workflow definition as a DAG of typed steps. Returns the workflow name.

WorkflowStep:

| Field | Type | Description | |---|---|---| | id | string | Unique step identifier in the DAG. | | type | "rpc" \| "event" \| "event_wait" \| "sleep" \| "workflow" | Step execution type. | | service | string | Required for rpc and workflow: target service that owns the function or child workflow. | | ref | string | Target name: RPC function, event topic, waited topic, or child workflow name (per type). | | deps | string[] | Dependencies. Empty/omitted means root step. | | if | string | Optional filter expression (step is skipped if false). | | timeoutMs | number | Optional timeout for rpc and event_wait steps. | | durationMs | number | Required for sleep steps. |

WorkflowOpts (third argument when registering a DAG — shape below; the interface is defined in the SDK but not re-exported from the main service-bridge package entry, so use an inline object in app code):

interface WorkflowOpts {
  stateLimitBytes?: number; // default 262144 (256 KB)
  stepTimeoutMs?: number;   // default 30000 (30 s)
}

| Field | Type | Default | Description | |---|---|---|---| | stateLimitBytes | number | 262144 (256 KB) | Maximum serialized state size in bytes. | | stepTimeoutMs | number | 30000 (30 s) | Default per-step timeout in milliseconds. |

await sb.workflows.run("order.fulfillment", [
  { id: "reserve", type: "rpc", service: "inventory", ref: "inventory.reserve" },
  { id: "charge", type: "rpc", service: "payment", ref: "payment.charge", deps: ["reserve"] },
  { id: "wait_5m", type: "sleep", durationMs: 300_000, deps: ["charge"] },
  { id: "notify", type: "event", ref: "orders.fulfilled", deps: ["wait_5m"] },
]);

With explicit limits:

await sb.workflows.run("checkout.flow", steps, { stepTimeoutMs: 60_000 });

workflows.declare(service, name)

declare(service: string, name: string): void

Declares an outbound workflow dependency for registration metadata (does not start an execution).


workflows.run(service, name, input?, opts?) — execute

This is the same run method as above when the second argument is the workflow name (string), not a step array:

run(service: string, name: string, input?: unknown, opts?: ExecuteWorkflowOpts): Promise<ExecuteWorkflowResult>

Starts a workflow execution on demand. The workflow must be registered first via workflows.run(name, steps). An alternative to scheduling via jobs.run(target, { via: "workflow", ... }) — triggers the execution immediately.

| Parameter | Type | Default | Description | |---|---|---|---| | service / name | string | required | Target service and workflow name. | | input | unknown | undefined | Optional JSON-serializable input payload (serialized as JSON for the runtime). |

Returns { traceId }. Use traceId with watchTrace() to observe execution in real time.

ExecuteWorkflowOpts (optional fourth argument):

| Option | Type | Description | |---|---|---| | traceId | string | Declared on the exported type for API parity; the current Node implementation does not forward this field to the control plane (the gRPC request is built without it). Prefer relying on the returned traceId. |

const { traceId } = await sb.workflows.run("users", "user.onboarding", { userId: "u_123" });

cancelWorkflow(traceId)

cancelWorkflow(traceId: string): Promise<void>

Cancels a running workflow instance.

await sb.cancelWorkflow("trace_01HQ...XYZ");

rpc.handle(fn, handler, opts?)

handle(
  fn: string,
  handler: (payload: unknown, ctx?: RpcContext) => unknown | Promise<unknown>,
  opts?: HandleRpcOpts,
): ServiceBridgeService

Registers an RPC handler. Chainable.

RpcContext:

| Field | Type | Description | |---|---|---| | traceId | string | Current trace ID. | | spanId | string | Current span ID. | | stream | StreamWriter | Real-time stream writer. |

HandleRpcOpts:

| Option | Type | Description | |---|---|---| | timeout | number | Advisory timeout hint (currently metadata-level, not hard-enforced by runtime). | | retryable | boolean | Advisory retry hint (currently metadata-level, not a strict policy switch). | | concurrency | number | Advisory concurrency hint (currently not hard-enforced). | | schema | RpcSchemaOpts | Inline protobuf schema for binary encode/decode. | | allowedCallers | string[] | Allow-list of caller service names. |

sb.rpc.handle("ai.generate", async (payload: { prompt: string }, ctx) => {
  await ctx?.stream.write({ token: "Hello" }, "output");
  await ctx?.stream.write({ token: " world" }, "output");
  return { text: "Hello world" };
});

StreamWriter:

| Method | Signature | Description | |---|---|---| | write | write(data: unknown, key?: string): Promise<void> | Append a real-time chunk to the trace stream. | | end | end(key?: string): Promise<void> | No-op placeholder for API symmetry (lifecycle managed by runtime). |


events.handle(pattern, handler, opts?)

handle(
  pattern: string,
  handler: (payload: unknown, ctx: EventContext) => void | Promise<void>,
  opts?: HandleEventOpts,
): ServiceBridgeService

Registers an event consumer handler. Chainable.

HandleEventOpts:

| Option | Type | Description | |---|---|---| | concurrency | number | Advisory concurrency hint (currently not hard-enforced). | | prefetch | number | Advisory prefetch hint (currently not hard-enforced). | | retryPolicyJson | string | Retry policy JSON string. | | filterExpr | string | Server-side filter expression. |

The consumer group name is fixed as <service-key-id>.<pattern> (derived from your sbv2 key and the pattern string). Registering a second handler for the same pattern throws a duplicate consumer-group error.

Delivery guarantee: once a message is accepted by the runtime, delivery to each consumer group is guaranteed. If the consumer is offline, the message waits in the server-side queue and is dispatched automatically the moment the service reconnects and registers its handlers — no retry budget is consumed while waiting. After SERVICEBRIDGE_DELIVERY_TTL_DAYS (default 7) days without a consumer, the delivery moves to DLQ with reason delivery_ttl_exceeded.

EventContext helpers:

  • ctx.traceId — current trace ID
  • ctx.spanId — current span ID
  • ctx.retry(delayMs?) — ask for redelivery with optional delay
  • ctx.reject(reason) — move to DLQ immediately, bypassing remaining retries
  • ctx.refs — metadata (topic, groupName, messageId, attempt, headers)
  • ctx.stream.write(...) — append real-time chunks to trace stream
sb.events.handle("orders.*", async (payload, ctx) => {
  const body = payload as { orderId?: string };
  if (!body.orderId) {
    ctx.reject("missing_order_id");
    return;
  }
  await ctx.stream.write({ status: "processing", orderId: body.orderId }, "progress");
});

start(opts?)

start(opts?: StartOpts): Promise<void>

Starts the worker gRPC server and registers handlers with the control plane. The promise resolves once startup/registration is complete (it does not block the Node.js process). Throws immediately if no handlers are registered (neither rpc.handle() nor events.handle() have been called).

StartOpts:

| Option | Type | Description | |---|---|---| | host | string | Bind host. Default: localhost. Use 0.0.0.0 in Docker/Kubernetes so ServiceBridge can reach the worker. | | maxInFlight | number | Max in-flight runtime-originated commands over OpenWorkerSession. Default: 128. | | instanceId | string | Stable worker instance identifier. | | weight | number | Scheduling/discovery weight hint. | | tls | WorkerTLSOpts | Per-start worker TLS override. |

await sb.start({
  host: "localhost",
  instanceId: process.env.HOSTNAME,
});

stop()

stop(): void

Gracefully stops the worker gRPC server (try graceful shutdown, then force), heartbeats, channels, and SDK internals.


startHttpSpan(opts)

startHttpSpan(opts: {
  method: string;
  path: string;
  traceId?: string;
  parentSpanId?: string;
}): HttpSpan

Manual HTTP tracing primitive.

const span = sb.startHttpSpan({ method: "GET", path: "/health" });
try {
  span.end({ statusCode: 200, success: true });
} catch (e) {
  span.end({ success: false, error: String(e) });
}

registerHttpEndpoint(opts)

registerHttpEndpoint(opts: {
  method: string;
  route: string;
  instanceId?: string;
  endpoint?: string;
  allowedCallers?: string[];
  requestSchemaJson?: string;
  responseSchemaJson?: string;
  transport?: string;
}): Promise<void>

Registers HTTP route metadata in the ServiceBridge service catalog (stored and sent on the next Reconcile). Requires a completed worker start(): until start() has finished successfully, the call resolves but does not record the route (HTTP middleware may invoke registerHttpEndpoint on first request; catalog entries appear only after start() has run).

| Option | Type | Description | |---|---|---| | method | string | HTTP method: GET, POST, PUT, PATCH, DELETE, etc. | | route | string | Route pattern with parameter placeholders, e.g. "/users/:id". | | instanceId | string | Present on the public opts type; not applied by the current Node client when building http_routes for Reconcile (worker identity comes from start()). | | endpoint | string | Same as above — use start() / deployment wiring for the reachable worker base URL. | | allowedCallers | string[] | Service names allowed to call (RBAC). | | requestSchemaJson | string | JSON schema for request validation metadata. | | responseSchemaJson | string | JSON schema for response validation metadata. | | transport | string | Present on the public opts type; not sent per route in the current Node reconcile payload. |

await sb.registerHttpEndpoint({
  method: "GET",
  route: "/users/:id",
  requestSchemaJson: '{"type":"object"}',
  transport: "http",
});

watchTrace(traceId, opts?)

watchTrace(traceId: string, opts?: WatchTraceOpts): AsyncIterable<TraceStreamEvent>

Subscribes to a trace stream with replay and live updates. traceId is the stream identifier used by ctx.stream.write(...).

WatchTraceOpts:

| Option | Type | Default | Description | |---|---|---|---| | key | string | "" | Stream key filter ("" = all keys). | | fromSequence | number | 0 | Replay from sequence cursor. |

TraceStreamEvent:

| Field | Type | Description | |---|---|---| | type | "chunk" \| "trace_complete" | Event kind. | | traceId | string | Trace identifier being watched. | | key | string | Stream lane key. | | sequence | number | Monotonic sequence number. | | data | unknown | JSON-decoded chunk payload. | | traceStatus | string \| undefined | Final status on trace_complete. |

Behavior:

  • Auto-reconnect with exponential backoff (500ms5000ms) on retryable stream failures.
  • Deduplicates by sequence across reconnects.
  • Enforces strict JSON for type="chunk" payloads (non-JSON chunk terminates stream with fatal error).
  • Enforces internal queue limit 256; overflow is fatal (consumer must drain promptly).
for await (const evt of sb.watchTrace(traceId, { key: "output", fromSequence: 0 })) {
  if (evt.type === "chunk") {
    process.stdout.write(String((evt.data as { token?: string }).token ?? ""));
  }
  if (evt.type === "trace_complete") break;
}

Trace Utilities

getTraceContext()

getTraceContext(): TraceCtx | undefined

Returns the current async-local trace context.

import { getTraceContext } from "service-bridge";

const tc = getTraceContext();
if (tc) {
  console.log(tc.traceId, tc.spanId);
}

withTraceContext(ctx, fn)

withTraceContext<T>(ctx: TraceCtx, fn: () => T): T

Runs a function inside an explicit trace context.

import { withTraceContext } from "service-bridge";

withTraceContext({ traceId: "trace-1", spanId: "span-1" }, async () => {
  await sb.events.publish("audit.log", { action: "user.login" });
});

HTTP Plugins

Express (service-bridge/express)

npm install express
import express from "express";
import { ServiceBridge } from "service-bridge";
import { servicebridgeMiddleware, registerExpressRoutes } from "service-bridge/express";

const sb = new ServiceBridge(process.env.SERVICEBRIDGE_URL!, process.env.SERVICEBRIDGE_SERVICE_KEY!);
const app = express();

app.use(servicebridgeMiddleware({
  client: sb,
  excludePaths: ["/health"],
  autoRegister: true,
}));

app.get("/users/:id", async (req, res) => {
  const user = await req.servicebridge.rpc.invoke("user.get", { id: req.params.id });
  res.json(user);
});

servicebridgeMiddleware(options)

servicebridgeMiddleware(options: {
  client: ServiceBridgeService;
  excludePaths?: string[];
  propagateTraceHeader?: boolean;
  autoRegister?: boolean;
}): express.RequestHandler
  • Attaches req.servicebridge, req.traceId, req.spanId
  • Starts/ends HTTP span automatically
  • Optionally sets x-trace-id response header
  • Optionally auto-registers route pattern in catalog on first hit

registerExpressRoutes(app, client, opts?)

Eager route catalog registration without waiting for first request.

await registerExpressRoutes(app, sb, {
  endpoint: "http://10.0.0.5:3000",
  allowedCallers: ["api-gateway"],
  excludePaths: ["/health"],
});

Fastify (service-bridge/fastify)

npm install fastify
import Fastify from "fastify";
import { ServiceBridge } from "service-bridge";
import { servicebridgePlugin, wrapHandler } from "service-bridge/fastify";

const sb = new ServiceBridge(process.env.SERVICEBRIDGE_URL!, process.env.SERVICEBRIDGE_SERVICE_KEY!);
const app = Fastify();

await app.register(servicebridgePlugin, {
  client: sb,
  excludePaths: ["/health"],
  autoRegister: true,
});

app.get("/users/:id", wrapHandler(async (request, reply) => {
  const user = await request.servicebridge.rpc.invoke("user.get", {
    id: (request.params as any).id,
  });
  return reply.send(user);
}));

servicebridgePlugin(fastify, options)

servicebridgePlugin(fastify, {
  client,
  excludePaths?,
  propagateTraceHeader?,
  autoRegister?,
  register?: {
    instanceId?,
    endpoint?,
    allowedCallers?,
    excludePaths?,
  },
})
  • Decorates request.servicebridge, request.traceId, request.spanId
  • Traces HTTP lifecycle via hooks
  • Auto-registers routes on onRoute before traffic

wrapHandler(handler)

Runs a Fastify handler inside the current trace context so downstream SDK calls inherit the trace.


Trace Utilities (HTTP Plugins)

extractTraceFromHeaders(headers)

import { extractTraceFromHeaders } from "service-bridge/express";
// or
import { extractTraceFromHeaders } from "service-bridge/fastify";

const { traceId, parentSpanId } = extractTraceFromHeaders(req.headers);

Extracts trace context from HTTP headers. Supports W3C traceparent, x-trace-id/x-span-id headers, and generates random IDs as fallback. Useful for custom HTTP framework integrations (Hono, Koa, etc.).


Configuration

TLS behavior

  • Worker transport is TLS-only.
  • Control plane is TLS-only. Trust source is embedded into sbv2 service key by default.
  • Embedded/explicit CA PEM is validated with strict x509 parsing.
  • If workerTLS is not provided, SDK auto-provisions worker certs via gRPC ProvisionWorkerCertificate.
  • workerTLS.cert and workerTLS.key must be provided together.
  • start({ tls }) overrides global workerTLS for a specific worker instance.

Offline queue behavior

When the control plane is unavailable, SDK queues write operations (events.publish, jobs.run, workflows.run, telemetry writes).

  • Queue size: queueMaxSize (default: 1000)
  • Overflow policy: queueOverflow (default: "drop-oldest")
  • Return values for queued writes may be empty strings until flushed

Environment Variables

The SDK requires values you pass into new ServiceBridge(...). Common setup:

| Variable | Required | Example | Description | |---|---|---|---| | SERVICEBRIDGE_URL | yes | localhost:14445 | gRPC control plane URL | | SERVICEBRIDGE_SERVICE_KEY | yes | sbv2.<id>.<secret>.<ca> | Service authentication key (sbv2 only) |

const sb = new ServiceBridge(
  process.env.SERVICEBRIDGE_URL ?? "localhost:14445",
  process.env.SERVICEBRIDGE_SERVICE_KEY!,
);

Error Handling

ServiceBridgeError is exported for normalized SDK and runtime errors.

import { ServiceBridge, ServiceBridgeError } from "service-bridge";

try {
  await sb.rpc.invoke("payment.charge", { orderId: "ord_1" });
} catch (e) {
  if (e instanceof ServiceBridgeError) {
    console.error(e.component, e.operation, e.severity, e.retryable, e.code, e.grpcStatus);
  }
  throw e;
}

| Field | Type | Description | |---|---|---| | component | string | SDK subsystem (for example, "rpc" or "event"). | | operation | string | Operation that failed. | | severity | "fatal" \| "retriable" \| "ignorable" | Error classification. | | retryable | boolean | Whether retry is recommended (true when severity === "retriable"). | | code | ServiceBridgeErrorCode | Stable SDK error id (SB_*). | | grpcStatus | number \| undefined | gRPC status code when the error came from gRPC. | | cause | unknown | Original underlying error. |


When to Use / When Not to Use

ServiceBridge is a good fit when you:

  • Have 3+ microservices that need to communicate via RPC, events, or both
  • Want RPC + events + workflows + jobs without managing separate infrastructure for each
  • Need end-to-end tracing across all communication patterns in one timeline
  • Want to eliminate sidecar proxies and reduce operational overhead
  • Need durable event delivery with retry, DLQ, and replay without running a broker
  • Are building AI/LLM pipelines and need realtime streaming with replay

Consider alternatives when you:

  • Run a single monolith with no service decomposition plans
  • Need ultra-high-throughput event streaming (100K+ msg/s sustained) — Kafka is purpose-built for this
  • Need a full API gateway with rate limiting, auth plugins, and request transformation — use Kong/Envoy Gateway
  • Already have a mature Istio/Linkerd mesh and only need traffic management (no events/workflows/jobs)
  • Need multi-region event replication — ServiceBridge currently targets single-region deployments

v2 Session API

session_v2.ts реализует новый Enterprise Session Protocol — Channel-based bidi stream с 8-состоянийным FSM, адаптивным heartbeat и кредитным управлением потоком. Симметричен с Go и Python SDK.

Жизненный цикл сессии (8 состояний FSM)

connecting → handshaking → ready ↔ active
                                 ↘ suspended → (reconnect)
                                 ↘ draining → closed
                                 ↘ fenced   (permanent)

| Состояние | Описание | |-----------|----------| | connecting | Устанавливается TCP/TLS соединение | | handshaking | Отправлен Hello, ждём HelloAck | | ready | HelloAck получен, команды не выполняются | | active | Есть активные команды | | suspended | Heartbeat пропущен 2+ раза | | draining | Инициирован graceful shutdown | | fenced | Сервер прислал GOAWAY_FENCED — сессия закрыта навсегда | | closed | Соединение закрыто |

Быстрый старт

import { V2SessionClient, validateV2Config } from 'service-bridge';

const cfg = {
  serverAddress: 'localhost:9090',
  instanceId: 'worker-1',
  zone: 'us-east-1a',
  transportMode: 'direct' as const,
  maxInflight: 64,
};

validateV2Config(cfg);
const session = new V2SessionClient(cfg);

// Отправить Hello при подключении
const hello = session.getHelloFields();

// Обработать HelloAck от сервера
session.onHelloAck({
  sessionId: 'sess-abc',
  resumeToken: 'token-xyz',
  epoch: 1n,
  resumed: false,
  resumeFromSeq: 0n,
  replayedCommands: 0,
  reconciledResults: 0,
  heartbeatIntervalMs: 10_000,
  heartbeatTimeoutMs: 30_000,
  initialPermits: 64,
  maxPermits: 128,
  effectiveTransportMode: 'direct',
});

console.log(session.state); // 'ready'

// Входящая команда
const accepted = session.onCommandReceived(1n, 'cmd-001');
if (!accepted) {
  // backpressure — permits = 0
}

// Команда выполнена
session.onCommandCompleted(1n, 'cmd-001');

Адаптивный heartbeat (EWMA RTT)

import { AdaptiveHeartbeatV2 } from 'service-bridge';

const hb = new AdaptiveHeartbeatV2(10_000, 30_000);

// Получен pong
hb.onPong(25); // rttMs

// Следующий интервал (адаптируется по EWMA RTT)
const nextMs = hb.nextIntervalMs();

// Пропуск — ускоряем пинги
const missCount = hb.onMiss();
if (missCount >= 2) {
  // reconnect
}

Алгоритм: базовый интервал intervalMs / 3; при пропусках делится на 2^miss (min 2s); при стабильном RTT < 50ms удваивается (max 30s).

Кредитное управление потоком

import { FlowControlStateV2 } from 'service-bridge';

const fc = new FlowControlStateV2(64, 1, 128);

if (fc.tryConsume()) {
  // dispatch command
}

// Команда завершена — вернуть permit
fc.release(1);

// Сервер прислал FlowControlUpdate
fc.setWindow(32);

Reconnect и resume

BackoffV2 реализует экспоненциальный backoff с full jitter (base=100ms, max=30s). При переподключении getHelloFields() автоматически включает resumeToken, epoch, lastReceivedSeq, lastSentSeq, completedCommandIds — сервер продолжит сессию с нужной позиции.

import { BackoffV2 } from 'service-bridge';

const backoff = new BackoffV2();

while (true) {
  if (backoff.isCircuitOpen()) break; // 10+ сбоев подряд

  const delayMs = backoff.next();
  await new Promise(r => setTimeout(r, delayMs));

  try {
    // reconnect...
    backoff.reset();
  } catch {
    backoff.recordFail();
  }
}

ConfigPush — динамическая конфигурация транспорта

Сервер может в любой момент прислать ConfigPush с новыми правилами маршрутизации:

session.onConfigPush({
  defaultMode: 'direct',
  serviceOverrides: {
    'payment-svc': { mode: 'proxy', fallbackPolicy: 'fallback_to_direct' },
  },
  functionOverrides: {
    'payment.charge': { mode: 'proxy', timeoutMs: 5000 },
  },
});

// Разрешить транспорт для функции
const mode = session.resolveTransportMode('payment.charge'); // 'proxy'

Все события сессии

| Метод | Описание | |-------|----------| | getHelloFields() | Поля для отправки Hello (первый + resume) | | onHelloAck(ack) | Обработка HelloAck от сервера | | onCommandReceived(seq, id) | Входящая команда; возвращает false при backpressure | | onCommandCompleted(seq, id) | Команда выполнена; освобождает permit | | onPermitGrant(n) | Сервер добавил n permits | | onFlowControlUpdate(size, reason) | Сервер изменил размер окна | | onPong(rttMs) | Получен pong; обновляет EWMA | | onHeartbeatMiss() | Таймаут pong; возвращает truesuspended | | onDrain(reason, deadlineMs) | Инициировать graceful drain | | onGoaway(code, reason) | GoawaySignal от сервера | | onConfigPush(config) | Применить новую конфигурацию транспорта | | resolveTransportMode(fnName) | Получить режим транспорта для функции | | stop() | Немедленно закрыть сессию |

Экспортируемые классы и типы

| Символ | Тип | Описание | |--------|-----|----------| | V2SessionClient | class | Главный клиент сессии | | AdaptiveHeartbeatV2 | class | EWMA RTT heartbeat controller | | FlowControlStateV2 | class | Кредитное управление потоком | | BackoffV2 | class | Exponential backoff + circuit | | PositionTrackerV2 | class | Трекер seq/completed IDs | | ConfigPushStateV2 | class | Менеджер динамической конфигурации | | validateV2Config | function | Валидация конфига; бросает Error | | V2Config | interface | Конфигурация сессии | | SessionStateV2 | type | Союз 8 состояний FSM | | TransportMode | type | 'direct' \| 'proxy' | | HelloAckV2 | interface | Данные HelloAck от сервера | | TransportConfigV2 | interface | ConfigPush payload | | ReconcileRequestV2 | interface | Declarative worker registration request | | FunctionDeclarationV2 | interface | Function declaration for Reconcile | | ConsumerGroupDeclarationV2 | interface | Consumer group declaration | | HttpRouteDeclarationV2 | interface | HTTP route declaration | | JobDeclarationV2 | interface | Job declaration | | WorkflowDeclarationV2 | interface | Workflow declaration | | SubscribeRequestV2 | interface | Registry subscribe request | | WorkerEndpointV2 | interface | Worker endpoint info | | IssueCertificateRequestV2 | interface | Certificate request | | IssueCertificateResponseV2 | interface | Certificate response | | CircuitBreakerConfigV2 | interface | Circuit breaker config | | ZoneConfigV2 | interface | Zone-aware config | | ServiceTransportOverride | interface | Per-service transport override | | FunctionTransportOverride | interface | Per-function transport override | | ResumeState | interface | Reconnect resume state |

From the main entry service-bridge, types such as ServiceBridgeOpts, RpcOpts, EventOpts, HandleRpcOpts, HandleEventOpts, ScheduleOpts, StartOpts, ExecuteWorkflowOpts, and ExecuteWorkflowResult are available. The DAG shapes WorkflowStep and WorkflowOpts are documented above but are not named exports from that entry — use inline object literals (inference from workflows.run(...)) unless your toolchain exposes deep paths. Example:

import type {
  RpcContext,
  EventContext,
  StreamWriter,
  TraceCtx,
  RetryPolicy,
  ServiceBridgeErrorSeverity,
} from "service-bridge";

FAQ

How does ServiceBridge handle service failures? RPC calls have configurable retries with exponential backoff and hard per-attempt timeouts, so a silent downstream service cannot keep a call pending forever. Events are durable (PostgreSQL-backed) with at-least-once delivery per consumer group. Failed deliveries are retried according to policy, then moved to DLQ. Workflows track step state and can be resumed.

Is there vendor lock-in? ServiceBridge is self-hosted. The runtime is a single Go binary + PostgreSQL. SDK calls map to standard patterns (RPC, pub/sub, cron) — migrating away means replacing SDK calls with equivalent library calls.

How does tracing work without an OTEL collector? The SDK automatically reports trace spans for every RPC call, event publish/delivery, workflow step, and HTTP request. The runtime stores traces in PostgreSQL and serves them via the built-in dashboard and a Loki-compatible API for Grafana integration.

Can I use ServiceBridge alongside existing infrastructure? Yes. You can adopt incrementally — start with RPC between two services, add events later, then workflows. ServiceBridge doesn't require replacing your existing broker or mesh all at once.

What happens when the control plane is down? In-flight direct RPC calls continue working (they go service-to-service, not through the control plane). New discovery lookups, event publishes, and telemetry writes are queued in the SDK offline queue and flushed when the control plane recovers.

What databases does the runtime support? PostgreSQL 16+. The runtime uses PostgreSQL for all persistence: traces, events, workflows, jobs, service registry, and configuration.


Community and Support


License

Free for non-commercial use. Commercial use requires a separate license. See LICENSE.

Copyright (c) 2026 Eugene Surkov.


Keywords

service-bridge · servicebridge · npm install service-bridge · npm i service-bridge · bun add service-bridge · Node.js SDK · TypeScript SDK · JavaScript microservices · RPC · gRPC · event bus · event-driven · distributed tracing · workflow orchestration · background jobs · cron · mTLS · service mesh · service discovery · zero sidecar · Istio alternative · Envoy alternative · RabbitMQ alternative · Temporal alternative · Jaeger alternative · PostgreSQL · Docker · Kubernetes · DLQ · dead letter queue · saga · distributed transactions · AI agent orchestration · Express middleware · Fastify middleware · HTTP middleware · observability · Prometheus · tracing · service catalog · durable events · retries · idempotency · auto mTLS · runtime dashboard · production ready · microservice communication