npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

nozzle-js

v0.10.0

Published

My awesome typescript library

Readme

Nozzle is a utility library for manipulating streams of text, and in particular streamed responses from LLMs.

Installation

npm i nozzle-js # or pnpm / bun / yarn

nozzle is written in TypeScript and has both cjs and esm builds.

Usage

const stream = await openai.chat.completions.create({ ...args, stream: true })

/*
# Reasoning:
3x3 is equal to 9.

# Answer:
The product is 9.

# Check:
9 / 3 = 3, so I think this answer is correct.

=> 
The product is 9.
*/
// extract the section between # Answer and # Reasoning; return the individual sentences at least 100ms apart.
return nz(stream)
  .after("# Answer")
  .before("# Check")
  .split(/ .;,/g)
  .trim() // trim the overall response of whitespace.
  .minInterval(100)
  .value()

// wait, does regex work with ^? probably not, since we truncate all the time, right? // because really, .trim() should just be .replace(^\s+, '').replace(\s+$, ''). // it could also be

import { parse, STR, OBJ } from "nozzle-json";

const input = `
Sure, the object that answers your question is:
\`\`\`json
{"product": 9}
\`\`\`
`

// should have .throwifnotfound or something, as well as .throwiffound, .censor, etc?
return nz(stream)
  .after("```json")
  .before("```")
  .trim()
  .accumulate()
  .map((prefix) => parse(prefix))
  .pairs()
  .filter(x => ) // only allow json values which have xyz
  .value()
```

Testing

Install the library:

git clone https://github.com/Robert-Cunningham/nozzle
cd nozzle
npm i

Then run the tests:

npm run test

License

This library is licensed under the MIT license.

API Reference

Functions

aperture()

function aperture<T>(source: Iterable<T>, n: number): AsyncGenerator<T[]>;

Parameters

| Parameter | Type | | ------ | ------ | | source | Iterable<T> | | n | number |

Accumulation

accumulate()

function accumulate(iterator: AsyncIterable<string>): AsyncGenerator<string>;

Yields a cumulative prefix of the input stream.

Example

const stream = accumulate(streamOf(["This ", "is ", "a ", "test!"]))
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["This ", "This is ", "This is a ", "This is a test!"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<string> | An asynchronous iterable of strings. |

diff()

function diff(iterator: AsyncIterable<string>): AsyncGenerator<string>;

Yields the difference between the current and previous string in the input stream.

Example

const stream = diff(streamOf(["This ", "This is ", "This is a ", "This is a test!"]))
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["This ", "is ", "a ", "test!"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<string> | An asynchronous iterable of strings. |

Buffering

buffer()

function buffer<T>(source: AsyncIterable<T>, n?: number): AsyncGenerator<T>;

Buffers up to N items from the source iterator, consuming them eagerly and yielding them on demand. If n is undefined, buffers unlimited items.

The buffer() function "slurps up" as much of the input iterator as it can as fast as it can, storing items in an internal buffer. When items are requested from the buffer, they are yielded from this pre-filled buffer. This creates a decoupling between the consumption rate and the production rate.

Error handling follows the pattern described in file://./../../ASYNC_ERROR_HANDLING.md. This function serves as a reference implementation for proper error handling with background consumers.

Example

// Buffer up to 10 items
const buffered = buffer(source, 10)

// Buffer unlimited items
const unbuffered = buffer(source)

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | source | AsyncIterable<T> | The async iterable source of values. | | n? | number | The maximum number of items to buffer. If undefined, buffers unlimited items. |

Conversion

asList()

function asList<T>(iterator: AsyncIterable<T>): Promise<T[]>;

Consumes an async iterator and returns all values as an array.

Example

const result = await asList(streamOf(["Hello", "World", "!"]))
console.log(result) // => ["Hello", "World", "!"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | An asynchronous iterable of strings. |

asString()

function asString(iterator: AsyncIterable<string>): Promise<string>;

Consumes an async iterator and returns the final accumulated string. Equivalent to calling accumulate().last() but more efficient.

Example

const result = await asString(streamOf(["Hello", " ", "World"]))
console.log(result) // => "Hello World"

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<string> | An asynchronous iterable of strings. |

fromList()

function fromList<T>(list: T[]): AsyncGenerator<T>;

Converts an array to an async iterator.

Example

const stream = fromList(["Hello", "World", "!"])
for await (const chunk of stream) {
  console.log(chunk)
}
// => "Hello", "World", "!"

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | list | T[] | An array of values. |

Elements

asyncMap()

function asyncMap<T, U>(iterator: AsyncIterable<T>, fn: (value: T) => Promise<U>): AsyncGenerator<U>;

Transforms each value from the input stream using the provided async function. Applies the async function to each item as soon as it comes off the iterator and yields results as they complete, allowing multiple function calls to run concurrently.

Error handling follows the pattern described in file://./../../ASYNC_ERROR_HANDLING.md to ensure errors are thrown during await ticks for proper try/catch handling.

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | An asynchronous iterable of strings. | | fn | (value: T) => Promise<U> | An async function that transforms each string value. |

Examples

const stream = asyncMap(streamOf(["hello", "world"]), async x => {
  await new Promise(resolve => setTimeout(resolve, 100))
  return x.toUpperCase()
})
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["HELLO", "WORLD"]
// Fetch data for each URL as they come in
const urls = streamOf(["api/users", "api/posts"])
const responses = asyncMap(urls, async url => {
  const response = await fetch(url)
  return await response.json()
})
for await (const data of responses) {
  console.log(data)
}

filter()

function filter<T>(iterator: AsyncIterable<T>, predicate: (chunk: T) => boolean): AsyncGenerator<T>;

Filters the input stream based on a predicate function.

Example

const stream = filter(streamOf(["Hello", "Hi", "World"]), (chunk: string) => chunk.length > 5)
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["Hello", "World"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | An asynchronous iterable of strings. | | predicate | (chunk: T) => boolean | A function that returns true for items to keep. |

map()

function map<T, U>(iterator: AsyncIterable<T>, fn: (value: T) => U): AsyncGenerator<U>;

Transforms each value from the input stream using the provided function.

Example

const stream = map(streamOf(["hello", "world"]), x => x.toUpperCase())
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["HELLO", "WORLD"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | An asynchronous iterable of strings. | | fn | (value: T) => U | A function that transforms each string value. |

Error Handling

unwrap()

function unwrap<T, R>(iterator: AsyncIterable<{
  error?: any;
  return?: R;
  value?: T;
}>): AsyncGenerator<T, undefined | R, any>;

Unwraps results from wrap() back into a normal iterator that throws/returns/yields. The opposite of wrap() - takes {value, return, error} objects and converts them back to normal iterator behavior.

Example

const wrappedStream = wrap(streamOf(["hello", "world"]))
const unwrapped = unwrap(wrappedStream)
for await (const value of unwrapped) {
  console.log("Got:", value) // "hello", "world"
}

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<{ error?: any; return?: R; value?: T; }> | An asynchronous iterable of wrapped result objects. |

wrap()

function wrap<T>(iterator: AsyncIterable<T>): AsyncGenerator<{
  error?: unknown;
  return?: any;
  value?: T;
}>;

Wraps an iterator to catch any errors and return them in a result object format. Instead of throwing, errors are yielded as {error} and successful values as {value}.

Example

const stream = wrap(streamOf(["hello", "world"]))
for await (const result of stream) {
  if (result.value !== undefined) {
    console.log("Got:", result.value)
  } else if (result.return !== undefined) {
    console.log("Return:", result.return)
  } else {
    console.log("Error:", result.error)
  }
}

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | An asynchronous iterable. |

Filtering

compact()

function compact(iterator: AsyncIterable<string>): AsyncGenerator<string>;

Filters out empty strings from the input stream.

Example

const stream = compact(streamOf(["Hello", "", "World", ""]))
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["Hello", "World"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<string> | An asynchronous iterable of strings. |

Indexing

first()

function first<T>(iterator: AsyncIterable<T>): AsyncGenerator<T>;

Yields only the first value from the input stream.

Example

const stream = first(streamOf(["Hello", "World", "!"]))
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["Hello"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | An asynchronous iterable of strings. |

head()

function head<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;

Yields only the first value from the input stream.

Example

const stream = head(streamOf(["Hello", "World", "!"]))
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["Hello"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | An asynchronous iterable of values. |

initial()

function initial<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;

Yields all values except the last from the input stream.

Example

const stream = initial(streamOf(["Hello", "World", "!"]))
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["Hello", "World"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | An asynchronous iterable of values. |

last()

function last<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;

Yields only the last value from the input stream.

Example

const stream = last(streamOf(["Hello", "World", "!"]))
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["!"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | An asynchronous iterable of values. |

slice()

function slice<T>(
   iterator: AsyncIterable<T>, 
   start: number, 
end?: number): AsyncGenerator<T>;

Yields a slice of the input stream between start and end indices. Supports negative indices by maintaining an internal buffer.

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | The async iterable to slice | | start | number | Starting index (inclusive). Negative values count from end. | | end? | number | Ending index (exclusive). Negative values count from end. If undefined, slices to end. |

Examples

const stream = slice(streamOf(["a", "b", "c", "d", "e"]), 1, 3)
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["b", "c"]
const stream = slice(streamOf(["a", "b", "c", "d", "e"]), -2)
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["d", "e"]

tail()

function tail<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;

Yields all values except the first from the input stream.

Example

const stream = tail(streamOf(["Hello", "World", "!"]))
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["World", "!"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T> | An asynchronous iterable of values. |

Regex

replace()

function replace(
   input: AsyncIterable<string>, 
   regex: RegExp, 
replacement: string): AsyncGenerator<string>;

Replaces matches of a regex pattern with a replacement string in the input stream.

Uses earliestPossibleMatchIndex to efficiently yield tokens as soon as we know they don't match the regex, while holding back potential matches until we can determine if they should be replaced.

Example

const stream = replace(streamOf(["a", "b", "b", "a"]), /a[ab]*a/g, "X")
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["X"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | input | AsyncIterable<string> | - | | regex | RegExp | The regular expression pattern to match. | | replacement | string | The string to replace matches with. |

Side Effects

tap()

function tap<T, R>(iterator: AsyncIterable<T, R>, fn: (value: T) => void): AsyncGenerator<T, R, undefined>;

Executes a side effect for each value without modifying the stream.

Example

const stream = tap(streamOf(["Hello", "World", "!"]), console.log)
for await (const chunk of stream) {
  // console.log will have printed each chunk
  console.log("Processed:", chunk)
}
// => logs: "Hello", "World", "!", then "Processed: Hello", "Processed: World", "Processed: !"

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterable<T, R> | An asynchronous iterable of strings. | | fn | (value: T) => void | A function to execute for each value. |

tee()

function tee<T>(iterator: AsyncIterator<T>, n: number): AsyncGenerator<T, any, any>[];

Splits a single iterator into N independent iterables.

Error handling follows the pattern described in file://./../../ASYNC_ERROR_HANDLING.md to ensure errors are thrown during await ticks for proper try/catch handling.

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | iterator | AsyncIterator<T> | The source async iterator to split. | | n | number | Number of independent iterables to create. |

Splitting

after()

function after(source: StringIterable, pattern: string | RegExp): AsyncGenerator<string>;

Emit everything after the accumulated prefix that matches pattern.

Example

const stream = after(streamOf(["a", "b", "c", "d", "e"]), /bc/)
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["d", "e"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | source | StringIterable | stream or iterable to scan | | pattern | string | RegExp | first RegExp that marks the cut-off |

before()

function before(source: StringIterable, separator: string | RegExp): AsyncGenerator<string>;

Emit everything before the accumulated prefix that contains separator.

Example

const stream = before(streamOf(["a", "b", "c", "d", "e"]), "cd")
for await (const chunk of stream) {
  console.log(chunk)
}
// => ["a", "b"]

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | source | StringIterable | stream or iterable to scan | | separator | string | RegExp | string that marks the cut-off |

chunk()

function chunk(source: AsyncIterable<string>, size: number): AsyncGenerator<string>;

Groups input tokens into chunks of the specified size and yields the joined result. Takes N input items and yields N/size output items, where each output is the concatenation of size input items.

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | source | AsyncIterable<string> | The async iterable source of strings (tokens). | | size | number | The number of input tokens to group together in each output chunk. |

split()

function split(source: AsyncIterable<string>, separator: string | RegExp): AsyncGenerator<string>;

Takes incoming chunks, merges them, and then splits them by a string separator.

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | source | AsyncIterable<string> | The async iterable source of strings. | | separator | string | RegExp | The string separator to split by. |

splitAfter()

function splitAfter(source: AsyncIterable<string>, separator: string | RegExp): AsyncGenerator<string>;

Takes incoming chunks, merges them, and then splits them by a string separator, keeping the separator at the end of each part (except the last).

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | source | AsyncIterable<string> | The async iterable source of strings. | | separator | string | RegExp | The string separator to split by. |

splitBefore()

function splitBefore(source: AsyncIterable<string>, separator: string | RegExp): AsyncGenerator<string>;

Takes incoming chunks, merges them, and then splits them by a string separator, keeping the separator at the beginning of each part (except the first).

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | source | AsyncIterable<string> | The async iterable source of strings. | | separator | string | RegExp | The string separator to split by. |

Timing

minInterval()

function minInterval<T>(source: AsyncIterable<T>, delayMs: number): AsyncGenerator<T>;

Enforces a minimum delay between adjacent tokens in a stream. The first token is yielded immediately, then subsequent tokens are delayed to ensure at least delayMs milliseconds pass between each yield.

Error handling follows the pattern described in file://./../../ASYNC_ERROR_HANDLING.md to ensure errors are thrown during await ticks for proper try/catch handling.

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | source | AsyncIterable<T> | The async iterable source of tokens. | | delayMs | number | The minimum delay in milliseconds between adjacent tokens. |

throttle()

function throttle<T>(
   source: AsyncIterable<T>, 
   intervalMs: number, 
merge: (values: T[]) => T): AsyncGenerator<T>;

Throttles the output from a source, with special timing behavior:

  • The first chunk is yielded immediately
  • Subsequent chunks are batched and yielded together after the interval
  • If no chunks arrive during an interval, the next chunk is yielded immediately when it arrives

Error handling follows the pattern described in file://./../../ASYNC_ERROR_HANDLING.md to ensure errors are thrown during await ticks for proper try/catch handling.

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | source | AsyncIterable<T> | The async iterable source of values. | | intervalMs | number | The throttling interval in milliseconds. | | merge | (values: T[]) => T | - |

Transformation

flatten()

function flatten<T>(src: Iterable<Iterable<T> | T[]>): AsyncGenerator<T>;

Flattens nested arrays or iterables into a single stream.

Example

const stream = fromList([["a", "b"], ["c", "d"], ["e"]])
const flattened = flatten(stream)
for await (const chunk of flattened) {
  console.log(chunk)
}
// => "a", "b", "c", "d", "e"

Parameters

| Parameter | Type | Description | | ------ | ------ | ------ | | src | Iterable<Iterable<T> | T[]> | The source iterable containing nested arrays or iterables. |