npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

llmrx-core

v0.3.0

Published

Reactive DG/DAG execution engine for LLM agents. Single file. One dependency (rxjs). Zero opinions about your stack.

Readme

llmrx-core

npm version license

Reactive graph execution engine for LLM agents. DAG, DG, or hybrid. Single file. One dependency (rxjs).

Every action — LLM call, tool execution, API request, human approval — flows through exec$ with two-phase policy resolution, limit tracking, and interrupt handling.

⚠ API unstable — breaking changes possible until v1.0. Pin to an exact version.

Install

pnpm add llmrx-core

Runnable examples live in examples/:

What it does

  • DAG/DG/Hybrid scheduling — topological ordering, parallel branches, back-edge detection, round-based cycle iteration
  • Oracle types — every async action goes through a registered oracle with its own policies. Hierarchical keys: "llm:anthropic:claude" inherits from "llm:anthropic" and "llm"
  • Two-phase policy resolution — capture (top-down, force only) then bubble (bottom-up, first deny/stop wins)
  • Built-in limit tracking — cost, calls, context, rounds, timeout, retries. Time-windowed counters per (oracle, type, window)
  • Mutable policies — declare which oracles may raise a limit's max at runtime, with ceiling functions. Auto-interrupts instead of killing
  • Nested graphsspawnSync (blocking), spawnAsync (fire-and-forget)
  • Full observability — every exec$, policy check, mutation, and lifecycle event is a typed Signal

What it does NOT do

  • Choose your LLM provider — you inject call
  • Define your tools — you inject executeTool
  • Structure your prompts — you implement loadPrompt
  • Pick your database — your handlers do IO
  • Decide what permissions mean — your policies resolve them
  • Choose your approval flow — you implement handleInterrupt
  • Log anything — subscribe to the signal stream

Quick start

import { createEngine, constraints } from "llmrx-core";

const engine = createEngine({
  oracles: {
    "llm": {
      policies: [
        { type: "calls", max: 1000, window: "day" },
      ],
    },
    "llm:anthropic:claude": {
      policies: [
        {
          type: "cost", max: 10, window: "day",
          extract: (r) => {
            const u = (r as { usage: { input_cost: number; output_cost: number } }).usage;
            return { prompt: u.input_cost, completion: u.output_cost };
          },
          mutable: {
            "oracle:human:uri:telegram": (next, current, max) => next <= 50,
            "oracle:human:cfo:email": true,
          },
        },
        { type: "timeout", max: 45_000 },
        { type: "retries", max: 2 },
      ],
    },
    "tool": {
      policies: [
        { type: "calls", max: 200, window: "invocation" },
      ],
    },
    "api:market:alpaca": {
      policies: [
        { type: "calls", max: 200, window: "minute" },
        { policy: (action) => {
            const h = new Date(action.timestamp)
              .toLocaleString("en", { timeZone: "CET", hour: "numeric", hour12: false });
            return +h >= 9 && +h < 17
              ? { approval: "allow" as const }
              : { approval: "deny" as const, reason: "outside CET trading hours" };
          },
        },
      ],
    },
  },
  policies: [constraints.maxCycles(0), constraints.maxDepth(10)],
  handleInterrupt: async (interrupt, ctx) => {
    // route to approval oracle, return retry/deny/result/mutate
  },
});

engine.exec$(manifest, "router", "user message")
  .subscribe((event) => {
    // typed union — switch on event.type
  });

Oracle API

Inside Node.execute(), the consumer gets ctx.oracle:

oracle.llm() — the LLM loop

Call LLM, execute tools, loop. Round limits come from oracle policies.

execute(layers, ctx) {
  return ctx.oracle.llm({
    oracle: "llm:anthropic:claude",
    // turns: append-only transcript of assistant + tool_result turns,
    // grown by the engine across rounds. Your provider translates it
    // to its wire format (Anthropic blocks, OpenAI tool_call_id, etc.)
    // and pairs each tool_result with its tool_use_id.
    call: async (turns) => {
      const r = await anthropic.messages.create({ system: layers, messages: toMessages(turns) });
      const text = r.content.find(b => b.type === "text")?.text ?? "";
      const tool_calls = r.content.filter(b => b.type === "tool_use")
        .map(b => ({ id: b.id, name: b.name, input: b.input }));
      return { text, tool_calls };
    },
    executeTool: (tc, ctx) => toolRegistry.execute(tc, ctx), // tc: { id, name, input }
  });
}

oracle.call() — generic tracked call

Any async work. Tracked, policy-checked, timed.

ctx.oracle.call("api:market:alpaca", () => fetchPrices("AAPL"))
ctx.oracle.call("oracle:human:uri:telegram", () => sendAndWait(trade))

exec$

Two forms — registered oracle or ad-hoc:

// Registered oracle — engine calls its executor
ctx.exec$("api:market", { symbol: "AAPL" })

// Ad-hoc — you provide the function
ctx.exec$("llm:anthropic:claude", () => callProvider(layers))

Policies

Two kinds on one array:

Built-in policies (OraclePolicy)

Engine handles counting, windows, and extraction:

| type | auto-counted | needs extract | description | |------|-------------|---------------|-------------| | cost | | yes | granular breakdown: { prompt: 0.02, completion: 0.01 } | | calls | +1 per exec$ | | | | context | | yes | e.g. { input_tokens: 3200, output_tokens: 1000 } | | rounds | +1 per llm loop | | only inside oracle.llm() | | retries | | | engine wraps exec$ with retry(max). optional backoff | | timeout | | | engine wraps exec$ with timeout(max) ms | | x:* | | yes | custom. engine just tracks the counter |

interface OraclePolicy {
  type: string;
  max: number;
  window?: TimeWindow;
  extract?: (result: unknown) => Record<string, number>;
  mutable?: Record<string, true | ((next: number, current: number, max: number) => boolean)>;
  backoff?: "exponential" | number | ((attempt: number) => number);
}

Backoff applies to type: "retries":

  • "exponential" — 1s, 2s, 4s, 8s... (1000 * 2^attempt)
  • number — fixed ms delay between retries
  • (attempt) => number — custom function, receives 0-indexed attempt

Custom policies (CustomPolicy)

Consumer implements policy():

interface CustomPolicy {
  id?: string;
  policy(action: PolicyAction): PolicyResult;
}

PolicyResult — what policies return

{ approval: "allow" }                              continue bubbling
{ approval: "allow", stop: true }                  stop bubbling here
{ approval: "allow", force: true }                 capture phase — override everything
{ approval: "deny", reason: "..." }                deny, stop
{ approval: "deny", reason: "...", force: true }   capture phase — force deny
{ approval: "interrupt", interrupt: ... }          pause execution

Two-phase resolution

Every exec$ resolves policies in bubble order:

exec$("llm:anthropic:claude", ...)

CAPTURE (top-down):  global → graph → node → "llm" → "llm:anthropic" → "llm:anthropic:claude"
  Only force policies. If any fires, that's final.

BUBBLE (bottom-up):  "llm:anthropic:claude" → "llm:anthropic" → "llm" → node → graph → global
  Normal policies. First deny/stop wins.

Mutable policies

Any limit can declare who may raise its max at runtime:

{
  type: "cost", max: 10, window: "day",
  extract: (r) => ({ prompt: r.usage.input_cost, completion: r.usage.output_cost }),
  mutable: {
    "oracle:human:uri:telegram": (next, current, max) => next <= 50,  // ceiling function
    "oracle:human:cfo:email": true,                                   // approve anything
  },
}

When exceeded: auto-interrupt → handleInterrupt → approver returns { decision: "mutate", oracle, limitType, max } → engine validates ceiling → mutate + retry.

Constraints

Shipped as CustomPolicy factories. Put them at any level — global, graph, or node.

constraints.maxCycles(0)                // no node can repeat in ancestry
constraints.maxDepth(10)                // max spawn depth
constraints.maxLoop("feedback_loop", 3) // allow this node to loop 3x (stops bubbling)

Time windows

| window | resets | |--------|--------| | "minute" / "hour" / "day" / "week" / "month" | sliding window | | "graph" | on each graph execution start | | "invocation" | on each exec$() call | | number | custom ms, sliding |

Signals

Every side effect is a typed Signal. All carry { nodeKey, invocationId, graphId, nodeId, ancestry }.

| type | when | |------|------| | graph:before / graph:after / graph:error | graph lifecycle (carry key) | | node:before / node:after / node:error | node lifecycle | | oracle:exec:in / oracle:exec:out | exec$ lifecycle | | oracle:denied | custom policy denied | | oracle:exceeded | limit exceeded pre-exec | | oracle:interrupt:in / oracle:interrupt:out | interrupt lifecycle | | oracle:policy:${key} | policy limit check post-exec | | oracle:mutation:${key} | policy limit mutated at runtime | | x:* | consumer extension namespace |

Architecture

exec$(manifest, key, message, signal?)
  └─ executeGraph$(topology)     — schedule nodes by edges, handle cycles
       └─ executeNode$(nodeKey)  — load prompt → execute → after
            └─ exec$()           — policies → check → execute → extract → accumulate

Types

// ── Manifest — what the consumer provides ──

interface Manifest {
  getNode(key: string): Node;
  getNodeKeys(): string[];
  isNode(key: string): boolean;
  getGraph(key: string): Graph;
  getGraphPolicies?(key: string): Policy[] | null;
  getNodePolicies?(nodeKey: string): Policy[] | null;
  newId?(entity: "manifest" | "graph" | "node" | "oracle" | "policy"): string;
  getMap?(nodeKey: string): MapConfig | null;
}

interface Graph {
  readonly nodes: string[];
  readonly edges: Array<{ from: string; to: string }>;
}

interface MapConfig {
  readonly items: string;         // key in upstream output to iterate
  readonly subTopology: Graph;    // sub-graph executed per item
}

interface Oracle {
  policies: Policy[];
  executor?: (input: unknown) => Promise<unknown>;
}

interface Node {
  loadPrompt(input: unknown, ctx: NodeExec): Observable<Layer[]>;
  execute(layers: Layer[], ctx: NodeExec): Observable<NodeResult>;
}

// ── createEngine → Engine ──

interface EngineDef {
  oracles: Record<string, Oracle>;
  policies?: CustomPolicy[];
  handleInterrupt?: (interrupt: Interrupt, ctx: InterruptCtx) => Promise<InterruptResult>;
  interruptTimeout?: number;      // default 120_000 ms
}

function createEngine(config: EngineDef): Engine;

interface Engine {
  exec$(manifest: Manifest, key: string, input: unknown, signal?: AbortSignal): Observable<Signal>;
  snapshot(): PolicySnapshot;
  restore(snap: PolicySnapshot): void;
}

// ── Errors — thrown by exec$ ──

class Interrupt { constructor(public readonly payload: unknown) {} }
class Denied extends Error { readonly node: string; readonly oracle: string; readonly reason: string; }
class PolicyExceeded extends Error { readonly label: string; readonly value: number; readonly limit: number; }
class Aborted extends Error {}

// ── Runtime — what execute() receives ──

interface NodeExec {
  counters(oracle: string): ReadonlyArray<{ type: string; accumulated: number; max: number; window?: TimeWindow }>;
  exec$<T>(oracle: string, input?: unknown): Observable<T>;
  exec$<T>(oracle: string, fn: () => Promise<T>, input?: unknown): Observable<T>;
  oracle: OracleExec;
  spawnSync(opts: { key: string; data: unknown }): Observable<Signal>;
  spawnAsync(opts: { key: string; data: unknown }): void;
  handleInterrupt(interrupt: Interrupt): Observable<ResolvedInterrupt>;
  checkAncestry(target: string): string | null;
  node(key: string): { layers: Layer[]; output: unknown } | null;
  ancestry: readonly AncestryEntry[];
  keys: string[];
  isNode(key: string): boolean;
  readonly invocationId: string;
  readonly graphId: string;
  readonly nodeId: string;
}

interface ToolCall { id: string; name: string; input: unknown; }
interface ToolResult { tool_use_id: string; content: string; is_error?: boolean; }
type Turn =
  | { role: "user"; content: string }
  | { role: "assistant"; text: string; tool_calls?: ToolCall[] }
  | { role: "tool_result"; results: ToolResult[] };

interface OracleExec {
  llm(opts: {
    oracle: string;
    call: (turns: Turn[]) => Promise<{ text: string; tool_calls?: ToolCall[] }>;
    executeTool?: (tc: ToolCall, ctx: NodeExec) => Promise<{ content: string; is_error?: boolean; signals?: Signal[] }>;
  }): Observable<NodeResult>;
  call<T>(oracle: string, fn: () => Promise<T>, input?: unknown): Observable<T>;
}

interface NodeResult {
  output: unknown;
  signals?: Signal[];
}

interface Layer {
  name: string;
  content: unknown;
}

// ── Policy types ──

interface PolicyAction {
  readonly node: string;
  readonly oracle: string;
  readonly input: unknown;
  readonly history: readonly string[];
  readonly ancestry: readonly AncestryEntry[];
  readonly target?: string;
  readonly timestamp: number;
  readonly invocationId: string;
  readonly graphId: string;
  readonly nodeId: string;
  readonly oracleId: string;
  readonly policyId: string;
}

type InterruptResult =
  | { decision: "retry" }
  | { decision: "deny"; reason: string }
  | { decision: "result"; value: unknown }
  | { decision: "mutate"; oracle: string; limitType: string; max: number }

// ── Primitives ──

type AncestryEntry = {
  readonly nodeKey: string;
  readonly nodeId: string;
  readonly graphId: string;
  readonly oracleId: string;
  readonly policyId: string;
};

type TimeWindow = "minute" | "hour" | "day" | "week" | "graph" | "invocation" | number;

type Policy = OraclePolicy | CustomPolicy;

type PolicySnapshot = ReadonlyArray<{
  key: string;
  accumulated: number;
  max: number;
  originalMax: number;
  windowStart: number;
}>;

Persistence

// shutdown
const snap = engine.snapshot();
await db.save("llmrx_state", snap);

// boot
const saved = await db.load("llmrx_state");
if (saved) engine.restore(saved);

Abort

Pass an AbortSignal to exec$(). Pending exec$ calls throw Aborted.

License

MIT