npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@obinexusltd/obix-driver-network-stream

v0.1.1

Published

OBIX Network Stream Driver - WebSocket/SSE for telemetry and real-time state sync

Readme

@obinexusltd/obix-driver-network-stream

WebSocket and Server-Sent Events driver for the OBIX SDK, built on the WHATWG Streams API. Provides typed wrappers for ReadableStream, WritableStream, and TransformStream with backpressure, pipe chains, and byte stream support.

Installation

npm install @obinexusltd/obix-driver-network-stream

Quick Start

import { createNetworkStreamDriver } from '@obinexusltd/obix-driver-network-stream';

const driver = createNetworkStreamDriver({ wsUrl: 'wss://api.example.com/ws' });

driver.on('message', (msg) => console.log(msg));
driver.on('open', () => console.log('connected'));

await driver.initialize();
await driver.connect('websocket');

await driver.send({ type: 'ping', data: null, timestamp: Date.now() });
await driver.destroy();

API Reference

Driver

createNetworkStreamDriver(config) / createWebSocketDriver(config)

Creates the main driver implementing NetworkStreamDriverAPI. Both names are identical — createNetworkStreamDriver is the backwards-compatible alias.

interface NetworkStreamDriverConfig {
  wsUrl?: string;           // WebSocket endpoint
  sseUrl?: string;          // SSE endpoint
  reconnectInterval?: number; // ms, default 1000
  authToken?: string;
  readStrategy?: ObixQueuingStrategy<StreamMessage>;
  writeStrategy?: ObixQueuingStrategy<StreamMessage>;
}

Methods:

| Method | Description | |--------|-------------| | initialize() | Initialise the driver | | connect(protocol) | Connect via 'websocket' or 'sse' | | disconnect() | Close the connection | | send(message) | Send a StreamMessage | | on(type, handler) | Subscribe to open, message, error, close | | off(type, handler) | Unsubscribe | | isConnected() | Returns boolean | | getLatency() | Returns round-trip latency in ms | | setReconnectInterval(ms) | Update reconnect interval | | destroy() | Disconnect and clear all handlers |

The driver also exposes Web Streams endpoints after connect():

const driver = createWebSocketDriver(config);
await driver.connect('websocket');

const reader = driver.readable?.getReader(); // ReadableStream<StreamMessage>
const writer = driver.writable?.getWriter(); // WritableStream<StreamMessage>

ReadableStream Wrappers

import {
  createMessageReadableStream,
  createArrayReadableStream,
  createIterableReadableStream,
} from '@obinexusltd/obix-driver-network-stream';

createMessageReadableStream(source, strategy?)

Wraps any MessageSource (WebSocket, EventSource) into a ReadableStream<StreamMessage>.

const ws = new WebSocket('wss://example.com');
const readable = createMessageReadableStream(ws);
const reader = readable.getReader();
const { value } = await reader.read();

createArrayReadableStream(messages, strategy?)

Streams a static array — useful for testing.

createIterableReadableStream(asyncIterable, strategy?)

Converts an async generator or iterable into a ReadableStream<StreamMessage>.


WritableStream Wrappers

import {
  createMessageWritableStream,
  createCollectorWritableStream,
  createCallbackWritableStream,
} from '@obinexusltd/obix-driver-network-stream';

createMessageWritableStream(sink, strategy?)

Wraps a MessageSink (WebSocket) into a WritableStream<StreamMessage>. Chunks are JSON-serialised before sending.

createCollectorWritableStream(strategy?)

Returns { writable, messages }. All written chunks accumulate in messages.

const { writable, messages } = createCollectorWritableStream();
await readable.pipeTo(writable);
console.log(messages); // StreamMessage[]

createCallbackWritableStream(onWrite, onClose?, strategy?)

Calls onWrite per chunk and optionally onClose when the stream closes.


TransformStream Utilities

import {
  createJsonTransformStream,
  createTimestampTransformStream,
  createFilterTransformStream,
  createMapTransformStream,
  createMessageEncoderStream,
} from '@obinexusltd/obix-driver-network-stream';

createJsonTransformStream(writableStrategy?, readableStrategy?)

Parses raw JSON strings into StreamMessage objects.

rawReadable
  .pipeThrough(createJsonTransformStream())
  .pipeTo(writable);

createTimestampTransformStream(maxAgeMs?, ...)

Stamps each chunk with the current time if the timestamp is missing or older than maxAgeMs.

createFilterTransformStream(predicate, ...)

Only passes chunks where predicate(chunk) returns true.

createMapTransformStream(mapper, ...)

Transforms each chunk using mapper.

createMessageEncoderStream(...)

Encodes StreamMessage → Uint8Array (UTF-8 JSON).


Pipe Chain Helpers

import {
  pipeMessageStream,
  transformMessageStream,
  transformMessageStreamTo,
  teeMessageStream,
  composeTransforms,
} from '@obinexusltd/obix-driver-network-stream';

pipeMessageStream(readable, writable, options?)

Typed wrapper for readable.pipeTo(writable).

transformMessageStream(readable, transform, options?)

Typed wrapper for readable.pipeThrough(transform).

teeMessageStream(readable)

Splits a stream into two branches, each receiving all chunks.

const [branch1, branch2] = teeMessageStream(readable);
branch1.pipeTo(logger);
branch2.pipeTo(processor);

composeTransforms(readable, transforms[])

Chains multiple TransformStream stages in sequence.

const pipeline = composeTransforms(readable, [
  createFilterTransformStream((m) => m.type === 'telemetry'),
  createTimestampTransformStream(5000),
  createMapTransformStream(normalise),
]);
await pipeline.pipeTo(writable);

Backpressure

import {
  createCountStrategy,
  createByteLengthStrategy,
  resolveStrategy,
} from '@obinexusltd/obix-driver-network-stream';

createCountStrategy(highWaterMark?)

Returns a CountQueuingStrategy. Each chunk counts as 1 unit.

createByteLengthStrategy(highWaterMark?)

Returns a ByteLengthQueuingStrategy. Each chunk is sized by .byteLength.

resolveStrategy(strategy?)

Converts an ObixQueuingStrategy descriptor into a native QueuingStrategy.

const readable = createArrayReadableStream(messages, {
  highWaterMark: 16,
  size: () => 1,
});

Byte Streams

import {
  createByteReadableStream,
  readExactBytes,
  concatByteStream,
} from '@obinexusltd/obix-driver-network-stream';

createByteReadableStream(config)

Creates a ReadableStream<Uint8Array> with ReadableByteStreamController. Supports BYOB readers for zero-copy reads.

const stream = createByteReadableStream({
  chunkSize: 4096,
  highWaterMark: 65536,
  onPull: async (controller) => {
    const chunk = await fetchNextChunk();
    if (chunk) controller.enqueue(chunk);
    else controller.close();
  },
});

// BYOB read
const reader = stream.getReader({ mode: 'byob' });
const buf = new Uint8Array(4096);
const { value } = await reader.read(buf);

readExactBytes(stream, length)

Reads exactly length bytes using a BYOB reader.

concatByteStream(stream)

Drains the entire stream into a single Uint8Array.


SSE Driver

import {
  createSSEReadableStream,
  createNamedSSEStream,
} from '@obinexusltd/obix-driver-network-stream';

createSSEReadableStream(config)

Wraps EventSource into a ReadableStream<StreamMessage>.

const events = createSSEReadableStream({ url: '/api/events', highWaterMark: 4 });
for await (const msg of events) {
  console.log(msg);
}

createNamedSSEStream(config, eventName)

Listens for a specific SSE event name (e.g. event: telemetry).


Compatibility

All stream primitives target browsers and Node.js environments that implement the WHATWG Streams API:

| Feature | Minimum | |---------|---------| | ReadableStream | Chrome 43, Firefox 65, Node 18 | | WritableStream | Chrome 59, Firefox 100, Node 18 | | TransformStream | Chrome 67, Firefox 102, Node 18 | | BYOB readers | Chrome 61, Node 18 |


License

MIT — OBINexus <[email protected]>