@livon/runtime
v0.29.0-rc.11
Published
Deterministic event pipeline runtime for LIVON.
Downloads
1,159
Maintainers
Readme
@livon/runtime
Install
pnpm add @livon/runtimePurpose
@livon/runtime is the event pipeline core. It composes modules and executes hook chains for:
onReceiveonSendonError
Best for
Use this package when you need deterministic event flow orchestration across transports and schema execution.
Basic usage
import {runtime} from '@livon/runtime';
runtime(moduleA, moduleB);Parameters
runtime(...modules):
modules(RuntimeModule[]): ordered module list to register and execute.
RuntimeModule:
name(string): module identifier for debugging and traceability.register((registry) => void): setup callback where hooks are registered.
Execution order (important)
Runtime module order defines execution order.
runtime(moduleA, moduleB, moduleC);Hook chains run from left to right:
moduleAmoduleBmoduleC
This applies to onReceive, onSend, and onError registration order.
Writing a runtime module
import type {RuntimeModule} from '@livon/runtime';
const traceModule: RuntimeModule = {
name: 'trace',
register: ({onReceive, onSend, onError}) => {
onReceive(async (envelope, ctx, next) => {
return next();
});
onSend(async (envelope, ctx, next) => {
return next();
});
onError((error, envelope, ctx) => {
// error handling
});
},
};Hook callback parameters
onReceive((envelope, ctx, next) => ...) and onSend((envelope, ctx, next) => ...):
envelope(EventEnvelope): current event envelope flowing through the chain.ctx(RuntimeContext): emit APIs, room-scoped context, and shared runtime state.next((update?) => Promise<EventEnvelope>): continue chain and optionally merge envelope updates.
onError((error, envelope, ctx) => ...):
error(unknown): thrown/normalized error from runtime chain.envelope(EventEnvelope): failed envelope snapshot.ctx(RuntimeContext): same runtime context for recovery/reporting logic.
Runtime context model
RuntimeContext (ctx in hooks) is runtime control surface.
It is separate from envelope.context (event data flowing through the pipeline).
flowchart TD
Runtime["runtime(...)"] --> Ctx["RuntimeContext"]
Ctx --> EmitReceive["emitReceive(input)"]
Ctx --> EmitSend["emitSend(input)"]
Ctx --> EmitError["emitError(input)"]
Ctx --> EmitEvent["emitEvent(input)"]
Ctx --> Room["room(name) -> RuntimeContext"]
Ctx --> State["state.get/set shared map"]
EmitReceive --> Envelope["EventEnvelope"]
EmitSend --> Envelope
EmitError --> Envelope
EmitEvent --> EnvelopesequenceDiagram
participant M1 as moduleA
participant R as runtime
participant M2 as moduleB
M1->>R: next({ context: { traceId: "t-1" } })
R->>R: mergeContext(base, update)
R->>M2: onReceive(... envelope.context includes traceId)
M2->>R: throw { message, context: { phase: "auth" } }
R->>R: buildFailedEnvelope + mergeContext(error.context)Rules:
ctx.room(name)creates a room-scoped context that injectsmetadata.room.ctx.stateis shared across room scopes for one runtime instance.envelope.contextis immutable hook input; usenext(update)to merge context changes.
Emit APIs in runtime context
Inside hooks/modules you can emit:
ctx.emitReceive(...)ctx.emitSend(...)ctx.emitError(...)ctx.emitEvent(...)(alias to send path)
You can also scope to rooms via ctx.room(roomId).
Emit input parameters
emitReceive, emitSend, emitError, emitEvent all accept one EmitInput:
event(string, required): event name.payload(Uint8Array, required unlesserroris provided): transport payload.error(EventError, required unlesspayloadis provided): error payload.id(string, optional): envelope id override.status('sending' | 'receiving' | 'failed', optional): explicit status override.metadata(Record<string, unknown>, optional): routing/correlation metadata.context(RuntimeEventContext, optional): module context object merged across pipeline.
