npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@sgmonda/streamfu

v0.5.6

Published

Functional programming utilities for working with streams in JS/TS

Readme

streamfu

Streams should feel like arrays. Now they do.

JSR Score JSR Version License: MIT 100% Coverage

Functional stream utilities for JavaScript & TypeScript

Website · Getting Started · API · Why streamfu? · Contributing


Why streamfu?

Streams are one of the most powerful primitives in JavaScript. They handle infinite data, backpressure, and async flows — things arrays simply can't do.

But the standard API makes you pay for that power with boilerplate, footguns, and unreadable code.

The problem: Native streams are painful

Here's a real scenario — read a stream of numbers, keep only even ones, double them, and collect the results:

// ❌ Native Web Streams — imperative, verbose, error-prone
const reader = readable.getReader()
const results: number[] = []

while (true) {
  const { done, value } = await reader.read()
  if (done) break
  if (value % 2 === 0) {
    results.push(value * 2)
  }
}

reader.releaseLock()

Manual reader management. Mutable state. An infinite while (true) loop. And this is the simple case.

Need to split a stream? Native tee() only gives you two copies. Want to merge streams? Build your own. Want to zip? Good luck.

// ❌ Native — splitting a stream into 4 branches
const [a, rest1] = stream.tee()
const [b, rest2] = rest1.tee()
const [c, d] = rest2.tee()
// Hope you got the order right...

The solution: streamfu

// ✅ streamfu — declarative, composable, readable
import { createReadable, filter, list, map, pipe } from "@sgmonda/streamfu"

const readable = createReadable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

const stream = pipe(
  readable,
  (r) => filter(r, (n) => n % 2 === 0),
  (r) => map(r, (n) => n * 2),
)
const results = await list(stream)

Same result. No manual readers. No mutable state. No while (true). Just pure transformations.

// ✅ streamfu — branch into any number of copies
import { branch } from "@sgmonda/streamfu"

const [a, b, c, d] = branch(stream, 4)

Side-by-side comparison

| Task | Native Streams | streamfu | | --------------------- | ----------------------------------------- | -------------------------- | | Transform each chunk | pipeThrough(new TransformStream({...})) | map(stream, fn) | | Filter chunks | Manual reader loop + condition | filter(stream, fn) | | Reduce to value | Manual reader loop + accumulator | reduce(stream, fn, init) | | Combine streams | Manual reader orchestration | zip(s1, s2, s3) | | Concatenate streams | Complex async pull logic | concat(s1, s2, s3) | | Split stream | Nested .tee() chains | branch(stream, n) | | Get element at index | Manual counter + reader | at(stream, i) | | Check if value exists | Manual loop + early exit | includes(stream, val) | | Chain operations | Deeply nested pipeThrough | pipe(stream, f1, f2, f3) |

If you know Array.prototype, you already know streamfu.


Getting Started

Install

npx jsr add @sgmonda/streamfu     # npm
yarn dlx jsr add @sgmonda/streamfu # yarn
pnpm dlx jsr add @sgmonda/streamfu # pnpm
bunx jsr add @sgmonda/streamfu     # bun
deno add jsr:@sgmonda/streamfu

Quick start

import { createReadable, filter, map, pipe, reduce } from "@sgmonda/streamfu"

// Create a stream from any iterable
const numbers = createReadable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

// Compose transformations with pipe
const stream = pipe(
  numbers,
  (r) => filter(r, (n) => n % 2 === 0), // keep even: 2, 4, 6, 8, 10
  (r) => map(r, (n) => n * 2), // double:   4, 8, 12, 16, 20
)
const sumOfDoubledEvens = await reduce(stream, (a, b) => a + b, 0) // sum: 60

console.log(sumOfDoubledEvens) // 60

API Reference

Stream creation

| Function | Description | | -------------------------- | --------------------------------------------------------------------- | | createReadable(iterable) | Create a stream from arrays, generators, sets, strings — any iterable | | createWritable(fn) | Create a writable stream from a callback function | | range(min, max, step?) | Generate a stream of numbers in a range | | words(chars, length) | Generate a stream of random strings |

Transformations (non-consuming)

These return a new stream — the original is not consumed.

| Function | Description | | ---------------------------------------- | ------------------------------------------------------------ | | map(stream, ...fns) | Transform each chunk (supports chaining multiple transforms) | | filter(stream, fn) | Keep only chunks matching a predicate | | flat(stream, depth?) | Flatten a stream of arrays | | flatMap(stream, fn) | Map + flatten in one step | | slice(stream, start, end?) | Extract a portion of the stream | | splice(stream, start, count, ...items) | Remove/insert chunks at a position | | concat(...streams) | Concatenate multiple streams sequentially | | zip(...streams) | Combine streams into a stream of tuples | | pipe(stream, ...fns) | Chain multiple stream operations | | branch(stream, n) | Split a stream into n independent copies |

Consumers (consuming)

These consume the stream — it cannot be reused afterward.

| Function | Description | | -------------------------- | ------------------------------------- | | reduce(stream, fn, init) | Reduce to a single value | | list(stream) | Collect all chunks into an array | | some(stream, fn) | Check if any chunk matches | | every(stream, fn) | Check if all chunks match | | forEach(stream, fn) | Execute a function for each chunk | | includes(stream, value) | Check if a value exists in the stream | | at(stream, index) | Get the chunk at a specific index | | indexOf(stream, value) | Find the index of a value |

Consuming vs non-consuming

Rule of thumb: If it returns a ReadableStream, it's non-consuming. If it returns a Promise, it consumes the stream.

Need to consume a stream multiple times? Use branch() first:

const [forSum, forCount] = branch(numbers, 2)

const sum = await reduce(forSum, (a, b) => a + b, 0)
const count = await reduce(forCount, (acc) => acc + 1, 0)

Streams the Hard Way vs streamfu

Every example below shows a real task done the hard way with native Web Streams, then the easy way with streamfu.

Before — native Web Streams:

const transform1 = new TransformStream({
  transform(line, ctrl) {
    ctrl.enqueue(line.split(","))
  },
})
const transform2 = new TransformStream({
  transform(cols, ctrl) {
    ctrl.enqueue({ name: cols[0].toUpperCase(), age: Number(cols[1]) })
  },
})

const reader = csvStream.pipeThrough(transform1).pipeThrough(transform2).getReader()
const results = []
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  results.push(value)
}
reader.releaseLock()

After — streamfu:

import { createReadable, list, map } from "@sgmonda/streamfu"

const stream = map(
  csvStream,
  (line) => line.split(","),
  (cols) => ({ name: cols[0].toUpperCase(), age: Number(cols[1]) }),
)
const results = await list(stream)

Before — native Web Streams:

const filter = new TransformStream({
  transform(user, ctrl) {
    if (user.status === "active") ctrl.enqueue(user)
  },
})

const reader = usersStream.pipeThrough(filter).getReader()
const active = []
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  active.push(value)
}
reader.releaseLock()

After — streamfu:

import { filter, list } from "@sgmonda/streamfu"

const active = await list(filter(usersStream, (u) => u.status === "active"))

Before — native Web Streams:

const reader = numbersStream.getReader()
let total = 0
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  total += value
}
reader.releaseLock()

After — streamfu:

import { reduce } from "@sgmonda/streamfu"

const total = await reduce(numbersStream, (sum, n) => sum + n, 0)

Before — native Web Streams:

const filterStep = new TransformStream({
  transform(n, ctrl) {
    if (n % 2 === 0) ctrl.enqueue(n)
  },
})
const doubleStep = new TransformStream({
  transform(n, ctrl) {
    ctrl.enqueue(n * 2)
  },
})
const toStringStep = new TransformStream({
  transform(n, ctrl) {
    ctrl.enqueue(`Value: ${n}`)
  },
})

const reader = numbersStream
  .pipeThrough(filterStep)
  .pipeThrough(doubleStep)
  .pipeThrough(toStringStep)
  .getReader()
const results = []
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  results.push(value)
}
reader.releaseLock()

After — streamfu:

import { filter, list, map, pipe } from "@sgmonda/streamfu"

const results = await list(pipe(
  numbersStream,
  (r) => filter(r, (n) => n % 2 === 0),
  (r) => map(r, (n) => n * 2),
  (r) => map(r, (n) => `Value: ${n}`),
))

Before — native Web Streams:

const sources = [page1Stream, page2Stream, page3Stream]
const reader1 = sources[0].getReader()
const reader2 = sources[1].getReader()
const reader3 = sources[2].getReader()
const all = []

for (const reader of [reader1, reader2, reader3]) {
  while (true) {
    const { done, value } = await reader.read()
    if (done) break
    all.push(value)
  }
  reader.releaseLock()
}
const first50 = all.slice(0, 50)

After — streamfu:

import { concat, list, slice } from "@sgmonda/streamfu"

const first50 = await list(slice(concat(page1Stream, page2Stream, page3Stream), 0, 50))

Before — native Web Streams:

const readerA = namesStream.getReader()
const readerB = scoresStream.getReader()
const leaderboard = []

while (true) {
  const [a, b] = await Promise.all([readerA.read(), readerB.read()])
  if (a.done || b.done) break
  leaderboard.push(`${a.value}: ${b.value}`)
}
readerA.releaseLock()
readerB.releaseLock()

After — streamfu:

import { list, map, zip } from "@sgmonda/streamfu"

const leaderboard = await list(map(zip(namesStream, scoresStream), ([name, score]) => `${name}: ${score}`))

Before — native Web Streams:

const [copy1, copy2] = numbersStream.tee()

const reader1 = copy1.getReader()
let sum = 0
while (true) {
  const { done, value } = await reader1.read()
  if (done) break
  sum += value
}
reader1.releaseLock()

const reader2 = copy2.getReader()
let max = -Infinity
while (true) {
  const { done, value } = await reader2.read()
  if (done) break
  if (value > max) max = value
}
reader2.releaseLock()

After — streamfu:

import { branch, reduce } from "@sgmonda/streamfu"

const [forSum, forMax] = branch(numbersStream, 2)

const [sum, max] = await Promise.all([
  reduce(forSum, (a, b) => a + b, 0),
  reduce(forMax, (a, b) => (b > a ? b : a), -Infinity),
])

Before — native Web Streams:

// Each chunk is an array like [item1, item2, item3] from a paginated API
const flatten = new TransformStream({
  transform(page, ctrl) {
    for (const item of page) ctrl.enqueue(item)
  },
})
const label = new TransformStream({
  transform(item, ctrl) {
    ctrl.enqueue(`#${item.id}: ${item.title}`)
  },
})

const reader = pagesStream.pipeThrough(flatten).pipeThrough(label).getReader()
const items = []
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  items.push(value)
}
reader.releaseLock()

After — streamfu:

import { flatMap, list, map } from "@sgmonda/streamfu"

const items = await list(map(flatMap(pagesStream, (page) => page), (item) => `#${item.id}: ${item.title}`))

Before — native Web Streams:

// Need 5 copies — tee() only gives 2, so we chain:
const [s1, rest1] = dataStream.tee()
const [s2, rest2] = rest1.tee()
const [s3, rest3] = rest2.tee()
const [s4, s5] = rest3.tee()

// Does it contain 42?
let hasFortyTwo = false
const r1 = s1.getReader()
while (true) {
  const { done, value } = await r1.read()
  if (done) break
  if (value === 42) {
    hasFortyTwo = true
    break
  }
}
r1.releaseLock()

// Are all values positive?
let allPositive = true
const r2 = s2.getReader()
while (true) {
  const { done, value } = await r2.read()
  if (done) break
  if (value <= 0) {
    allPositive = false
    break
  }
}
r2.releaseLock()

// Is any value greater than 100?
let anyOver100 = false
const r3 = s3.getReader()
while (true) {
  const { done, value } = await r3.read()
  if (done) break
  if (value > 100) {
    anyOver100 = true
    break
  }
}
r3.releaseLock()

// Where is 7?
let indexOf7 = -1
let idx = 0
const r4 = s4.getReader()
while (true) {
  const { done, value } = await r4.read()
  if (done) break
  if (value === 7) {
    indexOf7 = idx
    break
  }
  idx++
}
r4.releaseLock()

// What's the third element?
let third = undefined
let count = 0
const r5 = s5.getReader()
while (true) {
  const { done, value } = await r5.read()
  if (done) break
  if (count === 2) {
    third = value
    break
  }
  count++
}
r5.releaseLock()

After — streamfu:

import { at, branch, every, includes, indexOf, some } from "@sgmonda/streamfu"

const [s1, s2, s3, s4, s5] = branch(dataStream, 5)

const [hasFortyTwo, allPositive, anyOver100, indexOf7, third] = await Promise.all([
  includes(s1, 42),
  every(s2, (n) => n > 0),
  some(s3, (n) => n > 100),
  indexOf(s4, 7),
  at(s5, 2),
])

Before — native Web Streams:

// Generate 1..10 manually
const numbers = new ReadableStream({
  start(ctrl) {
    for (let i = 1; i <= 10; i++) ctrl.enqueue(i)
    ctrl.close()
  },
})

// Splice: remove 3 items at index 3, insert 99 and 100
let idx = 0
const removed = 3
const spliceTransform = new TransformStream({
  transform(chunk, ctrl) {
    if (idx === 3 + removed) {
      ctrl.enqueue(99)
      ctrl.enqueue(100)
    }
    if (idx < 3 || idx >= 3 + removed) {
      ctrl.enqueue(chunk)
    }
    idx++
  },
  flush(ctrl) {
    if (idx <= 3 + removed) {
      ctrl.enqueue(99)
      ctrl.enqueue(100)
    }
  },
})

// Double each value
const doubleTransform = new TransformStream({
  transform(n, ctrl) {
    ctrl.enqueue(n * 2)
  },
})

const reader = numbers.pipeThrough(spliceTransform).pipeThrough(doubleTransform).getReader()
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  console.log(value)
}
reader.releaseLock()

After — streamfu:

import { forEach, map, pipe, range, splice } from "@sgmonda/streamfu"

await forEach(
  pipe(
    range(1, 10),
    (r) => splice(r, 3, 3, 99, 100),
    (r) => map(r, (n) => n * 2),
  ),
  (value) => console.log(value),
)

Design principles

  • Functional & pure — No side effects, no mutations. Every operation returns a new stream.
  • Familiar API — Mirrors Array.prototype methods. Zero learning curve.
  • Universal — Built on the Web Streams API. Works in Node.js, Deno, Bun, and browsers.
  • Type-safe — Full TypeScript support with precise generics.
  • Tested — 100% code coverage. Every function, every edge case.

Error handling

One of the trickiest parts of working with streams is error handling. The traditional approach relies on listening to events, which splits your logic across multiple callbacks and makes the control flow hard to follow:

// ❌ Event-based error handling: scattered logic, easy to forget a listener
stream.on("data", (chunk) => {/* process chunk */})
stream.on("error", (err) => {/* handle error */})
stream.on("end", () => {/* cleanup */})

This pattern has several problems:

  • Error handling is disconnected from the processing logic
  • Forgetting the error listener can cause unhandled exceptions that crash your process
  • Coordinating end and error to know when the stream is truly done requires extra state
  • It doesn't compose well with async/await code

In streamfu, errors propagate automatically through chained operations. If a map() transformer or a filter() predicate throws, the error bubbles up and rejects the promise returned by any consuming operation (list(), reduce(), every(), etc.). This means you can use a standard try/catch block:

// ✅ Errors propagate through the entire chain
const stream = pipe(
  createReadable(data),
  (r) => map(r, transformFn), // if this throws...
  (r) => filter(r, predicateFn),
)

try {
  const results = await list(stream) // ...the error rejects here
} catch (err) {
  console.error("Something failed:", err)
}

No special error listeners, no extra plumbing. Errors flow naturally through map(), filter(), and any other chained transformation, all the way to whichever consumer ends the pipeline.

Contributing

Contributions welcome! Fork the repo, make your changes, and submit a PR.

deno task test

Requirements:

Publishing

Published to JSR automatically via GitHub CI when version changes in deno.json on main.


MIT License · Made with care by @sgmonda and contributors