@console-one/relay
v0.1.1
Published
Windowed acknowledged broadcast: many-to-many publisher/subscriber with per-listener sliding windows, three-channel message envelopes (SUCCESS/ERROR/CLOSED), and a small operator library (of, map, tap, reduce, scan, states, group, squash, after, poll, mat
Maintainers
Readme
@console-one/relay
Windowed acknowledged broadcast: many-to-many publisher/subscriber with per-listener sliding windows, three-channel message envelopes (SUCCESS / ERROR / CLOSED), and a small operator library.
Originally extracted from web-server/server/src/core/streams/. Standalone — no dependency on the parent monorepo.
Install
npm install
npm run buildQuick start
import { Relay, of, map, tap } from '@console-one/relay'
const out: number[] = []
tap<number>((x) => out.push(x))(
map<number, number>((x) => x * 10)(of(1, 2, 3))
).query()
// out: [10, 20, 30]Or with run() for a Subscription handle:
import { of } from '@console-one/relay'
const sub = of(1, 2, 3).run<number>((value, state, notify) => {
console.log(value)
return state
})
await sub.completion // wait for the stream to finish
sub.cancel() // or detach manually
sub.pause(); sub.resume() // flow controlTwo activation styles
query() is the low-level "activate the upstream providers, return the relay so you can keep chaining" call. It's lazy by design — handlers register, but nothing flows until query() runs. This is what makes fan-out work: build the graph, fire once, every branch sees the data.
run(handler?, options?) is the ergonomic entry point: register a handler if given, activate, and return a RelaySubscription handle:
| | query() | run() |
|---|---|---|
| Returns | Subscribable (chain more) | RelaySubscription (control handle) |
| Activates upstream | yes | yes |
| Registers handler | no | yes (if passed) |
| cancel / pause / resume / completion | n/a | yes |
run() exists because forgetting query() was the historical pitfall of the API, and because once activated, lifecycle control wants its own surface.
The operator library
Curried form: op(args)(stream) → Subscribable. Sources (of, after, linked.subscribable, linked.pair) return Subscribables directly.
| Operator | Shape | What it does |
|---|---|---|
| of(...items) | → Subscribable<T> | Emit a fixed sequence then close. |
| map(fn) | (stream) → stream | Apply a pure function to each value. |
| tap(fn) | (stream) → stream | Observe each value; pass through unchanged. |
| reduce(fn, initial) | (stream) → stream | Fold to one final value, emitted at close. |
| scan(fn, initial) | (stream) → stream | Emit the running fold on every step. |
| states(...sources) | → Subscribable<{state, diff}> | Track latest concurrent state across N sources. |
| group(partitioner) | (stream) → stream<grouped> | Partition into named sub-streams. |
| squash() | (stream<stream<T>>) → stream<T> | Flatten a stream-of-streams. |
| after(ms).send(v) | → Subscribable<T> | One-shot: emit v after ms then close. |
| poll(source, ms) | → Subscribable<T> | Repeatedly query source at interval. |
| match(dfa, …) | reducer factory | DFA-driven match-and-emit. |
linked factories:
| Factory | Returns |
|---|---|
| linked.subscribable(cb) | A relay connected to a callback source. |
| linked.pair() | Matched { subscribable, publisher }. |
| linked.http(uri) | A relay backed by fetch(uri). |
| linked.safePair() | Like pair(), but messages emitted before any subscriber attaches are queued and replayed on attach. |
Acknowledged windowed flow control
BufferWindowPublisher is the underlying many-to-listener publisher. Each listener has its own (head, tail, window) cursor; messages are kept until every listener has acknowledged past them. Producers call resolve / reject / close; subscribers call register and supply a per-message handler plus an acknowledgement callback.
For richer flow control, wrap the raw ack channel in makeAckController(ack):
import { BufferWindowPublisher, makeAckController } from '@console-one/relay'
const publisher = new BufferWindowPublisher<number>({ windowSize: 10 })
publisher.register((subscribe, ack) => {
const control = makeAckController(ack)
subscribe((message) => {
if (message.type !== 'SUCCESS') return
handle(message.data)
control.ack(message.index!)
if (memoryHigh()) control.pause()
if (memoryOk()) control.resume(50)
})
})| Method | Effect |
|---|---|
| ack(i) | Confirm processed up to index i. |
| ackThrough(i) | Bulk-ack everything up to i. |
| setWindow(n) | Set window size to n. |
| request(n) | Bump window by n. |
| pause() / resume(n?) | Window 0 / window N. |
| close() | Listener-side close. |
The controller is itself callable as the original (received) => void shape, so existing raw-ack callers are untouched.
Surface
Relay, BufferWindowPublisher
relay(id?), fromService(source, id?) // factories
of, map, tap, reduce, scan
states, group, squash
after, poll, match, matchtype
linked.subscribable, linked.pair, linked.http, linked.safePair
Message, MessageStatus, MessageType, ReceivedStatus
DataEmittor, ProviderCallback
Publisher, WindowPublisher, Listener, Listeners
Subscribable, SubscribableHandlerType, SuccessHandler, ErrorHandler, Notify
AckController, makeAckController, NoResolverError? // see ack.ts
RelaySubscription, makeSubscription
Queue, QueueItemLayout
src/
├── index.ts # public surface
├── types.ts # Message, MessageStatus, ReceivedStatus, ProviderCallback, DataEmittor
├── queue.ts # minimal singly-linked queue used by the publisher
├── publisher.ts # Publisher / WindowPublisher / Listener interfaces
├── subscribable.ts # Subscribable interface, handler types, Notify
├── window.ts # BufferWindowPublisher (sliding-window flow control)
├── relay.ts # Relay (the concrete Subscribable) + factories
├── ack.ts # AckController + makeAckController
├── subscription.ts # RelaySubscription + makeSubscription
├── uuid.ts # tiny local uuid
├── operators/
│ ├── index.ts
│ ├── make.ts # linked.subscribable / pair / http / safePair
│ ├── of.ts
│ ├── map.ts
│ ├── tap.ts
│ ├── reduce.ts # reduce + scan
│ ├── states.ts
│ ├── group.ts
│ ├── squash.ts
│ ├── delay.ts # after(ms).send(value)
│ ├── poll.ts
│ └── match.ts
├── test/
│ └── operators.ts # runnable test suite (13 cases)
├── test-runner.ts # @console-one/assessable executor entry
└── smoke.ts # quick standalone smokeScripts
npm run build # tsc -p tsconfig.build.json
npm run smoke # node dist/smoke.js (ad-hoc, manual asserts)
npm run test # node dist/test-runner.js (assessable runner)
npm run clean # rm -rf distTests
src/test/operators.ts exercises every public operator and the lifecycle handles:
== Summary of 13 tests executed in 0.4 seconds ==
100% of tests (13/13) passed.
100% of tests (13/13) completed.
#1:1 Passed of/map/tap chains values and closes
#1:2 Passed reduce produces final state at close
#1:3 Passed scan emits running state on each step
#1:4 Passed linked.pair forwards published values
#1:5 Passed states tracks latest concurrent state
#1:6 Passed group partitions into named substreams
#1:7 Passed after(ms).send delays emission
#1:8 Passed BufferWindowPublisher gates with windowSize
#1:9 Passed Relay routes a callback source through subscribers
#1:10 Passed linked.safePair preserves pre-subscription messages
#1:11 Passed run() returns a Subscription handle
#1:12 Passed run().cancel() detaches from upstream
#1:13 Passed AckController pause/resume gates delivery