@cogstream/agent-graph
v0.4.0
Published
LangGraph-based decision graph for the CogStream agent. Receives behavioral episodes, runs interpretation, evaluates intervention triggers, builds interventions, and emits AG-UI events.
Downloads
225
Readme
@cogstream/agent-graph
LangGraph-based decision graph for the CogStream agent. Receives behavioral episodes, runs interpretation, evaluates intervention triggers, builds interventions, and emits AG-UI events.
Server-only. Never import from browser-executed code.
Installation
npm install @cogstream/agent-graphQuick start
import { getCogstreamGraph } from '@cogstream/agent-graph';
// Uses the default CogStream interpretation pipeline.
// Requires @cogstream/interpretation to be installed.
const graph = getCogstreamGraph();
const result = await graph.invoke(
{
sessionId: 'user-abc-123',
incomingEpisode: episode,
},
{
configurable: {
thread_id: 'user-abc-123',
streamWriter: (event) => sendToClient(event),
},
},
);Checkpointing
Thread state persists across invocations using LangGraph's checkpointer. By default it uses an in-memory store (suitable for local dev and tests).
Set REDIS_URL to enable Redis-backed persistence:
REDIS_URL=rediss://your-redis-host:6380Both redis:// and rediss:// (TLS) schemes are supported.
Custom interpretation adapter
InterpretationAdapter is the public extension point for custom episode → USM pipelines. Implement it to integrate your own interpretation logic without depending on the private @cogstream/interpretation package.
import { getCogstreamGraph } from '@cogstream/agent-graph';
import type {
InterpretationAdapter,
InterpretationSnapshot,
InterpretationResult,
} from '@cogstream/agent-graph';
import type { EpisodeV2 } from '@cogstream/types';
class MyAdapter implements InterpretationAdapter {
async processEpisode(
episode: EpisodeV2,
snapshot: InterpretationSnapshot,
): Promise<InterpretationResult> {
const usm = await myPipeline.process(episode);
return { usm, adapterState: snapshot.adapterState };
}
}
const graph = getCogstreamGraph(new MyAdapter());API
getCogstreamGraph(adapter?)
Returns a compiled LangGraph graph. If adapter is omitted, the default CogStream interpretation adapter is loaded dynamically from @cogstream/interpretation.
AgentStateAnnotation / AgentState
The LangGraph state annotation and its TypeScript type. Use these to extend the graph or read state in custom nodes.
getCheckpointer()
Returns the configured checkpointer (Redis when REDIS_URL is set, MemorySaver otherwise). Useful when building a custom graph that reuses CogStream's checkpointing configuration.
InterpretationAdapter
interface InterpretationAdapter {
processEpisode(
episode: EpisodeV2,
snapshot: InterpretationSnapshot,
): Promise<InterpretationResult>;
}InterpretationSnapshot
Per-thread state passed into and returned from the adapter on each invocation. usm is read by trigger-evaluation nodes; adapterState is an opaque blob owned by the adapter.
interface InterpretationSnapshot {
sessionId: string;
usm: UserStateModel | null;
adapterState?: unknown;
}InterpretationResult
interface InterpretationResult {
usm: UserStateModel;
adapterState?: unknown;
applicablePolicies?: PolicyContext[]; // ranked by final_confidence
}applicablePolicies is optional — adapters without graph access return undefined or [].
Policy-driven decisions
The graph uses learned policies for intervention decisions instead of hardcoded rules. Policies are loaded by the adapter (from Neo4j or equivalent), passed via InterpretationResult.applicablePolicies, and selected in the evaluate_triggers node based on confidence, user preferences, and element context.
Element-level context (DOM element difficulty, semantic hints) is forwarded through the agent state so that interventions are proportional — high-stakes elements receive explanations, simple fields receive brief tips.
