npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@soffinal/stream

v0.2.4

Published

Type-safe event emitters that scale

Downloads

20

Readme

@soffinal/stream

npm version TypeScript License: MIT Bundle Size

Type-safe event emitters that scale

Stream is like EventEmitter, but better. Send events to multiple listeners, transform data with filter and map, and never worry about memory leaks. Works with DOM elements, WebSockets, user interactions, or any async data source. Fully typed, zero dependencies, 5.5KB.

Table of Contents

Features

  • Adaptive Constraints - Transformers that learn and evolve based on stream history
  • Universal Primitives - Four primitives: filter, map, merge, flat
  • Documentation-as-Distribution - Copy-paste transformers embedded in JSDoc, no separate packages needed
  • Async-First - Native async/await support with configurable concurrency control
  • Concurrency Strategies - Sequential, concurrent-unordered, concurrent-ordered processing
  • Automatic Cleanup - WeakRef-based listener cleanup prevents memory leaks
  • Multicast Streams - One stream, unlimited consumers
  • Awaitable - await stream for next value
  • Async Iterable - Native for await loop support
  • Pipe Composition - Stream-to-stream functional composition
  • Type Guards - Built-in TypeScript type narrowing support
  • Reactive State - Stateful values with automatic change propagation
  • Reactive Collections - Lists, Maps, Sets with fine-grained events
  • Stream Termination - Declarative stream lifecycle control
  • Zero Dependencies - Lightweight and tree-shakeable
  • Universal - Node.js, browsers, Deno, Bun, Cloudflare Workers
  • Full TypeScript - Complete type safety without the burden

Quick Start

import { Stream } from "@soffinal/stream";

const events = new Stream<string>();

events.listen(console.log);

events.push("Hello"); //log: Hello

Examples

import { Stream, State, filter, map, merge } from "@soffinal/stream";

// Create  streams
const events = new Stream<string>();
const numbers = new Stream<number>();

// Pull-based stream from async generator
const websocketStream = new Stream(async function* () {
  const ws = new WebSocket("ws://localhost:8080");
  while (ws.readyState === WebSocket.OPEN) {
    yield await new Promise((resolve) => {
      ws.onmessage = (event) => resolve(JSON.parse(event.data));
    });
  }
});

// Simple transformations
const processed = events
  .pipe(filter((msg) => msg.length > 3)) // Simple filtering
  .pipe(map((msg) => msg.toUpperCase())); // Transform to uppercase

// Async transformations with concurrency
const validated = events.pipe(
  filter(
    async (msg) => {
      const isValid = await validateAsync(msg);
      return isValid;
    },
    { strategy: "concurrent-ordered" }
  ) // Parallel validation, ordered results
);

// Stateful transformers that learn and adapt
const runningAverage = numbers
  .pipe(
    filter({ count: 0 }, (state, value) => {
      // Only pass every 3rd number, terminate after 10
      if (state.count >= 10) return; // Stream termination
      return [(state.count + 1) % 3 === 0, { count: state.count + 1 }];
    })
  )
  .pipe(
    map({ sum: 0, count: 0 }, (state, value) => {
      const newSum = state.sum + value;
      const newCount = state.count + 1;
      const average = newSum / newCount;
      return [
        { value, average },
        { sum: newSum, count: newCount },
      ];
    })
  );

// Automatic cleanup with DOM elements
const element = document.createElement('div');
events.listen(value => {
  element.textContent = value;
}, element); // Auto-removed when element is GC'd

// Copy-paste transformers from JSDoc
const limited = numbers.pipe(take(5)); // Limit to 5 items
const indexed = events.pipe(withIndex()); // Add indices
const delayed = processed.pipe(delay(100)); // Delay each value

// Multiple consumers
processed.listen((msg) => console.log("Processed:", msg));
validated.listen((msg) => console.log("Validated:", msg));
runningAverage.listen(({ value, average }) => console.log(`Value: ${value}, Running Average: ${average}`));

// Reactive state
const counter = new State(0);
counter.listen((count) => (document.title = `Count: ${count}`));
counter.value++; // UI updates automatically

Installation

Package Managers

# npm
npm install @soffinal/stream

# yarn
yarn add @soffinal/stream

# pnpm
pnpm add @soffinal/stream

# bun
bun add @soffinal/stream

# Deno
deno add jsr:@soffinal/stream

CDN (Browser)

<!-- Production (minified) -->
<script type="module">
  import { Stream, State } from "https://cdn.jsdelivr.net/npm/@soffinal/stream@latest/dist/index.js";
</script>

<!-- Alternative CDNs -->
<script type="module">
  import { Stream } from "https://esm.sh/@soffinal/stream";
  import { Stream } from "https://cdn.skypack.dev/@soffinal/stream";
</script>

Core Concepts

Streams: Multicast Event Pipelines

A Stream is a multicast, async iterable that pushes values to multiple listeners while being awaitable for the next value.

const userEvents = new Stream<UserEvent>();

// Multiple consumers automatically share the same data
userEvents.listen((event) => analytics.track(event));
userEvents.listen((event) => notifications.send(event));
userEvents.listen((event) => database.save(event));

// Await the next event
const nextEvent = await userEvents;

// Async iteration
for await (const event of userEvents) {
  if (event.type === "critical") break;
  processEvent(event);
}

Automatic Listener Cleanup

Stream provides three cleanup mechanisms to prevent memory leaks:

const stream = new Stream<string>();

// 1. Manual cleanup
const cleanup = stream.listen(value => console.log(value));
cleanup(); // Remove listener

// 2. AbortSignal cleanup
const controller = new AbortController();
stream.listen(value => console.log(value), controller.signal);
controller.abort(); // Remove listener

// 3. WeakRef automatic cleanup (NEW!)
const element = document.createElement('div');
stream.listen(value => {
  element.textContent = value;
}, element);
// Listener automatically removed when element is garbage collected
// Perfect for DOM elements, components, and temporary objects

WeakRef Benefits:

  • Zero memory leaks with DOM elements
  • No manual cleanup needed
  • Works with any object (components, instances, etc.)
  • Leverages JavaScript's garbage collector
  • Ideal for UI frameworks (React, Vue, Svelte, etc.)
// Real-world example: Component lifecycle
function createComponent() {
  const element = document.createElement('div');
  const dataStream = new Stream<Data>();
  
  // Auto-cleanup when component unmounts
  dataStream.listen(data => {
    element.innerHTML = renderTemplate(data);
  }, element);
  
  return element;
}

// When element is removed from DOM and GC'd, listener is automatically cleaned up

Pipe: Stream-to-Stream Composition

The pipe method enforces composition - it only accepts functions that return Stream instances, maintaining the infinite pipeline:

// All transformers return Streams - infinite chaining
stream.pipe(filter((v) => v > 0)); // → Stream<T>
stream.pipe(map((v) => v.toString())); // → Stream<string>
stream.pipe(toState("initial")); // → State<string> (extends Stream)

// Infinite chaining - every pipe returns a Stream
const result = stream
  .pipe(filter((v) => v > 0))
  .pipe(map((v) => v * 2))
  .pipe(take(5))
  .pipe(delay(100))
  .pipe(distinct()); // Always chainable

Streams are infinite - Like event emitters, they don't terminate naturally. The pipe constraint ensures you maintain the reactive paradigm throughout your entire pipeline.

Perfect TypeScript inference - no annotations needed:

const numbers = new Stream<number>();

// TypeScript knows these are all Streams
const doubled = numbers.pipe(map((n) => n * 2)); // Stream<number>
const strings = numbers.pipe(map((n) => n.toString())); // Stream<string>
const state = numbers.pipe(toState(0)); // State<number>

Universal Primitives: The Four Algebraic Operations

All stream operations are built from four universal primitives with Adaptive Constraints:

1. Filter

import { filter } from "@soffinal/stream";

// Simple filtering
stream.pipe(filter((value) => value > 0));

// Type guard filtering
stream.pipe(filter((value): value is number => typeof value === "number"));

// Async filtering with concurrency strategies
stream.pipe(
  filter(
    async (value) => {
      const isValid = await validateAsync(value);
      return isValid;
    },
    { strategy: "concurrent-ordered" }
  ) // Parallel validation, ordered results
);

// Stateful filtering with termination
stream.pipe(
  filter({ count: 0 }, (state, value) => {
    if (state.count >= 10) return; // Terminate after 10 items
    return [value > 0, { count: state.count + 1 }];
  })
);

📖 Complete Filter Documentation →

2. Map - Adaptive Transformer

import { map } from "@soffinal/stream";

// Simple transformation
stream.pipe(map((value) => value * 2));

// Type transformation
stream.pipe(map((value: number) => value.toString()));

// Async transformation with concurrency strategies
stream.pipe(
  map(
    async (value) => {
      const enriched = await enrichWithAPI(value);
      return enriched;
    },
    { strategy: "concurrent-unordered" }
  ) // Parallel processing, results as completed
);

// Stateful transformation with context
stream.pipe(
  map({ sum: 0 }, (state, value) => {
    const newSum = state.sum + value;
    return [{ value, runningSum: newSum }, { sum: newSum }];
  })
);

📖 Complete Map Documentation →

3. Merge - Stream Orchestration

import { merge } from "@soffinal/stream";

const stream1 = new Stream<number>();
const stream2 = new Stream<string>();

// Combine multiple streams with type safety
const combined = stream1.pipe(merge(stream2));
// Type: Stream<number | string>

combined.listen((value) => {
  if (typeof value === "number") {
    console.log("Number:", value);
  } else {
    console.log("String:", value);
  }
});

📖 Complete Merge Documentation →

4. Flat - Event Multiplication

import { flat } from "@soffinal/stream";

// Transform 1 array event → N individual events
const arrayStream = new Stream<number[]>();
const individualNumbers = arrayStream.pipe(flat());

arrayStream.push([1, 2, 3]); // Emits: 1, 2, 3 as separate events
// Type: Stream<number>

// Configurable depth flattening
const deepArrays = new Stream<number[][][]>();
const flattened = deepArrays.pipe(flat(2)); // Flatten 2 levels deep
// Type: Stream<number>

📖 Complete Flat Documentation →

Documentation-as-Distribution: Copy-Paste Transformers

No separate repos, no CLI tools, no package management - just copy-paste ready transformers embedded in JSDoc!

The Educational Transparency

Our approach makes every implementation pattern visible and learnable:

// 📦 All transformers are copy-pastable from IntelliSense!
// Hover over 'Stream' to see the complete transformers library

// Example: Users don't just get functions - they get implementation education
const searchInput = new Stream<string>(); // ← Hover here for full library
const searchResults = searchInput
  .pipe(distinct()) // Copy from Stream JSDoc - learn deduplication patterns
  .pipe(take(10)) // Copy from Stream JSDoc - learn termination patterns
  .pipe(delay(300)) // Copy from Stream JSDoc - learn async transformation
  .pipe(simpleMap((query) => searchAPI(query))); // Copy from Stream JSDoc - learn mapping patterns

What Users Actually Learn

When users hover over any function in JSDoc, they see complete implementation patterns:

// Users see EXACTLY how to build transformers
const take = <T>(n: number) =>
  filter<T, { count: number }>({ count: 0 }, (state, value) => {
    if (state.count >= n) return; // ← Learn termination patterns
    return [true, { count: state.count + 1 }]; // ← Learn state evolution
  });

const distinct = <T>() =>
  filter<T, { seen: Set<T> }>({ seen: new Set() }, (state, value) => {
    if (state.seen.has(value)) return [false, state]; // ← Learn deduplication logic
    state.seen.add(value); // ← Learn state mutation patterns
    return [true, state];
  });

From Consumers to Creators

This transparency empowers users to become transformer architects:

// After learning from JSDoc examples, users create their own:
const withTimestamp = <T>() =>
  map<T, {}, { value: T; timestamp: number }>(
    {}, // ← Learned: empty state when no memory needed
    (_, value) => [
      { value, timestamp: Date.now() }, // ← Learned: transformation pattern
      {}, // ← Learned: state management
    ]
  );

const rateLimited = <T>(maxPerSecond: number) =>
  filter<T, { timestamps: number[] }>({ timestamps: [] }, (state, value) => {
    const now = Date.now();
    const recent = state.timestamps.filter((t) => now - t < 1000);
    if (recent.length >= maxPerSecond) return [false, { timestamps: recent }];
    return [true, { timestamps: [...recent, now] }];
  });

Benefits Beyond Bundle Size

  • Zero friction - Copy-paste ready transformers
  • Perfect discoverability - IntelliSense shows all available transformers
  • Always up-to-date - Examples match current API version
  • No ecosystem fragmentation - Everything in one place
  • Educational transparency - Users learn implementation patterns
  • Infinite extensibility - Users become transformer creators
  • Self-documenting - Usage examples included with working code
  • Zero bundle cost - JSDoc stripped at compile time

The Network Effect

Documentation-as-Distribution creates multiplicative value:

  1. User discovers transformer in JSDoc
  2. User learns implementation pattern
  3. User creates custom transformers for their domain
  4. User shares patterns with their team
  5. Team creates hundreds of variations
  6. Knowledge multiplies exponentially across the community

How it works:

  1. Hover over Stream in your IDE to see the complete transformers library
  2. Or hover over individual functions for quick references
  3. Copy the transformer you need
  4. Use immediately - perfect TypeScript inference included!
  5. Learn the patterns and create your own infinite variations

Available Transformers (via JSDoc):

  • take(n), skip(n), distinct(), tap(fn) - Essential filtering patterns
  • withIndex(), delay(ms), pluck(key), scan(fn, initial) - Common transformation patterns
  • toState(initialValue) - Convert streams to reactive state
  • More transformers added with each release!

📊 Bundle Size Impact:

  • Package size: Currently ~15KB, grows with JSDoc transformer examples over time
  • Your app bundle: Always only 5.5KB (runtime code only, zero JSDoc overhead)
  • Tree-shaking: Only imported functions included in final bundle
  • JSDoc transformers: "Free" - rich transformer library without production cost

You're not just building applications - you're learning a paradigm that scales infinitely.

Manual Composition

// You can still build transformers manually
const customTransformer = <T>(count: number) =>
  filter<T, { taken: number }>({ taken: 0 }, (state, value) => {
    if (state.taken >= count) return; // Terminate after N items
    return [true, { taken: state.taken + 1 }];
  });

Reactive State: Stateful Values

State extends Stream with a current value that can be read and written:

const user = new State<User | null>(null);
const theme = new State<"light" | "dark">("light");
const counter = new State(0);

// Read current value
console.log(counter.value); // 0

// Write triggers all listeners
counter.value = 5;

// State from transformed streams
const source = new Stream<number>();
const derivedState = new State(0, source.pipe(map((v) => v * 2)));

// Derived state using transformers
const isLoggedIn = user.pipe(map((u) => u !== null));

const userDisplayName = user.pipe(
  filter((u) => u !== null),
  map((u) => `${u.firstName} ${u.lastName}`)
);

// Convert streams to state with toState transformer
const processedState = source
  .pipe(filter((v) => v > 0))
  .pipe(map((v) => v.toString()))
  .pipe(toState("0")); // Explicit initial value

// Automatic UI updates
isLoggedIn.listen((loggedIn) => {
  document.body.classList.toggle("authenticated", loggedIn);
});

// State changes propagate through the pipeline
user.value = { firstName: "John", lastName: "Doe" };
// Triggers: isLoggedIn → true, userDisplayName → 'John Doe'

Reactive Collections: Fine-Grained Change Events

Collections that emit specific change events for efficient UI updates:

import { List, Map, Set } from "@soffinal/stream";

const todos = new List<Todo>();
const userCache = new Map<string, User>();
const activeUsers = new Set<string>();

// React to specific operations
todos.insert.listen(([index, todo]) => {
  console.log(`Todo inserted at ${index}:`, todo);
  renderTodoAtIndex(index, todo);
});

todos.delete.listen(([index, todo]) => {
  console.log(`Todo removed from ${index}:`, todo);
  removeTodoFromDOM(index);
});

// Map changes
userCache.set.listen(([key, user]) => {
  console.log(`User cached: ${key}`, user);
  updateUserInUI(key, user);
});

// Set changes
activeUsers.add.listen((userId) => {
  console.log(`User ${userId} came online`);
  showOnlineIndicator(userId);
});

activeUsers.delete.listen((userId) => {
  console.log(`User ${userId} went offline`);
  hideOnlineIndicator(userId);
});

// Use like normal collections
todos.push({ id: 1, text: "Learn streams", done: false });
userCache.set("user1", { name: "Alice", email: "[email protected]" });
activeUsers.add("user1");

API Reference

Stream<T>

Core Methods

  • push(...values: T[]): void - Emit values to all listeners (auto-removes GC'd listeners)
  • listen(callback: (value: T) => void, context?: AbortSignal | Stream<any> | object): () => void - Add listener with optional cleanup
  • pipe<OUTPUT extends Stream<any>>(transformer: (stream: this) => OUTPUT): OUTPUT - Apply any transformer
  • withContext(context: object): AsyncIterator<T> - Async iterator bound to context lifetime

Async Interface

  • then<U>(callback?: (value: T) => U): Promise<U> - Promise interface for next value
  • [Symbol.asyncIterator](): AsyncIterator<T> - Async iteration support

Properties

  • hasListeners: boolean - Whether stream has active listeners
  • listenerAdded: Stream<void> - Emits when listener is added
  • listenerRemoved: Stream<void> - Emits when listener is removed

State<T> extends Stream<T>

Constructor

  • new State(initialValue: T) - Create state with initial value
  • new State(initialValue: T, stream: Stream<T>) - Create state from stream

Additional Properties

  • value: T - Current state value (get/set)

Universal Transformers

filter(predicate, options?)

  • Simple: filter((value) => boolean)
  • Type Guard: filter((value): value is Type => boolean) (sync only)
  • Async: filter(async (value) => boolean, { strategy? }) with concurrency options
  • Stateful: filter(state, (state, value) => [boolean, newState]) (always sequential)
  • Termination: Return undefined to terminate stream
  • Strategies: "sequential" | "concurrent-unordered" | "concurrent-ordered"

map(mapper, options?)

  • Simple: map((value) => newValue)
  • Async: map(async (value) => newValue, { strategy? }) with concurrency options
  • Stateful: map(state, (state, value) => [newValue, newState]) (always sequential)
  • Strategies: "sequential" | "concurrent-unordered" | "concurrent-ordered"

merge(...streams)

  • Basic: stream.pipe(merge(stream2, stream3))
  • Type-Safe: Automatically creates union types
  • Temporal Order: Maintains chronological sequence

flat(depth?)

  • Basic: stream.pipe(flat()) - Flatten one level
  • Deep: stream.pipe(flat(2)) - Flatten N levels
  • Event Multiplication: 1 array event → N individual events

Reactive Collections

List<T>

  • insert: Stream<[number, T]> - Insertion events
  • delete: Stream<[number, T]> - Deletion events
  • clear: Stream<void> - Clear events

Map<K,V> extends globalThis.Map<K,V>

  • set: Stream<[K, V]> - Set events (only on changes)
  • delete: Stream<[K, V]> - Delete events
  • clear: Stream<void> - Clear events

Set<T> extends globalThis.Set<T>

  • add: Stream<T> - Add events (only new values)
  • delete: Stream<T> - Delete events
  • clear: Stream<void> - Clear events

Performance

Bundle Size

  • Runtime bundle - 5.5KB minified, 1.6KB gzipped
  • Package size - Starts small, grows with JSDoc transformer library
  • Your production app - Always gets only the 5.5KB runtime code
  • Tree-shakeable - Import only what you use

Benchmarks

  • Fast startup - Zero dependencies, instant initialization
  • Efficient pipelines - Optimized transformer composition
  • Memory bounded - Built-in backpressure handling
  • Automatic cleanup - WeakRef prevents memory leaks

Runtime Support

  • Modern browsers supporting ES2020+ (WeakRef support)
  • Node.js 16+
  • Deno 1.0+
  • Bun 1.0+
  • Cloudflare Workers

Migration Guide

From EventEmitter

// EventEmitter
import { EventEmitter } from "events";
const emitter = new EventEmitter();
emitter.on("data", console.log);
emitter.emit("data", "hello");

// @soffinal/stream
import { Stream } from "@soffinal/stream";
const stream = new Stream();
stream.listen(console.log);
stream.push("hello");

Documentation

Transformer Guides

Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Setup

git clone https://github.com/soffinal/stream.git
cd stream
bun install
bun test

License

MIT © Soffinal

Contact: [email protected]