stream-pipes
v0.0.3
Published
Composable and modular stream transforms for Node.js.
Maintainers
Readme
stream-pipes
Modular and composable data transformation streams for Node.js.
stream-pipes helps you process data on-the-fly using clean, functional-style pipelines built on Node's stream API.
Features
- Object-mode stream transforms
- Convert objects to newline-delimited JSON
- Parse delimited text lines into objects
- Batch objects into fixed-size arrays
- Apply custom object transformations
- Perform side effects without altering the data (tap)
- Read and write files as streams, with optional Gzip compression
- Read and write lines from/to files and generic streams
Installation
npm install stream-pipesExample
import { pipeline } from 'stream/promises'
import {
createFileReader,
createFileWriter,
createDelimitedParseTransform,
createMapTransform,
createBatchStream,
createTapTransform,
createJsonTransform
} from 'stream-pipes'
await pipeline(
createFileReader('./data.csv.gz'),
createDelimitedParseTransform(),
createMapTransform((obj) => ({ ...obj, processed: true })),
createTapTransform(() => console.log('Processed!')),
createBatchStream(10),
createJsonTransform(),
createFileWriter('./output.jsonl.gz')
)Available Streams & Utilities
Summary
| Function | Description | Input | Output |
| ----------------------------------------------------| ------------------------------------------------ | ----------------------- | ------------------------------ |
| createFileReader(path, options) | Reads a file (gzip supported) | File content | Stream of chunks |
| createFileWriter(path, options) | Writes chunks to a file (gzip supported) | "..." (string/buffer) | Writes raw data |
| createLineReader(options) | Splits text stream by lines | Stream of text chunks | One line per chunk (string) |
| createLineWriter(options) | Writes each string with a trailing newline | "..." (string) | One line per write |
| createDelimitedParseTransform(delimiter, options) | Parses delimited text into objects | "name,age\nAlice,30" | { name: "Alice", age: "30" } |
| createMapTransform(fn, options) | Applies a custom function to each object | { name: "Alice" } | fn({ name: "Alice" }) |
| createBatchStream(size, options) | Groups objects into fixed-size arrays | {...}, {...} | [ {...}, {...}, {...} ] |
| createJsonTransform(options) | Converts each object to JSON string with newline | { name: "Alice" } | "{"name":"Alice"}\n" |
| createTapTransform(fn, options) | Runs a side-effect function per object | { name: "Alice" } | { name: "Alice" } |
createFileReader(path, options)
Creates a readable stream from a file. If the path ends with .gz, it's automatically decompressed.
import { createFileReader } from 'stream-pipes'
const textStream = createFileReader('file.txt')
const gzipStream = createFileReader('file.txt.gz')createFileWriter(path, options)
Creates a writable stream to a file. If the path ends with .gz, the stream automatically compresses the output using Gzip.
import { createLineWriter } from 'stream-pipes'
const writer = createFileWriter('output.txt') // writes plain text lines
const gzipWriter = createFileWriter('output.txt.gz') // compresses with gzipcreateLineReader(options)
Returns a transform stream that splits incoming text by newlines.
import { createLineReader } from 'stream-pipes'
source.pipe(createLineReader()).on('data', console.log)createLineWriter(options)
Returns a transform stream that splits incoming text by newlines.
import { createLineWriter } from 'stream-pipes'
stream.pipe(createLineWriter()).pipe(createFileWriter('lines.txt'))createDelimitedParseTransform(delimiter, options)
Transforms delimited text lines (e.g., CSV) into JavaScript objects using the first line as headers.
import { createDelimitedParseTransform } from 'stream-pipes'
const csvParser = createDelimitedParseTransform(',')createMapTransform(fn, options)
Applies a synchronous or asynchronous function fn to each incoming object and pushes the result downstream.
import { createMapTransform } from 'stream-pipes'
const transform = createMapTransform(obj => ({
...obj,
processed: true
})) createBatchStream(size, options)
Groups incoming objects into arrays of fixed size size before pushing them downstream.
import { createBatchStream } from 'stream-pipes'
const batcher = createBatchStream(10) // emit arrays of 10 objectscreateJsonTransform(options)
Converts each incoming object to a JSON string followed by a newline character (\n), ideal for creating JSON Lines (jsonl) streams.
import { createJsonTransform } from 'stream-pipes'
const jsonStringify = createJsonTransform()createTapTransform(fn, options)
Runs a side-effect function fn on each object passing through, without modifying the data.
import { createTapTransform } from 'stream-pipes'
const logger = createTapTransform(obj => console.log('Passing object:', obj))