streamops
v0.1.20
Published
A lightweight streaming operations library for JS that provides a flexible pipeline-based approach to data processing. StreamOps leverages generators and async generators to create efficient data processing pipelines with built-in support for parallel pro
Readme
StreamOps
A lightweight streaming operations library for JS that provides a flexible pipeline-based approach to data processing. StreamOps leverages generators and async generators to create efficient data processing pipelines with built-in support for parallel processing, error handling, and state management.
Installation
Key Features
- Pipeline-based streaming operations: Build complex data processing pipelines with ease
- Async/sync generator support: Seamlessly mix sync and async operations
- Parallel processing: Process data concurrently with parallel branches
- State management: Share state across pipeline steps
- Configurable error handling: Robust error handling with timeouts
- Rich operator set: Comprehensive set of built-in operators
Getting Started
Basic Pipeline
Chaining Style
Real-World Example: Processing API Data
Configuration
Timeout Behaviors
yieldTimeoutBehavior: Controls timeout handling'warn': Log warning and continue (default)'yield-null': Yield null value and continue'cancel': Cancel pipeline'block': Stop yielding from timed-out step
Error Handling
Using catchError Operator
Timeout Protection
Stream Control
End of Stream Handling
Flow Control with Accrue
The accrue operator collects all items before continuing:
Advanced Features
Parallel Processing
Results from parallel branches are merged in order.
State Management
Maintain state via 'this' context:
API Documentation
Built-in Operators
Basic Operators
map(fn): Transform each item using the provided functionfilter(predicate): Only allow items that match the predicatereduce(reducer, initialValue): Accumulate values, yielding intermediate resultsflatMap(fn): Map each item to multiple items
Control Operators
take(n): Limit stream to first n itemsskip(n): Skip first n itemsbatch(size, options): Group items into arrays of specified sizeOptions:
yieldIncomplete: Whether to yield incomplete batches (default: true)
distinct(equalityFn): Remove duplicates using optional equality function
Advanced Operators
mergeAggregate(options): Merge objects into arrays by keywaitUntil(condition): Buffer items until condition is metbufferBetween(startToken, endToken, mapFn): Capture content between tokens
Error Handling
catchError(handler): Handle errors in the pipelinetimeout(ms): Fail if processing takes too long
Utility Operators
tap(fn): Execute side effects without modifying streamaccrue(): Collect all items before proceedingdam(): Alias for accrue()
Stream Control Operators
withEndSignal(fn): Mark a function/generator to receive end signals
Simple Interface
StreamOps also provides a simplified interface for creating pipelines:
The simple interface automatically injects operators and handles pipeline creation.
Debugging
Logging
Set logLevel in configuration:
Use tap operator for debugging:
Common Issues
Memory Leaks
- Use batch operator for large streams
- Consider accrue carefully
Timeouts
- Adjust timeout configuration
- Use appropriate yieldTimeoutBehavior
Backpressure
- Monitor downstreamTimeout warnings
- Use batch operator to control flow
License
MIT
