npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@kyneta/unix-socket-transport

v1.8.0

Published

Unix domain socket transport for @kyneta/exchange — stream-oriented server-to-server transport with StreamFrameParser

Readme

@kyneta/unix-socket-transport

Stream-oriented Unix domain socket transport for @kyneta/exchange — server-to-server communication for co-located services, bypassing the TCP/IP stack. ~3.8× faster than TCP localhost for small messages.

Overview

  • Stream-oriented — unlike the WebSocket transport (message-oriented), write() returns a boolean backpressure signal and onDrain notifies when the kernel buffer is available again.
  • Binary CBOR encoding with stream framing via StreamFrameParser from @kyneta/wire. No fragmentation — UDS has no message size limits.
  • No transport prefixes — stream framing handles message boundaries directly.
  • No "ready" handshake — UDS connections are bidirectionally ready immediately (unlike WebSocket which needs a text "ready" signal). The client calls establishChannel directly after connect.
  • Leaderless topologycreateUnixSocketPeer handles connect-or-listen negotiation automatically. First peer listens, subsequent peers connect, survivors heal when the listener dies.
  • FC/IS boundaryfeedBytes (pure) produces frames from the byte stream; connection handlers (imperative) dispatch decoded messages and manage the write queue.

Export

| Export | Entry point | |--------|-------------| | @kyneta/unix-socket-transport | ./dist/index.js |

Everything is available from the top-level import — server, client, peer negotiation, connection, types, and platform wrappers.

Install

pnpm add @kyneta/unix-socket-transport

Quick Start

Leaderless Peer (Recommended)

The simplest way to use the transport — createUnixSocketPeer handles role negotiation, transport swaps, and healing automatically:

import { Exchange } from "@kyneta/exchange"
import { createUnixSocketPeer } from "@kyneta/unix-socket-transport"

const exchange = new Exchange({
  identity: { peerId: "service-a", name: "Service A" },
})

const peer = createUnixSocketPeer(exchange, {
  path: "/tmp/kyneta.sock",
})

// peer.role is "listener" | "connector" | "negotiating"
// Kill the listener → a connector re-negotiates and takes over
// No code changes needed — healing is automatic

Explicit Server + Client

For cases where you need direct control over server and client roles:

Server

import { Exchange } from "@kyneta/exchange"
import { UnixSocketServerTransport } from "@kyneta/unix-socket-transport"

const serverTransport = new UnixSocketServerTransport({
  path: "/tmp/kyneta.sock",
  cleanup: true,
})

const exchange = new Exchange({
  identity: { peerId: "server", name: "server", type: "service" },
  transports: [() => serverTransport],
})

Client

import { Exchange } from "@kyneta/exchange"
import { createUnixSocketClient } from "@kyneta/unix-socket-transport"

const exchange = new Exchange({
  identity: { peerId: "service-a", name: "Service A", type: "service" },
  transports: [
    createUnixSocketClient({ path: "/tmp/kyneta.sock" }),
  ],
})

// Wait for the connection to be established
const client = exchange.getTransport("unix-socket-client")
await client.waitForStatus("connected")

API Reference

createUnixSocketPeer(exchange, options)

Create a leaderless unix socket peer that manages topology negotiation automatically.

The first peer to start becomes the listener; subsequent peers become connectors. If the listener dies, a connector re-negotiates and becomes the new listener. Uses exchange.addTransport() / exchange.removeTransport() to swap transports at runtime — the Exchange, all documents, and all CRDT state survive across transport swaps.

Internally, the peer is a Program<PeerMsg, PeerModel, PeerEffect> from @kyneta/machine — a pure Mealy machine whose transitions are deterministically testable. The imperative shell interprets data effects as I/O. All negotiation logic lives in the pure createPeerProgram() function; this wrapper just wires the executor to the Exchange.

Returns a UnixSocketPeer.

UnixSocketPeerOptions

| Option | Default | Description | |--------|---------|-------------| | path | — | Path to the unix socket file. | | reconnect.enabled | true | Enable automatic reconnection (for connector role). | | reconnect.maxAttempts | 5 | Maximum reconnection attempts before re-negotiating. | | reconnect.baseDelay | 1000 | Base delay in ms for exponential backoff. | | reconnect.maxDelay | 30000 | Maximum delay cap in ms. |

UnixSocketPeer

| Member | Type | Description | |--------|------|-------------| | role | "listener" \| "connector" \| "negotiating" \| "disposed" | Current role — changes over time as healing occurs. | | dispose() | () => Promise<void> | Remove the transport from the Exchange and clean up the socket file. |

UnixSocketServerOptions

| Option | Default | Description | |--------|---------|-------------| | path | — | Path to the unix socket file. | | cleanup | true | Remove stale socket file on start. |

UnixSocketClientOptions

| Option | Default | Description | |--------|---------|-------------| | path | — | Path to the unix socket file. | | reconnect.enabled | true | Enable automatic reconnection. | | reconnect.maxAttempts | 10 | Maximum reconnection attempts before giving up. | | reconnect.baseDelay | 1000 | Base delay in ms for exponential backoff. | | reconnect.maxDelay | 30000 | Maximum delay cap in ms. |

UnixSocketServerTransport

| Method | Signature | Description | |--------|-----------|-------------| | getConnection | (peerId: string) => UnixSocketConnection \| undefined | Get an active connection by peer ID. | | getAllConnections | () => UnixSocketConnection[] | Get all active connections. | | isConnected | (peerId: string) => boolean | Check if a peer is connected. | | unregisterConnection | (peerId: string) => void | Remove a connection and its channel. | | broadcast | (msg: ChannelMsg) => void | Send a message to all connected peers. | | connectionCount | number (getter) | Number of connected peers. |

UnixSocketClientTransport

| Method | Signature | Description | |--------|-----------|-------------| | getState | () => UnixSocketClientState | Get the current connection state. | | waitForStatus | (status, options?) => Promise<UnixSocketClientState> | Wait for a specific status. | | waitForState | (predicate, options?) => Promise<UnixSocketClientState> | Wait for a state matching a predicate. | | subscribeToTransitions | (listener) => () => void | Subscribe to state transitions. Returns unsubscribe function. | | isConnected | boolean (getter) | Whether the client is connected. |

createUnixSocketClient(options)

Factory function returning a TransportFactory. Pass directly to Exchange({ transports: [...] }).

UnixSocket

Framework-agnostic stream-oriented socket interface. Unlike WebSocket's send(), write() returns false when the kernel buffer is full.

interface UnixSocket {
  write(data: Uint8Array): boolean
  end(): void
  onData(handler: (data: Uint8Array) => void): void
  onClose(handler: () => void): void
  onError(handler: (error: Error) => void): void
  onDrain(handler: () => void): void
}

Platform Wrappers

| Wrapper | Input | |---------|-------| | wrapNodeUnixSocket(socket) | Node.js net.Socket | | wrapBunUnixSocket(socket) | Bun unix socket |

wrapBunUnixSocket returns { unixSocket, handlers } — the caller wires handlers into Bun's callback-based socket structure.

Connection Lifecycle

The client connection lifecycle is a Program<Msg, Model, Fx> from @kyneta/machine — a pure Mealy machine with data effects. The transport class (UnixSocketClientTransport) is a thin imperative shell that interprets data effects as I/O (FC/IS design). The program is deterministically testable — every state × event combination is covered by pure data tests (no sockets, no timing, never flaky).

The public observation API (getState, subscribeToTransitions, waitForState, waitForStatus) is preserved, powered by createObservableProgram.

UDS uses a 4-state lifecycle (no "ready" phase — UDS connections are bidirectionally ready immediately, and there is no ready handshake). Stream framing via StreamFrameParser handles message boundaries on the raw byte stream.

disconnected → connecting → connected
                   ↓            ↓
              reconnecting ← ─ ─┘
                   ↓
              connecting (retry)
                   ↓
              disconnected (max retries)

| State | Description | |-------|-------------| | disconnected | No active connection. Optional reason field describes why. | | connecting | Socket handshake in progress. Tracks attempt number. | | connected | Connection open, messages can flow immediately. | | reconnecting | Connection lost, scheduling next attempt. Tracks attempt and nextAttemptMs. |

Peer Negotiation Lifecycle

createUnixSocketPeer layers on top of the client state machine:

negotiate → probe socket path
             ├── connected     → become connector (add client transport)
             ├── enoent        → become listener  (add server transport)
             ├── econnrefused  → become listener  (add server transport)
             └── eaddrinuse    → retry after delay

connector: client disconnects (max retries) → re-negotiate
listener:  runs until dispose

Observing State

import { createUnixSocketClient } from "@kyneta/unix-socket-transport"

const transport = createUnixSocketClient({
  path: "/tmp/kyneta.sock",
  reconnect: { enabled: true },
})

// Subscribe to transitions programmatically
const unsub = transport.subscribeToTransitions(({ from, to }) => {
  console.log(`${from.status} → ${to.status}`)
})

Backpressure

UnixSocket.write() returns false when the kernel buffer is full. The UnixSocketConnection manages a write queue:

  1. send(msg) encodes via encodeComplete(cborCodec, msg)socket.write(frameBytes).
  2. If write() returns false, the connection enters draining mode — subsequent frames are queued.
  3. When the drain event fires, queued frames are flushed in order.
  4. If any flush write returns false, the connection waits for the next drain.

Stale Socket Cleanup

When cleanup: true (the default), the server transport removes leftover socket files on start. This prevents EADDRINUSE after a crash where the previous process didn't clean up.

On stop, the server always unlinks the socket file.

Reconnection

Reconnection is encoded as effects within the pure program — start-reconnect-timer and cancel-reconnect-timer are data effects interpreted by the imperative shell. Backoff delay is computed by the shared computeBackoffDelay from @kyneta/transport (exponential backoff with jitter), the same algorithm used by the WebSocket and SSE transports.

The DisconnectReason discriminated union carries socket-specific context:

| Variant | Fields | Description | |---------|--------|-------------| | intentional | — | Clean shutdown via onStop(). | | error | error, errno? | Socket error. errno carries codes like ENOENT, ECONNREFUSED, EADDRINUSE, EACCES. | | closed | — | Server closed the connection. | | max-retries-exceeded | attempts | Reconnect limit reached. |

Design

Why No Fragmentation?

WebSocket and WebRTC transports use @kyneta/wire's fragmentation layer to stay within infrastructure limits (128KB for AWS API Gateway, ~256KB for SCTP). Unix domain sockets have no such limits — they transfer data as a byte stream. StreamFrameParser handles message boundary extraction; encodeComplete writes complete frames directly.

Why No Transport Prefixes?

WebSocket multiplexes text and binary frames and uses a text "ready" signal. UDS is a raw byte stream with a single purpose — there's nothing to multiplex and no handshake phase.

Why No "Ready" Handshake?

WebSocket connections need a server-sent "ready" signal after the HTTP upgrade completes. UDS connections are bidirectionally ready the moment connect resolves — the client sends establish-request immediately.

Why Leaderless?

Fixed server/client roles require external coordination — someone decides who listens. createUnixSocketPeer eliminates this: every peer runs the same code, the first one to arrive listens, and survivors heal when the listener dies. This makes the topology symmetric and self-organizing.

Peer Dependencies

{
  "peerDependencies": {
    "@kyneta/exchange": "^1.1.0",
    "@kyneta/machine": "^1.0.0",
    "@kyneta/wire": "^1.1.0"
  }
}

License

MIT