npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

anabranch

v0.10.0

Published

Async streams that keep flowing, even when things go wrong

Readme

Anabranch

Async stream processing where errors are collected alongside values instead of stopping the pipeline.

The name: an anabranch is a river branch that diverges from the main channel and may rejoin it later.

The problem

When processing async data in a loop, a single failure kills everything:

for await (const item of source) {
  const result = await process(item) // one throw stops the whole loop
}

Wrapping each item in try/catch works but produces messy, hard-to-compose code.

The solution

Anabranch wraps each item as either { type: "success", value } or { type: "error", error }. Operations like map, filter, and flatMap work only on successes; errors pass through until you decide what to do with them.

import { Source } from 'anabranch'

const stream = Source.from<string, Error>(async function* () {
  yield 'https://example.com/1'
  yield 'https://example.com/2'
  yield 'https://example.com/3'
})

const { successes, errors } = await stream
  .withConcurrency(4)
  .map(async (url) => {
    const res = await fetch(url)
    if (!res.ok) throw new Error(`HTTP ${res.status}`)
    return res.json()
  })
  .filter((data) => data.active)
  .partition()

Installation

Deno (JSR)

import { Source } from 'jsr:@anabranch/anabranch'

Node / Bun (npm)

npm install anabranch

Core concepts

Creating a stream

Use Source with an async generator, or Source.from() for an existing AsyncIterable:

const stream = Source.from<number, Error>(async function* () {
  yield 1
  yield 2
  yield 3
})

const stream2 = Source.from<number, Error>(someAsyncIterable)

For push-based streams where external producers send values as they arrive, use Channel:

import { Channel } from 'anabranch'

const channel = Channel.create<PriceUpdate, Error>()
  .withBufferSize(100)
  .withOnDrop((update) => console.log('dropped:', update))

// External producer sends values:
channel.send({ symbol: 'AAPL', price: 150 })
channel.send({ symbol: 'GOOGL', price: 2750 })

// Consumer reads from the channel like any stream:
for await (const result of channel) {
  // result is { type: "success", value } or { type: "error", error }
}

channel.close() // signals no more items

Channel.fail bypasses the buffer and injects errors directly into the stream:

channel.send({ symbol: 'AAPL', price: 150 })
channel.fail(new Error('feed disconnected')) // goes straight to consumer
channel.send({ symbol: 'GOOGL', price: 2750 }) // still processes

Single async operations with Task

For single async operations with retries, timeouts, and signal handling, use Task:

import { Task } from 'anabranch'

const task = Task.of(async () => {
  const res = await fetch('https://example.com')
  if (!res.ok) throw new Error('Bad response')
  return res.json()
})

const result = await task
  .retry({ attempts: 3, delay: (attempt) => 200 * 2 ** attempt })
  .timeout(5_000)
  .result()

Task composes with flatMap for chaining, and Task.allSettled / Task.race for concurrency:

const combined = Task.of(() => Promise.resolve(2))
  .flatMap((value) => Task.of(() => Promise.resolve(value * 3)))
  .timeout(500)

const results = await Task.allSettled([
  Task.of(() => fetch('/api/users')),
  Task.of(() => fetch('/api/posts')),
]).run()

const fastest = await Task.race([
  Task.of(() => fetch('/fast')),
  Task.of(() => fetch('/slow')),
]).run()

Abort signals thread through the task lifecycle:

const controller = new AbortController()
const task = Task.of(async (signal) => {
  const res = await fetch('/long-request', { signal })
  return res.json()
}).withSignal(controller.signal)

controller.abort()

For resource lifecycle management, use Task.acquireRelease to acquire a resource, run a composed task, and release it regardless of success or failure:

const task = Task.acquireRelease({
  acquire: (signal) => db.connect(signal),
  release: (conn) => conn.close(),
  use: (conn) =>
    Task.of(() => query(conn))
      .retry({ attempts: 3 })
      .timeout(5_000),
})

const result = await task.result()

Task error types are not runtime-checked; the E type is a hint for how you expect the task to fail.

Transforming values

map, flatMap, and filter work on successes. If the callback throws, the item becomes an error result:

stream
  .map((n) => n * 2)
  .flatMap((n) => [n, n + 1])
  .filter((n) => n % 2 === 0)

Operations are applied lazily without creating intermediate collections. Items flow through the pipeline as they are processed, so you can chain as many operations as you like without extra memory cost.

The stream itself is an AsyncIterable, so you can iterate directly:

for await (const result of stream) {
  if (result.type === 'success') {
    console.log(result.value)
  } else {
    console.error(result.error)
  }
}

Handling errors

Each success-side operation has an error-side counterpart:

stream
  .mapErr((e) => new WrappedError(e)) // transform errors
  .filterErr((e) => e.retryable) // drop errors that don't match
  .recoverWhen( // convert matching errors back to successes
    (e): e is NetworkError => e instanceof NetworkError,
    (e) => fallbackValue,
  )
  .recover((e) => defaultValue) // recover all remaining errors

Use throwOn to throw specific errors immediately, terminating iteration:

stream.throwOn((e): e is FatalError => e instanceof FatalError)

Accumulating state

scan is like fold but emits the running accumulator after each value:

const payments = Source.from<Payment, Error>(async function* () {
  /* stream of payment events */
})

payments
  .scan((total, payment) => total + payment.amount, 0)
  .tap((total) => updateDashboard(total))

chunks groups consecutive successes into fixed-size arrays:

const records = Source.from<Record, Error>(async function* () {
  /* stream of database records */
})

// Batch records for bulk insert
records
  .chunks(100)
  .map((batch) => db.insertMany(batch))

Concurrency and backpressure

Source.from(generator)
  .withConcurrency(8) // up to 8 concurrent map/flatMap operations
  .withBufferSize(16) // pause the source if results pile up

Collecting results

| Method | Returns | Throws? | | ------------- | --------------------------------- | ----------------------------------- | | collect() | T[] (successes only) | Yes, AggregateError if any errors | | partition() | { successes: T[], errors: E[] } | No | | toArray() | Result<T, E>[] (tagged) | No | | successes() | AsyncIterable<T> | No | | errors() | AsyncIterable<E> | No |

Other utilities

  • tap(fn) / tapErr(fn): run a side effect without changing the stream
  • take(n): stop after n successful values
  • takeWhile(fn): stop when the predicate returns false
  • fold(fn, init) / foldErr(fn, init): reduce the stream to a single value

API reference

See generated documentation for full API details.