@octaviaflow/execution-context
v0.1.1
Published
Durable, tenant-isolated execution context for Octaviaflow workflow engine — typed step envelopes, ExecutionContextStore (Valkey-backed), n-th level JSON Schema inferencer, and idempotent execution-event stream
Maintainers
Readme
@octaviaflow/execution-context
Durable, tenant-isolated execution context for the Octaviaflow workflow engine.
Provides:
- Typed step envelopes —
{ data, schema, schemaHash, lineage, metadata, status }produced by every step. ExecutionContextStore— Valkey-backed (viaiovalkey) store with named health states; raisesStoreUnavailableErrorrather than silently dropping writes.- N-th level JSON Schema inferencer — recursive 200-sample × depth-8 walk into JSON Schema 2020-12 with cardinality, nullability, and small-domain enums. Hash output is stable under property reordering.
- Tenant prefix discipline — every Valkey key, S3 prefix, and Mongo DB name is constructed centrally and starts with
t:{tenantId}:(ort/{tenantId}/for object-store paths). Cross-tenant key collision is impossible at the store layer. - Idempotency primitives —
idempotencyKey({executionId, stepId, attempt})for store writes and event publishes. - Execution event stream — Valkey Streams transport (
XADD/XREADGROUP/XACK) for the Engine ↔ Backend execution event protocol with publisher dedup at receive.
Distinct from the cross-service NATS-based @octaviaflow/eventbus (which carries CDC ingress events). This package's event stream is for intra-execution events: step.committed, step.failed, execution.status, execution.log.
Install
bun add @octaviaflow/execution-context iovalkeyQuick start — store
import {
createValkeyExecutionContextStore,
type StepEnvelope,
} from '@octaviaflow/execution-context';
const store = createValkeyExecutionContextStore({
url: process.env.VALKEY_URL,
});
const scope = { tenantId: 'org_42', executionId: 'exec_abc' };
const envelope: StepEnvelope = {
executionId: scope.executionId,
stepId: 'fetch_customers',
attempt: 0,
data: [{ id: 1, name: 'Acme' }],
schema: { type: 'array', items: { type: 'object' } },
schemaHash: '...',
schemaVersion: 1,
lineage: [],
metadata: { rowCount: 1, bytes: 120, elapsedMs: 42, producedAt: new Date().toISOString() },
status: 'success',
};
await store.putStep(scope, envelope);
const back = await store.getStep(scope, 'fetch_customers');Quick start — schema inference
import { inferSchema, schemaHash } from '@octaviaflow/execution-context';
const rows = [
{ id: 1, name: 'a', address: { city: 'NYC' } },
{ id: 2, name: null, address: { city: 'SF' } },
];
const schema = inferSchema(rows);
// { type: 'object',
// properties: { id: {type:'integer'}, name: {type:'string'},
// address: {type:'object', properties: {city: {type:'string', enum:['NYC','SF']}}, required:['city']} },
// required: ['address', 'id'] }
const hash = schemaHash(schema); // 64-char sha256, stable under property reorderQuick start — execution event stream
import Valkey from 'iovalkey';
import {
ExecutionEventPublisher,
ExecutionEventSubscriber,
} from '@octaviaflow/execution-context';
const client = new Valkey(process.env.VALKEY_URL!);
const pub = new ExecutionEventPublisher({ client });
await pub.publish(
{ tenantId: 'org_42' },
{
kind: 'step.committed',
executionId: 'exec_abc',
attempt: 0,
stepId: 'fetch_customers',
ts: new Date().toISOString(),
},
);
const sub = new ExecutionEventSubscriber({
client,
group: 'backend-mongo-writer',
consumer: 'host-1',
});
await sub.runLoop({ tenantId: 'org_42' }, async (event, meta) => {
await persistEvent(event, meta);
});Tenant prefix discipline
All keys are constructed by typed helpers. Calls with an invalid tenantId throw InvalidTenantError. The valid tenant pattern is ^[a-zA-Z0-9_-]{1,64}$.
hot-tier (Valkey) → t:{tenantId}:exec:{executionId}:steps:{stepId}
hot-tier (Valkey) → t:{tenantId}:flow:{flowId}:vars
hot-tier (Valkey) → t:{tenantId}:exec:{executionId}:lease
event stream (Valkey) → t:{tenantId}:events
cold-tier (S3) → t/{tenantId}/exec/{executionId}/steps/{stepId}.json
cold-tier (S3) → t/{tenantId}/archive/{flowId}/{executionId}.jsonLogger
The package is logger-agnostic. Pass any object satisfying the Logger interface (debug / info / warn / error) to createValkeyExecutionContextStore, ExecutionEventPublisher, or ExecutionEventSubscriber. The default is a no-op logger.
License
Apache-2.0
