@quarry-systems/drift-core
v0.0.2-alpha.6
Published
A managed cyclic graph library for building complex workflows with cycles, conditional logic, and multiple endpoints
Maintainers
Readme
@quarry-systems/drift-core
A powerful managed cyclic graph library for building complex workflows with cycles, conditional logic, and multiple terminal nodes.
Overview
Drift Core is the execution engine for managed cyclic graphs. It provides a fluent API for building graphs, a robust execution system with persistence, and comprehensive support for guards, actions, events, middleware, and plugins.
Installation
npm install @quarry-systems/drift-core @quarry-systems/drift-contractsFeatures
- ✅ Cyclic Graph Support: Native support for cycles and loops
- ✅ Fluent Builder API: Intuitive graph construction with
ManagedCyclicGraph - ✅ Execution Manager: Orchestrate graph execution with persistence and resumption
- ✅ Guards: Conditional logic for edge traversal, with
guardAnd/guardOrcomposition - ✅ Actions: Transform context data during node execution
- ✅ Event System: Subscribe to graph and node lifecycle events
- ✅ Middleware: Intercept and observe execution flow
- ✅ Plugin System: Extend functionality with custom node types
- ✅ Persistence: Save and resume execution state
- ✅ Service Injection: Provide services to nodes at runtime
- ✅ Error Handling: Comprehensive error tracking and remediation
- ✅ Execution Metrics: Per-node timings plus authoritative
completedNodestracking via the metrics middleware - ✅ TypeScript First: Full type safety with generics
Quick Start
Basic Graph
import { ManagedCyclicGraph, GuardPresets, Manager } from '@quarry-systems/drift-core';
// Build a simple workflow
const graph = new ManagedCyclicGraph('hello-world')
.guard('any', GuardPresets.any())
.node('start', {
label: 'Start',
execute: [(ctx) => {
console.log('Starting workflow...');
return ctx;
}]
})
.node('process', {
label: 'Process Data',
execute: [(ctx) => ({
...ctx,
data: { ...ctx.data, processed: true }
})]
})
.node('end', {
label: 'End'
})
.edge('start', 'process', 'any')
.edge('process', 'end', 'any')
.start('start')
.build();
// Execute the graph
const manager = new Manager(graph);
const runId = await manager.start({ userId: '123' });
console.log('Execution complete:', runId);Conditional Branching
const graph = new ManagedCyclicGraph('conditional-flow')
.guard('any', GuardPresets.any())
.guard('isValid', (ctx) => ctx.data.valid === true)
.guard('isInvalid', (ctx) => ctx.data.valid !== true)
.node('validate', {
label: 'Validate Input',
execute: [(ctx) => ({
...ctx,
data: {
...ctx.data,
valid: ctx.data.value > 0
}
})]
})
.node('success', { label: 'Success Path' })
.node('failure', { label: 'Failure Path' })
.edge('validate', 'success', 'isValid')
.edge('validate', 'failure', 'isInvalid')
.start('validate')
.build();
const manager = new Manager(graph);
await manager.start({ value: 10 }); // Goes to 'success'
await manager.start({ value: -5 }); // Goes to 'failure'Cyclic Workflows
const graph = new ManagedCyclicGraph('retry-loop')
.guard('any', GuardPresets.any())
.guard('shouldRetry', (ctx) => ctx.data.attempts < 3)
.guard('maxAttempts', (ctx) => ctx.data.attempts >= 3)
.node('attempt', {
label: 'Attempt Operation',
execute: [(ctx) => ({
...ctx,
data: {
...ctx.data,
attempts: (ctx.data.attempts || 0) + 1,
success: Math.random() > 0.5
}
})]
})
.node('success', { label: 'Success' })
.node('failed', { label: 'Max Retries' })
.edge('attempt', 'success', (ctx) => ctx.data.success === true)
.edge('attempt', 'attempt', 'shouldRetry') // Loop back
.edge('attempt', 'failed', 'maxAttempts')
.start('attempt')
.build();Core Concepts
Context (Ctx)
The context object flows through the graph and contains all execution state:
interface Ctx {
runId: string; // Unique run identifier
data: Record<string, any>; // Mutable workflow data
global: Record<string, any>; // Immutable global config
injected: Record<string, any>; // Runtime injected data
errors: ErrorState[]; // Error tracking
events: Event[]; // Event log
}Guards
Guards determine if an edge can be traversed:
// Function guard
.guard('hasUser', (ctx) => ctx.data.userId !== undefined)
// Inline guard
.edge('start', 'end', (ctx) => ctx.data.ready === true)
// Guard presets
.guard('any', GuardPresets.any())
.guard('none', GuardPresets.none())Guard Composition
Retrieve and compose previously registered guards without copy-pasting their
logic. getGuard(id) returns a registered guard (or undefined); guardAnd
and guardOr register a new compound guard under a fresh id.
const graph = new ManagedCyclicGraph('composed-guards')
.guard('hasUser', (ctx) => ctx.data.userId !== undefined)
.guard('isVerified', (ctx) => ctx.data.verified === true)
.guard('isAdmin', (ctx) => ctx.data.role === 'admin')
// AND: passes only if every referenced guard passes; short-circuits on
// first failure. Missing guard ids are treated as failures.
.guardAnd('hasVerifiedUser', ['hasUser', 'isVerified'])
// OR: passes if any referenced guard passes; short-circuits on first
// success. Missing ids are skipped.
.guardOr('canAccessAdmin', ['isAdmin', 'hasVerifiedUser'])
.node('gate', { label: 'Gate' })
.node('admin', { label: 'Admin Area' })
.edge('gate', 'admin', 'canAccessAdmin')
.start('gate')
.build();
// Introspection: fetch a compound (or any) guard back out
const admin = graph.getGuard('canAccessAdmin');Compound guards thread prevResult through to every member guard, so member
guards that inspect the previous node's handler output still work when called
via an AND/OR combinator.
Edges
Edges define transitions between nodes. In addition to guards and priority,
edges support a maxTraversals cap that bounds how often the edge is allowed
to fire within a single run:
// Remediation loop that retries at most 3 times before falling through.
// No manual `attempts` counter in ctx.data — the runtime enforces it.
const graph = new ManagedCyclicGraph('bounded-retry')
.guard('any', GuardPresets.any())
.guard('failed', (ctx) => ctx.data.ok !== true)
.node('execute', { label: 'Execute' })
.node('verify', { label: 'Verify' })
.node('giveUp', { label: 'Give Up' })
.node('done', { label: 'Done' })
.edge('execute', 'verify', 'any')
.edge('verify', 'done', (ctx) => ctx.data.ok === true)
// This edge fires at most 3 times — after that it is skipped and
// selection falls through to the giveUp edge.
.edge({
source: 'verify',
target: 'execute',
guards: ['failed'],
maxTraversals: 3
})
.edge('verify', 'giveUp', 'failed')
.start('execute')
.build();Omit maxTraversals (or set it to 0) for unlimited traversals — the
default, fully backward compatible.
Actions
Actions transform the context during node execution:
.node('transform', {
label: 'Transform Data',
execute: [
// Action 1: Add timestamp
(ctx) => ({
...ctx,
data: { ...ctx.data, timestamp: Date.now() }
}),
// Action 2: Process data
async (ctx) => {
const result = await processData(ctx.data);
return { ...ctx, data: { ...ctx.data, result } };
}
]
})Advanced Features
Persistence & Resumption
import { Manager, InMemorySnapshotStore } from '@quarry-systems/drift-core';
const store = new InMemorySnapshotStore();
const manager = new Manager(graph, { snapshotStore: store });
// Start execution
const runId = await manager.start({ userId: '123' });
// Later, resume from a snapshot
const snapshot = await store.load(runId);
await manager.resume(runId, snapshot);Resume validates the graph version before any state restoration (alpha.3+). A Manager constructed against a different graph or with a mismatched graphVersion throws ValidationError(field: 'graphId' | 'graphVersion') from resume() — no run state is registered, no services are resolved, no wait-registry handlers leak. Snapshot-to-Manager mismatches are caught loud, before they can corrupt audit trails.
Service Injection
import { defineService } from '@quarry-systems/drift-core';
// Define a service
const httpService = defineService({
name: 'http',
version: '1.0.0',
scope: 'run',
factory: (ctx) => ({
get: async (url: string) => fetch(url).then(r => r.json())
})
});
// Use in graph
const graph = new ManagedCyclicGraph('with-services')
.node('fetch', {
label: 'Fetch Data',
execute: [async (ctx) => {
const http = ctx.services.http;
const data = await http.get('https://api.example.com/data');
return { ...ctx, data: { ...ctx.data, apiData: data } };
}]
})
.build();
// Provide services at runtime
const manager = new Manager(graph, {
services: { http: httpService }
});Middleware
import { createLoggingMiddleware, createMetricsMiddleware } from '@quarry-systems/drift-core';
const manager = new Manager(graph, {
middleware: [
createLoggingMiddleware({ level: 'info' }),
createMetricsMiddleware()
]
});Event Handling
const graph = new ManagedCyclicGraph('with-events')
.on('graph:start', (event) => {
console.log('Graph started:', event.runId);
})
.on('node:start', (event) => {
console.log('Node started:', event.nodeId);
})
.on('node:end', (event) => {
console.log('Node completed:', event.nodeId, event.duration);
})
.on('graph:end', (event) => {
console.log('Graph completed:', event.runId);
})
.build();Plugin System
import type { Plugin, NodeHandler } from '@quarry-systems/drift-core';
// Create a custom node handler.
// Current signature is a single args object: { ctx, node, runId, step }.
const delayHandler: NodeHandler = async ({ ctx, node }) => {
const ms = (node.meta as { delay?: number } | undefined)?.delay ?? 1000;
await new Promise(resolve => setTimeout(resolve, ms));
return ctx;
};
// Define plugin. name + version are required.
const delayPlugin: Plugin = {
name: 'delay-plugin',
version: '1.0.0',
nodes: { 'delay': delayHandler },
};
// Use plugin
const graph = new ManagedCyclicGraph('with-plugin')
.use(delayPlugin)
.node('wait', {
type: 'delay',
meta: { delay: 5000 }
})
.build();Error Handling
const graph = new ManagedCyclicGraph('error-handling')
.guard('any', GuardPresets.any())
.guard('hasErrors', (ctx) => ctx.errors.length > 0)
.guard('noErrors', (ctx) => ctx.errors.length === 0)
.node('risky', {
label: 'Risky Operation',
execute: [async (ctx) => {
try {
const result = await riskyOperation();
return { ...ctx, data: { ...ctx.data, result } };
} catch (error) {
return {
...ctx,
errors: [...ctx.errors, {
kind: 'execution',
nodeId: 'risky',
message: error.message,
severity: 'error'
}]
};
}
}]
})
.node('handleError', {
label: 'Error Handler',
execute: [(ctx) => {
console.error('Errors occurred:', ctx.errors);
return ctx;
}]
})
.node('done', { label: 'Done' })
// Route on errors via edge guards rather than node-level gating
.edge('risky', 'handleError', 'hasErrors')
.edge('risky', 'done', 'noErrors')
.start('risky')
.build();Resume-time graph-version errors (alpha.3+): ValidationError(field: 'graphId') distinguishes a wrong-graph resume; ValidationError(field: 'graphVersion') covers explicit-version validation failures and comparator 'mismatch' outcomes; ConfigurationError(configKey: 'graphVersionComparator') wraps any throw from a consumer-supplied comparator.
API Reference
ManagedCyclicGraph
Main builder class for constructing graphs.
Methods:
.guard(id, fn)— Register a guard function.guardAnd(id, [guardIds])— Compose registered guards with AND (short-circuits on first failure).guardOr(id, [guardIds])— Compose registered guards with OR (short-circuits on first success).getGuard(id)— Retrieve a registered guard for introspection or composition.node(id, config)— Add a node.edge(from, to, guard?, priority?)or.edge({ source, target, guards, maxTraversals, ... })— Add an edge (positional or config object).meter({ id, maxTraversals, edges })— Bound a remediation loop with a shared traversal counter across multiple edges.start(...nodeIds)— Mark one or more start nodes.service(key, definition)— Register a service on the builder.use(plugin)— Register a plugin.on(event, handler)— Subscribe to events.build()— Build the graph (GraphDef).buildSeparated()— Build returning{ graph, runtime }so the runtime portion can be serialized separately from the structural definition (alpha.4+)
Manager
Execution orchestrator for graphs.
Methods:
.start(injected?, options?)- Start a new execution.resume(runId, snapshot?)- Resume from snapshot.step(runId, options?)- Execute a single step.getRunState(runId)- Get the in-memoryRunState(synchronous,undefinedif the run is not in memory).getSnapshot(runId)- Load the most recent persisted snapshot.getStatus(runId)- Unified async lookup returning{ status, source, nodeId }. Checks in-memory state first, then falls back to the snapshot store — callers no longer need to mergegetRunState()andgetSnapshot()themselves.sourceis'memory','store', or (for an unknown run)'store'withstatus: 'unknown'..stop(runId)- Stop execution
const info = await manager.getStatus(runId);
if (info.status === 'unknown') {
// Neither memory nor store knows about this run
} else if (info.source === 'memory') {
// Live run — info.nodeId is the current node
} else {
// info.source === 'store' — run has been evicted from memory
// and info reflects the last persisted snapshot
}Graph version pinning (alpha.3+):
const manager = new Manager(graph, {
graphVersion: 'v1.2.3', // explicit pin (recommended for prod)
graphVersionComparator: strictGraphVersionComparator, // optional; defaults to smart
});options.graphVersion: string— explicit version pinned at construction. Empty/whitespace/hash-prefix-shaped values throwValidationError(field: 'graphVersion').options.graphVersionComparator: GraphVersionComparator— pluggable comparator. Defaults todefaultGraphVersionComparator(smart: consults structural hash).strictGraphVersionComparatoropt-in for pure equality.manager.getGraphVersion(): GraphVersionResolution— read the resolved version (cached at construction). Returns{ version, source, structuralHash }.
When unset, drift-core computes a deterministic structural hash (sha256-v1:<hex>) of the GraphDef as a development-time drift safety net. Production consumers should pin explicit versions; the hash backstops "I forgot to bump."
GuardPresets
Common guard functions:
GuardPresets.any()- Always trueGuardPresets.none()- Always false
Utilities
randomId()- Generate unique IDsdeepClone(obj)- Deep clone objectssleep(ms)- Async sleeptimeout(ms, promise)- Add timeout to promisegetNestedValue(obj, path)- Get nested propertysetNestedValue(obj, path, value)- Set nested property
Storage Implementations
InMemorySnapshotStore
import { InMemorySnapshotStore } from '@quarry-systems/drift-core';
const store = new InMemorySnapshotStore();
const manager = new Manager(graph, { snapshotStore: store });FilesystemArtifactStore
import { FilesystemArtifactStore } from '@quarry-systems/drift-core';
const artifactStore = new FilesystemArtifactStore('./artifacts');TypeScript Support
Full TypeScript support with generics:
interface MyGlobal {
apiKey: string;
}
interface MyInjected {
userId: string;
}
const graph = new ManagedCyclicGraph<MyGlobal, MyInjected>('typed-graph')
.node('start', {
execute: [(ctx) => {
// ctx.global.apiKey is typed as string
// ctx.injected.userId is typed as string
return ctx;
}]
})
.build();Lifecycle Hooks
Middleware can intercept node execution at two execute-phase hooks in addition
to the existing observer-only onNodeStart / onNodeEnd hooks.
| Hook | When | Can propose patches |
|---|---|---|
| onBeforeNodeExecute | After onEnter committed, before execute actions run | Yes |
| onAfterNodeExecute | After execute actions, before edge selection | Yes |
| onNodeStart | Node begins (observer only) | No |
| onNodeEnd | Node finishes (observer only) | No |
Tier-1 vs Tier-2
- Tier-2 (middleware) —
onBeforeNodeExecuteandonAfterNodeExecutecallevent.propose(patch)to emit patches into the runtime's Tier-2 patch buffer. Patches are committed at phase boundaries and are not immediately visible to later middleware in the same chain. - Tier-1 (internal) — built-in contributors (
Tier1Contributor) run before Tier-2 hooks and have patches committed immediately. Not extensible by plugins.
event.propose(patch)
Both execute-phase events expose propose():
interface NodeBeforeExecuteEvent<TCtx> {
nodeId: string;
timestamp: number;
context: TCtx;
runId: string;
graphId: string;
propose(patch: Omit<Patch, 'ts' | 'by'> & { by?: string }): void;
}
interface NodeAfterExecuteEvent<TCtx> extends NodeBeforeExecuteEvent<TCtx> {
/** Return value of the last execute action; undefined if no actions ran. */
prevResult: unknown;
}Tier-2 Isolation Property
Tier-2 middleware see Tier-1 results (already committed) but do not see other
Tier-2 middleware's mid-phase patches. Each Tier-2 hook runs against the
post-Tier-1 ctx snapshot; patches proposed via event.propose() are buffered and
committed in registration order after the entire chain completes. This means
third-party plugins authored by different teams don't depend on each other's
internal state.
Example
import type { Middleware } from '@quarry-systems/drift-contracts';
const auditMiddleware: Middleware = {
name: 'audit',
version: '1.0.0',
async onBeforeNodeExecute(event) {
// Inject an audit timestamp before actions run
event.propose({ op: 'set', path: 'auditStart', value: event.timestamp });
},
async onAfterNodeExecute(event) {
// Record the last action's return value
if (event.prevResult !== undefined) {
event.propose({ op: 'set', path: 'lastResult', value: event.prevResult });
}
},
};
const manager = new Manager(graph, { middleware: [auditMiddleware] });Resource Meters
Meters are declarative accumulators with limits. Pass a MetersPolicy via
ManagerOptions.meters and the runtime:
- Auto-registers a per-meter
metersOk:<meterId>guard for each meter. - Wraps edges per the meter's
scope:membersmeters (default foredge-traversals) wrap only their declared member edges;non-essential-sourcesmeters (default formetrics/timer/builtin) wrap every outbound edge whose source node is not markedmeta.essential: true. - After each node, snapshots current meter values into
ctx.data.__meterState. - Fires a
MeterExceededevent on the unified event bus when any meter breaches its limit.
MeterDef and MetersPolicy
type MeterSource =
| { kind: 'metrics'; key: string } // reads from metrics plugin total(key)
| { kind: 'timer' } // reads ctx.timer.elapsed (requires timer: true)
| { kind: 'builtin'; name: 'steps' }; // reads ctx.data.__currentStep
interface MeterDef {
limit: number; // Maximum allowed value; breach fires MeterExceeded
source: MeterSource;
description?: string; // Human description for logs/errors
}
type MetersPolicy = Record<string, MeterDef>;Meter Sources
| kind | Reads from | Requires |
|---|---|---|
| metrics | metrics plugin total(key) | metrics service registered |
| timer | ctx.timer.elapsed | ManagerOptions.timer: true |
| builtin: 'steps' | ctx.data.__currentStep | nothing |
Essential Nodes
Nodes marked meta.essential: true are excluded from the non-essential-sources-scoped
metersOk:<meterId> wraps — their outgoing edges fire even when those meters are
exhausted. Use this for graceful-shutdown or error-handling nodes that must run
regardless of budgets. (members-scoped meters ignore essentiality entirely; they
only wrap their declared member edges.)
Example
import { ManagedCyclicGraph, Manager } from '@quarry-systems/drift-core';
const graph = new ManagedCyclicGraph('metered-flow')
.guard('any', (ctx) => true)
.node('work', { label: 'Do Work' })
.node('cleanup', { label: 'Cleanup', meta: { essential: true } })
.node('done', { label: 'Done' })
.edge('work', 'done', 'any')
.edge('work', 'cleanup', 'any')
.start('work')
.build(); // pass builder, not built graph, so meters can wrap edges
const manager = new Manager(graph, {
meters: {
steps: {
limit: 50,
source: { kind: 'builtin', name: 'steps' },
description: 'Max 50 steps per run',
},
},
});Note: Pass the
ManagedCyclicGraphbuilder instance (not the result of.build()) toManagerwhen usingmeters. The Manager callsbuildSeparated()internally after wiring the per-metermetersOk:<meterId>guards and edge wraps.
Shared traversal cap (graph.meter())
Bound a remediation loop that fans out to multiple targets with one shared counter:
new ManagedCyclicGraph('council')
.guard('not-ready', notReadyGuard)
.guard('is-ready', isReadyGuard)
.node('arbiter', { execute: [arbiterAction] })
.node('specialist-a', { execute: [specialistA] })
.node('specialist-b', { execute: [specialistB] })
.node('done', { execute: [doneAction] })
.start('arbiter')
.edge({ from: 'arbiter', to: 'done', guards: ['is-ready'] })
.meter({
id: 'council-remediation',
maxTraversals: 3,
edges: [
{ from: 'arbiter', to: 'specialist-a', guards: ['not-ready'] },
{ from: 'arbiter', to: 'specialist-b', guards: ['not-ready'] },
],
});After 3 total member-edge traversals (any split), both member edges are blocked by the meter's per-meter metersOk:council-remediation guard and a MeterExceeded event fires.
Exit routing: Non-member edges (like your is-ready exit) are NOT wrapped by the meter's guard. When the cap trips, member edges are blocked but the exit routes normally as soon as its own guards pass. See docs/superpowers/specs/2026-04-15-drift-meter-scoping-design.md for the per-meter scoping design.
Wiring note: The Manager does not auto-read metersPolicy from the built graph. You must extract and pass it explicitly:
const meters = graph.build().metersPolicy;
const manager = new Manager(graph, { store, meters });Each meter gets its own guard id (metersOk:<meterId>); there is no longer a single shared metersOk guard.
Key constraints:
- The meter is the declaration site for its member edges — do NOT also declare them via
graph.edge(). - An edge may belong to at most one meter (v1 restriction).
- Counter lives at
ctx.data.__meterCounters[meterId]; survives suspend/resume via the standard snapshot pipeline.
Dependency Injection
At the start of every node's before lifecycle phase, Drift populates
ctx.node[nodeId].upstream with the results of the node's direct predecessors.
This eliminates manual ctx.data.results walks in most cases.
How It Works
- Regular / fork nodes —
upstreamis keyed by thesourcenodeId of each incoming edge. - Join nodes (with
awaitBranches) —upstreamis keyed by each entry inawaitBranches. - Scope — one-hop only. Transitive predecessors are not included.
- Source — values are read from
ctx.data.results[upstreamNodeId].
Example
.node('merge', {
nodeType: 'join',
awaitBranches: ['branchA', 'branchB'],
execute: [(ctx) => {
const a = ctx.node['merge'].upstream['branchA'];
const b = ctx.node['merge'].upstream['branchB'];
return { ...ctx, data: { ...ctx.data, combined: [a, b] } };
}],
})If no upstream results are available for a node (e.g., it is the start node),
ctx.node[nodeId].upstream is not written and remains whatever it was
previously (typically undefined).
Wait / Inject Lifecycle
Drift supports mid-run suspension while a node waits for external data — for example, waiting for a human approval, a webhook callback, or a remote API response.
ctx.waitForInjection(key, options?)
Called from within a node's execute phase. Uses mark-and-return semantics:
- The call registers the wait and returns immediately — the action continues executing normally.
- Suspension happens at the next
onAfterNodeExecutephase boundary. - The run is persisted with status
'waiting'and aWaitSuspendedevent is fired on the event bus.
interface WaitOptions {
/** Wall-clock timeout in ms; on expiry ctx.injected.__waits[key] = { __timedOut: true } */
timeoutMs?: number;
/** Reserved for future 'any' mode; v1 only supports all-satisfied resume. */
mode?: 'all';
}The injected value lands in ctx.injected.__waits[key] and is consumed by
the next node after resume (not the suspended node).
Manager.inject(runId, key, value)
Called from outside the graph (e.g., a webhook handler) to deliver a value to a waiting run:
await manager.inject(runId, 'approvalResult', { approved: true });
// Then resume the run:
await manager.resume({ runId });inject() throws if the run is not in a waiting state. It does not
auto-resume — the caller must call manager.resume() separately.
Timeout Sentinel
If a timeoutMs expires before inject() is called, the Manager writes
{ __timedOut: true } to ctx.injected.__waits[key] and fires
WaitResolved with timedOut: true. The run remains suspended until
explicitly resumed.
Events
| Event | Fired when |
|---|---|
| WaitSuspended | Run enters waiting state (runId, nodeId, keys, timeoutAt?) |
| WaitResolved | A key is satisfied via inject or timeout (runId, key, timedOut) |
| RunSuspended | Alias emitted alongside WaitSuspended |
| RunResumed | Run resumes from waiting state |
Services
Services provide infrastructure adapters (HTTP clients, secret stores, vector
DBs, etc.) to nodes via ctx.services. They can be registered either on the
graph builder or passed at runtime to the Manager.
graph.service(key, definition)
Register a service on the builder. Accepts either a bare instance or a factory descriptor ({ instance, scope }). Both .service() and .use(plugin) populate the same internal registry.
const graph = new ManagedCyclicGraph('my-graph')
.service('db', myDatabaseAdapter)
.service('secrets', mySecretsAdapter)
.node('fetchUser', {
execute: [async (ctx) => {
const user = await ctx.services.db.findUser(ctx.injected.userId);
return { ...ctx, data: { ...ctx.data, user } };
}],
})
.build();Runtime Service Injection
Services passed to ManagerOptions.services are merged with plugin-registered
services according to serviceMergeStrategy (default: 'runtime-overrides'):
const manager = new Manager(graph, {
services: {
db: myDatabaseAdapter,
secrets: { get: (key) => process.env[key] },
},
serviceMergeStrategy: 'runtime-overrides', // 'plugin-overrides' | 'error-on-conflict'
requiredServices: ['db', 'secrets'], // fail fast if missing
});Service Definitions (Scoping)
Wrap a service in a ServiceDefinition to control its scope:
import type { ServiceDefinition, ServiceFactory } from '@quarry-systems/drift-contracts';
// Per-run factory: rebuilt for each run (survives cross-process resume)
const dbDef: ServiceDefinition = {
instance: (runCtx) => createDbConnection(runCtx.runId),
scope: 'per-run',
};
const manager = new Manager(graph, {
services: { db: dbDef },
});| Scope | Instance created | Survives resume? |
|---|---|---|
| 'global' (default) | Once, shared across all runs | No — bare instance |
| 'per-run' | Per manager.start() / manager.resume() call | Yes |
Cross-Process Resume Warning
Global bare-instance services (plain objects / class instances with
scope: 'global') cannot be serialized into a snapshot and will not survive a
daemon restart or cross-process resume. The Manager emits a console.warn for
each such service at construction time.
Set allowBareInstances: true in ManagerOptions to silence the warning when
cross-process resume is not required:
const manager = new Manager(graph, {
services: { logger: console },
allowBareInstances: true,
});ServiceRebuilt Event
Fired on the unified event bus when a per-run service factory is invoked to
rebuild a service instance during resume:
{ type: 'ServiceRebuilt'; runId: string; key: string; timestamp: number }Related Packages
- @quarry-systems/drift-contracts - Type definitions
- @quarry-systems/drift-ai-core - AI agent functionality
- @quarry-systems/drift-cli - CLI tools
- @quarry-systems/drift-testing - Testing utilities
Official Plugins
@quarry-systems/drift-http- HTTP requests@quarry-systems/drift-timer- Delays and scheduling@quarry-systems/drift-openai- OpenAI integration@quarry-systems/drift-secrets- Secrets management@quarry-systems/drift-store-sqlite- SQLite persistence@quarry-systems/drift-vector-chroma- Vector storage
Examples
See the examples directory for complete examples:
- Basic workflows
- Conditional branching
- Retry loops
- Service integration
- Plugin development
Contributing
This package is part of the Drift monorepo. See the main repository for contribution guidelines.
License
Dual-licensed under:
- AGPL-3.0 for open source projects
- Commercial License for proprietary use
See LICENSE.md for details.
For commercial licensing:
- Email: [email protected]
- Web: https://quarry-systems.com/license
