@ariaflowagents/cf-agent
v1.0.0
Published
AriaFlow agent integration for Cloudflare Workers with AIChatAgent
Downloads
1,935
Maintainers
Readme
@ariaflowagents/cf-agent
AriaFlow integration for Cloudflare Workers using AIChatAgent. Build multi-agent AI systems with automatic persistence, resumable streaming, and real-time sync.
Features
- All AriaFlow Agent Types: LLM, Flow, Triage, and Composite agents
- Automatic Persistence: Messages stored in SQLite, survive restarts
- Resumable Streaming: Reconnect without data loss
- Multi-Client Sync: Real-time updates across all connected clients
- Full Tool Support: Server-side, client-side, and approval tools
- Zero Config: Works out of the box with sensible defaults
Quick Start
# Create a new project
mkdir my-agent && cd my-agent
npm init -y
# Install dependencies
npm install @ariaflowagents/cf-agent ai zod
npm install -D @cloudflare/vite-plugin vite wrangler
# Create your agent
# See examples below...
# Run locally
npm run devOpen http://localhost:5173 to see your agent in action.
Installation
npm install @ariaflowagents/cf-agentProject Structure
my-agent/
├── src/
│ ├── server.ts # Agent implementation
│ ├── app.tsx # React chat UI
│ └── client.tsx # React entry point
├── package.json
├── tsconfig.json
├── vite.config.ts
└── wrangler.jsoncBasic Example
1. Create Agent (src/server.ts)
import { AriaFlowAIChatAgent } from "@ariaflowagents/cf-agent";
import { createOpenAI } from "@ai-sdk/openai";
import { tool } from "ai";
import { z } from "zod";
interface Env {
OPENAI_API_KEY: string;
}
export class MyAgent extends AriaFlowAIChatAgent<Env> {
protected getRuntimeConfig() {
const openai = createOpenAI({ apiKey: this.env.OPENAI_API_KEY });
return {
agents: [{
id: "assistant",
type: "llm" as const,
name: "Assistant",
model: openai("gpt-4o"),
prompt: "You are a helpful assistant",
tools: {
getWeather: tool({
description: "Get weather for a city",
inputSchema: z.object({ city: z.string() }),
execute: async ({ city }) => {
return { temperature: 72, condition: "sunny" };
},
}),
},
}],
defaultAgentId: "assistant",
};
}
}
export default MyAgent;2. Create UI (src/app.tsx)
import { useAgent } from "agents/react";
import { useAgentChat } from "@cloudflare/ai-chat/react";
export function Chat() {
const agent = useAgent({ agent: "MyAgent" });
const { messages, sendMessage, status } = useAgentChat({ agent });
return (
<div>
{messages.map((msg) => (
<div key={msg.id}>
<strong>{msg.role}:</strong>
{msg.parts.map((part, i) =>
part.type === "text" ? <span key={i}>{part.text}</span> : null
)}
</div>
))}
<form onSubmit={(e) => {
e.preventDefault();
const input = e.currentTarget.elements.namedItem("input") as HTMLInputElement;
sendMessage({ text: input.value });
input.value = "";
}}>
<input name="input" placeholder="Type a message..." />
<button type="submit" disabled={status === "streaming"}>Send</button>
</form>
</div>
);
}3. Configuration Files
vite.config.ts:
import { defineConfig } from "vite";
import { cloudflare } from "@cloudflare/vite-plugin";
import react from "@vitejs/plugin-react";
export default defineConfig({
plugins: [cloudflare(), react()],
});wrangler.jsonc:
{
"name": "my-agent",
"main": "src/server.ts",
"compatibility_date": "2026-01-28",
"compatibility_flags": ["nodejs_compat"],
"ai": { "binding": "AI" },
"durable_objects": {
"bindings": [{ "name": "MyAgent", "class_name": "MyAgent" }]
},
"migrations": [{ "tag": "v1", "new_sqlite_classes": ["MyAgent"] }]
}package.json scripts:
{
"scripts": {
"dev": "vite dev",
"deploy": "vite build && wrangler deploy",
"types": "wrangler types"
}
}Agent Types
LLMAgent
Standard conversational agent with tools:
{
id: "sales",
type: "llm" as const,
name: "Sales Agent",
model: openai("gpt-4o"),
prompt: "You are a sales specialist...",
tools: { /* ... */ },
canHandoffTo: ["support", "billing"]
}FlowAgent
Structured node-based conversation flows:
{
id: "order-flow",
type: "flow" as const,
name: "Order Flow",
model: openai("gpt-4o"),
flow: {
nodes: [
{
id: "initial",
task: "Ask what they want",
transitions: [{ to: "confirm", on: "product_selected" }]
},
{
id: "confirm",
task: "Confirm order",
transitions: [{ to: "end", on: "confirmed" }]
}
],
hybrid: true // Allow off-flow questions
}
}TriageAgent
Intelligent routing between agents:
{
id: "router",
type: "triage" as const,
name: "Router",
model: openai("gpt-4o-mini"),
routingConfig: {
agents: ["sales", "support", "billing"],
instructions: "Route based on user intent..."
}
}CompositeAgent
Multi-agent coordination:
{
id: "supervisor",
type: "composite" as const,
name: "Supervisor",
model: openai("gpt-4o"),
agents: ["researcher", "analyst", "writer"],
coordinationMode: "sequential"
}Tools
Server-Side Tools
Run automatically on the server:
tools: {
getWeather: tool({
description: "Get weather for a city",
inputSchema: z.object({ city: z.string() }),
execute: async ({ city }) => {
// Runs on server
return await fetchWeather(city);
}
})
}Client-Side Tools
No execute function - browser provides the result:
// Server
tools: {
getLocation: tool({
description: "Get user location",
inputSchema: z.object({})
// No execute - client handles it
})
}
// Client
useAgentChat({
onToolCall: async ({ toolCall, addToolOutput }) => {
if (toolCall.toolName === "getLocation") {
const pos = await navigator.geolocation.getCurrentPosition();
addToolOutput({
toolCallId: toolCall.toolCallId,
output: { lat: pos.coords.latitude, lng: pos.coords.longitude }
});
}
}
});Approval Tools
Human-in-the-loop for sensitive operations:
tools: {
processRefund: tool({
description: "Process a refund",
inputSchema: z.object({ orderId: z.string(), amount: z.number() }),
needsApproval: ({ amount }) => amount > 100,
execute: async ({ orderId, amount }) => {
// Only runs after user approval
return await processRefund(orderId, amount);
}
})
}Hooks
React to agent lifecycle events:
protected getHooks() {
return {
onHandoff: async (context, from, to, reason) => {
console.log(`Handoff: ${from} -> ${to} (${reason})`);
},
onToolCall: async (context, toolCall) => {
console.log(`Tool: ${toolCall.toolName}`);
},
onNodeEnter: async (context, node) => {
console.log(`Flow node: ${node.name}`);
}
};
}State Persistence
AriaFlow stores two types of data:
- CF Tables (
cf_ai_chat_*): Chat messages, stream chunks, resumption data - AriaFlow Tables (
ariaflow_sessions): Session state, working memory, flow state
SQLite Database:
├── cf_ai_chat_agent_messages (UIMessages for UI)
├── cf_ai_chat_stream_chunks (Stream chunks)
├── cf_ai_chat_stream_metadata (Stream state)
└── ariaflow_sessions (AriaFlow Session state)Custom Configuration
Stream Adapter
Control which events are sent to the client:
protected getStreamAdapterConfig() {
return {
includeHandoffs: true, // Show agent handoffs in UI
includeFlowEvents: true, // Show flow node transitions
includeTripwires: true, // Show guardrail triggers
includeStepEvents: false, // Hide step lifecycle
includeAgentEvents: false // Hide agent lifecycle
};
}Session ID
Override default session ID generation:
protected getSessionId(): string {
// Default uses Durable Object ID
// Override for custom session management
return `user-${this.getUserId()}`;
}Scripts
# Development (with hot reload)
npm run dev
# Deploy to Cloudflare
npm run deploy
# Generate types
npm run types
# Build only
npm run buildUsing Different AI Providers
Workers AI (No API key needed)
import { createWorkersAI } from "workers-ai-provider";
const workersai = createWorkersAI({ binding: this.env.AI });
// In agent config:
model: workersai("@cf/meta/llama-3.3-70b-instruct-fp8-fast")OpenAI
npm install @ai-sdk/openaiimport { createOpenAI } from "@ai-sdk/openai";
const openai = createOpenAI({ apiKey: this.env.OPENAI_API_KEY });
// In agent config:
model: openai("gpt-4o")Anthropic
npm install @ai-sdk/anthropicimport { createAnthropic } from "@ai-sdk/anthropic";
const anthropic = createAnthropic({ apiKey: this.env.ANTHROPIC_API_KEY });
// In agent config:
model: anthropic("claude-sonnet-4-20250514")API Reference
AriaFlowAIChatAgent
Abstract base class extending CF's AIChatAgent.
abstract class AriaFlowAIChatAgent<Env, State> {
// Required
protected abstract getRuntimeConfig(): HarnessConfig;
// Optional overrides
protected getHooks(): Partial<HarnessHooks>;
protected getStreamAdapterConfig(): Partial<StreamAdapterConfig>;
protected getSessionId(): string;
// Utility methods
protected async getSession(): Promise<Session | null>;
protected getSessionStats(): SessionStats | null;
}Environment Variables
# For OpenAI
OPENAI_API_KEY=your-key
# For Anthropic
ANTHROPIC_API_KEY=your-key
# For custom providers
PROVIDER_API_KEY=your-keyArchitecture
Client (useAgentChat)
│
▼ WebSocket
CF AIChatAgent (Durable Object)
├─► CF SQLite: Chat messages, stream chunks
└─► AriaFlow Runtime
├─► CloudflareSessionStore (SQLite)
├─► Multi-agent execution
└─► StreamAdapter
└─► UIMessageChunk formatRealtime Voice Reconnect Contract
withRealtimeVoice (RFC-05) runs on two orthogonal WebSocket edges and each has its own reconnect model:
| Edge | Owner | Reconnect strategy |
|---|---|---|
| Browser ↔ Durable Object | Client | Built in — useVoiceAgent / partysocket handles it transparently. DO is keyed by path; reconnecting to the same path wakes the same session (hibernation-aware). |
| Durable Object ↔ Gemini Live | Server (this mixin) | On goAway, re-opens the provider WS using the persisted resumption handle. Full-jitter exponential backoff: 500ms base, 8s cap, 3 attempts. |
During a server-side reconnect (handle-based re-open on goAway) the mixin sends {type:"reconnecting", reason:"goAway"} on entry and {type:"reconnected"} on success. Inbound PCM frames from the client are buffered in a per-connection ring (default 48KB ≈ 1.5s @ 16kHz mono PCM16) and drained to the new provider session after reconnect. Oldest frames drop when the cap is exceeded; a warning logs the dropped byte count.
Resumption handle semantics. Gemini Live handles are single-use and expire after 2 hours of session inactivity. The mixin's cf_realtime_resumption SQLite table tracks used_at and updated_at; a handle is refused if already consumed ({type:"error", code:"resumption_consumed"}) or expired ({type:"error", code:"resumption_expired"}). Successful reconnect drains the audio buffer; exhausted retries send {type:"error", code:"reconnect_exhausted"} and tear down the session.
Writing a custom browser client. If you bypass useVoiceAgent and open the WS yourself, implement reconnect with exponential backoff and a 10-second floor (per Cloudflare's own workers-chat-demo precedent). The DO is the source of truth — on reconnect, simply re-open the WS to the same path; the mixin hydrates the Gemini session from SQLite transparently. Do NOT layer retry on top of useVoiceAgent — it already reconnects via partysocket and stacking will produce overlapping provider sessions and double-consume resumption handles.
Tunables (pass to withRealtimeVoice or AriaFlowRealtimeVoiceAgent.voiceOptions):
{
reconnectOnGoAway: true, // auto-reconnect when timeLeft < threshold
reconnectThresholdMs: 2000,
reconnectMaxAttempts: 3,
reconnectBaseDelayMs: 500, // full-jitter: delay ∈ [0, min(cap, base * 2^attempt)]
reconnectCapDelayMs: 8000,
reconnectAudioBufferBytes: 48_000, // ring buffer for user PCM during reconnect
}Learn More
License
MIT
