@durable-effect/sockets
v0.0.1
Published
Effect-native WebSocket rooms on top of Cloudflare Durable Objects.
Readme
@durable-effect/sockets
Effect-native WebSocket rooms on top of Cloudflare Durable Objects.
Declare a room with schemas, write handlers, get type-safe broadcast,
per-connection state, and hibernation-ready Cloudflare deployment. Sibling
of @durable-effect/task: tasks are for fire-and-forget
events; sockets are for live multi-party stateful connections.
┌─ WS client ─┐ ┌─ worker code ─┐
│ ws.send(…) │ │ rooms.send(…) │
└──────┬──────┘ └───────┬───────┘
│ │
▼ ▼
┌────────────────────────────────────────────────┐
│ Durable Object instance (one Room) │
│ │
│ onConnect │ onEvent → state → onBroadcast │
│ onClose │ │
│ onAlarm │ │
│ │
│ state (room) ── connections[] ── attachments │
└───┬────────────────────────────────────────┬───┘
│ │
└─ ctx.broadcast / conn.send ────────────┘
▼
connected WS clientsInstall
pnpm add @durable-effect/sockets @durable-effect/task effect@^4.0.0-beta.50@durable-effect/task is a peer dependency — the two packages share
Storage, Alarm, and CloudflareEnv services so they compose in a single
worker.
Exports:
@durable-effect/sockets— declarations, registry, in-memory runtime@durable-effect/sockets/cloudflare— the Durable Object adapter@durable-effect/sockets/testing— re-exports of test utilities
Core concepts
| | |
|---|---|
| Room | A named tuple of schemas (state, connection, event, broadcast) + handlers. One room type can have many instances, each keyed by an id. |
| Instance | One (roomType, id) pair. In Cloudflare, one instance = one DO. Holds room state, live connections, alarms. |
| Connection | A live WebSocket attached to an instance. Has its own id, tags, and schema-typed per-connection state (the attachment). |
| Registry | The set of rooms you've declared + the handler implementations. Produces a runtime. |
| Runtime | What you call to upgrade / send / broadcast / get state. Cloudflare (makeSocketGroupDO) or in-memory (makeInMemorySocketRuntime). |
The five schemas per room:
state— persistent shared state for the instance (likeTask'sstate)connection— per-connection attachment, persisted in the WS attachmentevent— inbound messages AND direct-API events — one schema, two pathsbroadcast— outbound messages the server pushes to clients
Quick start
1. Declare the room
// src/rooms/chat/registry.ts
import { Schema } from "effect"
import { SocketRegistry, WebSocketRoom } from "@durable-effect/sockets"
export const Chat = WebSocketRoom.make("chat", {
state: Schema.Struct({
createdAt: Schema.Number,
messageCount: Schema.Number,
}),
connection: Schema.Struct({
userId: Schema.String,
joinedAt: Schema.Number,
}),
event: Schema.Union([
Schema.Struct({ _tag: Schema.Literal("Say"), text: Schema.String }),
Schema.Struct({ _tag: Schema.Literal("Typing") }),
]),
broadcast: Schema.Union([
Schema.Struct({
_tag: Schema.Literal("Message"),
text: Schema.String,
userId: Schema.String,
ts: Schema.Number,
}),
Schema.Struct({ _tag: Schema.Literal("UserJoined"), userId: Schema.String }),
Schema.Struct({ _tag: Schema.Literal("UserLeft"), userId: Schema.String }),
Schema.Struct({ _tag: Schema.Literal("Typing"), userId: Schema.String }),
]),
})
export const registry = SocketRegistry.make(Chat)2. Implement the handlers
// src/rooms/chat/handlers.ts
import { Effect } from "effect"
import { registry } from "./registry.js"
const c = registry.for("chat")
const onConnect = c.onConnect((ctx) =>
Effect.gen(function* () {
// Initialize room state on first connection
if ((yield* ctx.recall()) === null) {
yield* ctx.save({ createdAt: Date.now(), messageCount: 0 })
}
const conn = ctx.connection
if (!conn) return
yield* ctx.broadcast(
{ _tag: "UserJoined", userId: conn.state.userId },
{ except: conn },
)
}),
)
const onEvent = c.onEvent((ctx, event) =>
Effect.gen(function* () {
switch (event._tag) {
case "Say":
yield* ctx.update((s) => ({ ...s, messageCount: s.messageCount + 1 }))
yield* ctx.broadcast({
_tag: "Message",
text: event.text,
userId: ctx.connection?.state.userId ?? "system",
ts: Date.now(),
})
return
case "Typing":
if (ctx.connection)
yield* ctx.broadcast(
{ _tag: "Typing", userId: ctx.connection.state.userId },
{ except: ctx.connection },
)
return
}
}),
)
const onClose = c.onClose((ctx) =>
Effect.gen(function* () {
const uid = ctx.connection?.state.userId
if (uid)
yield* ctx.broadcast(
{ _tag: "UserLeft", userId: uid },
{ except: ctx.connection ?? [] },
)
}),
)
export const chatHandler = registry.handler("chat", { onConnect, onEvent, onClose })3. Wire the runtime for Cloudflare
// src/rooms/index.ts
import { makeSocketGroupDO } from "@durable-effect/sockets/cloudflare"
import { registry } from "./chat/registry.js"
import { chatHandler } from "./chat/handlers.js"
const config = registry.build({ chat: chatHandler })
export const socketGroup = makeSocketGroupDO(config)
export const ChatDO = socketGroup.DO // export for wrangler4. Add the DO binding
// wrangler.jsonc
{
"durable_objects": {
"bindings": [{ "name": "CHAT_DO", "class_name": "ChatDO" }]
},
"migrations": [
{ "tag": "v1", "new_sqlite_classes": ["ChatDO"] }
]
}5. Serve upgrades and REST calls
Plain fetch entry:
// src/index.ts
export { ChatDO } from "./rooms/index.js"
import { Effect } from "effect"
import { socketGroup } from "./rooms"
export default {
async fetch(request: Request, env: Env) {
const url = new URL(request.url)
const rooms = socketGroup.client(env.CHAT_DO)
// WS upgrade: /ws/chat/:id?userId=…
const wsMatch = url.pathname.match(/^\/ws\/chat\/(.+)$/)
if (wsMatch && request.headers.get("upgrade") === "websocket") {
const [_, roomId] = wsMatch
const userId = url.searchParams.get("userId") ?? "anon"
return Effect.runPromise(
rooms.room("chat").upgrade(roomId, request, {
connection: {
id: userId,
tags: [`user:${userId}`],
initialState: { userId, joinedAt: Date.now() },
},
}),
)
}
// Server-initiated message: POST /api/chat/:id/say { text }
const sayMatch = url.pathname.match(/^\/api\/chat\/([^/]+)\/say$/)
if (sayMatch && request.method === "POST") {
const [_, roomId] = sayMatch
const { text } = await request.json<{ text: string }>()
await Effect.runPromise(
rooms.room("chat").send(roomId, { _tag: "Say", text }),
)
return Response.json({ status: "ok" })
}
return new Response("not found", { status: 404 })
},
} satisfies ExportedHandler<Env>6. Connect a client
const ws = new WebSocket("wss://your-worker/ws/chat/r1?userId=alice")
ws.addEventListener("message", (ev) => {
const msg = JSON.parse(ev.data) // typed as ChatBroadcast on your side
console.log("←", msg)
})
ws.addEventListener("open", () => {
ws.send(JSON.stringify({ _tag: "Say", text: "hi" }))
})Handler surface (the RoomCtx)
Inside handlers, ctx gives you:
ctx.id // the instance id (e.g. "r1")
ctx.name // the room type name (e.g. "chat")
ctx.connection // Connection<C, B> | null — null for direct-API calls
// Room state
yield* ctx.recall() // Effect<S | null, RoomError>
yield* ctx.save(s) // Effect<void, RoomError>
yield* ctx.update((s) => …) // Effect<void, RoomError> — no-op if state is null
// Connections
ctx.connections.all() // Connection<C, B>[]
ctx.connections.byTag("admin") // Connection<C, B>[]
ctx.connections.byId("alice") // Connection<C, B> | null
ctx.connections.filter(pred) // Connection<C, B>[]
ctx.connections.count() // number
// Broadcast
yield* ctx.broadcast(msg) // everyone
yield* ctx.broadcast(msg, { except: conn }) // exclude sender
yield* ctx.broadcast(msg, { tag: "admin" }) // filter by tag
yield* ctx.broadcast(msg, { to: [a, b] }) // multicast
// Unicast
yield* conn.send(msg) // to one
yield* conn.setState(newState) // update attachment (schema-validated,
// 2 KB cap enforced, typed error if over)
yield* conn.close(1000, "bye")
// Scheduling (persisted across hibernation)
yield* ctx.scheduleIn("30 seconds")
yield* ctx.scheduleAt(new Date(…))
yield* ctx.cancelSchedule()
yield* ctx.nextAlarm() // number | null
// Sibling dispatch to another registered room
yield* ctx.room("notifications").send(id, event)
yield* ctx.room("notifications").getState(id)
yield* ctx.room("notifications").broadcast(id, msg)
// Lifecycle
yield* ctx.purge() // clears storage + alarm, exits handler
ctx.systemFailure // SystemFailure | null on alarm retriesClient (worker-side)
const rooms = socketGroup.client(env.CHAT_DO)
rooms.room("chat").send(id, event) // Effect<void, RoomError>
rooms.room("chat").getState(id) // Effect<State | null, RoomError>
rooms.room("chat").getConnections(id, tag?) // Effect<ConnectionInfo[], RoomError>
rooms.room("chat").broadcast(id, broadcastMsg) // Effect<void, RoomError>
rooms.room("chat").fireAlarm(id) // Effect<void, RoomError>
rooms.room("chat").upgrade(id, request, opts) // Effect<Response, UpgradeError | …>upgrade takes a UpgradeOpts:
{
connection: {
id?: string // caller-assigned; defaults to random
tags?: readonly string[]
meta?: Record<string, unknown> // one-shot, not persisted beyond onConnect
initialState: C // the connection schema — required
}
onDuplicate?: "allow" | "replace" | "reject" // default: allow
}As an Effect service
If you prefer providing the client via Layer rather than calling .client(...) at the call site:
import { SocketClientService } from "@durable-effect/sockets/cloudflare"
const RoomsLayer = socketGroup.layer(env.CHAT_DO)
// then inside handlers:
const rooms = yield* SocketClientService
yield* rooms.room("chat").send(id, event)Integrating with effect/unstable/httpapi
Rooms slot into an HttpApi group exactly like any other endpoint group. The only wrinkle is the WebSocket upgrade — it returns a raw Response (status 101 + webSocket field), which doesn't fit a schema-typed success. Two tools handle it:
HttpApiSchema.Empty(101)as the endpoint's success schemahandlers.handleRaw(...)to skip payload decoding, +HttpServerResponse.raw(response, { status: 101 })so the builder emits the Response as-is without re-encoding
Example (reproduced from the examples/effect-worker-api/ example):
// api.ts
export const ChatGroup = HttpApiGroup.make("chat")
.add(
HttpApiEndpoint.get("upgrade", "/ws/:roomId", {
params: S.Struct({ roomId: S.String }),
success: HttpApiSchema.Empty(101),
}),
)
.add(
HttpApiEndpoint.post("say", "/:roomId/say", {
params: S.Struct({ roomId: S.String }),
payload: S.Struct({ text: S.String }),
success: S.Struct({ status: S.Literal("ok") }),
}),
)
.add(
HttpApiEndpoint.get("state", "/:roomId/state", {
params: S.Struct({ roomId: S.String }),
success: S.NullOr(ChatStateResponseSchema),
}),
)
.prefix("/chat")
// handlers/chat.ts
export const ChatGroupLive = HttpApiBuilder.group(WorkerApi, "chat", (h) =>
h.handleRaw("upgrade", ({ request, params }) =>
Effect.gen(function* () {
const env = yield* currentEnv
const rawRequest = request.source as Request // structural narrow
const userId = new URL(rawRequest.url).searchParams.get("userId") ?? "anon"
const rooms = socketGroup.client(env!.CHAT_DO)
const response = yield* rooms.room("chat").upgrade(params.roomId, rawRequest, {
connection: {
id: userId,
initialState: { userId, joinedAt: Date.now() },
},
}).pipe(Effect.orDie)
return HttpServerResponse.raw(response, { status: 101 })
}),
)
.handle("say", ({ params, payload }) =>
Effect.gen(function* () {
const env = yield* currentEnv
const rooms = socketGroup.client(env!.CHAT_DO)
yield* rooms.room("chat")
.send(params.roomId, { _tag: "Say", text: payload.text })
.pipe(Effect.orDie)
return { status: "ok" as const }
}),
)
.handle("state", ({ params }) =>
Effect.gen(function* () {
const env = yield* currentEnv
const rooms = socketGroup.client(env!.CHAT_DO)
return yield* rooms.room("chat").getState(params.roomId).pipe(Effect.orDie)
}),
),
)See examples/effect-worker-api/src/handlers/chat.ts for the full wiring,
including getConnections and the registration into HttpGroupsLive.
Providing services to handlers
Two flavours — same as @durable-effect/task.
Pure services
import { Context, Effect, Layer } from "effect"
import { withServices } from "@durable-effect/sockets"
class Analytics extends Context.Service<Analytics, {
readonly track: (event: string, payload: unknown) => Effect.Effect<void>
}>()("@app/Analytics") {}
const AnalyticsLive = Layer.succeed(Analytics, {
track: (event, payload) => Effect.log(`[analytics] ${event}`, payload),
})
registry.handler("chat",
withServices(
{ onConnect, onEvent, onClose },
AnalyticsLive,
),
)withServices wraps every handler's Effect.provide(layer) and reduces the
R channel of each handler to never.
Cloudflare-env-dependent services
When a service's construction depends on env (which isn't available at
module scope), provide it lazily via CloudflareEnv from @durable-effect/task:
import { CloudflareEnv } from "@durable-effect/sockets/cloudflare"
const ConfigLive = Layer.effect(ConfigService, Effect.gen(function* () {
const env = yield* CloudflareEnv
return makeConfig({ apiKey: env.OPENAI_KEY })
}))The DO runtime provides CloudflareEnv automatically from the bindings it
receives in its constructor.
Testing
Use the in-memory runtime. Same handler code, no Cloudflare harness needed:
import { Effect } from "effect"
import { makeInMemorySocketRuntime } from "@durable-effect/sockets"
const rt = makeInMemorySocketRuntime(config)
// Drive a connection
const alice = await Effect.runPromise(
rt.testingConnect("chat", "room-1", {
connection: {
id: "alice",
initialState: { userId: "alice", joinedAt: Date.now() },
},
}),
)
// TestConnection gives you:
alice.id // "alice"
alice.tags // readonly string[]
alice.received // readonly unknown[] — server → client messages
alice.isOpen() // boolean
alice.close(1000, "bye") // fires onClose
// Simulate a typed inbound event
await Effect.runPromise(alice.receive({ _tag: "Say", text: "hi" }))
// Or a raw untyped event (for testing validation)
await Effect.runPromise(alice.receiveRaw({ _tag: "BadType" }))
// Drive a direct-API send (no connection)
await Effect.runPromise(rt.room("chat").send("room-1", { _tag: "Say", text: "from server" }))
// Get state
await Effect.runPromise(rt.room("chat").getState("room-1"))
// Fire a scheduled alarm manually
await Effect.runPromise(rt.room("chat").fireAlarm("room-1"))Lifecycle: hibernation and onConnect
On Cloudflare, a DO can hibernate while sockets stay connected. The adapter preserves correctness:
onConnectfires exactly once per socket, during the HTTP upgrade — never again on wake.- Constructor rehydrates the in-memory connection map from
ctx.getWebSockets()+ attachments before any handler runs. onEventfor a WS-inbound message seesctx.connectiontyped correctly whether the DO just woke or has been in memory for hours.
Error taxonomy
All errors are Schema.TaggedErrorClasses:
| | Where | |
|---|---|---|
| RoomError | Most ctx.* ops | storage/alarm/schema |
| UpgradeError | client.upgrade | handshake or onConnect failed |
| SendError | conn.send, ctx.broadcast | socket not OPEN, send threw |
| BroadcastError | ctx.broadcast | one or more sends failed; includes failures array |
| AttachmentTooLargeError | conn.setState | encoded attachment exceeds 2 KB cap |
| RoomValidationError | onEvent entry | inbound frame failed schema decode |
| RoomExecutionError | anywhere in handlers | handler threw or returned an unmapped error |
| RoomNotFoundError | client.room(name) | name not in registry |
| PurgeSignal | ctx.purge() | control flow; triggers cleanup |
| SystemFailure | ctx.systemFailure | non-null on alarm retries post-crash |
Effect.catchTag-friendly throughout.
Error handling on the handler side
Each channel can pair with an optional onError that receives the
handler's error:
registry.handler("chat", {
onEvent: {
handler: (ctx, event) => Effect.gen(function* () { … }),
onError: (ctx, err) => Effect.logError(err),
},
onConnect: {
handler: (ctx) => Effect.gen(function* () { … }),
onError: (ctx, err) =>
Effect.gen(function* () {
yield* ctx.connection?.close(1011, "auth failed") ?? Effect.void
}),
},
// for inbound schema-decode failures specifically:
onDecodeError: (ctx, raw, cause) =>
Effect.logError(`bad frame`, { raw, cause }),
})Connection duplicates
upgrade({ connection: { id: "alice" } }) twice in a row:
onDuplicate: "allow"(default) — both sockets remain, share the idonDuplicate: "replace"— close the old socket with code 1012, accept the newonDuplicate: "reject"— return 409 Conflict, don't upgrade
What this package is not
- Not a pub/sub bus. Each room is a stateful actor with a list of currently connected clients. Broadcast fans out per-call; there's no persistent topic retention.
- Not request/response. Events are one-way. If you need typed
request/response, pair this with
effect/unstable/rpc(see theexamples/effect-worker-rpc/example). - Not transport-polyglot. Cloudflare DO WebSocket today. An in-memory runtime exists for tests, but there's no Node.js or SSE adapter.
Full end-to-end example
See examples/effect-worker-api/:
src/rooms/chat/— room declaration + handlerssrc/rooms/index.ts— runtime wiring + DO exportsrc/handlers/chat.ts— HttpApi group implementation with WS upgradesrc/api.ts—ChatGroupdeclarationscripts/e2e-chat-effect.mjs— pure-Effect client test (3 clients, 24 assertions)scripts/e2e-chat.mjs— plain-wsclient test (5-client fan-out, 42 assertions)
Run:
pnpm --filter effect-worker-api wrangler dev --port 8787 --local
# in another shell:
node examples/effect-worker-api/scripts/e2e-chat-effect.mjsRelated packages
@durable-effect/task— the sibling package for fire-and-forget events + scheduled work. SharesStorage,Alarm,CloudflareEnv. Use sockets for live connections, tasks for durable event-driven workflows.
