@sumicom/quicksave-message-bus
v0.8.6
Published
Transport-agnostic command + subscribe message bus for Quicksave
Maintainers
Readme
@sumicom/quicksave-message-bus
Transport-agnostic command + subscribe message bus. Built for Quicksave's PWA ↔ agent RPC channel, but has no Quicksave-specific code — it only needs a duplex transport that can deliver JSON frames.
What it gives you
Three primitives over a single transport:
| Primitive | Shape |
| -------------- | ---------------------------------------------------------------- |
| command | One-shot request/response (verb + payload → result or error) |
| subscribe | Path-based (/sessions/:id/cards), delivers snapshot + updates until unsub |
| publish | Server-side fan-out to every peer subscribed to a path |
| getSnapshot | One-shot read of a subscribable path — resolves with the same data sub would, without creating a subscription |
Key properties:
- Snapshot-on-subscribe: the first frame after
subis always a full snapshot. On reconnect the client auto-resendssubframes and gets a fresh snapshot, eliminating the "stale-after-reconnect" window. - Command queueing:
command(..., { queueWhileDisconnected: true })holds the request until the transport reconnects, then flushes. - Typed path params:
PathParams<'/sessions/:id/cards'>→{ id: string }. - Transport-agnostic: implement
ServerTransport/ClientTransportand drop it in. AFakeTransportships under/fakefor tests.
Install
npm install @sumicom/quicksave-message-busQuick example
Server (Node, any transport):
import { MessageBusServer } from '@sumicom/quicksave-message-bus';
const bus = new MessageBusServer(transport);
bus.onCommand<StartPayload, StartResult>('session:start', async (payload, ctx) => {
return await startSession(payload);
});
bus.onSubscribe<'/sessions/:id/cards', CardHistory, CardUpdate>(
'/sessions/:id/cards',
{
snapshot: async ({ params }) => getCardHistory(params.id),
onSubscribed: ({ params, peer }) => trackSubscriber(params.id, peer),
},
);
// Push updates to all subscribers of this exact path
bus.publish('/sessions/abc123/cards', { kind: 'card', event });Client (browser or Node):
import { MessageBusClient } from '@sumicom/quicksave-message-bus';
const bus = new MessageBusClient(transport);
const result = await bus.command<StartResult, StartPayload>(
'session:start',
{ prompt: 'hello' },
{ timeoutMs: 30_000, queueWhileDisconnected: true },
);
const unsub = bus.subscribe<CardHistory, CardUpdate>(
'/sessions/abc123/cards',
{
onSnapshot: (history) => setInitialState(history),
onUpdate: (event) => applyUpdate(event),
onError: (err) => console.warn('sub failed:', err),
},
);
// later:
unsub();Wire protocol
Frames (JSON):
// Client → Server
{ kind: 'cmd', id, verb, payload }
{ kind: 'sub', path }
{ kind: 'unsub', path }
// Server → Client
{ kind: 'result', id, ok: true, data }
{ kind: 'result', id, ok: false, error }
{ kind: 'snap', path, data }
{ kind: 'upd', path, data }
{ kind: 'sub-error', path, error }Transports are responsible for framing, delivery, and emitting peer connect/disconnect events; the bus has no opinion on wire format below that.
Transport contract
interface ServerTransport {
send(peer: PeerId, frame: ServerFrame): void;
onFrame(handler: (peer: PeerId, frame: ClientFrame) => void): void;
onPeerConnected(handler: (peer: PeerId) => void): void;
onPeerDisconnected(handler: (peer: PeerId) => void): void;
}
interface ClientTransport {
send(frame: ClientFrame): void;
onFrame(handler: (frame: ServerFrame) => void): void;
onConnected(handler: () => void): void;
onDisconnected(handler: () => void): void;
onReestablished(handler: () => void): void;
isConnected(): boolean;
}onConnected / onDisconnected track whether the transport is currently
up; they should be idempotent on already-in-state. onReestablished is
distinct and fires every time a fresh upstream session has just been
established (e.g. a successful handshake-ack), even when the transport's
connected flag never transitioned to disconnected. The bus uses it to
re-send sub frames, because the server drops a peer's subscriptions per
disconnect — if the wire layer masks brief blips from onDisconnected to
keep in-flight commands alive, only onReestablished will tell the bus
that its server-side subscription state needs rebuilding.
For an example over an existing WebSocket layer, see Quicksave's
apps/agent/src/messageBus/busServerTransport.ts and
apps/pwa/src/lib/busClientTransport.ts.
Testing with the fake transport
import { FakeServerTransport, FakeClientTransport } from '@sumicom/quicksave-message-bus/fake';Pairs in-memory; lets you drive connect/disconnect manually. Used in this package's own test suite.
License
MIT
