@augceo/iterators
v0.1.0
Published
Concurrent iterator utilities for rotery
Readme
@augceo/iterators
Concurrent iterator utilities for working with async generators in JavaScript/TypeScript.
- Enables efficient parallel IO operations
- Works with any iterable source (arrays, async generators, streams)
- Creates strongly-typed async processing pipelines
- Computes values on-demand with lazy evaluation
- Gives precise control over concurrency limits and buffer sizes
- Includes ready-made tools for batching and stream multiplexing
- Optimizes throughput for constrained resources like DB connections
- Respects back-pressure end-to-end, allowing slower steps throttle consumption
- Intuitive ergonomics - streamed code looks like synchronous code
Installation
npm install @augceo/iteratorsUsage
import * as I from '@augceo/iterators';
// Create a pipeline with concurrent processing
const pipeline = I.pipe(
[getDataSource(), getDataSource(), getDatasource()],
I.concat(2, 4), // process 2 streams concurrently, buffer 3 items
I.chunk(50), // in batches of 50
I.map(myBatchTransform, 4), // Process 4 batches concurrently
I.concat(), // flatten batches back into stream
I.map(myItemTransform), // process items in stream one by one
I.filter(Boolean), // remove nulls
I.chunk(20), // 20 results per chunk
I.take(10), // dont go over 10 batches
I.map(processChunk, 2), // process 2 batches in parallel
I.concat(), // flatten stream again
// Split each record into two parts and process them in parallel streams
I.dispatch(
record => ({ meta: record.metadata, data: record.metadata }), // Split record into metadata and data
{
// Process metadata in batches of 1000
meta: I.pipe(
I.chunk(1000),
I.map(writeMetadata, 2) // Process 2 metadata batches concurrently
),
// Process data in batches of 2000
data: I.pipe(
I.chunk(2000),
I.map(writeData, 3) // Process 3 data batches concurrently
),
}
)
);
// Looping over the pipeline starts computation
let i = 0;
for await (const result of pipeline) {
if (++i > 25) {
// Can stop production at will - after first 25 items
// which will compute 3 batches in total:
// 20 items from first batch, 5 items from second batch, and one full batch of 20 buffered by processChunk
break;
}
}Features
- map: Transform items with controlled concurrency
- filter: Filter items with support for transformation
- concat: Process multiple async iterators concurrently
- pipe: Compose functions or data flows
- chunk: Group items in pages of N items
- take: Limit number of processed items
Syntax
- Basic:
map(iterator, fn, concurrency) - Curried:
map(fn, concurrency)(iterator) - Pipe:
I.pipe(map(fn, concurrency), concat(concurency))
Concurrency Explained
Why Concurrent Map?
The map(fn, concurrency?: number) function with concurrency control enables processing multiple items simultaneously without overwhelming system resources:
IO Concurrency: Avoids blocking on IO operations by allowing multiple operations to happen in parallel
// Race up to 3 network requests concurrently for await (const result of I.map(requestData, fetchFromApi, 3)) { console.log('One request done', result); }Backpressure Handling: Automatically manages processing speed to match consumption speed
// Buffer up to 2 results, so when slow operation is done, it has another result to pull from for await (const result of I.map(items, processFn, 2)) { // Only one slow operation runs at once await slowOperation(result); }Composition: Chain multiple concurrent operations together
// Create a pipeline of concurrent operations const pipeline = I.pipe( items, I.map(validateItem, 4), // Validate 4 items concurrently I.map(transformItem, 2) // Transform 2 items concurrently );Familiar semantics: Map without concurrency acts as regular map over asynchronous iterator
for await (const i of map([1,2,3], String)) { console.log(i) //1, 2, 3 }
Why Concurrent Concat?
The concat(concurrency?: number, buffer?: number) function processes multiple async iterators concurrently, intelligently managing how data flows between streams:
Parallel Processing: Streams are processed simultaneously, yielding values as soon as they're ready
// Mix results from two streams in FIFO order const streams = [fastStream, slowStream]; const results = I.concat(streams, Infinity, Infinity);Control Over Input Sources: Limit how many input streams are active simultaneously
// Only process 2 streams at a time, tapping the next stream when one completes const results = I.pipe([stream1, stream2, stream3, stream4], I.concat(2, Infinity));Buffer Management: Control how many receieved values are buffered for next concurrent steps to consume
// Tap into 3 streams, keep 2 prepared items for final step const pipeline = I.pipe( [stream1, stream2, stream3], // Allow all streams to be active, but only buffer up to 2 values total I.concat(Infinity, 2), // Keep 2 values in output buffer at all times I.map(processItem) // once processItem is finished, it can immediately pull value from previous step's buffer );Stream Multiplexing: Combine multiple streams and process them concurrently
// Process values from multiple streams with controlled concurrency const results = I.pipe( [stream1, stream2, stream3], I.concat(2, 4), // Tap into 2 streams at once, buffer 4 values I.map(processValue, 4) // Process 4 values concurrently );Familiar semantics: Concat without concurrency under 2 acts as concat() over async stream, flattening incoming iterables
for await (const i of concat([1,2], [3,4], 1, 1)) { console.log(i) //1, 2, 3, 4 }
Why Concurrent Dispatch?
The dispatch function enables several powerful patterns:
Conditional Branching: Route items to different processors based on their properties
// Route items to different processors based on their needs const pipeline = I.pipe( items, I.dispatch( item => { validate: item.needsValidation ? item : undefined, enrich: item.needsEnrichment ? item : undefined, transform: item.needsTransformation ? item : undefined, } { validate, enrich, transform } ) );Record Splitting: Process different parts of a record in parallel streams
// Split each record into metadata and data for parallel processing const pipeline = I.pipe( records, I.dispatch( ({ data, meta }) => ({ meta, data }), // Destruct record into two properties { meta: I.pipe(I.chunk(1000), I.map(writeMetadata, 2)), data: I.pipe(I.chunk(2000), I.map(writeData, 3)), } ) );Fan-out: Process the same item through multiple streams simultaneously
// Process each item through multiple streams in parallel const pipeline = I.pipe( items, I.dispatch( item => ({ a: item, b: item, c: item }), // Send the same item to all three processors { a: I.pipe(I.map(processA), I.map(writeToFileA)), b: I.pipe(I.map(processB), I.map(writeToFileB)), c: I.pipe(I.map(processC), I.map(writeToFileC)), } ) );
Advanced Concurrency Patterns
These utilities enable sophisticated concurrency patterns for maximizing throughput in async processing pipelines:
Saturating Slowest Steps: Buffering keeps slower pipeline stages continuously fed with data
// Ensure the slow database query step always has inputs ready const pipeline = I.pipe( dataSource, I.map(parseData, 10), // Fast operation, high concurrency I.map(validateData, 8), // Medium operation, medium concurrency I.map(databaseQuery, 3) // Slow operation, lower concurrency but always fed );By having higher concurrency in earlier steps, the slowest step (database queries) will always have items waiting, eliminating idle time.
Controlled Resource Usage: Limit concurrency for resource-intensive operations
// Control database connection pool usage const results = I.map( userIds, async id => await pool.query('SELECT * FROM users WHERE id = ?', [id]), 5 // Limit to 5 concurrent database queries );Multiplex Queue with Producer/Consumer Pools: Use
concat()to create sophisticated work distribution systems// Create multiple producer and consumer streams const results = I.pipe( [ I.map(produceFromSource1, 3)(source1), I.map(produceFromSource2, 2)(source2), I.map(produceFromSource3, 4)(source3), ], I.concat(2, 10), // 2 active sources, buffer of 10 I.map(processWork, 8) // 8 concurrent workers consuming from the queue );This pattern creates a flexible system where work from multiple sources is intelligently buffered and fed to a pool of workers.
Batching & Unbatching for Efficiency: Create batch processing workflows with optimal throughput
// Process items in batches for efficiency, then flatten results const efficientPipeline = I.pipe( sourceItems, I.chunk(100), // Group into batches of 100 items I.map(async batch => { // Process each batch as a unit (e.g., bulk database operations) const results = await db.bulkInsert(batch); return results; // Returns array of results }, 3), // Process 3 batches concurrently I.concat(), // Flatten batch results back to individual items I.map(enrichItem) // Process individual results ); for await (const item of efficientPipeline) { // Each item is processed individually after batch operations console.log(item); }
API Reference
map(fn, concurrency)
Creates a function that maps values from an iterable with specified concurrency.
- fn: Function to apply to each item in the iterable
- concurrency: Maximum number of concurrent operations (default: 1)
- Returns: A function that takes an iterable and returns an async generator
filter(fn)
Filters elements based on a predicate. If the predicate returns a non-boolean/non-null value, that value is yielded instead.
- fn: Predicate function to test each item
- Returns: A function that takes an iterable and returns an async generator
concat(maxConcurrentInputs, bufferSize)
Processes multiple async iterables concurrently, yielding values as they become available.
- maxConcurrentInputs: Maximum number of concurrent iterators to process (default: Infinity)
- bufferSize: Maximum number of values to buffer (default: Infinity)
- Returns: A function that takes an iterable of iterators and returns an async generator
pipe(source, ...fns)
Composes multiple iterator transformations into a single processing pipeline.
- source: Source iterable
- fns: Functions to apply in sequence
- Returns: An async generator with all transformations applied
take(n)
Takes a specified number of elements from an iterable.
- n: Number of elements to take
- Returns: A function that takes an iterable and returns an async generator
chunk(size)
Groups elements from an iterable into arrays of specified size.
- size: Number of elements to include in each chunk
- Returns: A function that takes an iterable and returns an async generator yielding arrays of size
size
dispatch(splitter, ...processors, concurrency)
Routes items from a single input iterable to multiple processing streams based on a splitter function, creating a one-directional fan-out pattern.
- splitter: Function that determines which processors should receive each input item
- processors: Processing functions for each possible output from the splitter
- concurrency: Maximum number of concurrent
splitteroperations (default: 1) - Returns: A function that takes an iterable and returns an async generator
writer(callback, concurrency)
Turns an async pipe into a writable stream destination that you can push values into, inverting the typical iterator flow.
- callback: A function that processes an iterable and returns another iterable
- concurrency: Optional concurrency limit for the underlying queue buffer
- Returns: An object with the following methods:
- write: Pushes a value through the pipe and returns the processed result
- publish: Adds a value to the processing queue without waiting for the result
- end: Signals the end of input and closes the stream
Writer inverts the typical generator flow by allowing the source to push messages into the pipeline rather than the pipeline pulling from the source. It automatically respects backpressure - if the pipe is slow to process values, the source will be throttled to prevent overwhelming the system.
This pattern is ideal for:
- Event-driven Sources: Process events from callbacks or event emitters
- External Control: When you need the source to control the flow of data
- Sink Operations: Creating pipeline endpoints that consume data
- Closing Streams: When you need explicit control over when a stream ends
Todo
- Eager/lazy control for
concat()
License
MIT
Acknowledgements
This library was inspired by rotery library, inspired by ramda
