@quarry-systems/drift-core
v0.3.0-alpha.3
Published
A managed cyclic graph library for building complex workflows with cycles, conditional logic, and multiple endpoints
Maintainers
Readme
@quarry-systems/drift-core
A powerful managed cyclic graph library for building complex workflows with cycles, conditional logic, and multiple endpoints.
Overview
Drift Core is the execution engine for managed cyclic graphs. It provides a fluent API for building graphs, a robust execution system with persistence, and comprehensive support for guards, rules, actions, events, middleware, and plugins.
Installation
npm install @quarry-systems/drift-core @quarry-systems/drift-contractsFeatures
- ✅ Cyclic Graph Support: Native support for cycles and loops
- ✅ Fluent Builder API: Intuitive graph construction with
ManagedCyclicGraph - ✅ Execution Manager: Orchestrate graph execution with persistence and resumption
- ✅ Guards & Rules: Conditional logic for edge traversal and node execution
- ✅ Actions: Transform context data during node execution
- ✅ Event System: Subscribe to graph and node lifecycle events
- ✅ Middleware: Intercept and observe execution flow
- ✅ Plugin System: Extend functionality with custom node types
- ✅ Persistence: Save and resume execution state
- ✅ Service Injection: Provide services to nodes at runtime
- ✅ Error Handling: Comprehensive error tracking and remediation
- ✅ TypeScript First: Full type safety with generics
Quick Start
Basic Graph
import { ManagedCyclicGraph, GuardPresets, Manager } from '@quarry-systems/drift-core';
// Build a simple workflow
const graph = new ManagedCyclicGraph('hello-world')
.guard('any', GuardPresets.any())
.node('start', {
label: 'Start',
execute: [(ctx) => {
console.log('Starting workflow...');
return ctx;
}]
})
.node('process', {
label: 'Process Data',
execute: [(ctx) => ({
...ctx,
data: { ...ctx.data, processed: true }
})]
})
.node('end', {
label: 'End',
isEndpoint: true
})
.edge('start', 'process', 'any')
.edge('process', 'end', 'any')
.start('start')
.build();
// Execute the graph
const manager = new Manager(graph);
const runId = await manager.start({ userId: '123' });
console.log('Execution complete:', runId);Conditional Branching
const graph = new ManagedCyclicGraph('conditional-flow')
.guard('any', GuardPresets.any())
.guard('isValid', (ctx) => ctx.data.valid === true)
.guard('isInvalid', (ctx) => ctx.data.valid !== true)
.node('validate', {
label: 'Validate Input',
execute: [(ctx) => ({
...ctx,
data: {
...ctx.data,
valid: ctx.data.value > 0
}
})]
})
.node('success', { label: 'Success Path', isEndpoint: true })
.node('failure', { label: 'Failure Path', isEndpoint: true })
.edge('validate', 'success', 'isValid')
.edge('validate', 'failure', 'isInvalid')
.start('validate')
.build();
const manager = new Manager(graph);
await manager.start({ value: 10 }); // Goes to 'success'
await manager.start({ value: -5 }); // Goes to 'failure'Cyclic Workflows
const graph = new ManagedCyclicGraph('retry-loop')
.guard('any', GuardPresets.any())
.guard('shouldRetry', (ctx) => ctx.data.attempts < 3)
.guard('maxAttempts', (ctx) => ctx.data.attempts >= 3)
.node('attempt', {
label: 'Attempt Operation',
execute: [(ctx) => ({
...ctx,
data: {
...ctx.data,
attempts: (ctx.data.attempts || 0) + 1,
success: Math.random() > 0.5
}
})]
})
.node('success', { label: 'Success', isEndpoint: true })
.node('failed', { label: 'Max Retries', isEndpoint: true })
.edge('attempt', 'success', (ctx) => ctx.data.success === true)
.edge('attempt', 'attempt', 'shouldRetry') // Loop back
.edge('attempt', 'failed', 'maxAttempts')
.start('attempt')
.build();Core Concepts
Context (Ctx)
The context object flows through the graph and contains all execution state:
interface Ctx {
runId: string; // Unique run identifier
data: Record<string, any>; // Mutable workflow data
global: Record<string, any>; // Immutable global config
injected: Record<string, any>; // Runtime injected data
errors: ErrorState[]; // Error tracking
events: Event[]; // Event log
}Guards
Guards determine if an edge can be traversed:
// Function guard
.guard('hasUser', (ctx) => ctx.data.userId !== undefined)
// Inline guard
.edge('start', 'end', (ctx) => ctx.data.ready === true)
// Guard presets
.guard('any', GuardPresets.any())
.guard('none', GuardPresets.none())Rules
Rules determine if a node should execute:
.node('conditional', {
label: 'Conditional Node',
rule: (ctx) => ctx.data.enabled === true,
execute: [/* actions */]
})Actions
Actions transform the context during node execution:
.node('transform', {
label: 'Transform Data',
execute: [
// Action 1: Add timestamp
(ctx) => ({
...ctx,
data: { ...ctx.data, timestamp: Date.now() }
}),
// Action 2: Process data
async (ctx) => {
const result = await processData(ctx.data);
return { ...ctx, data: { ...ctx.data, result } };
}
]
})Advanced Features
Persistence & Resumption
import { Manager, InMemorySnapshotStore } from '@quarry-systems/drift-core';
const store = new InMemorySnapshotStore();
const manager = new Manager(graph, { snapshotStore: store });
// Start execution
const runId = await manager.start({ userId: '123' });
// Later, resume from a snapshot
const snapshot = await store.load(runId);
await manager.resume(runId, snapshot);Service Injection
import { defineService } from '@quarry-systems/drift-core';
// Define a service
const httpService = defineService({
name: 'http',
version: '1.0.0',
scope: 'run',
factory: (ctx) => ({
get: async (url: string) => fetch(url).then(r => r.json())
})
});
// Use in graph
const graph = new ManagedCyclicGraph('with-services')
.node('fetch', {
label: 'Fetch Data',
execute: [async (ctx) => {
const http = ctx.services.http;
const data = await http.get('https://api.example.com/data');
return { ...ctx, data: { ...ctx.data, apiData: data } };
}]
})
.build();
// Provide services at runtime
const manager = new Manager(graph, {
services: { http: httpService }
});Middleware
import { createLoggingMiddleware, createMetricsMiddleware } from '@quarry-systems/drift-core';
const manager = new Manager(graph, {
middleware: [
createLoggingMiddleware({ level: 'info' }),
createMetricsMiddleware()
]
});Event Handling
const graph = new ManagedCyclicGraph('with-events')
.on('graph:start', (event) => {
console.log('Graph started:', event.runId);
})
.on('node:start', (event) => {
console.log('Node started:', event.nodeId);
})
.on('node:end', (event) => {
console.log('Node completed:', event.nodeId, event.duration);
})
.on('graph:end', (event) => {
console.log('Graph completed:', event.runId);
})
.build();Plugin System
import type { Plugin, NodeHandler } from '@quarry-systems/drift-core';
// Create a custom node handler
const delayHandler: NodeHandler = async (node, ctx, meta) => {
const ms = node.meta?.delay || 1000;
await new Promise(resolve => setTimeout(resolve, ms));
return ctx;
};
// Define plugin
const delayPlugin: Plugin = {
name: 'delay-plugin',
version: '1.0.0',
nodes: { 'delay': delayHandler }
};
// Use plugin
const graph = new ManagedCyclicGraph('with-plugin')
.use(delayPlugin)
.node('wait', {
type: 'delay',
meta: { delay: 5000 }
})
.build();Error Handling
const graph = new ManagedCyclicGraph('error-handling')
.node('risky', {
label: 'Risky Operation',
execute: [async (ctx) => {
try {
const result = await riskyOperation();
return { ...ctx, data: { ...ctx.data, result } };
} catch (error) {
return {
...ctx,
errors: [...ctx.errors, {
kind: 'execution',
nodeId: 'risky',
message: error.message,
severity: 'error'
}]
};
}
}]
})
.node('handleError', {
label: 'Error Handler',
rule: (ctx) => ctx.errors.length > 0,
execute: [(ctx) => {
console.error('Errors occurred:', ctx.errors);
return ctx;
}]
})
.build();API Reference
ManagedCyclicGraph
Main builder class for constructing graphs.
Methods:
.guard(name, fn)- Register a guard function.rule(name, fn)- Register a rule function.node(id, config)- Add a node.edge(from, to, guard)- Add an edge.start(nodeId)- Mark a start node.use(plugin)- Register a plugin.on(event, handler)- Subscribe to events.build()- Build the graph
Manager
Execution orchestrator for graphs.
Methods:
.start(injected?, options?)- Start a new execution.resume(runId, snapshot?)- Resume from snapshot.step(runId, options?)- Execute a single step.getSnapshot(runId)- Get current state.stop(runId)- Stop execution
GuardPresets
Common guard functions:
GuardPresets.any()- Always trueGuardPresets.none()- Always false
Utilities
randomId()- Generate unique IDsdeepClone(obj)- Deep clone objectssleep(ms)- Async sleeptimeout(ms, promise)- Add timeout to promisegetNestedValue(obj, path)- Get nested propertysetNestedValue(obj, path, value)- Set nested property
Storage Implementations
InMemorySnapshotStore
import { InMemorySnapshotStore } from '@quarry-systems/drift-core';
const store = new InMemorySnapshotStore();
const manager = new Manager(graph, { snapshotStore: store });FilesystemArtifactStore
import { FilesystemArtifactStore } from '@quarry-systems/drift-core';
const artifactStore = new FilesystemArtifactStore('./artifacts');TypeScript Support
Full TypeScript support with generics:
interface MyGlobal {
apiKey: string;
}
interface MyInjected {
userId: string;
}
const graph = new ManagedCyclicGraph<MyGlobal, MyInjected>('typed-graph')
.node('start', {
execute: [(ctx) => {
// ctx.global.apiKey is typed as string
// ctx.injected.userId is typed as string
return ctx;
}]
})
.build();Related Packages
- @quarry-systems/drift-contracts - Type definitions
- @quarry-systems/drift-ai-core - AI agent functionality
- @quarry-systems/drift-cli - CLI tools
- @quarry-systems/drift-testing - Testing utilities
Official Plugins
@quarry-systems/drift-http- HTTP requests@quarry-systems/drift-timer- Delays and scheduling@quarry-systems/drift-openai- OpenAI integration@quarry-systems/drift-secrets- Secrets management@quarry-systems/drift-store-sqlite- SQLite persistence@quarry-systems/drift-vector-chroma- Vector storage
Examples
See the examples directory for complete examples:
- Basic workflows
- Conditional branching
- Retry loops
- Service integration
- Plugin development
Contributing
This package is part of the Drift monorepo. See the main repository for contribution guidelines.
License
Dual-licensed under:
- AGPL-3.0 for open source projects
- Commercial License for proprietary use
See LICENSE.md for details.
For commercial licensing:
- Email: [email protected]
- Web: https://quarry-systems.com/license
