@webcompere/streamo
v1.0.17
Published
A streaming library inspired by Java Streams
Downloads
18
Readme
Streamo
A TypeScript implementation of collection streaming. Loosely based on Java's Streaming API. It provides:
map,filter,reducefunctions on data sets that do not have to be in memory all at once- concurrent processing with
AsyncStream - composable, expressive algorithms for data processing, synchronously and asynchronsly
Primary Use Case and Rationale
Consider the following code:
const count = [1, 2, 3]
.map((item) => item * 2)
.map((item) => `${item}`)
.filter((item) => item !== '6')
.length;In this example, we've created 3 instances of the array to get to the end result. With a streaming operation, we iterate the data through each operation and do not need to create intermediate arrays:
import { Stream } from '@webcompere/streamo';
const count = Stream.of(1, 2, 3)
.map((item) => item * 2)
.map((item) => `${item}`)
.filter((item) => item !== '6')
.count();Here, the map and filter operations modify the stream and it's only the count operation which causes iteration to occur, with the data transformed on the fly.
This really comes into its own when we're searching:
const firstEvenNumber = Stream.of(1, 1, 2, 8, 8, 8, 8, 8)
.filter((item) => item % 2 === 0)
.findFirst();
console.log(firstEvenNumber.orElse('unknown'));Here the first even number is 2. The findFirst method returns an Optional<number> in this case,
which contains the 2. Unlike with JavaScript arrays, the array is not converted into a fully filtered
copy of the original by calling the predicate, provided to filter 8 times. Instead, the data is streamed
until findFirst gets its first result.
The Optional class, also copied from Java, is a way to represent a value, or the absence of a value, as
found in a stream.
Synchronous Usage
Overview
- Create a
Streamfrom some source data - Apply operations like
distinct,sorted,maporfilterto change the data passing through it - Use
skip,limit,takeWhile, ordropWhile, to trim the data available - Then use a terminal operation to get a final result, such as
findFirst,anyMatch,noneMatch,count,toArray,max,min,reduce, orcollect
There's a special subtype of Stream for numbers - NumberStream which allows a sum operator, and some
useful default comparators for sorting/max/min.
Note: Streams can only be used once. Once we reach a terminal operation, we cannot call any other functions on the stream.
Create a Stream
Streams of Literals
If we have an array, we can use ofArray:
const myArray = ['a', 'b', 'c'];
const strings = Stream.ofArray(myArray);If we have some absolute values we can use of:
const strings = Stream.of('a', 'b', 'c');Number Streams
We can construct a stream of number using ofNumbers:
const numbers = Stream.ofNumbers(1, 2, 3);And if we have a stream of something we want to make into a number stream, we can map
it to a number stream with mapToNumber that converts the items into numbers:
const arrayOfStrings = ['1', '2', '3'];
const numbers = Stream.ofArray(arrayOfStrings).mapToNumber((num) => +num);which means we can use NumberStream methods like sum:
expect(numbers.sum()).toBe(6);Streams of Entries
We can also create a stream of the entries from a Map or the key value pairs from an Object:
const map = new Map<string, string>([
['a', 'b'],
['c', 'd'],
]);
const streamOfEntries = Stream.ofMap(map);which will allow us to filter the key value pairs, or collect them into an object:
expect(streamOfEntries.collect(Collectors.toObjectFromEntries())).toEqual({
a: 'b',
c: 'd',
});If we need to provide an empty stream, then Stream.empty() will do so.
Joining Streams
If we have multiple streams, we can add them together with Stream.concat:
const superStream = Stream.concat(Stream.of(1, 2, 3), Stream.of(4, 5, 6));Familiar Operations from Array
Functions like map, flatMap, filter and reduce are familiar from Array.
Filtering
const filtered = Stream.of('a', 'b', 'cc', 'd')
.filter((item) => item.length === 1)
.toArray(); // a, b, dNote: if we're just doing a single operation and then converting back to array, then there's no advantage of using
Stream. The runtime advantages of stream take effect with multiple operations and the ability to compose functions to route just enough data from a producer to a consumer
Mapping
We can also use map:
const mapped = Stream.of({ name: 'Bill' }, { name: 'Bob' })
.map((item) => item.name)
.toArray(); // Bill, BobIf we want to map to a Stream and have the individual elements of that stream pass down to the following
operators, then we use flat map.
// stream of two arrays
const individualItems = Stream.of([1, 2, 3], [4, 5, 6])
.flatMap((array) => Stream.ofArray(array))
.count(); // will receive 6 itemsflatMap can return a stream, but as a shortcut, a flatMap can return an Array and this will be converted into a stream:
// stream of two arrays
const individualItems = Stream.of([1, 2, 3], [4, 5, 6])
.flatMap(identity) // the identity function maps the item to itself
.count(); // will receive 6 itemsReducing
For reduce we have two options. We can reduce using a single binary operator:
const reduced = Stream.of('A', 'B', 'C').reduce((a, b) => a + b); // Optional.of('ABC')
// get the value from the optional
expect(reduced.get()).toBe('ABC');When reducing with a binary operator, there's a chance that the stream is empty, so the result is wrapped
in an Optional. If we use reduceFrom where we supply an initial value, then even if the stream
is empty, there's a guaranteed value.
const reduced = Stream.of('A', 'B', 'C').reduceFrom(
'',
(a, b) => a + b,
identity
);
expect(reduced).toBe('ABC');The reduceFrom function takes three inputs:
initialValue- the starting valueaccumulator- a binary operation which takes the accumulator and the next value and adds them together to make the new accumulatorconverter- a function which converts from element values to the type of the accumulator
In the above example, we're accumulating in the same type as the element type - string - so we can use the convenience function identity with the reduceFrom function.
Searching Functions
Find First
We can stop iteration at the first available value with findFirst. This can be supplied with a predicate
of its own for a search, or can be placed after various map and filter operations so that it
pulls the items through the stream until it reaches the first one. It returns an Optional:
// no first item on an empty stream
expect(Stream.empty().findFirst().isPresent()).toBeFalsy();
// find first even number
expect(
Stream.of(1, 2, 3)
.findFirst((num) => num % 2 === 0)
.get()
).toBe(2);
// find first number after an even filter
expect(
Stream.of(1, 2, 3)
.filter((num) => num % 2 === 0)
.findFirst()
.get()
).toBe(2);Matching
We can check if any items match a predicate using anyMatch. This is like Array.prototype.some():
expect(Stream.of(1, 2, 3).anyMatch((item) => item === 2)).toBeTruthy();We can check if no items match a predicate using noneMatch:
expect(Stream.of(1, 2, 3).noneMatch((item) => item === 2)).toBeFalsy();We can check if all items match a preciate using allMatch:
expect(Stream.of(1, 2, 3).allMatch((item) => item < 100)).toBeTruthy();Terminal Operations
Calling a terminal operation causes the iterators to run. These cannot be run more than once, as the stream is used up. Do not reuse a stream after a terminal operation is used.
Embedded Collectors
The toArray function will collect the items into an array;
expect(Stream.of('a', 'b', 'c').toArray()).toEqual(['a', 'b', 'c']);The toMap function will collect the items into a Map;
const expectedMap = new Map<string, { name: string; age: number }>();
expectedMap.set('John', { name: 'John', age: 41 });
expectedMap.set('Bill', { name: 'Bill', age: 23 });
expect(
Stream.of({ name: 'John', age: 41 }, { name: 'Bill', age: 23 }).toMap(
(item) => item.name,
identity
)
).toEqual(expectedMap);We need to provide a keyMapper and a valueMapper function. In this example, the utility function identity is called to map the object to itself as the value in the map.
To get the maximum item in a Stream we use max (and for the minimum we use min):
const maxString = Stream.of('a', 'b', 'c)
.max(compareString); // an optional, containing 'c'The result is an Optional which is empty when the Stream is empty.
We need to provide a comparator here, for which the utility function compareString can help us. If we're
using a NumberStream the comparator defaults to compareNumber and, thus, is optional.
We can build a comparator of a property using comparingBy:
const maxElement = Stream.of({ name: 'a', val: 1 }, { name: 'b', val: 2 }).max(
comparingBy((element) => element.name, compareString)
);comparingBy lets us compose a comparator from a function to select the property of the item, and then another
comparator. Or we can build our own comparator from scratch, following the same rules as a sorting compareFn in
JavaScript.
Collectors
The collect function uses a Collector to produce a final value from the contents of the Stream. This is
similar to reduceFrom but collector objects are also composable. The Collectors class contains some ready
made objects. Let's look at collecting to an array as an example:
const array = Stream.of('foo', 'bar').collect(Collectors.toArray()); // ['foo', 'bar']The collector uses three functions:
supplier- create an empty accumulatoraccumulator- adds the next element to the accumulatorfinisher- converts the accumulator into the final returned object
Other Collectors functions include:
toObject- called with a key mapper (which must map tostring) and value mapper - this produces aRecord<string, V>from the streamtoObjectFromEntries- does the same, but assuming the stream is already make ofEntryobjects from an originalMaporRecordtoMap- as withtoObjectbut the key mapper can map to anythingtoMapFromEntries- as abovecounting- will count the number of entries in the streamsummming- can be used withStream<number>orNumberStreamand provides the sum of the valuesaveraging- can be used withStream<number>orNumberStreamand calculates mean average - returningNanif there are no valuesjoining- allows optionaldelimiter,prefixandsuffix, and can only operate onStream<string>(map items to string usingmapif necessary)collectingAndThen- allows us to first apply a collector and then apply a mapping function to convert the output of that to something elseminBy- find the smallest element using a givenComparator- same as theminfunction on theStreamitself, but also composable with other collectorsmaxBy- asminBybut with the maxmimum elementgroupingByToArray- group the items according to an identity mapper and return aMapwith the identity as a key and an array of matching items as the valuegroupingBy- as withgroupingByToArraybut the items that share an identity are collected using another collector - so we can, for example, group by name and then collect the maximum of each group.
While some of these collectors replicate terminal operations on
StreamandNumberstream, they do so to allow them be composed with other collectors. E.g.groupingBymay partition by an identity and then the sub items can be further collected into a count withcountingor to an array withtoArray
Length Functions
We can use skip to ignore some items:
expect(Stream.of(1, 2, 3).skip(2).toArray()).toEqual([3]);We can use limit to stop the stream after it has provided so many values - this is useful when we're using
infinite generators:
expect(
Stream.generate(() => 'dave')
.limit(3)
.toArray()
).toEqual(['dave', 'dave', 'dave']);We can use takeWhile to keep reading from the stream until a predicate stops being true, and dropWhile to skip items in the stream until a predicate stops being true.
expect(
Stream.ofNumericArray([1, 2, 3, 4, 5])
.takeWhile((num) => num < 4)
.sum()
).toBe(6);Generators
A stream does not have to come from a finite data source. We can use generators.
Suppliers
The simplest generator is a Supplier:
// a stream of 10 random numbers
const tenRandoms = Stream.generate(() => Math.random()).limit(10);If the supplier can return an optional with Optional.empty to indicate the end of the stream, then we can use generateFinite. This will stop when the supply of new values runs out:
// generate random integers between 0 and 100 until one is even
const oddRandoms = Stream.generateFinite(() =>
Optional.of(Math.floor(Math.random() * 100)).filter((coin) => coin % 2 === 0)
).toArray();Generate with iterate
We can use the iterate function to provide a source of data from a seed. We provide the seed, the operator on the last value to produce the next, and an optional predicate on when to stop:
expect(
Stream.iterate(
0, // initial seed 0
(a) => a + 1 // increment by one each time
// defaults to always has next
)
.limit(4)
.toArray()
).toEqual([0, 1, 2, 3]);With a predicate:
expect(
Stream.iterate(
0,
(a) => a + 1,
(a) => a < 4 // keep going while the next number is less than 4
).toArray()
).toEqual([0, 1, 2, 3]);Numeric Streams
We can produce NumberStreams from numeric ranges:
// specify the limit of a range
expect(Stream.ofRange(0, 4).toArray()).toEqual([0, 1, 2, 3]);
// or specify the last number
expect(Stream.ofRangeClosed(0, 3).toArray()).toEqual([0, 1, 2, 3]);and the increment between each number defaults to 1 but can be provided, so we get the in between numbers with a delta of 0.5:
expect(Stream.ofRange(0, 4, 0.5).toArray()).toEqual([
0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5,
]);
expect(Stream.ofRangeClosed(0, 3, 0.5).toArray()).toEqual([
0, 0.5, 1, 1.5, 2, 2.5, 3,
]);Iterables
If it's more convenient, we can create a subclass of Iterable to produce elements, and construct a stream
to wrap that Iterable. Similarly, we can use getIterable on the Stream to use the iterable externally,
or even modify that iterable to produce new elements and feed that to a new Stream.
Internally, most of the streaming operations involve adding wrappers around the stream's iterable.
Value Modifying Functions
Indexing
We can use indexed to provide a position value next to each value in the stream:
// here we're going to take the even numbered items from
// the list according to their position
expect(
Stream.of('blue', 'green', 'white', 'black')
.indexed()
.filter((item) => item.index % 2 !== 0)
.map((item) => item.value)
.toArray()
).toEqual(['green', 'black']);indexed converts each item into {index: number, value: item}. Its numbering is dependent on where in the
streaming operation the .indexed is inserted. Here, it's close to the beginning, but if we added it after a
filtering operation, then it would count items post-filter.
In this example, we also used map to convert back from the indexed form to the individual items before
collecting to an array.
Transformation
We can add a transformer to the middle of a stream to modify the contents of the stream in a stateful way.
E.g. for batching:
// the `batch` transfomer will convert the stream into sub
// arrays sized according to the batch size
const stream = Stream.of('a', 'b', 'c', 'd', 'e').transform(
Transformers.batch(2)
);
expect(stream.toArray()).toEqual([['a', 'b'], ['c', 'd'], ['e']]);Transformers are formed of:
supplier- which creates an empty statetransformer- which adds the next item from the stream into that state, possibly emitting a new item for the downstreamStreamto use; when emitting that item the transformer indicates whether the state is still valid or needs clearingfinisher- which has the opportunity to emit one last item when the upstream stream is now exhausted
We could use a transformer to provide all the prime numbers in a range:
expect(
Stream.ofRange(2, Number.MAX_VALUE)
.transform({
supplier: (): number[] => [],
transformer: (a, t) => {
if (Stream.ofArray(a).noneMatch((prime) => t % prime === 0)) {
a.push(t);
return { value: Optional.of(t) };
}
return { value: Optional.empty() };
},
finisher: () => Optional.empty(),
})
.limit(10)
.toArray()
).toEqual([2, 3, 5, 7, 11, 13, 17, 19, 23, 29]);Optional
The equivalent Java Optional class is a first class citizen in this library. Optional is a special
collection of 0 or 1 items. We can convert it into a Stream with its stream function. We can
check it for presence and absence of its element with isEmpty and we can perform filter and
map functions on it, which will transform the element present, or cause it to become not present.
In Optional, the only value for missing is undefined. If the type field of the Optional allows
null to exist, then there's a special filterNotNull that will take null out of the type of the
Optional.
Note: Optional is provided as a reflection of the original Java API from which Streamo is ported but has also been a useful tool in implementing Streamo.
Creating an Optional
We can create an Optional using of:
const optional = Optional.of('foo');and we can create an empty Optional:
const emptyOptional = Optional.empty();If we need to make this a typed optional, e.g. an empty string optional:
const emptyString = Optional.empty<string>();Creating from Alternatives - Coalescing
Optional can replace the ?? coalesce operator in typescript. Let's look at the comparison:
const a = 'a';
const b = undefined;
const c = undefined;
const coalesceTs = a ?? b ?? c;
expect(coalesceTs).toBe('a');The same coalesce can be done using Optional.of:
const optionalCoalesce = Optional.of(a, b, c).get();With variables, this is fine, but the short-circuiting of ?? would be better than passing all values
to Optional.of when we're calling functions to find alternatives. So:
const coalesceTs = fnA() ?? fnB() ?? fnC(123);should be converted to:
const optionalCoalesce = Optional.ofSupplier(fnA, fnB, () => fnC(123));The only advantage of using Optional over ?? for coalescing is that .orElse is a clearer way
to demonstrate that we're providing a guaranteed fallback value.
E.g.
const optionalCoalesce = Optional.ofSupplier(fnA, fnB).orElse(12);The benefits of Optional as a fluent interface over a potentially absent element can be achieved
after coalescing with ?? or in place of it.
The or function can be used to bring together multiple suppliers of Optional until the first present one:
const optional = Optional.ofSupplier(fnA, fnB).or(fnC, fnD);Filtering
If the Optional has a value, then the filter will apply the predicate to it, and produce a new
Optional with the value removed if the predicate wasn't achieved:
// filtering on string of length 1 blanks out our input
expect(
Optional.of('foo')
.filter((st) => st.length === 1)
.isPresent()
).toBeFalsy();Map and FlatMap
We can map the Optional to another value:
expect(
Optional.of('foo')
.map((opt) => `${opt}!`)
.get()
).toBe('foo!');If the mapping function itself returns an Optional, we can flatten that by using flatMap:
expect(
Optional.of('foo')
.flatMap((opt) => Optional.of(`${opt}!`))
.get()
).toBe('foo!');Getting the Result
isEmpty and isPresent will tell us if the result is available.
The get function will retrieve the value, but can also return undefined. To guarantee a value of type
T, then we need to use orElse:
expect(Optional.of('foo').orElse('bar')).toBe('foo');If the optional is blank, the value provided in orElse will be returned:
expect(Optional.empty().orElse('bar')).toBe('bar');If there's effort in producing the value for else, we use orElseGet to invoke a function
to produce the result:
expect(Optional.empty().orElseGet(() => 'bar')).toBe('bar');If not having a value is an error then we can use orElseThrow:
const nextItem = optionalValue.orElseThrow();
const lastItem = optionalValue.orElseThrow(
() => new Error('How did this happen?')
);If we want to do something with the value when present, we can use ifPresent:
Optional.of('foo').ifPresent((item) => console.log(item));And this can replace if/else entirely using ifPresentOrElse:
Optional.of('foo').ifPresentOrElse(
(item) => console.log(item),
() => console.log('none')
);Async Support
AsyncStreams
The AsyncStream is an alternative implementation of Stream which allows for the source and modifications functions
to be asynchronous. Where it allows asynchronous functions such as AsyncMapper it also allows the synchronous version Mapper
to be used.
We can build an AsyncStream to process using asynchronous functions, and add a buffer to it to create concurrency.
The terminal operations of the AsyncStream result in a Promise we can await.
There's no equivalent of
NumberStreamfor async use.
Creating
A Stream can be converted into an AsyncStream:
const asyncStream = Stream.of('a', 'b', 'c').async();Or an AsyncStream can be constructed with the of and ofArray functions. As with Stream, we can use concat to
bring together two AsyncStreams of the same type:
const unifiedStream = AsyncStream.concat(
AsyncStream.of(1, 2),
AsyncStream.ofArray([3, 4])
);We can create a stream from a generating function using generateFinite:
const streamOfGenerated = AsyncStream.generate(() =>
// let's assume readNextReqeuest is function that returns a promise of something
AsyncOptional.of(readNextRequest())
).limit(10);The generating function can be synchronous or async. If used in conjunction with a buffer (see later)
this might allow the elements to be fetched concurrently and to enter the stream out of order,
we can add an optional boolean parameter of true to force one by one calls to the generator:
const streamOfGenerated = AsyncStream.generate(
() =>
// using this sequentially
AsyncOptional.of(readNextRequest()),
true
).limit(10);This allows the implementation of iterate where a seed value or function can be used in conjunction with a function to
calculate the next item (or return empty to signal none) to produce a stream:
const streamOfIterated = AsyncStream.iterate(
'first',
async (last: string): AsyncOptional<string> => findEntryAfter(last)
);Mapping, Filtering
AsyncStream can accept synchronous or asynchronous functions to map, flatMap, filter, etc.
const result = await Stream.of('a', 'b', 'c')
.async()
.map((item) => lookupItemInWebService(item)) // async mapper
.filter((item) => item.value === 'Success') // synchronous predicate
.toArray();The flatMap modifier will extract the items from an AsyncStream produced from the items:
const stream = Stream.of([1, 2, 3], [4, 5, 6])
.async()
.flatMap((array) => AsyncStream.ofArray(array)); // now AsyncStream<number>The limit function is also available to reduce the number of items allowed past it.
Terminal Operations
The usual functions all apply:
findFirst,anyMatch,noneMatch,allMatch- allowing async and synchronous predicatestoArray- for collecting to a final arraycount- to get the countcollect- which uses a synchronous collector fromCollectorsand, thus allows us also use use themaxBy,minByand other collectors available forStream
Conversion
Using indexed will attach the source order at the point of indexing (though if running concurrently, this can be random):
const stream = AsyncStream.of(1, 2, 3)
.map((item) => item * 2)
.buffer(3) // applies concurrency
.indexed(); // now a stream of <{ index: number; value: number; }>The transform method uses a synchronous transformer to rewrite the stream with processed values, such as batching.
As with Stream, the sorted method can be used to put the elements into order, and distinct will filter out duplicates: these are implemented via transforms.
Buffering and Concurrency
Note: this is still incubating - watch out for infinite waits, or missing items - please raise issues if you find any
Without a buffer, the stream uses await on each item coming through the stream individually, guaranteeing the source order. However,
one of the benefits of using asynchronous sources and asynchronous mapping functions, is that async code can be allow us to exploit concurrency.
The solution to this is to add a buffer to the stream.
The buffer should be added directly after
maporfilteroperations that do the most asynchronous work. If the buffer is added too soon, or too late, then it will not be able to parallelise the work.
It's probably advisable to add only one buffer to the stream.
const results = await AsyncStream.of('user1', 'user2', 'user3', 'user4')
.map((user) => loadUserProfile(user))
.filter((user) => user.isAdmin)
.buffer(2) // do this two users at a time
.toArray();Buffers have finite upper limits which we're using to control the volume of concurrent requests we might make to external
services, as well as the amount of unprocessed data being kept in the buffer before it hits something like a findFirst.
If we wanted maximum concurrency, we could set the buffer size to Number.MAX_VALUE.
Performance tuning may need to consider where the
bufferoperation is put compared to more sequential activities such asflatMapandtransform. Similarly, the presence ofbufferwill affect the output order of the data.
AsyncOptional
AsyncStream is built on top of AsyncOptional, which is the returned value from findFirst. However, AsyncOptional can be
used stand-alone.
For one-off async operations, we could continue to use Optional, which supports mapAsync, flatMapAsync and filterAsync which take either a synchronous or
async version of the Mapper or Predicate and returns a Promise<Optional> which can be awaited
to get the actual optional.
However, for fluent async operations on the Promise of an Optional, we can use AsyncOptional.
We can convert an Optional to AsyncOptional:
const asyncOptional = Optional.of('someValue').async();Or we can construct an AsyncOptional from a Promise of a value:
const asyncOptional = AsyncOptional.of(functionThatReturnsPromise());Then we can use map, filter, flatMap on the AsyncOptional with a mix of synchronous functions,
or functions that return promises.
const asyncOptional = AsyncOptional.of('someValue')
.map(someAsyncFunction)
.filter((s) => s !== 'error')
.map((s) => `${s}!`)
.filter(asyncLookupFunction);The terminal operations of the AsyncOptional, such as get, isPresent, isEmpty, orElse, etc
must themselves be awaited, and the AsyncOptional can be converted back to an Optional by awaiting it:
const value = await AsyncOptional.of(callAsync())
.filter(someAsyncFilter)
.toOptional(); // now we have a standard optionalor
const value = await AsyncOptional.of(callAsync())
.filter(someAsyncFilter)
.orElseGet(someAsyncSupplierOfNewValue);