@turbo-agent/runtime
v0.1.6-rc.1
Published
前端对话运行时:单条对话历史与事件流逻辑状态管理
Downloads
110
Maintainers
Readme
turbo-agent-runtime
前端对话运行时:基于 turbo-agent-core 的 Conversation 管理对话状态与事件流。
推荐阅读
- 最新的
runtime + react对话接入说明见 docs/developer-guide/examples/08-runtime-react-chat.md - 如果你要同时接入
ConversationRuntime、ChatConversationView、ChatInput,优先参考上面的组合文档
核心概念
| 概念 | 说明 |
|------|------|
| ConversationRuntime | 对话运行时实例,持有 Conversation,管理 send / stream / abort |
| StreamSession | 单条流式生成的状态管理,封装 EventTreeAggregator |
| ConversationTransport | 传输层接口,由使用方实现(HTTP / WebSocket 等) |
| StreamState | 流式会话的实时状态(phase / content / reasoning / aggregation) |
工作流程
runtime.send({ message: "你好" })
│
├─ 1. 创建 user Message → 追加到 conversation.messages
│ → onStateChange(conversation, null)
│
├─ 2. transport.send() → 后端返回 traceId
│ → onStateChange(conversation, { phase: "connecting" })
│
├─ 3. transport.openEventStream(traceId) → 事件流
│ → onStateChange(conversation, { phase: "streaming", content: "你", ... })
│ → onStateChange(conversation, { phase: "streaming", content: "你好", ... })
│ → ...每收到一个事件触发一次
│
├─ 4. 流结束 → aggregation() 产出 assistant Message
│ → 追加到 conversation.messages
│ → onStreamCommitted(message)
│ → onStateChange(conversation, null)
│
└─ 返回 traceId(= assistant message id)安装
pnpm add turbo-agent-runtime使用方式
第一步:实现 Transport
ConversationTransport 接口定义了两个方法,由使用方根据自身后端 API 实现:
import type { ConversationTransport } from "turbo-agent-runtime";
const transport: ConversationTransport = {
async send(request, signal) {
const res = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
message: request.message,
conversation_id: request.conversationId,
conversation_mode: request.conversationMode,
agent_mode: request.agentMode,
executor_id: request.executorId,
executor: request.executor,
}),
signal,
});
const data = await res.json();
return { traceId: data.trace_id, conversationId: data.conversation_id };
},
async openEventStream(traceId, conversationId, signal) {
const query = conversationId ? `?conversation_id=${conversationId}` : "";
const res = await fetch(`/api/chat/stream/${traceId}${query}`, {
headers: { Accept: "text/event-stream" },
signal,
});
return parseSSE(res); // 返回 AsyncIterable<BaseEvent>
},
};第二步:创建 Runtime 并绑定 UI 更新
React 示例
import { useState, useRef } from "react";
import { ConversationRuntime } from "turbo-agent-runtime";
import type { StreamState, ConversationTransport } from "turbo-agent-runtime";
import type { Conversation } from "turbo-agent-core";
function ChatPage() {
const [conversation, setConversation] = useState<Conversation | null>(null);
const [streamState, setStreamState] = useState<StreamState | null>(null);
const runtimeRef = useRef<ConversationRuntime | null>(null);
// 初始化 runtime(仅一次)
if (!runtimeRef.current) {
runtimeRef.current = new ConversationRuntime({
transport,
callbacks: {
// ★ 核心回调:每次状态变化都触发,直接 setState 即可驱动渲染
onStateChange(conv, stream) {
setConversation({ ...conv });
setStreamState(stream ? { ...stream } : null);
},
onConversationCreated(id) {
window.history.pushState(null, "", `/chat/${id}`);
},
},
});
}
const runtime = runtimeRef.current;
// 发送消息
const handleSend = async (text: string) => {
await runtime.send({ message: text });
};
// 中止生成
const handleAbort = () => runtime.abort();
return (
<div>
{/* 历史消息 */}
{conversation?.messages.map((msg) => (
<MessageBubble key={msg.id} message={msg} />
))}
{/* 流式生成中 */}
{streamState?.phase === "streaming" && (
<div className="streaming">
{streamState.reasoning && (
<div className="thinking">{streamState.reasoning}</div>
)}
<div>{streamState.content}</div>
<button onClick={handleAbort}>停止</button>
</div>
)}
{/* 输入框 */}
<ChatInput
onSend={handleSend}
disabled={streamState?.phase === "streaming" || streamState?.phase === "connecting"}
/>
</div>
);
}React — 封装为 Hook
import { useState, useRef, useCallback } from "react";
import { ConversationRuntime } from "turbo-agent-runtime";
import type { StreamState, ConversationTransport, ConversationRuntimeCallbacks } from "turbo-agent-runtime";
import type { Conversation } from "turbo-agent-core";
interface UseChatRuntimeOptions {
transport: ConversationTransport;
callbacks?: Omit<ConversationRuntimeCallbacks, "onStateChange">;
initialConversation?: Conversation;
}
function useChatRuntime(options: UseChatRuntimeOptions) {
const [conversation, setConversation] = useState<Conversation | null>(
options.initialConversation ?? null
);
const [streamState, setStreamState] = useState<StreamState | null>(null);
const runtimeRef = useRef<ConversationRuntime | null>(null);
if (!runtimeRef.current) {
runtimeRef.current = new ConversationRuntime({
transport: options.transport,
conversation: options.initialConversation,
callbacks: {
...options.callbacks,
onStateChange(conv, stream) {
setConversation({ ...conv });
setStreamState(stream ? { ...stream } : null);
},
},
});
}
const send = useCallback(
(message: string) => runtimeRef.current!.send({ message }),
[]
);
const abort = useCallback(() => runtimeRef.current!.abort(), []);
const load = useCallback(
(conv: Conversation) => runtimeRef.current!.load(conv),
[]
);
return {
runtime: runtimeRef.current,
conversation,
streamState,
isStreaming: streamState?.phase === "streaming" || streamState?.phase === "connecting",
send,
abort,
load,
};
}Vue 3 示例(Composition API)
<script setup lang="ts">
import { ref, shallowRef, onUnmounted } from "vue";
import { ConversationRuntime } from "turbo-agent-runtime";
import type { StreamState } from "turbo-agent-runtime";
import type { Conversation } from "turbo-agent-core";
const props = defineProps<{
transport: ConversationTransport;
}>();
const conversation = shallowRef<Conversation | null>(null);
const streamState = shallowRef<StreamState | null>(null);
const runtime = new ConversationRuntime({
transport: props.transport,
callbacks: {
onStateChange(conv, stream) {
// shallowRef + 新对象引用 → 触发 Vue 3 响应式更新
conversation.value = { ...conv };
streamState.value = stream ? { ...stream } : null;
},
onConversationCreated(id) {
router.push(`/chat/${id}`);
},
},
});
const isStreaming = computed(
() => streamState.value?.phase === "streaming" || streamState.value?.phase === "connecting"
);
async function handleSend(text: string) {
await runtime.send({ message: text });
}
function handleAbort() {
runtime.abort();
}
onUnmounted(() => {
runtime.abort();
});
</script>
<template>
<div>
<!-- 历史消息 -->
<div v-for="msg in conversation?.messages" :key="msg.id">
<MessageBubble :message="msg" />
</div>
<!-- 流式生成中 -->
<div v-if="streamState?.phase === 'streaming'">
<div v-if="streamState.reasoning" class="thinking">
{{ streamState.reasoning }}
</div>
<div>{{ streamState.content }}</div>
<button @click="handleAbort">停止</button>
</div>
<!-- 输入框 -->
<ChatInput :disabled="isStreaming" @send="handleSend" />
</div>
</template>Vue 3 — 封装为 Composable
// composables/useChatRuntime.ts
import { shallowRef, computed, onUnmounted } from "vue";
import { ConversationRuntime } from "turbo-agent-runtime";
import type { StreamState, ConversationTransport, ConversationRuntimeCallbacks } from "turbo-agent-runtime";
import type { Conversation } from "turbo-agent-core";
export function useChatRuntime(
transport: ConversationTransport,
extraCallbacks?: Omit<ConversationRuntimeCallbacks, "onStateChange">
) {
const conversation = shallowRef<Conversation | null>(null);
const streamState = shallowRef<StreamState | null>(null);
const runtime = new ConversationRuntime({
transport,
callbacks: {
...extraCallbacks,
onStateChange(conv, stream) {
conversation.value = { ...conv };
streamState.value = stream ? { ...stream } : null;
},
},
});
const isStreaming = computed(
() => streamState.value?.phase === "streaming" || streamState.value?.phase === "connecting"
);
onUnmounted(() => runtime.abort());
return {
runtime,
conversation,
streamState,
isStreaming,
send: (msg: string) => runtime.send({ message: msg }),
abort: () => runtime.abort(),
load: (conv: Conversation) => runtime.load(conv),
};
}Vue 2 示例(Options API)
import { ConversationRuntime } from "turbo-agent-runtime";
export default {
data() {
return {
conversation: null,
streamState: null,
};
},
computed: {
messages() {
return this.conversation?.messages ?? [];
},
isStreaming() {
const phase = this.streamState?.phase;
return phase === "streaming" || phase === "connecting";
},
},
created() {
this.runtime = new ConversationRuntime({
transport: this.createTransport(),
callbacks: {
onStateChange: (conv, stream) => {
// Vue 2 的 $set 或直接赋值均可触发响应式
this.conversation = conv;
this.streamState = stream;
},
onConversationCreated: (id) => {
this.$router.push(`/chat/${id}`);
},
},
});
},
beforeDestroy() {
this.runtime.abort();
},
methods: {
createTransport() {
return {
async send(request, signal) {
const res = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
message: request.message,
conversation_id: request.conversationId,
executor_id: request.executorId,
executor: request.executor,
}),
signal,
});
const data = await res.json();
return { traceId: data.trace_id, conversationId: data.conversation_id };
},
async openEventStream(traceId, conversationId, signal) {
const query = conversationId ? `?conversation_id=${conversationId}` : "";
const res = await fetch(`/api/chat/stream/${traceId}${query}`, {
headers: { Accept: "text/event-stream" },
signal,
});
return parseSSE(res);
},
};
},
async handleSend(text) {
await this.runtime.send({ message: text });
},
handleAbort() {
this.runtime.abort();
},
},
};API 参考
ConversationRuntime
new ConversationRuntime(options: ConversationRuntimeOptions)| 属性/方法 | 类型 | 说明 |
|-----------|------|------|
| conversation | Conversation | 当前对话数据(只读) |
| stream | StreamSession \| null | 当前流式会话(只读) |
| isStreaming | boolean | 是否有流在进行中 |
| streamState | StreamState \| null | 当前流的状态 |
| send(request) | Promise<string> | 发送消息,返回 traceId |
| resumeStream(traceId) | Promise<string> | 恢复断线的事件流 |
| abort() | void | 中止当前流 |
| load(conversation) | void | 替换整个 Conversation |
| update(patch) | void | 更新对话属性 |
ConversationTransport
interface ConversationTransport {
send(request: SendRequest, signal?: AbortSignal): Promise<SendResult>;
openEventStream(traceId: string, conversationId?: string, signal?: AbortSignal): Promise<AsyncIterable<BaseEvent>>;
}ConversationRuntimeCallbacks
interface ConversationRuntimeCallbacks {
onStateChange?: (conversation: Conversation, stream: StreamState | null) => void;
onConversationCreated?: (conversationId: string) => void;
onStreamEvent?: (event: BaseEvent) => void;
onStreamCommitted?: (message: Message) => void;
}StreamState
interface StreamState {
traceId: string;
phase: "idle" | "connecting" | "streaming" | "committing" | "error";
content: string;
reasoning: string;
aggregation: Message | ToolCallRecord | null;
error?: string | null;
}