npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@quarry-systems/drift-core

v0.0.2-alpha.6

Published

A managed cyclic graph library for building complex workflows with cycles, conditional logic, and multiple endpoints

Readme

@quarry-systems/drift-core

A powerful managed cyclic graph library for building complex workflows with cycles, conditional logic, and multiple terminal nodes.

npm version License

Overview

Drift Core is the execution engine for managed cyclic graphs. It provides a fluent API for building graphs, a robust execution system with persistence, and comprehensive support for guards, actions, events, middleware, and plugins.

Installation

npm install @quarry-systems/drift-core @quarry-systems/drift-contracts

Features

  • Cyclic Graph Support: Native support for cycles and loops
  • Fluent Builder API: Intuitive graph construction with ManagedCyclicGraph
  • Execution Manager: Orchestrate graph execution with persistence and resumption
  • Guards: Conditional logic for edge traversal, with guardAnd / guardOr composition
  • Actions: Transform context data during node execution
  • Event System: Subscribe to graph and node lifecycle events
  • Middleware: Intercept and observe execution flow
  • Plugin System: Extend functionality with custom node types
  • Persistence: Save and resume execution state
  • Service Injection: Provide services to nodes at runtime
  • Error Handling: Comprehensive error tracking and remediation
  • Execution Metrics: Per-node timings plus authoritative completedNodes tracking via the metrics middleware
  • TypeScript First: Full type safety with generics

Quick Start

Basic Graph

import { ManagedCyclicGraph, GuardPresets, Manager } from '@quarry-systems/drift-core';

// Build a simple workflow
const graph = new ManagedCyclicGraph('hello-world')
  .guard('any', GuardPresets.any())
  
  .node('start', { 
    label: 'Start',
    execute: [(ctx) => {
      console.log('Starting workflow...');
      return ctx;
    }]
  })
  
  .node('process', {
    label: 'Process Data',
    execute: [(ctx) => ({
      ...ctx,
      data: { ...ctx.data, processed: true }
    })]
  })
  
  .node('end', { 
    label: 'End'
  })
  
  .edge('start', 'process', 'any')
  .edge('process', 'end', 'any')
  .start('start')
  .build();

// Execute the graph
const manager = new Manager(graph);
const runId = await manager.start({ userId: '123' });
console.log('Execution complete:', runId);

Conditional Branching

const graph = new ManagedCyclicGraph('conditional-flow')
  .guard('any', GuardPresets.any())
  .guard('isValid', (ctx) => ctx.data.valid === true)
  .guard('isInvalid', (ctx) => ctx.data.valid !== true)
  
  .node('validate', {
    label: 'Validate Input',
    execute: [(ctx) => ({
      ...ctx,
      data: { 
        ...ctx.data, 
        valid: ctx.data.value > 0 
      }
    })]
  })
  
  .node('success', { label: 'Success Path' })
  .node('failure', { label: 'Failure Path' })
  
  .edge('validate', 'success', 'isValid')
  .edge('validate', 'failure', 'isInvalid')
  .start('validate')
  .build();

const manager = new Manager(graph);
await manager.start({ value: 10 }); // Goes to 'success'
await manager.start({ value: -5 }); // Goes to 'failure'

Cyclic Workflows

const graph = new ManagedCyclicGraph('retry-loop')
  .guard('any', GuardPresets.any())
  .guard('shouldRetry', (ctx) => ctx.data.attempts < 3)
  .guard('maxAttempts', (ctx) => ctx.data.attempts >= 3)
  
  .node('attempt', {
    label: 'Attempt Operation',
    execute: [(ctx) => ({
      ...ctx,
      data: {
        ...ctx.data,
        attempts: (ctx.data.attempts || 0) + 1,
        success: Math.random() > 0.5
      }
    })]
  })
  
  .node('success', { label: 'Success' })
  .node('failed', { label: 'Max Retries' })
  
  .edge('attempt', 'success', (ctx) => ctx.data.success === true)
  .edge('attempt', 'attempt', 'shouldRetry') // Loop back
  .edge('attempt', 'failed', 'maxAttempts')
  .start('attempt')
  .build();

Core Concepts

Context (Ctx)

The context object flows through the graph and contains all execution state:

interface Ctx {
  runId: string;           // Unique run identifier
  data: Record<string, any>;      // Mutable workflow data
  global: Record<string, any>;    // Immutable global config
  injected: Record<string, any>;  // Runtime injected data
  errors: ErrorState[];    // Error tracking
  events: Event[];         // Event log
}

Guards

Guards determine if an edge can be traversed:

// Function guard
.guard('hasUser', (ctx) => ctx.data.userId !== undefined)

// Inline guard
.edge('start', 'end', (ctx) => ctx.data.ready === true)

// Guard presets
.guard('any', GuardPresets.any())
.guard('none', GuardPresets.none())

Guard Composition

Retrieve and compose previously registered guards without copy-pasting their logic. getGuard(id) returns a registered guard (or undefined); guardAnd and guardOr register a new compound guard under a fresh id.

const graph = new ManagedCyclicGraph('composed-guards')
  .guard('hasUser', (ctx) => ctx.data.userId !== undefined)
  .guard('isVerified', (ctx) => ctx.data.verified === true)
  .guard('isAdmin', (ctx) => ctx.data.role === 'admin')

  // AND: passes only if every referenced guard passes; short-circuits on
  // first failure. Missing guard ids are treated as failures.
  .guardAnd('hasVerifiedUser', ['hasUser', 'isVerified'])

  // OR: passes if any referenced guard passes; short-circuits on first
  // success. Missing ids are skipped.
  .guardOr('canAccessAdmin', ['isAdmin', 'hasVerifiedUser'])

  .node('gate', { label: 'Gate' })
  .node('admin', { label: 'Admin Area' })
  .edge('gate', 'admin', 'canAccessAdmin')
  .start('gate')
  .build();

// Introspection: fetch a compound (or any) guard back out
const admin = graph.getGuard('canAccessAdmin');

Compound guards thread prevResult through to every member guard, so member guards that inspect the previous node's handler output still work when called via an AND/OR combinator.

Edges

Edges define transitions between nodes. In addition to guards and priority, edges support a maxTraversals cap that bounds how often the edge is allowed to fire within a single run:

// Remediation loop that retries at most 3 times before falling through.
// No manual `attempts` counter in ctx.data — the runtime enforces it.
const graph = new ManagedCyclicGraph('bounded-retry')
  .guard('any', GuardPresets.any())
  .guard('failed', (ctx) => ctx.data.ok !== true)

  .node('execute', { label: 'Execute' })
  .node('verify', { label: 'Verify' })
  .node('giveUp', { label: 'Give Up' })
  .node('done', { label: 'Done' })

  .edge('execute', 'verify', 'any')
  .edge('verify', 'done', (ctx) => ctx.data.ok === true)
  // This edge fires at most 3 times — after that it is skipped and
  // selection falls through to the giveUp edge.
  .edge({
    source: 'verify',
    target: 'execute',
    guards: ['failed'],
    maxTraversals: 3
  })
  .edge('verify', 'giveUp', 'failed')
  .start('execute')
  .build();

Omit maxTraversals (or set it to 0) for unlimited traversals — the default, fully backward compatible.

Actions

Actions transform the context during node execution:

.node('transform', {
  label: 'Transform Data',
  execute: [
    // Action 1: Add timestamp
    (ctx) => ({
      ...ctx,
      data: { ...ctx.data, timestamp: Date.now() }
    }),
    
    // Action 2: Process data
    async (ctx) => {
      const result = await processData(ctx.data);
      return { ...ctx, data: { ...ctx.data, result } };
    }
  ]
})

Advanced Features

Persistence & Resumption

import { Manager, InMemorySnapshotStore } from '@quarry-systems/drift-core';

const store = new InMemorySnapshotStore();
const manager = new Manager(graph, { snapshotStore: store });

// Start execution
const runId = await manager.start({ userId: '123' });

// Later, resume from a snapshot
const snapshot = await store.load(runId);
await manager.resume(runId, snapshot);

Resume validates the graph version before any state restoration (alpha.3+). A Manager constructed against a different graph or with a mismatched graphVersion throws ValidationError(field: 'graphId' | 'graphVersion') from resume() — no run state is registered, no services are resolved, no wait-registry handlers leak. Snapshot-to-Manager mismatches are caught loud, before they can corrupt audit trails.

Service Injection

import { defineService } from '@quarry-systems/drift-core';

// Define a service
const httpService = defineService({
  name: 'http',
  version: '1.0.0',
  scope: 'run',
  factory: (ctx) => ({
    get: async (url: string) => fetch(url).then(r => r.json())
  })
});

// Use in graph
const graph = new ManagedCyclicGraph('with-services')
  .node('fetch', {
    label: 'Fetch Data',
    execute: [async (ctx) => {
      const http = ctx.services.http;
      const data = await http.get('https://api.example.com/data');
      return { ...ctx, data: { ...ctx.data, apiData: data } };
    }]
  })
  .build();

// Provide services at runtime
const manager = new Manager(graph, {
  services: { http: httpService }
});

Middleware

import { createLoggingMiddleware, createMetricsMiddleware } from '@quarry-systems/drift-core';

const manager = new Manager(graph, {
  middleware: [
    createLoggingMiddleware({ level: 'info' }),
    createMetricsMiddleware()
  ]
});

Event Handling

const graph = new ManagedCyclicGraph('with-events')
  .on('graph:start', (event) => {
    console.log('Graph started:', event.runId);
  })
  .on('node:start', (event) => {
    console.log('Node started:', event.nodeId);
  })
  .on('node:end', (event) => {
    console.log('Node completed:', event.nodeId, event.duration);
  })
  .on('graph:end', (event) => {
    console.log('Graph completed:', event.runId);
  })
  .build();

Plugin System

import type { Plugin, NodeHandler } from '@quarry-systems/drift-core';

// Create a custom node handler.
// Current signature is a single args object: { ctx, node, runId, step }.
const delayHandler: NodeHandler = async ({ ctx, node }) => {
  const ms = (node.meta as { delay?: number } | undefined)?.delay ?? 1000;
  await new Promise(resolve => setTimeout(resolve, ms));
  return ctx;
};

// Define plugin. name + version are required.
const delayPlugin: Plugin = {
  name: 'delay-plugin',
  version: '1.0.0',
  nodes: { 'delay': delayHandler },
};

// Use plugin
const graph = new ManagedCyclicGraph('with-plugin')
  .use(delayPlugin)
  .node('wait', {
    type: 'delay',
    meta: { delay: 5000 }
  })
  .build();

Error Handling

const graph = new ManagedCyclicGraph('error-handling')
  .guard('any', GuardPresets.any())
  .guard('hasErrors', (ctx) => ctx.errors.length > 0)
  .guard('noErrors',  (ctx) => ctx.errors.length === 0)

  .node('risky', {
    label: 'Risky Operation',
    execute: [async (ctx) => {
      try {
        const result = await riskyOperation();
        return { ...ctx, data: { ...ctx.data, result } };
      } catch (error) {
        return {
          ...ctx,
          errors: [...ctx.errors, {
            kind: 'execution',
            nodeId: 'risky',
            message: error.message,
            severity: 'error'
          }]
        };
      }
    }]
  })
  .node('handleError', {
    label: 'Error Handler',
    execute: [(ctx) => {
      console.error('Errors occurred:', ctx.errors);
      return ctx;
    }]
  })
  .node('done', { label: 'Done' })

  // Route on errors via edge guards rather than node-level gating
  .edge('risky', 'handleError', 'hasErrors')
  .edge('risky', 'done',        'noErrors')
  .start('risky')
  .build();

Resume-time graph-version errors (alpha.3+): ValidationError(field: 'graphId') distinguishes a wrong-graph resume; ValidationError(field: 'graphVersion') covers explicit-version validation failures and comparator 'mismatch' outcomes; ConfigurationError(configKey: 'graphVersionComparator') wraps any throw from a consumer-supplied comparator.

API Reference

ManagedCyclicGraph

Main builder class for constructing graphs.

Methods:

  • .guard(id, fn) — Register a guard function
  • .guardAnd(id, [guardIds]) — Compose registered guards with AND (short-circuits on first failure)
  • .guardOr(id, [guardIds]) — Compose registered guards with OR (short-circuits on first success)
  • .getGuard(id) — Retrieve a registered guard for introspection or composition
  • .node(id, config) — Add a node
  • .edge(from, to, guard?, priority?) or .edge({ source, target, guards, maxTraversals, ... }) — Add an edge (positional or config object)
  • .meter({ id, maxTraversals, edges }) — Bound a remediation loop with a shared traversal counter across multiple edges
  • .start(...nodeIds) — Mark one or more start nodes
  • .service(key, definition) — Register a service on the builder
  • .use(plugin) — Register a plugin
  • .on(event, handler) — Subscribe to events
  • .build() — Build the graph (GraphDef)
  • .buildSeparated() — Build returning { graph, runtime } so the runtime portion can be serialized separately from the structural definition (alpha.4+)

Manager

Execution orchestrator for graphs.

Methods:

  • .start(injected?, options?) - Start a new execution
  • .resume(runId, snapshot?) - Resume from snapshot
  • .step(runId, options?) - Execute a single step
  • .getRunState(runId) - Get the in-memory RunState (synchronous, undefined if the run is not in memory)
  • .getSnapshot(runId) - Load the most recent persisted snapshot
  • .getStatus(runId) - Unified async lookup returning { status, source, nodeId }. Checks in-memory state first, then falls back to the snapshot store — callers no longer need to merge getRunState() and getSnapshot() themselves. source is 'memory', 'store', or (for an unknown run) 'store' with status: 'unknown'.
  • .stop(runId) - Stop execution
const info = await manager.getStatus(runId);
if (info.status === 'unknown') {
  // Neither memory nor store knows about this run
} else if (info.source === 'memory') {
  // Live run — info.nodeId is the current node
} else {
  // info.source === 'store' — run has been evicted from memory
  // and info reflects the last persisted snapshot
}

Graph version pinning (alpha.3+):

const manager = new Manager(graph, {
  graphVersion: 'v1.2.3',                        // explicit pin (recommended for prod)
  graphVersionComparator: strictGraphVersionComparator,  // optional; defaults to smart
});
  • options.graphVersion: string — explicit version pinned at construction. Empty/whitespace/hash-prefix-shaped values throw ValidationError(field: 'graphVersion').
  • options.graphVersionComparator: GraphVersionComparator — pluggable comparator. Defaults to defaultGraphVersionComparator (smart: consults structural hash). strictGraphVersionComparator opt-in for pure equality.
  • manager.getGraphVersion(): GraphVersionResolution — read the resolved version (cached at construction). Returns { version, source, structuralHash }.

When unset, drift-core computes a deterministic structural hash (sha256-v1:<hex>) of the GraphDef as a development-time drift safety net. Production consumers should pin explicit versions; the hash backstops "I forgot to bump."

GuardPresets

Common guard functions:

  • GuardPresets.any() - Always true
  • GuardPresets.none() - Always false

Utilities

  • randomId() - Generate unique IDs
  • deepClone(obj) - Deep clone objects
  • sleep(ms) - Async sleep
  • timeout(ms, promise) - Add timeout to promise
  • getNestedValue(obj, path) - Get nested property
  • setNestedValue(obj, path, value) - Set nested property

Storage Implementations

InMemorySnapshotStore

import { InMemorySnapshotStore } from '@quarry-systems/drift-core';

const store = new InMemorySnapshotStore();
const manager = new Manager(graph, { snapshotStore: store });

FilesystemArtifactStore

import { FilesystemArtifactStore } from '@quarry-systems/drift-core';

const artifactStore = new FilesystemArtifactStore('./artifacts');

TypeScript Support

Full TypeScript support with generics:

interface MyGlobal {
  apiKey: string;
}

interface MyInjected {
  userId: string;
}

const graph = new ManagedCyclicGraph<MyGlobal, MyInjected>('typed-graph')
  .node('start', {
    execute: [(ctx) => {
      // ctx.global.apiKey is typed as string
      // ctx.injected.userId is typed as string
      return ctx;
    }]
  })
  .build();

Lifecycle Hooks

Middleware can intercept node execution at two execute-phase hooks in addition to the existing observer-only onNodeStart / onNodeEnd hooks.

| Hook | When | Can propose patches | |---|---|---| | onBeforeNodeExecute | After onEnter committed, before execute actions run | Yes | | onAfterNodeExecute | After execute actions, before edge selection | Yes | | onNodeStart | Node begins (observer only) | No | | onNodeEnd | Node finishes (observer only) | No |

Tier-1 vs Tier-2

  • Tier-2 (middleware)onBeforeNodeExecute and onAfterNodeExecute call event.propose(patch) to emit patches into the runtime's Tier-2 patch buffer. Patches are committed at phase boundaries and are not immediately visible to later middleware in the same chain.
  • Tier-1 (internal) — built-in contributors (Tier1Contributor) run before Tier-2 hooks and have patches committed immediately. Not extensible by plugins.

event.propose(patch)

Both execute-phase events expose propose():

interface NodeBeforeExecuteEvent<TCtx> {
  nodeId: string;
  timestamp: number;
  context: TCtx;
  runId: string;
  graphId: string;
  propose(patch: Omit<Patch, 'ts' | 'by'> & { by?: string }): void;
}

interface NodeAfterExecuteEvent<TCtx> extends NodeBeforeExecuteEvent<TCtx> {
  /** Return value of the last execute action; undefined if no actions ran. */
  prevResult: unknown;
}

Tier-2 Isolation Property

Tier-2 middleware see Tier-1 results (already committed) but do not see other Tier-2 middleware's mid-phase patches. Each Tier-2 hook runs against the post-Tier-1 ctx snapshot; patches proposed via event.propose() are buffered and committed in registration order after the entire chain completes. This means third-party plugins authored by different teams don't depend on each other's internal state.

Example

import type { Middleware } from '@quarry-systems/drift-contracts';

const auditMiddleware: Middleware = {
  name: 'audit',
  version: '1.0.0',

  async onBeforeNodeExecute(event) {
    // Inject an audit timestamp before actions run
    event.propose({ op: 'set', path: 'auditStart', value: event.timestamp });
  },

  async onAfterNodeExecute(event) {
    // Record the last action's return value
    if (event.prevResult !== undefined) {
      event.propose({ op: 'set', path: 'lastResult', value: event.prevResult });
    }
  },
};

const manager = new Manager(graph, { middleware: [auditMiddleware] });

Resource Meters

Meters are declarative accumulators with limits. Pass a MetersPolicy via ManagerOptions.meters and the runtime:

  1. Auto-registers a per-meter metersOk:<meterId> guard for each meter.
  2. Wraps edges per the meter's scope: members meters (default for edge-traversals) wrap only their declared member edges; non-essential-sources meters (default for metrics / timer / builtin) wrap every outbound edge whose source node is not marked meta.essential: true.
  3. After each node, snapshots current meter values into ctx.data.__meterState.
  4. Fires a MeterExceeded event on the unified event bus when any meter breaches its limit.

MeterDef and MetersPolicy

type MeterSource =
  | { kind: 'metrics'; key: string }   // reads from metrics plugin total(key)
  | { kind: 'timer' }                   // reads ctx.timer.elapsed (requires timer: true)
  | { kind: 'builtin'; name: 'steps' }; // reads ctx.data.__currentStep

interface MeterDef {
  limit: number;        // Maximum allowed value; breach fires MeterExceeded
  source: MeterSource;
  description?: string; // Human description for logs/errors
}

type MetersPolicy = Record<string, MeterDef>;

Meter Sources

| kind | Reads from | Requires | |---|---|---| | metrics | metrics plugin total(key) | metrics service registered | | timer | ctx.timer.elapsed | ManagerOptions.timer: true | | builtin: 'steps' | ctx.data.__currentStep | nothing |

Essential Nodes

Nodes marked meta.essential: true are excluded from the non-essential-sources-scoped metersOk:<meterId> wraps — their outgoing edges fire even when those meters are exhausted. Use this for graceful-shutdown or error-handling nodes that must run regardless of budgets. (members-scoped meters ignore essentiality entirely; they only wrap their declared member edges.)

Example

import { ManagedCyclicGraph, Manager } from '@quarry-systems/drift-core';

const graph = new ManagedCyclicGraph('metered-flow')
  .guard('any', (ctx) => true)
  .node('work',    { label: 'Do Work' })
  .node('cleanup', { label: 'Cleanup', meta: { essential: true } })
  .node('done',    { label: 'Done' })
  .edge('work',    'done',    'any')
  .edge('work',    'cleanup', 'any')
  .start('work')
  .build();  // pass builder, not built graph, so meters can wrap edges

const manager = new Manager(graph, {
  meters: {
    steps: {
      limit: 50,
      source: { kind: 'builtin', name: 'steps' },
      description: 'Max 50 steps per run',
    },
  },
});

Note: Pass the ManagedCyclicGraph builder instance (not the result of .build()) to Manager when using meters. The Manager calls buildSeparated() internally after wiring the per-meter metersOk:<meterId> guards and edge wraps.

Shared traversal cap (graph.meter())

Bound a remediation loop that fans out to multiple targets with one shared counter:

new ManagedCyclicGraph('council')
  .guard('not-ready', notReadyGuard)
  .guard('is-ready', isReadyGuard)
  .node('arbiter', { execute: [arbiterAction] })
  .node('specialist-a', { execute: [specialistA] })
  .node('specialist-b', { execute: [specialistB] })
  .node('done', { execute: [doneAction] })
  .start('arbiter')
  .edge({ from: 'arbiter', to: 'done', guards: ['is-ready'] })
  .meter({
    id: 'council-remediation',
    maxTraversals: 3,
    edges: [
      { from: 'arbiter', to: 'specialist-a', guards: ['not-ready'] },
      { from: 'arbiter', to: 'specialist-b', guards: ['not-ready'] },
    ],
  });

After 3 total member-edge traversals (any split), both member edges are blocked by the meter's per-meter metersOk:council-remediation guard and a MeterExceeded event fires.

Exit routing: Non-member edges (like your is-ready exit) are NOT wrapped by the meter's guard. When the cap trips, member edges are blocked but the exit routes normally as soon as its own guards pass. See docs/superpowers/specs/2026-04-15-drift-meter-scoping-design.md for the per-meter scoping design.

Wiring note: The Manager does not auto-read metersPolicy from the built graph. You must extract and pass it explicitly:

const meters = graph.build().metersPolicy;
const manager = new Manager(graph, { store, meters });

Each meter gets its own guard id (metersOk:<meterId>); there is no longer a single shared metersOk guard.

Key constraints:

  • The meter is the declaration site for its member edges — do NOT also declare them via graph.edge().
  • An edge may belong to at most one meter (v1 restriction).
  • Counter lives at ctx.data.__meterCounters[meterId]; survives suspend/resume via the standard snapshot pipeline.

Dependency Injection

At the start of every node's before lifecycle phase, Drift populates ctx.node[nodeId].upstream with the results of the node's direct predecessors. This eliminates manual ctx.data.results walks in most cases.

How It Works

  • Regular / fork nodesupstream is keyed by the source nodeId of each incoming edge.
  • Join nodes (with awaitBranches) — upstream is keyed by each entry in awaitBranches.
  • Scope — one-hop only. Transitive predecessors are not included.
  • Source — values are read from ctx.data.results[upstreamNodeId].

Example

.node('merge', {
  nodeType: 'join',
  awaitBranches: ['branchA', 'branchB'],
  execute: [(ctx) => {
    const a = ctx.node['merge'].upstream['branchA'];
    const b = ctx.node['merge'].upstream['branchB'];
    return { ...ctx, data: { ...ctx.data, combined: [a, b] } };
  }],
})

If no upstream results are available for a node (e.g., it is the start node), ctx.node[nodeId].upstream is not written and remains whatever it was previously (typically undefined).


Wait / Inject Lifecycle

Drift supports mid-run suspension while a node waits for external data — for example, waiting for a human approval, a webhook callback, or a remote API response.

ctx.waitForInjection(key, options?)

Called from within a node's execute phase. Uses mark-and-return semantics:

  1. The call registers the wait and returns immediately — the action continues executing normally.
  2. Suspension happens at the next onAfterNodeExecute phase boundary.
  3. The run is persisted with status 'waiting' and a WaitSuspended event is fired on the event bus.
interface WaitOptions {
  /** Wall-clock timeout in ms; on expiry ctx.injected.__waits[key] = { __timedOut: true } */
  timeoutMs?: number;
  /** Reserved for future 'any' mode; v1 only supports all-satisfied resume. */
  mode?: 'all';
}

The injected value lands in ctx.injected.__waits[key] and is consumed by the next node after resume (not the suspended node).

Manager.inject(runId, key, value)

Called from outside the graph (e.g., a webhook handler) to deliver a value to a waiting run:

await manager.inject(runId, 'approvalResult', { approved: true });
// Then resume the run:
await manager.resume({ runId });

inject() throws if the run is not in a waiting state. It does not auto-resume — the caller must call manager.resume() separately.

Timeout Sentinel

If a timeoutMs expires before inject() is called, the Manager writes { __timedOut: true } to ctx.injected.__waits[key] and fires WaitResolved with timedOut: true. The run remains suspended until explicitly resumed.

Events

| Event | Fired when | |---|---| | WaitSuspended | Run enters waiting state (runId, nodeId, keys, timeoutAt?) | | WaitResolved | A key is satisfied via inject or timeout (runId, key, timedOut) | | RunSuspended | Alias emitted alongside WaitSuspended | | RunResumed | Run resumes from waiting state |


Services

Services provide infrastructure adapters (HTTP clients, secret stores, vector DBs, etc.) to nodes via ctx.services. They can be registered either on the graph builder or passed at runtime to the Manager.

graph.service(key, definition)

Register a service on the builder. Accepts either a bare instance or a factory descriptor ({ instance, scope }). Both .service() and .use(plugin) populate the same internal registry.

const graph = new ManagedCyclicGraph('my-graph')
  .service('db', myDatabaseAdapter)
  .service('secrets', mySecretsAdapter)
  .node('fetchUser', {
    execute: [async (ctx) => {
      const user = await ctx.services.db.findUser(ctx.injected.userId);
      return { ...ctx, data: { ...ctx.data, user } };
    }],
  })
  .build();

Runtime Service Injection

Services passed to ManagerOptions.services are merged with plugin-registered services according to serviceMergeStrategy (default: 'runtime-overrides'):

const manager = new Manager(graph, {
  services: {
    db: myDatabaseAdapter,
    secrets: { get: (key) => process.env[key] },
  },
  serviceMergeStrategy: 'runtime-overrides', // 'plugin-overrides' | 'error-on-conflict'
  requiredServices: ['db', 'secrets'],        // fail fast if missing
});

Service Definitions (Scoping)

Wrap a service in a ServiceDefinition to control its scope:

import type { ServiceDefinition, ServiceFactory } from '@quarry-systems/drift-contracts';

// Per-run factory: rebuilt for each run (survives cross-process resume)
const dbDef: ServiceDefinition = {
  instance: (runCtx) => createDbConnection(runCtx.runId),
  scope: 'per-run',
};

const manager = new Manager(graph, {
  services: { db: dbDef },
});

| Scope | Instance created | Survives resume? | |---|---|---| | 'global' (default) | Once, shared across all runs | No — bare instance | | 'per-run' | Per manager.start() / manager.resume() call | Yes |

Cross-Process Resume Warning

Global bare-instance services (plain objects / class instances with scope: 'global') cannot be serialized into a snapshot and will not survive a daemon restart or cross-process resume. The Manager emits a console.warn for each such service at construction time.

Set allowBareInstances: true in ManagerOptions to silence the warning when cross-process resume is not required:

const manager = new Manager(graph, {
  services: { logger: console },
  allowBareInstances: true,
});

ServiceRebuilt Event

Fired on the unified event bus when a per-run service factory is invoked to rebuild a service instance during resume:

{ type: 'ServiceRebuilt'; runId: string; key: string; timestamp: number }

Related Packages

Official Plugins

  • @quarry-systems/drift-http - HTTP requests
  • @quarry-systems/drift-timer - Delays and scheduling
  • @quarry-systems/drift-openai - OpenAI integration
  • @quarry-systems/drift-secrets - Secrets management
  • @quarry-systems/drift-store-sqlite - SQLite persistence
  • @quarry-systems/drift-vector-chroma - Vector storage

Examples

See the examples directory for complete examples:

  • Basic workflows
  • Conditional branching
  • Retry loops
  • Service integration
  • Plugin development

Contributing

This package is part of the Drift monorepo. See the main repository for contribution guidelines.

License

Dual-licensed under:

  • AGPL-3.0 for open source projects
  • Commercial License for proprietary use

See LICENSE.md for details.

For commercial licensing:

Support