llm-state-graph
v1.1.3
Published
轻量级前端状态图工作流引擎,支持线性/条件/并行/循环执行
Maintainers
Readme
LLM State Graph
轻量级前端状态图工作流引擎,支持线性/条件/并行/循环执行
LLM State Graph 是一个类型安全的状态图工作流引擎,专为复杂业务流程编排而设计。它将业务逻辑抽象为状态、节点和边的图结构,提供声明式的 API 和强大的编译时验证能力。
✨ 特性
- 🎯 类型安全 - 完整的 TypeScript 类型推导,编译时捕获错误
- 🚀 高性能 - O(1) 路由查找,优化的批处理执行
- 🔀 多模式执行 - 支持线性、条件分支、并行、循环执行
- ✅ 编译时验证 - 检测不可达节点、死端节点等配置错误
- 🌊 流式执行 - 支持异步生成器,实时产出执行事件
- 🔄 错误恢复/重试 - 节点级重试策略,支持退避、降级、跳过
- ⏪ 状态回滚 - 每步自动快照,支持回滚到任意步骤
- 🛑 人工介入 - 中断点暂停执行,等待人工输入后恢复
- 🪵 调试日志 - 内置 Logger,
isDebug一键开启调试日志 - 📦 零依赖 - 纯 TypeScript 实现,无第三方依赖
- 🔄 通用兼容 - 可运行在浏览器、小程序、Node.js 环境
- 🔧 依赖注入 - 支持 Context 依赖注入,节点和路由函数可访问外部服务
📦 安装
npm install llm-state-graph🚀 快速开始
基础示例:智能客服
import { StateGraph, START, END } from 'llm-state-graph'
// 1. 定义状态
const ChatState = {
userInput: { default: '' },
intent: { default: '' },
reply: { default: '' },
} as const
// 2. 构建图
const graph = new StateGraph({ state: ChatState })
.addNode('detect', state => {
const intent = state.userInput.includes('订单') ? 'order' : 'chat'
return { intent }
})
.addNode('orderReply', () => ({ reply: '您的订单已发货' }))
.addNode('chatReply', () => ({ reply: '你好,有什么可以帮您?' }))
.addEdge(START, 'detect')
// intent 条件分支,根据意图跳转不同回复节点
// intent 为 'order' 时,跳转到 'orderReply' 节点
.addConditionalEdges('detect', state => state.intent, {
order: 'orderReply',
chat: 'chatReply',
})
.addEdge('orderReply', END)
.addEdge('chatReply', END)
// 3. 编译并执行
const app = graph.compile()
const result = await app.invoke({ userInput: '查订单' })
console.log(result.state.reply) // "您的订单已发货"带依赖注入的示例
// 定义上下文(依赖注入)
interface AppContext {
llmService: {
analyze: (text: string) => Promise<string>
}
dbService: {
query: (id: string) => Promise<any>
}
}
const context: AppContext = {
llmService: { analyze: async text => 'intent:result' },
dbService: { query: async id => ({ id }) },
}
// 构建图,传入 context
const graph = new StateGraph({ state: ChatState, context })
.addNode('detect', async (state, ctx) => {
// 使用 context 中的 LLM 服务
const intent = await ctx.llmService.analyze(state.userInput)
return { intent }
})
.addNode('orderQuery', async (state, ctx) => {
// 使用 context 中的数据库服务
const order = await ctx.dbService.query('12345')
return { reply: `订单状态: ${order.status}` }
})
.addConditionalEdges('detect', (state, ctx) => state.intent)
.addEdge(START, 'detect')
.addEdge('orderQuery', END)并行执行示例
const ReportState = {
topic: { default: '' },
macroResult: { default: '' },
competitorResult: { default: '' },
report: { default: '' },
} as const
const graph = new StateGraph({ state: ReportState })
.addNode('fanOut', () => ({}))
.addNode('macroAnalysis', () => ({ macroResult: '宏观环境稳定' }))
.addNode('competitorAnalysis', () => ({ competitorResult: '竞品分析完成' }))
.addNode('summary', state => ({
report: `${state.macroResult}\n${state.competitorResult}`,
}))
.addEdge(START, 'fanOut')
// 创建 fanOut 节点,并行执行 macroAnalysis 和 competitorAnalysis 节点
// 当返回的是数组时,就是并行边
.addConditionalEdges('fanOut', () => ['macroAnalysis', 'competitorAnalysis'], {
macroAnalysis: 'macroAnalysis',
competitorAnalysis: 'competitorAnalysis',
})
.addEdge('macroAnalysis', 'summary')
.addEdge('competitorAnalysis', 'summary')
.addEdge('summary', END)
const app = graph.compile()
const result = await app.invoke({ topic: 'AI 行业研报' })流式执行
const stream = app.stream({ userInput: '你好' })
for await (const event of stream) {
switch (event.type) {
case 'nodeStart':
console.log(`开始执行节点: ${event.node}`)
break
case 'nodeEnd':
console.log(`节点执行完成: ${event.node}`)
break
case 'stateSnapshot':
console.log('当前状态:', event.state)
break
}
}StreamEvent 事件格式
前端通过 app.stream() 的 for await 逐步接收以下事件,根据 type 字段决定如何渲染 UI:
| type | 触发时机 | 字段 | 前端渲染建议 |
| --------------- | ------------ | -------------------------------------------------- | ---------------------------------------------- |
| nodeStart | 节点开始执行 | node: string | 显示加载状态,如"正在分析..." |
| nodeEnd | 节点执行完成 | node: string, output: object | 隐藏加载,展示节点输出 |
| stateSnapshot | 批次合并后 | state: object | 更新全局状态展示 |
| nodeRetry | 节点重试中 | node: string, attempt: number, error: Error | 提示重试进度,如"重试第 2 次..." |
| interrupt | 执行到中断点 | node: string, prompt?: string, state: object | 弹出确认框/输入框,等待用户操作后调 resume() |
| error | 节点执行出错 | node: string, error: Error | 展示错误信息 |
一个完整的 Agent 澄清问答流程,前端收到的事件序列如下:
{ "type": "nodeStart", "node": "analyze" }
{ "type": "nodeEnd", "node": "analyze", "output": { "needsClarification": true, "clarifyQuestion": "请补充信息", "answer": "" } }
{ "type": "interrupt", "node": "analyze", "prompt": "Agent 需要您澄清问题", "state": { "question": "那个东西怎么弄", "needsClarification": true, "clarifyQuestion": "请补充信息", "answer": "" } }用户澄清后,resume() 继续执行:
{ "type": "nodeStart", "node": "reply" }
{ "type": "nodeEnd", "node": "reply", "output": { "answer": "已为您处理:那个东西怎么弄(补充:我想查订单物流)" } }
{ "type": "stateSnapshot", "state": { "question": "那个东西怎么弄", "clarification": "我想查订单物流", "needsClarification": true, "clarifyQuestion": "请补充信息", "answer": "已为您处理:那个东西怎么弄(补充:我想查订单物流)" } }如果问题明确不需要澄清,事件序列为:
{ "type": "nodeStart", "node": "analyze" }
{ "type": "nodeEnd", "node": "analyze", "output": { "needsClarification": false, "answer": "安全通过" } }
{ "type": "stateSnapshot", "state": { "question": "我的订单什么时候发货", "needsClarification": false, "answer": "安全通过" } }
{ "type": "nodeStart", "node": "reply" }
{ "type": "nodeEnd", "node": "reply", "output": { "answer": "已为您处理:我的订单什么时候发货" } }
{ "type": "stateSnapshot", "state": { "question": "我的订单什么时候发货", "needsClarification": false, "answer": "已为您处理:我的订单什么时候发货" } }📖 核心概念
状态(State)
状态是全局共享的数据对象,作为"黑板"在节点间传递信息。
const MyState = {
counter: { default: 0 },
messages: {
reducer: (a: string[], b: string[]) => a.concat(b),
default: () => [] as string[],
},
} as const上下文(Context)
上下文用于依赖注入,节点和路由函数可以访问外部服务(如 LLM、数据库等)。
interface MyContext {
llm: LLMService
db: DatabaseService
}
const graph = new StateGraph({ state: MyState, context: myContext })
// 节点函数签名:(state, context) => Update
.addNode('analyze', async (state, ctx) => {
const result = await ctx.llm.analyze(state.text)
return { analysis: result }
})节点(Node)
节点是执行具体业务逻辑的最小单元,接收状态和上下文,返回状态更新。
async function myNode(state: MyStateType, context: MyContext) {
// 执行业务逻辑,可使用 context 中的服务
return { counter: state.counter + 1 }
}边(Edge)
边定义节点间的流转逻辑,支持静态指向和动态路由。
// 静态边
.addEdge('nodeA', 'nodeB')
// 条件边(路由函数也可访问 context)
.addConditionalEdges('router', (state, ctx) => state.route, {
path1: 'nodeA',
path2: 'nodeB',
})错误恢复/重试
节点支持配置重试策略和错误处理,避免因临时故障导致整个流程中断。
const graph = new StateGraph({ state: MyState })
// 自动重试:最多重试 3 次,指数退避
.addNode(
'apiCall',
async (state, ctx) => {
return await ctx.service.fetch(state.query)
},
{
retry: {
maxRetries: 3,
delay: attempt => Math.pow(2, attempt) * 100, // 100ms, 200ms, 400ms
},
onError: {
strategy: 'fallback', // abort(中止) | skip(跳过) | fallback(降级)
fallbackOutput: { result: '服务暂时不可用' },
},
}
)retry.maxRetries- 最大重试次数(默认 0,不重试)retry.delay- 重试间隔(ms),支持函数实现退避策略onError.strategy- 重试耗尽后的策略:'abort'(默认)— 中止整个执行'skip'— 跳过该节点,不产出状态更新,继续后续节点'fallback'— 使用fallbackOutput作为节点输出
流式模式会产出 nodeRetry 事件,可用于 UI 展示重试进度。
状态回滚
执行过程中每步自动保存状态快照,支持回滚到任意步骤。
const result = await app.invoke({ value: 0 })
// 查看最终状态
console.log(result.state)
// 回滚到第 1 步之前的状态
const rolledBack = result.rollback(0)
console.log(rolledBack) // 回滚后的状态快照人工介入 (Human-in-the-Loop)
通过 interrupt 配置标记中断点,执行到该节点时暂停,等待人工输入后恢复。
const FormState = {
name: { default: '' },
email: { default: '' },
confirmed: { default: false },
} as const
const graph = new StateGraph({ state: FormState })
.addNode('input', () => ({ name: 'lizhooh' }), {
interrupt: { prompt: '请确认您的信息' },
})
.addNode('confirm', s => ({ confirmed: s.email !== '' }))
.addEdge(START, 'input')
.addEdge('input', 'confirm')
.addEdge('confirm', END)
const app = graph.compile()
// 第一次执行,中断在 input 节点
const intResult = await app.invoke({})
// intResult 是 InterruptResult:{ node, prompt, state, steps }
// 人工输入 email 后恢复执行
const finalResult = await app.resume(intResult, { email: '[email protected]' })
// finalResult.state.confirmed === true动态中断(Agent 自主决定是否需要澄清)
通过 when 条件函数,让 Agent 根据运行时状态动态决定是否中断请求用户澄清:
const ChatState = {
question: { default: '' },
clarification: { default: '' },
needsClarification: { default: false },
clarifyQuestion: { default: '' },
answer: { default: '' },
} as const
const graph = new StateGraph({ state: ChatState, context: { llm: myLLMService } })
.addNode(
'analyze',
async (state, ctx) => {
// Agent 分析用户问题,判断是否需要澄清
const result = await ctx.llm.analyze(state.question)
return {
needsClarification: result.needsClarification,
clarifyQuestion: result.clarifyQuestion, // "您能具体说一下需求吗?"
answer: result.draftAnswer,
}
},
{
interrupt: {
prompt: 'Agent 需要您澄清问题',
// 动态判断:只有当 Agent 认为问题模糊时才中断
when: state => state.needsClarification === true,
},
}
)
.addNode('reply', async (state, ctx) => {
// 根据用户澄清后的信息给出最终回答
const answer = await ctx.llm.answer(state.question, state.clarification)
return { answer }
})
.addEdge(START, 'analyze')
.addEdge('analyze', 'reply')
.addEdge('reply', END)
const app = graph.compile()
// 问题明确 → when 返回 false → 不中断,直接完成
const directResult = await app.invoke({ question: '我的订单什么时候发货' })
// directResult.state.answer === '已为您处理:我的订单什么时候发货'
// 问题模糊 → when 返回 true → 中断,等待用户澄清
const intResult = await app.invoke({ question: '那个东西怎么弄' })
// intResult 是 InterruptResult,包含 Agent 的澄清问题
// 需要自行判断是否是 InterruptResult
// 用户提供澄清后恢复执行
const finalResult = await app.resume(intResult, { clarification: '我想查订单物流' })
// finalResult.state.answer === '已为您处理:那个东西怎么弄(补充:我想查订单物流)'interrupt.prompt- 中断提示信息interrupt.inputKey- 中断时注入状态值的键名interrupt.when- 动态中断条件函数,返回true时中断,不传则始终中断app.resume(interruptResult, humanInput)- 从中断点恢复执行
🎯 使用场景
智能客服对话代理
此场景展示了如何利用条件分支与循环来处理用户的动态需求。
用户输入
↓
[开始节点] 初始化对话状态
↓
[意图识别节点] 判断意图 → 写入状态
↓
路由决策边 ──────────────────────────────┐
├─ "查询订单" ──→ [订单查询工具节点]
├─ "投诉建议" ──→ [工单创建工具节点]
└─ "闲聊" ──→ [通用回复节点]
↓
[回复生成节点] 生成回复文本
↓
输出回复给用户
↓
是否多轮追问?
├─ 是 → 回环至 [意图识别节点]
└─ 否 → 流程结束多维度市场研报生成
此场景展示了经典的 扇出-扇入 模式,用于处理耗时且独立的并行任务。
研报主题输入
↓
[初始化节点] 设定时间范围
↓
[大纲规划节点] 生成大纲 → 维度列表写入状态
↓
并行分发边 ──────────────────────────────┐
├──────→ [宏观分析节点] 调用政策数据库
├──────→ [竞品分析节点] 抓取财报与新闻
└──────→ [技术分析节点] 检索专利与论文
↓ ↓ ↓
═══════════ 汇聚等待 ═══════════
↓
[汇总生成节点] 拼接研报草稿
↓
输出最终研报智能代码审查助手
此场景展示了如何利用自我修正循环来提升任务质量。
代码提交
↓
[代码提交节点] 接收代码片段
↓
[静态检查节点] 基础语法检查
↓
[AI 审查节点] 深度审查 → 生成审查意见列表
↓
质量评估边 ──────────────────────────────┐
├─ 存在严重问题 ──→ [代码修正节点] 生成修复代码
│ ↓
│ 回环至 [AI 审查节点] ←──┘
│ ↓ (二次审查)
│ 仍存在问题?
│ ├─ 是 → 继续循环
│ └─ 否 → ↓
└─ 无问题 ──────────→ [通过节点] 输出评分与证书
↓
流程结束电商大促活动配置向导
此场景展示了如何利用线性流程处理严格顺序的用户交互。
[活动创建节点] 填写名称、时间
↓
[规则配置节点] 配置满减、折扣规则
↓
[选品节点] 选择参与活动商品
↓
校验边:必填项是否完整?
├─ 否 → 返回对应节点补充
└─ 是 → ↓
[预览节点] 生成活动预览链接
↓
[发布节点] 调用发布接口上线
↓
跳转至活动管理列表实时协作白板同步
此场景展示了事件驱动与状态合并的混合模式。
[连接节点] 建立 WebSocket 连接
↓
[事件监听] 持续监听推送事件流 ←──────────┐
↓ │
事件分发边 │
├─ 绘图事件 ──→ [图形渲染节点] │
├─ 文本事件 ──→ [文本更新节点] │
└─ 光标事件 ──→ [光标同步节点] │
↓ │
更新 Canvas/DOM │
↓ │
是否断开信号? │
├─ 否 → 回环 ──────────┘
└─ 是 → [断开连接]
↓
流程结束License
MIT © lizhooh
