agent-socket
v0.1.1
Published
Protocol primitives for agent UI/runtime state, ack, replay, and recovery.
Downloads
182
Maintainers
Readme
agent-socket
Protocol primitives for agent UI/runtime state, ack, replay, and recovery.
agent-socket — a thin, transport-agnostic wire protocol + small set of helpers for keeping an agent UI in sync with a long-running runtime. The package ships:
- a typed JSON envelope (
AgentSocketEnvelope) and amakeEnvelope/parseEnvelopepair - a server-side
AgentSocketServerSessionand a client-sideAgentSocketClient(transport is your own — seeAgentSocketTransport) - an in-memory FIFO event store (
MemoryEventStore) that powers replay - a stale-UI detector (
detectStaleUi) that catches "client thinks it's sending, server is idle" races - a reference
InMemoryTransportfor tests and embedding
The protocol is versioned (AGENT_SOCKET_VERSION = 1) and every event carries a monotonic seq per client, so a reconnecting UI can ask for replay_events(since=N) and reconstruct its view without server-side state.
Problem it solves
An agent runtime does many things in parallel — turns stream text deltas, tool calls are dispatched, approvals come and go, and the runtime can crash, restart, or be reloaded mid-turn. A naive "polling /chat/history" approach can:
- lose the tail of a turn when the page reloads
- show "thinking" forever after a runtime reload even though the server is idle
- replay old events after a reconnect without knowing where it left off
- re-send a
turn_startafter a network blip and accidentally fork the turn
agent-socket addresses each of these with four things:
- Sequenced, typed envelopes — every event has a
seqand atypefrom a closed enum. - Replay —
replay_request(since=N)returns everything the client missed. - Runtime generation —
runtime_generation_changedtells the UI the server restarted; the UI is supposed torequest_replay. - Stale-UI detection —
detectStaleUi(local, state)compares what the UI thinks with what the server knows and returns anaction: 'clear_sending' | 'show_lost_tail' | 'request_replay'.
Install
bun add agent-socket # or: npm install agent-socketThe package has zero runtime dependencies. It is published as ESM ("type": "module") and ships TypeScript declarations.
Subpath exports:
| import | what you get |
| --- | --- |
| agent-socket | core: AgentSocketEnvelope, makeEnvelope, parseEnvelope, MemoryEventStore, InMemoryTransport, detectStaleUi |
| agent-socket/client | AgentSocketClient (browser/UI side) |
| agent-socket/server | createAgentSocketServerSession (runtime side) |
| agent-socket/adapters/browser | reference browser adapter (see src/adapters/browser.ts) |
Quick start
Browser (UI side)
import {
AgentSocketClient,
type AgentSocketTransport,
type AgentSocketEnvelope,
} from 'agent-socket/client'
import { agentSocketBrowserTransport } from 'agent-socket/adapters/browser'
// 1. bring your own WebSocket (or anything that implements AgentSocketTransport)
const ws = new WebSocket(agentSocketBrowserTransport('/agent-socket'))
const client = new AgentSocketClient({
clientId: 'web-a',
transport: ws,
lastSeenSeq: Number(localStorage.getItem('lastSeq') ?? 0),
runtimeGeneration: localStorage.getItem('runtimeGen'),
})
// 2. subscribe to event types you care about
client.on('turn_text_delta', env => appendToActiveTurn(env.payload))
client.on('tool_result', env => renderToolResult(env.payload))
client.on('state_snapshot', env => applyRuntimeState(env.payload))
client.on('runtime_generation_changed', () => client.requestReplay())
// 3. connect (server replies with `welcome` + `state_snapshot`)
client.connect()Runtime (server side)
import { createAgentSocketServerSession } from 'agent-socket/server'
import type { AgentSocketTransport, AgentRuntimeState } from 'agent-socket'
const session = createAgentSocketServerSession(yourTransport, {
runtimeGeneration: process.env.RUNTIME_GEN ?? 'dev-1',
handlers: {
getState: async clientId => currentRuntimeStateFor(clientId),
onStartTurn: async ({ clientId, turnId, payload, emit }) => {
emit({ type: 'turn_queued', turnId, payload })
// ... run the turn, emit `turn_phase`, `turn_text_delta`, `turn_done` ...
},
onApprovalResolve: async ({ clientId, operationId, payload }) => {
// resolve a pending approval
},
},
})
// Whenever your runtime changes state (a new turn starts, a tool finishes, etc.):
session.emit('turn_start', payload, { turnId })
session.emit('tool_start', { name: 'castle.create_elf', input }, { operationId })
// ...Detecting "stuck thinking" on reload
import { detectStaleUi, shouldClearThinking } from 'agent-socket'
const local: LocalUiState = {
sending: true,
knownTurnId: 't-1',
runtimeGeneration: cachedGen,
}
const server: AgentRuntimeState = { runtimeGeneration: 'gen-2', activeTurn: null, tool: { status: 'idle' } }
const stale = detectStaleUi(local, server)
// {
// stale: true,
// reason: 'runtime_generation_changed',
// action: 'request_replay',
// }If shouldClearThinking(local, server) === true you should also drop any "thinking…" indicator on the UI.
Replaying missed events
// On reconnect, ask the server for everything since seq=42
client.lastSeenSeq = 42
client.requestReplay(42)
// Server replies with `replay_events` containing the buffered events.The default MemoryEventStore keeps a FIFO of the last 500 events per client; you can swap in a persistent store by implementing AgentSocketEventStore (see src/event-store.ts).
Architecture
┌────────┐ ┌─────────────┐
│ UI │ ─── AgentSocketTransport ──▶ Runtime │
│ │ ◀── envelopes (typed JSON) ── │
└────────┘ └─────────────┘
│ │
│ seq/ack + replay │ EventStore (in-memory or
│ │ your own impl)
▼ ▼
LocalUiState AgentRuntimeState
(sending, lastSeq, (activeTurn, tool,
knownTurnId) runtimeGeneration)
│ │
└──── detectStaleUi(local, state) ───┘AgentSocketTransport is a four-method interface (send, close, onMessage, onError?). Bring your own — InMemoryTransport is included for tests and embedding, and adapters/browser.ts ships a thin WebSocket adapter. The runtime is expected to provide its own server-side transport (Bun's WebSocket server, ws, uWebSockets.js, etc.).
API reference
Types
// top-level constants
export const AGENT_SOCKET_PROTOCOL = 'agent-socket' as const
export const AGENT_SOCKET_VERSION = 1 as const
// closed enum of event types
export type AgentSocketEventType =
| 'hello' | 'welcome' | 'heartbeat' | 'ack'
| 'state_request' | 'state_snapshot'
| 'replay_request' | 'replay_events'
| 'turn_start' | 'turn_queued' | 'turn_running' | 'turn_phase'
| 'turn_text_delta' | 'turn_done' | 'turn_error' | 'turn_aborted' | 'turn_lost'
| 'tool_start' | 'tool_waiting_approval' | 'tool_result'
| 'tool_failed' | 'tool_lost' | 'tool_retry_available'
| 'approval_request' | 'approval_resolve' | 'approval_result'
| 'attachment_uploading' | 'attachment_ready' | 'attachment_error'
| 'runtime_generation_changed' | 'runtime_recovered'
export type AgentSocketEnvelope<T = unknown> = {
protocol: 'agent-socket'
version: 1
type: AgentSocketEventType
clientId?: string
socketId?: string
runtimeGeneration?: string
turnId?: string
operationId?: string
seq?: number
ack?: number
ts: number
payload?: T
}
export type AgentTurnState = {
id: string
status: 'queued' | 'running' | 'done' | 'error' | 'aborted' | 'lost'
phase?: string
startedAt?: number
updatedAt?: number
error?: string | null
}
export type AgentToolState = {
id?: string
name?: string
status: 'idle' | 'waiting_approval' | 'running' | 'done' | 'failed' | 'lost'
path?: string | null
command?: string | null
startedAt?: number | null
updatedAt?: number | null
resultSeen?: boolean
retryable?: boolean
error?: string | null
outputPreview?: string | null
}
export type AgentRuntimeState = {
runtimeGeneration: string
activeTurn: AgentTurnState | null
tool: AgentToolState
brainQueue?: unknown[]
lastLostTool?: AgentToolState | null
}
export type LocalUiState = {
sending: boolean
knownTurnId?: string | null
lastSeenSeq?: number
runtimeGeneration?: string | null
}
export type StaleUiDetection = {
stale: boolean
reason?:
| 'client_thinks_sending_but_server_idle'
| 'client_turn_missing_on_server'
| 'runtime_generation_changed'
action?: 'clear_sending' | 'show_lost_tail' | 'request_replay'
}Envelope
export function makeEnvelope<T>(
type: AgentSocketEventType,
payload?: T,
meta?: { ts?: number; clientId?: string; turnId?: string; operationId?: string; seq?: number; ack?: number; runtimeGeneration?: string },
): AgentSocketEnvelope<T>
export function isAgentSocketEnvelope(value: unknown): value is AgentSocketEnvelope
export function parseEnvelope(raw: string | ArrayBuffer | Uint8Array): AgentSocketEnvelope
export function serializeEnvelope(env: AgentSocketEnvelope): stringparseEnvelope throws on any field that fails the schema check (protocol, version, type, ts).
Transport
export type AgentSocketTransport = {
send(data: string): void
close(code?: number, reason?: string): void
onMessage(cb: (data: string | ArrayBuffer | Uint8Array) => void): void
onClose(cb: (event?: unknown) => void): void
onError?(cb: (error: unknown) => void): void
}
export class InMemoryTransport implements AgentSocketTransport {
static pair(): [InMemoryTransport, InMemoryTransport]
}InMemoryTransport.pair() returns two transports that route messages to each other via queueMicrotask — handy for unit tests of the protocol and the reconcile logic.
Event store
export interface AgentSocketEventStore {
append(event: AgentSocketEnvelope): Promise<void> | void
getSince(clientId: string, seq: number): Promise<AgentSocketEnvelope[]> | AgentSocketEnvelope[]
latestSeq(clientId: string): Promise<number> | number
trim?(clientId: string): Promise<void> | void
}
export class MemoryEventStore implements AgentSocketEventStore {
constructor(options?: { maxEvents?: number }) // default 500
nextSeq(clientId: string): number
append(event: AgentSocketEnvelope): void
getSince(clientId: string, seq: number): AgentSocketEnvelope[]
latestSeq(clientId: string): number
clear(clientId?: string): void
}The store is FIFO with a configurable cap. If you want a persistent store (SQLite, Redis, etc.), implement AgentSocketEventStore and pass it via the store option to createAgentSocketServerSession.
Reconcile
export function detectStaleUi(local: LocalUiState, state: AgentRuntimeState): StaleUiDetection
export function shouldClearThinking(local: LocalUiState, state: AgentRuntimeState): booleandetectStaleUi returns:
| local | server | reason | action |
| --- | --- | --- | --- |
| runtimeGeneration != state.runtimeGeneration | — | runtime_generation_changed | request_replay |
| sending && !activeTurn | — | client_thinks_sending_but_server_idle | clear_sending |
| sending && knownTurnId != activeTurn.id | — | client_turn_missing_on_server | show_lost_tail |
| (else) | — | — | stale: false |
Client
export class AgentSocketClient {
constructor(options: {
clientId: string
transport: AgentSocketTransport
lastSeenSeq?: number
runtimeGeneration?: string | null
})
get seq(): number
get generation(): string | null
on<T>(type: AgentSocketEnvelope<T>['type'], handler: (event: AgentSocketEnvelope<T>) => void): void
// handler bound to '*' is invoked for every event
connect(): void // sends `hello`
requestState(): void // sends `state_request`
requestReplay(since?: number): void // sends `replay_request`
startTurn(payload: unknown, turnId: string): void
abortTurn(turnId: string): void
ack(seq?: number): void
send<T>(type, payload?, meta?): void
}handleRaw (internal) is called by the transport; it auto-advances lastSeenSeq and runtimeGeneration from incoming envelopes, dispatches typed handlers, and acks the last seq it has seen.
Server
export type AgentSocketServerHandlers = {
getState: (clientId: string) => AgentRuntimeState | Promise<AgentRuntimeState>
onStartTurn?: (args: {
clientId: string
turnId?: string
payload: unknown
emit: (event: AgentSocketEnvelope) => void
}) => void | Promise<void>
onAbortTurn?: (args: { clientId: string; turnId?: string; payload: unknown }) => void | Promise<void>
onApprovalResolve?: (args: { clientId: string; operationId?: string; payload: unknown }) => void | Promise<void>
}
export type AgentSocketServerOptions = {
runtimeGeneration: string
handlers: AgentSocketServerHandlers
store?: AgentSocketEventStore
}
export class AgentSocketServerSession {
emit<T>(type, payload?, meta?): AgentSocketEnvelope<T>
// ... everything else is internal
}
export function createAgentSocketServerSession(
transport: AgentSocketTransport,
options: AgentSocketServerOptions,
): AgentSocketServerSessionThe server appends every emitted envelope to the store before sending it. This means the in-memory replay window includes the just-emitted event, so a client that asks replay_request(since=N) for N == latestSeq correctly gets nothing back.
Plans
- Persistent event store adapter — implement
AgentSocketEventStorefor your storage of choice (SQLite, Postgres, Redis, …). Drop-in replacement for the in-memory default. The protocol is storage-agnostic. - Server-side reference adapter — a thin Bun
WebSocketadapter, parallel toadapters/browser.ts, for quick local dev. - Compression for replay —
replay_request(since=N, until=M, max=K)plus agetSinceRangehelper on the store. - Server-side rate limit / backpressure — cap on
turn_text_deltafrequency andseqgaps. - Heartbeat semantics — currently a no-op placeholder (
'heartbeat'event type), to be specified. - v2 protocol — when we add new event types, plan a
version: 2with a clean migration path.
See the source (src/) and tests (tests/protocol.test.ts) for the current contract.
License
MIT — see LICENSE.
