npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@codeforbreakfast/eventsourcing-transport

v0.3.8

Published

Pure transport layer abstractions and contracts for event sourcing - Define transport interfaces that any implementation (WebSocket, HTTP, SSE) can implement

Readme

@codeforbreakfast/eventsourcing-transport

Pure transport layer abstractions and contracts for event sourcing. Define transport interfaces that any implementation (WebSocket, HTTP, SSE) can implement.

Overview

This package provides the core abstractions for building transport layers in event sourcing systems. It contains only contracts and testing utilities - no implementations. This ensures clean separation between transport protocols and the business logic that uses them.

Key Features

  • Protocol Agnostic: Abstractions work with any transport (WebSocket, HTTP, SSE, etc.)
  • Type Safe: Full TypeScript support with generic message types
  • Effect-Native: Built on Effect for functional programming patterns
  • Contract Testing: Complete test suite to verify transport implementations
  • Connection Management: Standardized connection lifecycle management
  • Stream Support: First-class support for message streaming
  • Error Handling: Comprehensive error types for transport failures

Core Interfaces

Client.Transport

The main interface representing a fully connected transport that can send and receive messages:

import { Effect, Stream, Scope } from 'effect';
import type {
  TransportMessage,
  ConnectionState,
  TransportError,
} from '@codeforbreakfast/eventsourcing-transport';

interface Transport {
  readonly connectionState: Stream.Stream<ConnectionState, never, never>;
  readonly publish: (message: TransportMessage) => Effect.Effect<void, TransportError, never>;
  readonly subscribe: (
    filter?: (message: TransportMessage) => boolean
  ) => Effect.Effect<Stream.Stream<TransportMessage, never, never>, TransportError, never>;
}

Client.Connector

The service tag for creating connected transports. This design prevents calling transport methods before establishing a connection:

import { Context, Effect, Scope } from 'effect';
import type { Client, ConnectionError } from '@codeforbreakfast/eventsourcing-transport';

class Connector extends Context.Tag('@transport/Client.Connector')<
  Connector,
  {
    readonly connect: (
      url: string
    ) => Effect.Effect<Client.Transport, ConnectionError, Scope.Scope>;
  }
>() {}

TransportMessage

The base message type that all transport implementations must handle:

import { Brand } from 'effect';
import type { TransportMessage } from '@codeforbreakfast/eventsourcing-transport';

// TransportMessage has this structure:
type MessageStructure = {
  readonly id: string & Brand.Brand<'MessageId'>;
  readonly type: string;
  readonly payload: string; // Always a JSON string - transport doesn't parse it
  readonly metadata: Record<string, unknown>;
};

The transport layer only validates the envelope structure. The payload is always a serialized JSON string - the transport doesn't care what's inside.

Error Types

The package provides standardized error types:

  • TransportError - General transport operation failures
  • ConnectionError - Connection establishment/management failures
  • MessageParseError - Message serialization/deserialization failures

Contract Testing

The @codeforbreakfast/eventsourcing-testing-contracts package provides comprehensive contract tests to verify your transport implementation meets all requirements.

See the testing contracts package for documentation on how to use runClientTransportContractTests to validate your implementation.

Usage Example

import { Effect, Stream, pipe } from 'effect';
import { Client, makeMessageId } from '@codeforbreakfast/eventsourcing-transport';
import type { TransportMessage } from '@codeforbreakfast/eventsourcing-transport';

const useTransport = pipe(
  Client.Connector,
  Effect.andThen((connector) =>
    pipe(
      connector.connect('ws://localhost:8080'),
      Effect.andThen((transport: Client.Transport) =>
        pipe(
          // Subscribe to messages
          transport.subscribe((msg: TransportMessage) => msg.type === 'chat'),
          Effect.andThen((messageStream: Stream.Stream<TransportMessage, never, never>) =>
            pipe(
              // Send a message
              transport.publish({
                id: makeMessageId(crypto.randomUUID()),
                type: 'chat',
                payload: JSON.stringify({ text: 'Hello, world!' }),
                metadata: {},
              }),
              Effect.andThen(() =>
                // Process incoming messages
                Stream.runForEach(messageStream, (message: TransportMessage) =>
                  Effect.logInfo(`Received: ${message.payload}`)
                )
              )
            )
          )
        )
      )
    )
  )
);

Design Principles

1. Protocol Independence

The abstractions work with any underlying protocol. Whether you're using WebSockets, Server-Sent Events, HTTP long polling, or custom TCP protocols, the same interfaces apply.

2. Connection Safety

The TransportConnector pattern ensures you cannot call transport methods before establishing a connection. This prevents a whole class of runtime errors.

3. Type Safety

Generic message types ensure compile-time safety for your specific message formats while allowing the transport layer to remain generic.

4. Effect Integration

Built on Effect for composable, type-safe error handling and resource management. Connections are automatically cleaned up when scopes exit.

5. Testing First

The contract test suite ensures any implementation correctly handles edge cases like reconnection, message ordering, and error conditions.

Related Packages

  • @codeforbreakfast/eventsourcing-websocket-transport - WebSocket implementation
  • @codeforbreakfast/eventsourcing-store - Event store abstractions
  • @codeforbreakfast/eventsourcing-aggregates - Aggregate root patterns

License

MIT