@luisss/zero
v0.1.1
Published
Zero Agent — functional, event-stream-driven agent runtime for LLMs
Downloads
191
Maintainers
Readme
@luisss/zero
一个 FP 风格、基于 RxJS 的 agent 运行时。 所有数据(system / messages / tools)都由调用方管理,框架不持有任何隐性状态 —— 想持久化、序列化、数据分析、回放、分支,全都顺手就能做。
pnpm add @luisss/zero这是什么
市面上的 agent 框架大多是"给你一个 Agent 对象,你喂它一句话,它内部维护对话历史、调用工具、产生答复"。方便,但有两个问题:
- 状态被框架黑箱持有 —— 想把一次对话存到数据库、恢复到另一台机器接着聊、在 A 对话中开出一个 B 对话分支,都很麻烦。
- 扩展点有限 —— 框架暴露什么钩子你就用什么。想在"发给 LLM 前改 prompt" / "工具执行前加一层权限检查" / "整个 run 超时 cancel" —— 每个都是一次新 PR。
@luisss/zero 走相反的路:
- 所有数据外置。system prompt、messages 历史、tool registry 都是你的纯数据,框架只是一个"接收这堆数据、跑一次 agent loop、吐出事件流"的纯函数。
- 一切皆中间件。agent loop 是一条 Koa 风格的中间件链,循环本身也是其中一个中间件。你可以在任意位置插入自己的逻辑 —— 改 prompt、加预算控制、rewrite tool call、加审计日志,都是同一套机制。
- 事件流 = RxJS Observable。token 一点点吐,工具一个个跑,usage、错误、完成信号,全都通过统一的
AgentEvent流。rxjs 的操作符任你组合 —— 取消、限速、合并、去重、广播 … - 没有 class、没有单例、没有隐藏的"setup"。
createAgent(config)返回的是一个纯配置对象,每次runWithMessages(messages)独立运行,互不影响。
不想写 rxjs?提供 runAndWait (一行拿 Promise<Messages>)和 toAsyncIterable(for await 消费)桥。
架构一眼
你的代码:
messages (你持有) ─┐
system (你持有) ├──▶ createAgent(config).runWithMessages(messages)
tools (你持有) ─┘ │
▼
Observable<AgentEvent>
│
├── text_delta (LLM 吐 token)
├── tool_start / _end (工具调度)
├── turn_start / _end (每轮边界)
├── usage (token 用量)
├── error (LLM 失败)
└── done (reason, messages)Agent loop 的默认链:
[ runLoop, callLLM, maybeRunTools ]
↑ ↑ ↑
while 一次 LLM 一次工具并发批
驱动 调用 + (按 isConcurrencySafe 分区)
turn 边界你可以在任意位置插入自己的中间件。
快速上手
最小例子:一句话 + 一个工具
import {
createAgent,
buildTool,
buildUserTextMessage,
emptyMessages,
emptyTools,
registerTools,
appendMessage,
systemFrom,
createOpenAILLM,
runAndWait,
} from '@luisss/zero';
// 1. 定义一个工具
const nowTool = buildTool({
name: 'now',
description: 'Get the current date and time in ISO format.',
parameters: {
type: 'object',
properties: {},
additionalProperties: false,
},
handler: () => new Date().toISOString(),
});
// 2. 创建 agent —— 纯配置,没有隐藏状态
const agent = createAgent({
llm: createOpenAILLM({ model: 'gpt-4o-mini' }), // OPENAI_API_KEY 环境变量自动读取
system: systemFrom(['You are a helpful assistant.']),
tools: registerTools(emptyTools, [nowTool]),
maxTurns: 5,
});
// 3. 准备 messages —— 你持有,框架只读
const messages = appendMessage(
emptyMessages,
buildUserTextMessage('What time is it?'),
);
// 4. 跑一次,等最终结果
const finalMessages = await runAndWait(agent.runWithMessages(messages));
// finalMessages 现在包含:
// [user, assistant(tool_call), tool(tool_result), assistant(text)]
console.log(finalMessages);监听事件流(流式输出、进度提示)
import { ofEventType, textOnly, shareRun } from '@luisss/zero';
const run$ = agent.runWithMessages(messages).pipe(shareRun());
// token 一出来就打
run$.pipe(textOnly()).subscribe((chunk) => process.stdout.write(chunk));
// 工具调用提示
run$.pipe(ofEventType('tool_start')).subscribe((ev) => {
console.log(`\n[tool] ${ev.name}(${JSON.stringify(ev.input)})`);
});
// 等结束
await new Promise<void>((resolve) => {
run$.pipe(ofEventType('done')).subscribe((ev) => {
console.log(`\n[done] ${ev.reason}`);
resolve();
});
});为什么要
shareRun()?Observable 冷流,每订阅一次 = 跑一次 agent。shareRun()把它变成热流,多路订阅共享同一次运行。
多轮对话 —— 数据自己管
import { createMessagesRef, syncMessages, runAndWait } from '@luisss/zero';
// 用一个 ref 持有历史
const history = createMessagesRef(emptyMessages);
// 第 1 轮
const r1 = await runAndWait(
agent
.runWithMessages(appendMessage(history.current, buildUserTextMessage('Hi, I am Alice.')))
.pipe(syncMessages(history)), // ★ syncMessages 会自动把 history.current 同步到最新
);
// 第 2 轮 —— history.current 已经包含第 1 轮所有消息
await runAndWait(
agent
.runWithMessages(appendMessage(history.current, buildUserTextMessage('What was my name?')))
.pipe(syncMessages(history)),
);
// 想持久化?history.current 就是一个 readonly Messages 数组,JSON.stringify 直接存
localStorage.setItem('chat', JSON.stringify(history.current));注意:
createMessagesRef/syncMessages是可选糖 —— 你也可以用自己的状态容器(Zustand / Redux / 自定义)- 真·状态保存就是
JSON.stringify(messages),框架不插手
不想写 rxjs?用 async iterable
import { toAsyncIterable, ofEventType } from '@luisss/zero';
const events$ = agent.runWithMessages(messages);
for await (const ev of toAsyncIterable(events$)) {
if (ev.type === 'text_delta') process.stdout.write(ev.text);
if (ev.type === 'tool_start') console.log(`\n[tool] ${ev.name}`);
if (ev.type === 'done') console.log(`\n[done] ${ev.reason}`);
}数据模型
三类数据,全由你持有。
1. Messages — 对话历史
两层结构:Message 容器 + Part 内容。一条 message 属于 user / assistant / tool 三种之一,内部可以装多个 part(text / tool_call / tool_result / thinking / image)。
// 顶层
type IMessage = IUserMessage | IAssistantMessage | IToolMessage;
type Messages = readonly IMessage[];
// 用户消息
interface IUserMessage {
readonly kind: 'user';
readonly parts: readonly IUserPart[]; // user_text | user_image
}
// 助手消息(可同时含 text + 多个 tool_call)
interface IAssistantMessage {
readonly kind: 'assistant';
readonly parts: readonly IAssistantPart[]; // assistant_text | assistant_tool_call | assistant_thinking
}
// 工具结果(和 assistant 的 tool_call 通过 callId 配对)
interface IToolMessage {
readonly kind: 'tool';
readonly parts: readonly IToolResultPart[];
}构造(永远用 build* 工厂,不要手写字面量 —— 未来加字段时免改):
import {
buildUserTextMessage,
buildUserMessage,
buildUserTextPart,
buildUserImagePart,
} from '@luisss/zero';
// 纯文本 —— 最常见的
const m1 = buildUserTextMessage('Read the file README.md');
// 文字 + 图片(多模态)
const m2 = buildUserMessage([
buildUserTextPart('What is in this image?'),
buildUserImagePart({
kind: 'base64',
mediaType: 'image/png',
data: '<base64-data>',
}),
]);持久化:Messages 是纯 JSON(全部字段都是 string / number / boolean / 数组 / 对象,readonly 只是 TS 层面的标记)。
// 存
const serialized = JSON.stringify(messages);
await db.save(chatId, serialized);
// 读
const messages = JSON.parse(await db.load(chatId)) as Messages;
agent.runWithMessages(messages); // 直接用,类型兼容2. SystemSpec — System Prompt
支持多片段拼接 —— 方便按模块组织(基础指令、工具说明、动态上下文、用户偏好 ...)。
import { systemFrom, addFragment, buildSystem } from '@luisss/zero';
// 最简 —— 一组字符串
const system = systemFrom([
'You are a coding assistant.',
'Always use the tools to access the file system.',
]);
// 分片段(未来要动态追加):
let spec = systemFrom(['You are a coding assistant.']);
spec = addFragment(spec, 'Current working directory: /home/alice/project');
spec = addFragment(spec, async () => `Current time: ${new Date().toISOString()}`);
// ↑ 也可以是 Promise —— 每次 run 前才求值
// 框架内部会在 makeLLMCtx 时自动调 buildSystem —— 用户通常不需要手动调3. ToolRegistry — 工具注册表
不可变的 Record<name, ITool>。每次 registerTool 返回新注册表,不改动原值。
import { emptyTools, registerTool, registerTools, unregisterTool } from '@luisss/zero';
// 从零开始
let tools = emptyTools;
tools = registerTool(tools, readTool);
tools = registerTool(tools, writeTool);
// 或者一次批量
tools = registerTools(emptyTools, [readTool, writeTool, bashTool]);
// 去掉一个
tools = unregisterTool(tools, 'bash');
// tools 就是 { read: ITool, write: ITool, bash: ITool },纯数据工具(Tool)
定义工具的唯一入口是 buildTool。它接受"宽松"的输入规范,填充默认值,输出一个类型完整的 ITool。
最小工具
const add = buildTool({
name: 'add',
description: 'Add two numbers.',
parameters: {
type: 'object',
properties: {
a: { type: 'number' },
b: { type: 'number' },
},
required: ['a', 'b'],
additionalProperties: false,
},
handler: ({ a, b }) => String((a as number) + (b as number)),
});parameters是 JSON Schema 的 object 形式。框架类型里的IToolParameters固定了type: 'object'+ 常用字段,拼错会编译报错handler返回string或Promise<string>—— 统一字符串输出,由下游 LLM 适配器翻译格式
四个可选字段
buildTool 会自动填默认值,用户通常不用操心;但语义值得了解:
const bash = buildTool({
name: 'bash',
description: '...',
parameters: { ... },
handler: async ({ command }, { signal } = {}) => {
// ① handler 第二参数 signal —— agent loop 取消令牌,传给 fetch / exec 让子进程协作退出
const result = await execa('bash', ['-c', command as string], { signal });
return result.stdout;
},
// ② isDisabled —— 整个工具此刻是否可用(不依赖具体 call)
// 默认 false。生效时机:发给 LLM 前过滤 + 执行时兜底
isDisabled: () => process.env.READ_ONLY === '1',
// ↑ 也可以 boolean / async function
// ③ isConcurrencySafe —— 这个工具能否和别的工具并发跑
// 默认 true。bash/write 这类有副作用的**必须显式 false**
isConcurrencySafe: false,
// ④ shouldDefer —— 是否把 schema 发给 LLM
// 默认 false。标 true 的工具仍在 registry 里(能调),但 schema
// 不发给 LLM,用来省 token。解锁机制由应用层自己管。
shouldDefer: false,
});并发调度(自动)
一轮里 LLM 发出多个 tool_call,框架按每个 tool 的 isConcurrencySafe 自动分批:
LLM 发出: [read_a, read_b, write_x, read_c, bash]
(read 默认 safe / write 和 bash 显式 unsafe)
执行:
[[read_a, read_b]] ← 第 1 批:并发
[[write_x]] ← 第 2 批:独占
[[read_c]] ← 第 3 批:独占(被 write 隔开后不能跟 read_a 合并)
[[bash]] ← 第 4 批:独占保序不重排 —— LLM 的调用顺序可能有语义依赖(read_c 要读 write_x 刚写的文件),不能打乱。
工具执行中间件
在工具真正调用前/后做事 —— 日志、权限、重试、缓存、脱敏、审计。一个中间件就是 (next) => (call, opts) => Promise<ToolOutcome>:
import type { ToolMiddleware } from '@luisss/zero';
// 示例:日志
const withLogging = (label = 'log'): ToolMiddleware => (next) => async (call, opts) => {
console.log(`[${label}] → ${call.name}(${JSON.stringify(call.input)})`);
const outcome = await next(call, opts);
console.log(`[${label}] ${outcome.isError ? '✗' : '✓'} ${call.name}`);
return outcome;
};
// 示例:用户 confirm
const withConfirm = (
ask: (call: ToolCall) => Promise<boolean>,
): ToolMiddleware => (next) => async (call, opts) => {
if (!(await ask(call))) {
return { id: call.id, output: `user denied ${call.name}`, isError: true };
}
return next(call, opts);
};
// 使用
const agent = createAgent({
llm,
tools,
toolMiddlewares: [
withLogging('audit'), // 最外层 —— 最先看到 call,最后看到 outcome
withConfirm(askUser), // 中间层 —— confirm 通过才下传
withRetry({ maxAttempts: 3 }), // 内层 —— 失败时重试几次
],
});内置的重试中间件 withRetry 从 @luisss/zero 直接 import:
import { withRetry } from '@luisss/zero';
toolMiddlewares: [withRetry({ maxAttempts: 3, delayMs: 200 })]Agent 配置
interface AgentConfig {
llm: LLM; // 必填,LLM 客户端(openai / claude / 自定义)
system?: SystemSpec; // 可选,system prompt
tools?: ToolRegistry; // 可选,工具注册表
toolMiddlewares?: ToolMiddleware[]; // 可选,工具侧中间件
loopMiddlewares?: (defaults) => MW[]; // 可选,agent loop 中间件(高级)
maxTurns?: number; // 可选,默认 10。LLM 一直在调工具时的保险
}换 LLM 厂商
import { createOpenAILLM, createClaudeLLM, createMockLLM } from '@luisss/zero';
// OpenAI
const llm = createOpenAILLM({ model: 'gpt-4o-mini' });
// Anthropic
const llm = createClaudeLLM({ model: 'claude-3-5-sonnet-20241022' });
// 本地 Ollama(用 OpenAI 协议)
const llm = createOpenAILLM({
model: 'qwen2.5:14b',
baseURL: 'http://localhost:11434/v1',
client: { apiKey: 'ollama' },
});
// Mock(测试 / 回放 / demo)
const llm = createMockLLM({
script: [
[...streamText('Sure.'), { type: 'done', reason: 'stop' }],
],
});自定义 LLM? LLM 就是 (ctx: LLMContext) => Observable<LLMEvent> —— 实现这个函数就能接任何厂商、任何协议,甚至 HTTP polling。
Loop 中间件(高级扩展点)
默认链是 [runLoop, callLLM, maybeRunTools]。想在某个位置塞自己的中间件:
import { insertBefore, insertAfter, callLLM, maybeRunTools } from '@luisss/zero';
const myPromptRewriter: AgentLoopMiddleware = async (ctx, next) => {
// 在 callLLM 前可以修改 ctx.input.system(通过 makeLLMCtx 的覆盖)
await next();
};
const agent = createAgent({
llm, tools,
loopMiddlewares: (defaults) =>
insertBefore(callLLM, myPromptRewriter)(defaults),
});每个 await next() = 下游中间件跑一遍。runLoop 就是用反复调 next() 驱动多轮循环的。你也可以在 runLoop 前再加一层 —— 得到"整个 run 级别"的钩子(比如 withRunTimeout / withRunLogging,已内置)。
事件契约
type AgentEvent =
| { type: 'turn_start'; turn: number }
| { type: 'turn_end'; turn: number }
| { type: 'text_delta'; text: string } // token 流
| { type: 'tool_start'; id; name; input } // 工具开始
| { type: 'tool_end'; id; name; output; isError } // 工具结束
| { type: 'usage'; scope: 'turn' | 'total'; ... }
| { type: 'error'; message: string }
| { type: 'done'; reason; messages }; // 唯一终止事件保证:
done事件一定会发,即使中间出错或被取消。reason告诉你为什么结束:'completed'/'max_turns'/'cancelled'/'error'done.messages是最终的完整Messages(等价于跑到结束时 history.current)
常用 operator:
import { textOnly, toMessages, ofEventType } from '@luisss/zero';
run$.pipe(textOnly()) // 只拿 text_delta 的 text
run$.pipe(toMessages(initial)) // 从事件流重建 messages 快照
run$.pipe(ofEventType('tool_start')) // 过滤某种事件(类型收窄)为什么这样设计(TL;DR)
| 设计 | 你得到什么 | |---|---| | 所有数据外置 | 持久化 / 序列化 / 回放 / 分支 / 迁移,全都 free | | 一切皆中间件 | 需求来了就插一个 mw,不用改框架 | | Event 流 = Observable | 取消、限速、广播、debounce、错误边界,rxjs 操作符直接用 | | 纯数据 + 纯函数 | 好测、好改、好并发 —— 没有隐藏状态能咬你 | | 多个桥(rxjs / Promise / AsyncIterable) | 不喜欢 rxjs 也能用,没绑死 | | LLM 抽象 = 一个函数 | 接任何厂商都简单,mock 测试零成本 |
开发 / demo
# 安装
pnpm install
# 跑 mock demo —— 不需要 API key,看一下事件流全貌
pnpm demo:core
# 跑 Ollama demo —— 本地 LLM 实跑
OLLAMA_MODEL=qwen2.5:14b pnpm demo:ollama
# 类型检查
pnpm --filter @luisss/zero build
# 监听模式
pnpm --filter @luisss/zero devLicense
MIT
