@soffinal/stream
v0.3.0
Published
Composable reactive primitives for TypeScript
Maintainers
Readme
@soffinal/stream
Composable reactive primitives for TypeScript
Minimal reactive streaming library: type-safe events, stateful transformations, async iteration, automatic cleanup.
Quick Start
import { Stream } from "@soffinal/stream";
const events = new Stream<string>();
events.listen((value) => console.log(value));
events.push("Hello", "World");
// Output: Hello, WorldInstallation
npm install @soffinal/streamCore Concepts
1. Stream - Push & Pull
// Push: Emit values manually
const stream = new Stream<number>();
stream.listen((n) => console.log(n));
stream.push(1, 2, 3);
// Pull: Generate values from async source
const countdown = new Stream(async function* () {
for (let i = 5; i > 0; i--) {
yield i;
await new Promise(resolve => setTimeout(resolve, 1000));
}
});
// Both: Push to a pull-based stream
const messages = new Stream(async function* () {
const ws = new WebSocket("ws://localhost");
while (ws.readyState === WebSocket.OPEN) {
yield await new Promise(resolve => {
ws.onmessage = e => resolve(e.data);
});
}
});
messages.push("manual event"); // Can still push!2. Automatic Cleanup
// Disposable pattern
using stream.listen(handler);
// Manual cleanup
const cleanup = stream.listen(handler);
cleanup();
// AbortSignal
stream.listen(handler, controller.signal);
// Stream as signal
const stop = new Stream<void>();
stream.listen(handler, stop);
stop.push(); // Remove listener3. Transform with pipe
const numbers = new Stream<number>();
const doubled = numbers.pipe(filter((n) => n > 0)).pipe(map((n) => n * 2));
doubled.listen((n) => console.log(n));
numbers.push(-1, 2, 3);
// Output: 4, 64. Async Iteration
for await (const value of stream) {
console.log(value);
if (done) break;
}Transformers
state - Reactive State
const counter = new Stream<number>().pipe(state(0));
counter.listen((n) => console.log(n));
counter.state.value = 5; // Triggers listener
console.log(counter.state.value); // 5gate - Flow Control
const stream = new Stream<number>().pipe(gate());
stream.listen((n) => console.log(n));
stream.push(1); // Logs: 1
stream.gate.close();
stream.push(2); // Blocked
stream.gate.open();
stream.push(3); // Logs: 3filter - Remove Values
// Simple
stream.pipe(filter((n) => n > 0));
// Async
stream.pipe(filter(async (n) => await validate(n)));
// Stateful
stream.pipe(
filter({ count: 0 }, (state, n) => {
if (state.count >= 10) return; // Stop after 10
return [n > 0, { count: state.count + 1 }];
}),
);map - Transform Values
// Simple
stream.pipe(map((n) => n * 2));
// Async
stream.pipe(map(async (n) => await process(n)));
// Stateful
stream.pipe(
map({ sum: 0 }, (state, n) => {
const newSum = state.sum + n;
return [newSum, { sum: newSum }];
}),
);merge - Combine Streams
const numbers = new Stream<number>();
const strings = new Stream<string>();
const combined = numbers.pipe(merge(strings));
// Type: Stream<number | string>flat - Flatten Arrays
const arrays = new Stream<number[]>();
const numbers = arrays.pipe(flat());
arrays.push([1, 2, 3]); // Emits: 1, 2, 3Write Your Own
A transformer is just a function:
const double = (stream: Stream<number>) =>
new Stream(async function* () {
for await (const n of stream) yield n * 2;
});
stream.pipe(double);API
Stream<T>
push(...values: T[])- Emit valueslisten(callback, context?)- Add listenerpipe(transformer)- Transform streamclear()- Remove all listeners
Async
await stream- Wait for next valuefor await (const value of stream)- Iterate
Philosophy
2 primitives: Stream + pipe
6 transformers: state, gate, filter, map, merge, flat
Everything else you compose yourself.
Efficient by design: Transformers execute once per value. Multiple listeners share the same computation:
const expensive = source.pipe(map(async v => await heavyComputation(v)));
expensive.listen(v => updateUI(v));
expensive.listen(v => logToAnalytics(v));
expensive.listen(v => saveToCache(v));
// heavyComputation() runs ONCE per value, not 3 timesPerformance
- 1.6KB gzipped
- Zero dependencies
- Automatic memory management
- Tree-shakeable
Browser Support
Chrome 84+, Firefox 79+, Safari 14.1+, Node.js 16+, Deno, Bun
License
MIT © Soffinal
