npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@codeforbreakfast/eventsourcing-transport-websocket

v0.5.7

Published

WebSocket transport implementation for event sourcing - Protocol-agnostic message transport over WebSocket connections

Downloads

42

Readme

@codeforbreakfast/eventsourcing-transport-websocket

WebSocket transport implementation for event sourcing - Real-time bidirectional message transport over WebSocket connections.

Overview

This package provides WebSocket client and server transport implementations that follow the transport contracts defined in @codeforbreakfast/eventsourcing-transport. It enables real-time, bidirectional communication between clients and servers using WebSockets.

Key Features

  • Real-time Communication: WebSocket-based bidirectional messaging
  • Client & Server Support: Both WebSocket client connector and server acceptor
  • Effect-Native: Built with Effect for composability and type safety
  • Contract Compliant: Implements transport contract interfaces
  • Connection Management: Handles connection lifecycle with proper cleanup
  • Message Filtering: Subscribe to specific message types
  • Broadcasting: Server can broadcast messages to all connected clients
  • Scope-based Resources: Automatic cleanup with Effect Scope

Installation

bun add @codeforbreakfast/eventsourcing-transport-websocket

Client Usage

Basic Connection

import { Effect, Stream, pipe } from 'effect';
import { WebSocketConnector } from '@codeforbreakfast/eventsourcing-transport-websocket';
import { makeMessageId } from '@codeforbreakfast/eventsourcing-transport';

const program = Effect.scoped(
  Effect.gen(function* () {
    // Connect to WebSocket server
    const transport = yield* WebSocketConnector.connect('ws://localhost:8080');

    // Monitor connection state
    yield* pipe(
      transport.connectionState,
      Stream.runForEach((state) => Effect.sync(() => console.log('Connection state:', state))),
      Effect.fork
    );

    // Subscribe to messages
    const subscription = yield* transport.subscribe();

    // Handle incoming messages
    yield* pipe(
      subscription,
      Stream.runForEach((message) => Effect.sync(() => console.log('Received:', message))),
      Effect.fork
    );

    // Publish a message
    yield* transport.publish({
      id: makeMessageId('msg-1'),
      type: 'chat.message',
      payload: JSON.stringify({ text: 'Hello WebSocket!' }),
      metadata: {},
    });
  })
);

// Run with automatic cleanup
await Effect.runPromise(program);

Message Filtering

import { Effect, Stream, pipe } from 'effect';
import { WebSocketConnector } from '@codeforbreakfast/eventsourcing-transport-websocket';

const program = Effect.scoped(
  Effect.gen(function* () {
    const transport = yield* WebSocketConnector.connect('ws://localhost:8080');

    // Subscribe only to specific message types
    const chatMessages = yield* transport.subscribe((msg) => msg.type.startsWith('chat.'));

    // Handle filtered messages
    yield* pipe(
      chatMessages,
      Stream.runForEach((message) => Effect.sync(() => console.log('Chat message:', message)))
    );
  })
);

Server Usage

Basic Server

import { Effect, Stream, pipe } from 'effect';
import { WebSocketAcceptor } from '@codeforbreakfast/eventsourcing-transport-websocket';
import { makeMessageId } from '@codeforbreakfast/eventsourcing-transport';

const program = Effect.scoped(
  Effect.gen(function* () {
    // Create WebSocket server
    const acceptor = yield* WebSocketAcceptor.make({
      port: 8080,
      host: 'localhost',
    });

    // Start accepting connections
    const transport = yield* acceptor.start();

    // Handle new client connections
    yield* pipe(
      transport.connections,
      Stream.runForEach((connection) =>
        Effect.gen(function* () {
          console.log('Client connected:', connection.clientId);

          // Subscribe to messages from this client
          const messages = yield* connection.transport.subscribe();

          // Echo messages back to client
          yield* pipe(
            messages,
            Stream.runForEach((message) =>
              connection.transport.publish({
                ...message,
                id: makeMessageId(`echo-${message.id}`),
                type: 'echo.response',
              })
            ),
            Effect.fork
          );
        })
      )
    );
  })
);

// Run server
await Effect.runPromise(program);

Broadcasting to All Clients

import { Effect } from 'effect';
import { WebSocketAcceptor } from '@codeforbreakfast/eventsourcing-transport-websocket';
import { makeMessageId } from '@codeforbreakfast/eventsourcing-transport';

const program = Effect.scoped(
  Effect.gen(function* () {
    const acceptor = yield* WebSocketAcceptor.make({
      port: 8080,
      host: 'localhost',
    });

    const transport = yield* acceptor.start();

    // Broadcast to all connected clients
    yield* transport.broadcast({
      id: makeMessageId('broadcast-1'),
      type: 'server.announcement',
      payload: JSON.stringify({ message: 'Server is shutting down in 5 minutes' }),
      metadata: {},
    });
  })
);

Connection States

The transport tracks connection states:

  • connecting: Initial connection attempt
  • connected: Successfully connected
  • disconnected: Connection closed
  • error: Connection error occurred
import { Effect, Stream, pipe } from 'effect';
import { WebSocketConnector } from '@codeforbreakfast/eventsourcing-transport-websocket';

const program = Effect.scoped(
  Effect.gen(function* () {
    const transport = yield* WebSocketConnector.connect('ws://localhost:8080');

    // Monitor connection state changes
    yield* pipe(
      transport.connectionState,
      Stream.runForEach((state) =>
        Effect.sync(() => {
          switch (state) {
            case 'connected':
              console.log('Connected to server');
              break;
            case 'disconnected':
              console.log('Disconnected from server');
              break;
            case 'error':
              console.error('Connection error');
              break;
          }
        })
      )
    );
  })
);

Error Handling

The transport uses typed errors from the contracts package:

import { Effect, pipe } from 'effect';
import { WebSocketConnector } from '@codeforbreakfast/eventsourcing-transport-websocket';

const program = Effect.scoped(
  pipe(
    WebSocketConnector.connect('ws://invalid-server:9999'),
    Effect.catchTag('ConnectionError', (error) =>
      Effect.sync(() => console.error('Failed to connect:', error.message))
    )
  )
);

Testing

The package includes comprehensive tests:

Unit Tests

Tests edge cases using mock WebSockets:

  • Connection errors
  • Malformed messages
  • Rapid state changes

Integration Tests

Tests real WebSocket communication:

  • Client-server message exchange
  • Broadcasting
  • Multiple client connections
  • Connection lifecycle
  • Resource cleanup
# Run all tests
bun test packages/eventsourcing-transport-websocket

# Run unit tests only
bun test packages/eventsourcing-transport-websocket/src/lib/websocket-transport.test.ts

# Run integration tests only
bun test packages/eventsourcing-transport-websocket/src/tests/integration/

Architecture

This package implements the transport layer abstraction:

┌─────────────────────────────────────┐
│     Application Layer               │  ← Your application
├─────────────────────────────────────┤
│     Protocol Layer                  │  ← Domain protocols (optional)
├─────────────────────────────────────┤
│     Transport Layer                 │  ← This package (WebSocket)
└─────────────────────────────────────┘

The transport is protocol-agnostic and moves TransportMessage objects between clients and servers without understanding their content.

API Reference

Client API

WebSocketConnector

import { Effect, Scope } from 'effect';
import { Client, ConnectionError } from '@codeforbreakfast/eventsourcing-transport';

declare const WebSocketConnector: {
  connect(url: string): Effect.Effect<Client.Transport, ConnectionError, Scope.Scope>;
};

Client.Transport

import { Effect, Stream } from 'effect';
import { TransportError, ConnectionState } from '@codeforbreakfast/eventsourcing-transport';

declare const transport: {
  connectionState: Stream.Stream<ConnectionState, never, never>;

  publish(message: unknown): Effect.Effect<void, TransportError, never>;

  subscribe(
    filter?: (message: unknown) => boolean
  ): Effect.Effect<Stream.Stream<unknown, never, never>, TransportError, never>;
};

Server API

WebSocketAcceptor

import { Effect, Context } from 'effect';
import { Server } from '@codeforbreakfast/eventsourcing-transport';

declare const WebSocketAcceptor: {
  make(config: {
    port: number;
    host: string;
  }): Effect.Effect<Context.Tag.Service<typeof Server.Acceptor>, never, never>;
};

Server.Transport

import { Effect, Stream } from 'effect';
import { Server, TransportError } from '@codeforbreakfast/eventsourcing-transport';

declare const transport: {
  connections: Stream.Stream<Server.ClientConnection, never, never>;

  broadcast(message: unknown): Effect.Effect<void, TransportError, never>;
};

Implementation Notes

  • Uses Bun's native WebSocket implementation for performance
  • Handles connection lifecycle with Effect's Scope for automatic cleanup
  • Message parsing errors are silently dropped to prevent stream corruption
  • Each client subscription gets its own queue for message isolation
  • Server maintains a map of connected clients with their state

Contributing

This package is part of the @codeforbreakfast/eventsourcing monorepo. See the main repository for contribution guidelines.

License

MIT