npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@durable-effect/sockets

v0.0.1

Published

Effect-native WebSocket rooms on top of Cloudflare Durable Objects.

Readme

@durable-effect/sockets

Effect-native WebSocket rooms on top of Cloudflare Durable Objects.

Declare a room with schemas, write handlers, get type-safe broadcast, per-connection state, and hibernation-ready Cloudflare deployment. Sibling of @durable-effect/task: tasks are for fire-and-forget events; sockets are for live multi-party stateful connections.

┌─ WS client ─┐                ┌─ worker code ─┐
│ ws.send(…)  │                │ rooms.send(…) │
└──────┬──────┘                └───────┬───────┘
       │                               │
       ▼                               ▼
┌────────────────────────────────────────────────┐
│  Durable Object instance (one Room)             │
│                                                 │
│  onConnect │ onEvent → state → onBroadcast      │
│  onClose   │                                    │
│  onAlarm   │                                    │
│                                                 │
│  state (room) ── connections[] ── attachments   │
└───┬────────────────────────────────────────┬───┘
    │                                        │
    └─ ctx.broadcast / conn.send ────────────┘
                      ▼
              connected WS clients

Install

pnpm add @durable-effect/sockets @durable-effect/task effect@^4.0.0-beta.50

@durable-effect/task is a peer dependency — the two packages share Storage, Alarm, and CloudflareEnv services so they compose in a single worker.

Exports:

  • @durable-effect/sockets — declarations, registry, in-memory runtime
  • @durable-effect/sockets/cloudflare — the Durable Object adapter
  • @durable-effect/sockets/testing — re-exports of test utilities

Core concepts

| | | |---|---| | Room | A named tuple of schemas (state, connection, event, broadcast) + handlers. One room type can have many instances, each keyed by an id. | | Instance | One (roomType, id) pair. In Cloudflare, one instance = one DO. Holds room state, live connections, alarms. | | Connection | A live WebSocket attached to an instance. Has its own id, tags, and schema-typed per-connection state (the attachment). | | Registry | The set of rooms you've declared + the handler implementations. Produces a runtime. | | Runtime | What you call to upgrade / send / broadcast / get state. Cloudflare (makeSocketGroupDO) or in-memory (makeInMemorySocketRuntime). |

The five schemas per room:

  • state — persistent shared state for the instance (like Task's state)
  • connection — per-connection attachment, persisted in the WS attachment
  • event — inbound messages AND direct-API events — one schema, two paths
  • broadcast — outbound messages the server pushes to clients

Quick start

1. Declare the room

// src/rooms/chat/registry.ts
import { Schema } from "effect"
import { SocketRegistry, WebSocketRoom } from "@durable-effect/sockets"

export const Chat = WebSocketRoom.make("chat", {
  state: Schema.Struct({
    createdAt: Schema.Number,
    messageCount: Schema.Number,
  }),
  connection: Schema.Struct({
    userId: Schema.String,
    joinedAt: Schema.Number,
  }),
  event: Schema.Union([
    Schema.Struct({ _tag: Schema.Literal("Say"), text: Schema.String }),
    Schema.Struct({ _tag: Schema.Literal("Typing") }),
  ]),
  broadcast: Schema.Union([
    Schema.Struct({
      _tag: Schema.Literal("Message"),
      text: Schema.String,
      userId: Schema.String,
      ts: Schema.Number,
    }),
    Schema.Struct({ _tag: Schema.Literal("UserJoined"), userId: Schema.String }),
    Schema.Struct({ _tag: Schema.Literal("UserLeft"), userId: Schema.String }),
    Schema.Struct({ _tag: Schema.Literal("Typing"), userId: Schema.String }),
  ]),
})

export const registry = SocketRegistry.make(Chat)

2. Implement the handlers

// src/rooms/chat/handlers.ts
import { Effect } from "effect"
import { registry } from "./registry.js"

const c = registry.for("chat")

const onConnect = c.onConnect((ctx) =>
  Effect.gen(function* () {
    // Initialize room state on first connection
    if ((yield* ctx.recall()) === null) {
      yield* ctx.save({ createdAt: Date.now(), messageCount: 0 })
    }
    const conn = ctx.connection
    if (!conn) return
    yield* ctx.broadcast(
      { _tag: "UserJoined", userId: conn.state.userId },
      { except: conn },
    )
  }),
)

const onEvent = c.onEvent((ctx, event) =>
  Effect.gen(function* () {
    switch (event._tag) {
      case "Say":
        yield* ctx.update((s) => ({ ...s, messageCount: s.messageCount + 1 }))
        yield* ctx.broadcast({
          _tag: "Message",
          text: event.text,
          userId: ctx.connection?.state.userId ?? "system",
          ts: Date.now(),
        })
        return
      case "Typing":
        if (ctx.connection)
          yield* ctx.broadcast(
            { _tag: "Typing", userId: ctx.connection.state.userId },
            { except: ctx.connection },
          )
        return
    }
  }),
)

const onClose = c.onClose((ctx) =>
  Effect.gen(function* () {
    const uid = ctx.connection?.state.userId
    if (uid)
      yield* ctx.broadcast(
        { _tag: "UserLeft", userId: uid },
        { except: ctx.connection ?? [] },
      )
  }),
)

export const chatHandler = registry.handler("chat", { onConnect, onEvent, onClose })

3. Wire the runtime for Cloudflare

// src/rooms/index.ts
import { makeSocketGroupDO } from "@durable-effect/sockets/cloudflare"
import { registry } from "./chat/registry.js"
import { chatHandler } from "./chat/handlers.js"

const config = registry.build({ chat: chatHandler })
export const socketGroup = makeSocketGroupDO(config)

export const ChatDO = socketGroup.DO  // export for wrangler

4. Add the DO binding

// wrangler.jsonc
{
  "durable_objects": {
    "bindings": [{ "name": "CHAT_DO", "class_name": "ChatDO" }]
  },
  "migrations": [
    { "tag": "v1", "new_sqlite_classes": ["ChatDO"] }
  ]
}

5. Serve upgrades and REST calls

Plain fetch entry:

// src/index.ts
export { ChatDO } from "./rooms/index.js"
import { Effect } from "effect"
import { socketGroup } from "./rooms"

export default {
  async fetch(request: Request, env: Env) {
    const url = new URL(request.url)
    const rooms = socketGroup.client(env.CHAT_DO)

    // WS upgrade: /ws/chat/:id?userId=…
    const wsMatch = url.pathname.match(/^\/ws\/chat\/(.+)$/)
    if (wsMatch && request.headers.get("upgrade") === "websocket") {
      const [_, roomId] = wsMatch
      const userId = url.searchParams.get("userId") ?? "anon"
      return Effect.runPromise(
        rooms.room("chat").upgrade(roomId, request, {
          connection: {
            id: userId,
            tags: [`user:${userId}`],
            initialState: { userId, joinedAt: Date.now() },
          },
        }),
      )
    }

    // Server-initiated message: POST /api/chat/:id/say { text }
    const sayMatch = url.pathname.match(/^\/api\/chat\/([^/]+)\/say$/)
    if (sayMatch && request.method === "POST") {
      const [_, roomId] = sayMatch
      const { text } = await request.json<{ text: string }>()
      await Effect.runPromise(
        rooms.room("chat").send(roomId, { _tag: "Say", text }),
      )
      return Response.json({ status: "ok" })
    }

    return new Response("not found", { status: 404 })
  },
} satisfies ExportedHandler<Env>

6. Connect a client

const ws = new WebSocket("wss://your-worker/ws/chat/r1?userId=alice")
ws.addEventListener("message", (ev) => {
  const msg = JSON.parse(ev.data)       // typed as ChatBroadcast on your side
  console.log("←", msg)
})
ws.addEventListener("open", () => {
  ws.send(JSON.stringify({ _tag: "Say", text: "hi" }))
})

Handler surface (the RoomCtx)

Inside handlers, ctx gives you:

ctx.id                         // the instance id (e.g. "r1")
ctx.name                       // the room type name (e.g. "chat")
ctx.connection                 // Connection<C, B> | null — null for direct-API calls

// Room state
yield* ctx.recall()            // Effect<S | null, RoomError>
yield* ctx.save(s)             // Effect<void,    RoomError>
yield* ctx.update((s) => …)    // Effect<void,    RoomError> — no-op if state is null

// Connections
ctx.connections.all()          // Connection<C, B>[]
ctx.connections.byTag("admin") // Connection<C, B>[]
ctx.connections.byId("alice")  // Connection<C, B> | null
ctx.connections.filter(pred)   // Connection<C, B>[]
ctx.connections.count()        // number

// Broadcast
yield* ctx.broadcast(msg)                          // everyone
yield* ctx.broadcast(msg, { except: conn })        // exclude sender
yield* ctx.broadcast(msg, { tag: "admin" })        // filter by tag
yield* ctx.broadcast(msg, { to: [a, b] })          // multicast

// Unicast
yield* conn.send(msg)                              // to one
yield* conn.setState(newState)                     // update attachment (schema-validated,
                                                   // 2 KB cap enforced, typed error if over)
yield* conn.close(1000, "bye")

// Scheduling (persisted across hibernation)
yield* ctx.scheduleIn("30 seconds")
yield* ctx.scheduleAt(new Date(…))
yield* ctx.cancelSchedule()
yield* ctx.nextAlarm()                             // number | null

// Sibling dispatch to another registered room
yield* ctx.room("notifications").send(id, event)
yield* ctx.room("notifications").getState(id)
yield* ctx.room("notifications").broadcast(id, msg)

// Lifecycle
yield* ctx.purge()                                 // clears storage + alarm, exits handler
ctx.systemFailure                                  // SystemFailure | null on alarm retries

Client (worker-side)

const rooms = socketGroup.client(env.CHAT_DO)

rooms.room("chat").send(id, event)                 // Effect<void, RoomError>
rooms.room("chat").getState(id)                    // Effect<State | null, RoomError>
rooms.room("chat").getConnections(id, tag?)        // Effect<ConnectionInfo[], RoomError>
rooms.room("chat").broadcast(id, broadcastMsg)     // Effect<void, RoomError>
rooms.room("chat").fireAlarm(id)                   // Effect<void, RoomError>
rooms.room("chat").upgrade(id, request, opts)      // Effect<Response, UpgradeError | …>

upgrade takes a UpgradeOpts:

{
  connection: {
    id?: string                        // caller-assigned; defaults to random
    tags?: readonly string[]
    meta?: Record<string, unknown>     // one-shot, not persisted beyond onConnect
    initialState: C                    // the connection schema — required
  }
  onDuplicate?: "allow" | "replace" | "reject"   // default: allow
}

As an Effect service

If you prefer providing the client via Layer rather than calling .client(...) at the call site:

import { SocketClientService } from "@durable-effect/sockets/cloudflare"

const RoomsLayer = socketGroup.layer(env.CHAT_DO)

// then inside handlers:
const rooms = yield* SocketClientService
yield* rooms.room("chat").send(id, event)

Integrating with effect/unstable/httpapi

Rooms slot into an HttpApi group exactly like any other endpoint group. The only wrinkle is the WebSocket upgrade — it returns a raw Response (status 101 + webSocket field), which doesn't fit a schema-typed success. Two tools handle it:

  • HttpApiSchema.Empty(101) as the endpoint's success schema
  • handlers.handleRaw(...) to skip payload decoding, + HttpServerResponse.raw(response, { status: 101 }) so the builder emits the Response as-is without re-encoding

Example (reproduced from the examples/effect-worker-api/ example):

// api.ts
export const ChatGroup = HttpApiGroup.make("chat")
  .add(
    HttpApiEndpoint.get("upgrade", "/ws/:roomId", {
      params: S.Struct({ roomId: S.String }),
      success: HttpApiSchema.Empty(101),
    }),
  )
  .add(
    HttpApiEndpoint.post("say", "/:roomId/say", {
      params: S.Struct({ roomId: S.String }),
      payload: S.Struct({ text: S.String }),
      success: S.Struct({ status: S.Literal("ok") }),
    }),
  )
  .add(
    HttpApiEndpoint.get("state", "/:roomId/state", {
      params: S.Struct({ roomId: S.String }),
      success: S.NullOr(ChatStateResponseSchema),
    }),
  )
  .prefix("/chat")

// handlers/chat.ts
export const ChatGroupLive = HttpApiBuilder.group(WorkerApi, "chat", (h) =>
  h.handleRaw("upgrade", ({ request, params }) =>
    Effect.gen(function* () {
      const env = yield* currentEnv
      const rawRequest = request.source as Request          // structural narrow
      const userId = new URL(rawRequest.url).searchParams.get("userId") ?? "anon"
      const rooms = socketGroup.client(env!.CHAT_DO)
      const response = yield* rooms.room("chat").upgrade(params.roomId, rawRequest, {
        connection: {
          id: userId,
          initialState: { userId, joinedAt: Date.now() },
        },
      }).pipe(Effect.orDie)
      return HttpServerResponse.raw(response, { status: 101 })
    }),
  )
  .handle("say", ({ params, payload }) =>
    Effect.gen(function* () {
      const env = yield* currentEnv
      const rooms = socketGroup.client(env!.CHAT_DO)
      yield* rooms.room("chat")
        .send(params.roomId, { _tag: "Say", text: payload.text })
        .pipe(Effect.orDie)
      return { status: "ok" as const }
    }),
  )
  .handle("state", ({ params }) =>
    Effect.gen(function* () {
      const env = yield* currentEnv
      const rooms = socketGroup.client(env!.CHAT_DO)
      return yield* rooms.room("chat").getState(params.roomId).pipe(Effect.orDie)
    }),
  ),
)

See examples/effect-worker-api/src/handlers/chat.ts for the full wiring, including getConnections and the registration into HttpGroupsLive.

Providing services to handlers

Two flavours — same as @durable-effect/task.

Pure services

import { Context, Effect, Layer } from "effect"
import { withServices } from "@durable-effect/sockets"

class Analytics extends Context.Service<Analytics, {
  readonly track: (event: string, payload: unknown) => Effect.Effect<void>
}>()("@app/Analytics") {}

const AnalyticsLive = Layer.succeed(Analytics, {
  track: (event, payload) => Effect.log(`[analytics] ${event}`, payload),
})

registry.handler("chat",
  withServices(
    { onConnect, onEvent, onClose },
    AnalyticsLive,
  ),
)

withServices wraps every handler's Effect.provide(layer) and reduces the R channel of each handler to never.

Cloudflare-env-dependent services

When a service's construction depends on env (which isn't available at module scope), provide it lazily via CloudflareEnv from @durable-effect/task:

import { CloudflareEnv } from "@durable-effect/sockets/cloudflare"

const ConfigLive = Layer.effect(ConfigService, Effect.gen(function* () {
  const env = yield* CloudflareEnv
  return makeConfig({ apiKey: env.OPENAI_KEY })
}))

The DO runtime provides CloudflareEnv automatically from the bindings it receives in its constructor.

Testing

Use the in-memory runtime. Same handler code, no Cloudflare harness needed:

import { Effect } from "effect"
import { makeInMemorySocketRuntime } from "@durable-effect/sockets"

const rt = makeInMemorySocketRuntime(config)

// Drive a connection
const alice = await Effect.runPromise(
  rt.testingConnect("chat", "room-1", {
    connection: {
      id: "alice",
      initialState: { userId: "alice", joinedAt: Date.now() },
    },
  }),
)

// TestConnection gives you:
alice.id                                 // "alice"
alice.tags                               // readonly string[]
alice.received                           // readonly unknown[] — server → client messages
alice.isOpen()                           // boolean
alice.close(1000, "bye")                 // fires onClose

// Simulate a typed inbound event
await Effect.runPromise(alice.receive({ _tag: "Say", text: "hi" }))

// Or a raw untyped event (for testing validation)
await Effect.runPromise(alice.receiveRaw({ _tag: "BadType" }))

// Drive a direct-API send (no connection)
await Effect.runPromise(rt.room("chat").send("room-1", { _tag: "Say", text: "from server" }))

// Get state
await Effect.runPromise(rt.room("chat").getState("room-1"))

// Fire a scheduled alarm manually
await Effect.runPromise(rt.room("chat").fireAlarm("room-1"))

Lifecycle: hibernation and onConnect

On Cloudflare, a DO can hibernate while sockets stay connected. The adapter preserves correctness:

  • onConnect fires exactly once per socket, during the HTTP upgrade — never again on wake.
  • Constructor rehydrates the in-memory connection map from ctx.getWebSockets() + attachments before any handler runs.
  • onEvent for a WS-inbound message sees ctx.connection typed correctly whether the DO just woke or has been in memory for hours.

Error taxonomy

All errors are Schema.TaggedErrorClasses:

| | Where | | |---|---|---| | RoomError | Most ctx.* ops | storage/alarm/schema | | UpgradeError | client.upgrade | handshake or onConnect failed | | SendError | conn.send, ctx.broadcast | socket not OPEN, send threw | | BroadcastError | ctx.broadcast | one or more sends failed; includes failures array | | AttachmentTooLargeError | conn.setState | encoded attachment exceeds 2 KB cap | | RoomValidationError | onEvent entry | inbound frame failed schema decode | | RoomExecutionError | anywhere in handlers | handler threw or returned an unmapped error | | RoomNotFoundError | client.room(name) | name not in registry | | PurgeSignal | ctx.purge() | control flow; triggers cleanup | | SystemFailure | ctx.systemFailure | non-null on alarm retries post-crash |

Effect.catchTag-friendly throughout.

Error handling on the handler side

Each channel can pair with an optional onError that receives the handler's error:

registry.handler("chat", {
  onEvent: {
    handler: (ctx, event) => Effect.gen(function* () { … }),
    onError: (ctx, err) => Effect.logError(err),
  },
  onConnect: {
    handler: (ctx) => Effect.gen(function* () { … }),
    onError: (ctx, err) =>
      Effect.gen(function* () {
        yield* ctx.connection?.close(1011, "auth failed") ?? Effect.void
      }),
  },
  // for inbound schema-decode failures specifically:
  onDecodeError: (ctx, raw, cause) =>
    Effect.logError(`bad frame`, { raw, cause }),
})

Connection duplicates

upgrade({ connection: { id: "alice" } }) twice in a row:

  • onDuplicate: "allow" (default) — both sockets remain, share the id
  • onDuplicate: "replace" — close the old socket with code 1012, accept the new
  • onDuplicate: "reject" — return 409 Conflict, don't upgrade

What this package is not

  • Not a pub/sub bus. Each room is a stateful actor with a list of currently connected clients. Broadcast fans out per-call; there's no persistent topic retention.
  • Not request/response. Events are one-way. If you need typed request/response, pair this with effect/unstable/rpc (see the examples/effect-worker-rpc/ example).
  • Not transport-polyglot. Cloudflare DO WebSocket today. An in-memory runtime exists for tests, but there's no Node.js or SSE adapter.

Full end-to-end example

See examples/effect-worker-api/:

  • src/rooms/chat/ — room declaration + handlers
  • src/rooms/index.ts — runtime wiring + DO export
  • src/handlers/chat.ts — HttpApi group implementation with WS upgrade
  • src/api.tsChatGroup declaration
  • scripts/e2e-chat-effect.mjs — pure-Effect client test (3 clients, 24 assertions)
  • scripts/e2e-chat.mjs — plain-ws client test (5-client fan-out, 42 assertions)

Run:

pnpm --filter effect-worker-api wrangler dev --port 8787 --local
# in another shell:
node examples/effect-worker-api/scripts/e2e-chat-effect.mjs

Related packages

  • @durable-effect/task — the sibling package for fire-and-forget events + scheduled work. Shares Storage, Alarm, CloudflareEnv. Use sockets for live connections, tasks for durable event-driven workflows.