npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@motorcycle/stream

v3.0.0

Published

Functional and reactive event streams for Motorcycle.ts

Downloads

42

Readme

@motorcycle/stream -- 2.1.0

Functional and reactive event streams for Motorcycle.ts

Get it

yarn add @motorcycle/stream
# or
npm install --save @motorcycle/stream

API Documentation

All functions are curried!

ap<A, B>(fns: Stream<(value: A) => B>, values: Stream<A>): Stream<B>

Applies a stream of functions to the latest from a stream of values.

import { ap, now, periodic, scan, skip, observe } from '@motorcycle/stream'

const count$ = scan(x => x + 1, 0, skip(1, periodic(100)))

const fn$ = now(x => x * x)

const stream = ap(fn$, count$)

observe(console.log, stream)
// 0
// 4
// 9
// ...

export { ap } from '@most/core'

at<A>(time: Time, value: A): Stream<A>

Creates a stream that emits a value after a given amount of time.


export { at } from '@most/core'

at<A>(time: number, value: A): Stream<A>

Create a stream containing a single event at a specific time.

import { at, observe } from '@motorcycle/stream'

observe(console.log, at(1000, 'Hello'))
// After 1 second
// logs 'Hello'

export { at } from '@most/core'

awaitPromises<A>(stream: Stream<Promise<A>>): Stream<A>

Turn a stream of promises into a stream containing the promises' values. Note that order is always preserved, regardless of promise fulfillment order.

import { mergeArray, fromPromise, at, now, observe } from '@motorcycle/stream'

// ----1------->
const a = new Promise(resolve => setTimeout(resolve, 100, 1))
// ---------2-->
const b = new Promise(resolve => setTimeout(resolve, 200, 2))
// --3--------->
const c = new Promise(resolve => setTimeout(resolve, 50, 3))

// bc---a------->
const source = mergeArray([ at(100, a), now(b), now(c) ])

// -----1----23->
const stream = awaitPromises(source)

export { awaitPromises } from '@most/core'

chain<A, B>(f: (value: A) => Stream<B>, stream: Stream<A>): Stream<B>

Creates a new stream by applying a stream-returning function to every event value and merging them into the resulting stream.

import { chain, now, observe } from '@motorcycle/stream'

const stream = chain(x => now(x * 2), now(1000))

observe(console.log, stream) 
// 2000

export { chain } from '@most/core'

combine<A, B, C>(f: (a: A, b: B) => C, a$: Stream<A>, b$: Stream<B>): Stream<C>

Apply a function to the most recent event from each stream when a new event arrives on any stream.

import { combine, at, merge, observe } from '@motorcycle/stream'

const a$ = merge(at(100, 100), at(200, 200))
const b$ = merge(at(200, 3000), at(250, 100))

const stream = combine(add, a$, b$)

observe(console.log, stream)
// 3200 -- at time 200 as a result of add(200, 3000)
// 350 -- at time 250 as a result of add(200, 100)

export { combine } from '@most/core'

combineArray<A, B, C>(f: (a: A, b: B) => C, streams: [ Stream<A>, Stream<B> ]): Stream<C>

Applies a function to the most recent event from all streams when a new event arrives on any stream.

import { combineArray, now, merge, at, observe } from '@motorcycle/stream'

const a$ = now(1000)
const b$ = now(2000)
const c$ = merge(at(100, 1), at(200, 2))

const sum = (x, y, z) => x + y + z

const stream = combineArray(sum, [ a$, b$, c$ ])

observe(console.log, stream)
// 3001 -- at time 100 as result of sum(1000, 2000, 1)
// 30002 -- at time 200 as result of sum(1000, 2000, 2)

export { combineArray } from '@most/core'

combineObj<Obj extends object>(obj: { [K in keyof Obj]: Stream<Obj[K]> }): Stream<Obj>

Takes an object of streams and returns a Stream of an object.

import { combineObj, now } from '@motorcycle/stream'

const obj = { a: now(1), b: now(2), c: now(3) }

const stream: Stream<{ a: number, b: number, c: number }> = combineObj(obj)

export function combineObj<Obj extends object>(
  object: { readonly [K in keyof Obj]: Stream<Obj[K]> }
): Stream<Obj> {
  const objectKeys = keys(object)
  const sources = values(object) as Array<Stream<Obj[keyof Obj]>>

  return combineArray((...values: Array<Obj[keyof Obj]>) => {
    const valuesMap = {} as Obj

    for (let i = 0; i < length(values); ++i) valuesMap[objectKeys[i]] = values[i]

    return valuesMap
  }, sources)
}

concatMap<A, B>(f: (value: A) => Stream<B>, stream: Stream<A>): Stream<B>

Creates a new stream by lazily applying a stream-returning function to each event value of a stream concatenating that stream's values to the resulting stream.

import { concatMap, now, observe } from '@motorcycle/stream'

const source = // --104--101--108--108--111|

const f = (x: number) => now(String.fromCharCode(x))

const stream = concatMap(f, source)

observe(console.log, stream)
// h
// e
// l
// l
// o

export { concatMap } from '@most/core'

constant<A>(value: A, stream: Stream<any>): Stream<A>

Replace each event on a stream with a given value.

import { constant, periodic, observe } from '@motorcycle/stream'

const stream = constant(100, periodic(1000))

observe(console.log, stream) // every 1 second logs 100

export { constant } from '@most/core'

continueWith(f: () => Stream<A>, stream: Stream<A>): Stream<A>

Replace the end signal with a new stream returned by f. Note that f must return a stream.

import { continueWith, at } from '@motorcycle/stream'

// ----1------>
const a = at(100, 1)
// ----2------>
const b = at(100, 2)

// ----1----2->
const stream = continueWith(() => b, a)

export { continueWith } from '@most/core'

debounce<A>(ms: number, stream: Stream<A>): Stream<A>

Wait for a burst of events to subside and keep only the last event in the burst.

import { debounce } from '@motorcycle/stream'

const source = // abcd----abcd--->
//                -----d-------d->
const stream = debounce(2, source)

export { debounce } from '@most/core'

delay<A>(ms: number, stream: Stream<A>): Stream<A>

Timeshift a stream by a number of milliseconds.

import { delay } from '@motorcycle/stream'

const source = -1--2--3--4--5---->
//             ----1--2--3--4--5->
const stream = delay(3, source)

export { delay } from '@most/core'

drain<A>(stream: Stream<A>): Promise<void>

Activates a stream using an default scheduler instance from @most/scheduler, returning a promise of completion.

import { drain } from '@motorcycle/stream'

drain(stream)
 .then(() => console.log('complete'))

export const drain = <A>(stream: Stream<A>): Promise<void> => runEffects(stream, scheduler)

during<A>(signal: Stream<Stream<any>>, stream: Stream<A>): Stream<A>

Keep events that occur during a time window defined by a higher-order stream.

import { during } from '@motorcycle/stream'

const source = // -1-2-3-4-5-6-7-8->
const signal = // ------s---------->
const s      = //       --------x-->
//                -------4-5-6-7|
const stream = during(signal, source)

export { during } from '@most/core'

empty<A>(): Stream<A>

Create a stream containing no events, which ends immediately.

import { empty, drain } from '@motorcycle/stream'

const stream = empty()

drain(stream)
 .then(() => console.log('complete'))

export { empty } from '@most/core'

filter<A>(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>

Retain only events for which a predicate is truthy.

import { filter, observe } from '@motorcycle/stream'

const source = // ---true---false---true---|

// resulting stream only contains truthy values
const stream = filter(Boolean, source)

observe(console.log, stream)
// true
// true

export { filter } from '@most/core'

fromPromise<A>(promise: Promise<A>): Stream<A>

Create a stream containing a promise's value.

import { fromPromise, observe } from '@motorcycle/stream'

const a = fromPromise(Promise.resolve(1))
const b = fromPromise(Promise.reject(new Error('failure')))

observe(console.log, a)
 .then(() => console.log('done'))
 .catch(err => console.error(err.message))
// 1
// done

observe(console.log, b)
 .then(() => console.log('done'))
 .catch(err => console.error(err.message))
// 'failure'

export { fromPromise } from '@most/core'

hold<A>(stream: Stream<A>): Stream<A>

Deliver the most recently seen event to each new observer the instant it begins observing. A held stream is always multicast.

Given an input stream:

stream:    -a---b---c---d-\>

observers which begin observing at different times will see:

observer1: -a---b---c---d-\>
observer2:    a-b---c---d-\>
observer3:           c--d-\>
import { createDocumentDomSource events } from '@motorcycle/dom'
import { drain, hold, map } from @motorcycle/stream'

const doc = createDocumentDomSource(now(document))

// start holding on first subscription
const click$ = hold(map(e => ({ x: e.clientX, y: e.clientY }), events('click', doc)))

// hold the latest event even before the first subscription
drain(click$)

export function hold<A>(stream: Stream<A>): Stream<A> {
  return new Hold<A>(stream)
}

class Hold<A> extends MulticastSource<A> implements Stream<A> {
  private has: boolean
  private value: A
  private scheduler: Scheduler

  constructor(stream: Stream<A>) {
    super(stream)
  }

  public run(sink: Sink<A>, scheduler: Scheduler) {
    this.scheduler = scheduler

    return super.run(sink, scheduler)
  }

  public add(sink: Sink<A>) {
    if (this.has) sink.event(this.scheduler.currentTime(), this.value)

    return super.add(sink)
  }

  public event(time: Time, value: A) {
    this.has = true
    this.value = value

    return super.event(time, value)
  }
}

last<A>(stream: Stream<A>): Stream<A>

Returns a stream that will only emit it's last value right before ending. If the stream does not end, then no events will ever occur. If the stream ends before emitting a value, no value will emit.


export function last<A>(stream: Stream<A>): Stream<A> {
  return new Last(stream)
}

class Last<A> implements Stream<A> {
  constructor(private source: Stream<A>) {}

  public run(sink: Sink<A>, scheduler: Scheduler): Disposable {
    return this.source.run(new LastSink(sink), scheduler)
  }
}

class LastSink<A> implements Sink<A> {
  private has: boolean = false
  private value: A

  constructor(private sink: Sink<A>) {}

  public event(_: Time, value: A) {
    this.has = true
    this.value = value
  }

  public error(time: Time, error: Error) {
    this.has = false
    this.sink.error(time, error)
  }

  public end(time: Time) {
    if (this.has) this.sink.event(time, this.value)

    this.has = false

    this.sink.end(time)
  }
}

loop<A, B, C>(f: (accumulator: B, value: A) => { seed: B, value: C }, initial: B, stream: Stream<A>): Stream<A>

Accumulate results using a feedback loop that emits one value and feeds back another to be used in the next iteration.

It allows you to maintain and update a "state" while emitting a different value. In contrast, scan feeds back and produces the same value.

import { loop, periodic, filter, observe } from '@motorcycle/stream'

function pairwiseInterval (acc: number): { seed: number, value: [number, number] } {
  const seed = acc + 1
  const value =  [ acc, seed ]

  return { seed, value }
}

const stream = loop(pairwiseInterval, periodic(100))

observe(console.log, stream)
// [ 0, 1 ]
// [ 1, 2 ]
// [ 2, 3 ]
// ....

export { loop } from '@most/core'

map<A, B>(f: (value: A) => B, stream: Stream<A>): Stream<B>

Apply a function to each event value of a stream, returning a new stream containing the returned values.

import { map, now, observe } from '@motorcycle/stream'

const stream = map(x => x + 1, now(100))

observe(console.log, stream) // 101

export { map } from '@most/core'

mapList<A, B>(f: (value: A, index: number) => B, sinksList$: Stream<ArrayLike<A>>): Stream<ReadonlyArray<B>>

Applies a function to all Sinks in a list of Sinks.

import { mapList } from '@motorcycle/stream'

function Component(sources) {
  const { listOfData$ } = sources

  const sinksList$: Stream<ReadonlyArray<Sinks>> = mapList(
    data => ChildComponent({ ...sources, data$: now(data) })), 
    listOfData$,
  )

  const childViews$: Stream<ReadonlyArray<Stream<VNode>> = 
    mapList(({ view$ }) => view$, sinksList$)

  ...
}

export const mapList: MapList = curry2(__mapList)

export type MapList = {
  <A, B>(f: (value: A, index: number) => B, list$: Stream<ArrayLike<A>>): Stream<ReadonlyArray<B>>
  <A, B>(f: (value: A, index: number) => B): (
    list$: Stream<ArrayLike<A>>
  ) => Stream<ReadonlyArray<B>>
}

function __mapList<A, B>(
  f: (value: A) => B,
  list$: Stream<ArrayLike<A>>
): Stream<ReadonlyArray<B>> {
  return map<ArrayLike<A>, ReadonlyArray<B>>(mapArray(f), list$)
}

merge<A>(a: Stream<A>, b: Stream<A>): Stream<A>

Creates a new Stream containing events from both streams.

import { merge, at, observe } from '@motorcycle/stream'

const stream = merge(at(1000, 'World'), at(100, 'Hello'))

observe(console.log, stream)
// Hello -- at time 100
// World -- at time 1000

export { merge } from '@most/core'

mergeArray<A>(stream: Array<Stream<A>>): Stream<A>

Creates a new stream containing all events of underlying streams.

import { at, mergeArray, observe } from '@motorcycle/stream'

const stream = mergeArray([
  at(100, 'foo'),
  at(300, 'baz')
  at(200, 'bar'),
])

observe(console.log, stream)
// foo -- at time 100
// bar -- at time 200
// baz -- at time 300

export { mergeArray } from '@most/core'

multicast<A>(stream: Stream<A>): Stream<A>

Returns a stream equivalent to the original, but which can be shared more efficiently among multiple consumers.

import { multicast, observe } from '@motorcycle/stream'

// --1--2--3--4--5--6--7--8-->
const source = // ...

// --1--2--3--4--5--6--7--8-->
observe(console.log, source)

setTimeout(() => {
// --------------1--2--3--4--5--6--7--8-->
  observe(console.log, source)
}, 5)

const stream = multicast(source)

// --1--2--3--4--5--6--7--8-->
observe(console.log, stream)

setTimeout(() => {
// --------------5--6--7--8-->
  observe(console.log, stream)
}, 5)

export { multicast } from '@most/core'

never<A>(): Stream<A>

Create a stream containing no events, which never ends.

import { never, drain } from '@motorcycle/stream'

const stream = never()

drain(stream) // Returns a promise that never fulfills.

export { never } from '@most/core'

now<A>(value: A): Stream<A>

Create a stream containing a single event at time 0

import { now, observe } from '@motorcycle/stream'

const stream = now(1)

observe(console.log, stream)
// 1

export { now } from '@most/core'

observe<A>(f: (value: A) => any, stream: Stream<A>): Promise<void>

Activates a stream, calling a function f with each event value, and returns a Promise of completion.


export const observe: Observe = curry2(<A>(f: (value: A) => any, stream: Stream<A>): Promise<
  void
> => drain(tap(f, stream)))

export interface Observe {
  <A>(f: (value: A) => any, stream: Stream<A>): Promise<void>
  <A>(f: (value: A) => any): (stream: Stream<A>) => Promise<void>
}

periodic(ms: number): Stream<void>

Creates a stream that emits ever time 0 and every n milliseconds after.

import { periodic } from '@motorcycle/stream'

// void----void----void----void---->
const stream = periodic(5)

export { periodic } from '@most/core'

recoverWith<A>((err: Error) => Stream<A>, stream: Stream<A>): Stream<A>

Recover from a stream failure by calling a function to create a new stream.

import { recoverWith } from '@motorcycle/stream'

// -1-2-3X------->
const a = // ...
// -4-5-6-------->
const b = // ...

// -1-2-3-4-5-6-->
const stream = recoverWith(() => b, a)

export { recoverWith } from '@most/core'

runEffects<A>(stream: Stream<A>, scheduler: Scheduler): Promise<void>

Activate an event stream, and consume all its events.

import { runEffects, tap } from '@motorcycle/stream'
import { newDefaultScheduler } from '@most/scheduler'

const logStream = tap(console.log, stream)

runEffects(logStream, newDefaultScheduler())
 .then(() => console.log('complete'))
 .catch(err => console.error(err))

export { runEffects } from '@most/core'

sample<A, B, C>(f: (a: A, b: B) => C, sampler: Stream<A>, stream: Stream<B>): Stream<C>

For each event in a sampler stream, apply a function to combine it with the most recent event in another stream. The resulting stream will contain the same number of events as the sampler stream.

s1:                       -1--2--3--4--5->
sampler:                  -1-----2-----3->
sample(sum, sampler, s1): -2-----5-----8->

s1:                       -1-----2-----3->
sampler:                  -1--2--3--4--5->
sample(sum, sampler, s1): -2--3--5--6--8->

export { sample } from '@most/core'

sampleWith<A>(sampler: Stream<any>, stream: Stream<A>): Stream<A>

Given each event occurrence from a sampler stream takes the latest value from the given stream.

import { sampleWith } from '@motorcycle/stream'

function submit(dom: DomSource): Stream<string> {
  const button = query('button', dom)
  const input = query('input', dom)

  const click$ = events('click', button)
  const value$ = map(ev => ev.target.value, events('input', input))

  return sampleWith(click$, value$)
}

export const sampleWith = sample(takeRight) as SampleWith

export interface SampleWith {
  <A>(sampler: Stream<any>, stream: Stream<A>): Stream<A>
  <A>(sampler: Stream<any>): (stream: Stream<A>) => Stream<A>
  (sampler: Stream<any>): <A>(stream: Stream<A>) => Stream<A>
}

function takeRight<A>(_: any, value: A): A {
  return value
}

scan<A, B>(f: (seed: B, value: A) => B, initial: B, stream: Stream<A>): Stream<B>

Incrementally accumulate results, starting with the provided initial value.

import { scan, periodic, observe } from '@motorcycle/stream'

// creates a stream that increments by 1 every 1000ms
const count$ = scan(x => x + 1, 0, periodic(1000))

observe(console.log, count$)

export { scan } from '@most/core'

scheduler (Scheduler)

A shared instance of the default scheduler from @most/scheduler

import { scheduler, now } from '@motorcycle/stream'

const stream = now(1)

const sink = {
  event(time: number, value: number) { ... },
  error(time: number, err: Error) { ... },
  end(time: number) { ... }
}

const disposable = stream.run(sink, scheduler)

// later 
disposable.dispose()

export const scheduler: Scheduler = newDefaultScheduler()

since<A>(startSingal: Stream<any>, stream: Stream<A>): Stream<A>

Discard all events in one stream until the first event occurs in another.:

import { since } from '@motorcycle/stream'

const source = // -1-2-3-4-5-6-7-8->
const start =  // --------x-------->
//                ---------5-6-7-8->
const stream = since(start, source)

export { since } from '@most/core'

skip<A>(quanity: number, stream: Stream<A>): Stream<A>

Skip the first n number of events.

import { skip } from '@motorcycle/stream'

const source = // -1-2-3-4-5-6-7-8-9-10->
//                -----------6-7-8-9-10->
const stream = skip(5, source)

export { skip } from '@most/core'

skipAfter<A>(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>

Discard all events after the first event for which predicate returns true.

import { skipAfter } from '@motorcycle/stream'

const source = // --1-2-3-4-5-6-7-8->
//                --1-2|
const stream = skipAfter(even, source)

export { skipAfter } from '@most/core'

skipRepeats<A>(stream: Stream<A>): Stream<A>

Remove adjacent events that are equal in terms of value equality.


const a = { a: 1 }
const b = Object.assign({}, a)
const c = { c: 2 }

const source = // --a--b--a--c-->
//                --a--------c-->
const stream = skipRepeats(source)

observe(console.log, stream)
// { a: 1 }
// { c: 2 }

export const skipRepeats: SkipRepeats = skipRepeatsWith(equals)

export type SkipRepeats = <A>(stream: Stream<A>) => Stream<A>

skipRepeatsWith<A>(predicate: (a: A, b: A) => boolean, stream: Stream<A>): Stream<A>

Remove adjacent repeated events, using the provided equality function to compare adjacent events.:

import { skipRepeatsWith, observe } from '@motorcycle/stream'
 
const source = // --a-b-B-c-D-d-e->

const equalsIgnoreCase = (a: string, b: string) =>
 a.toLowerCase() === b.toLowerCase()

const stream = skipRepeatsWith(equalsIgnoreCase, source)

observe(console.log, stream)
// a
// b
// c
// D
// e

export { skipRepeatsWith } from '@most/core'

skipWhile(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>

Discard all events until predicate returns false, and keep the rest.

import { skipWhile } from '@motorcycle/stream'

const source = // -2-4-5-6-8->
//                 ----5-6-8->
const stream = skipWhile(even, source)

export { skipWhile } from '@most/core'

slice<A>(skip: number, take: number, stream: Stream<A>): Stream<A>

Keep only events in a range, where start <= index < end, and index is the ordinal index of an event in stream.

import { slice } from '@most/core'

const source = // --1--2--3--4--5--6--7--8--9--10-->
//                --------3--4--5|
const stream = slice(2, 3, source)

export { slice } from '@most/core'

startWith<A>(initialValue: A, stream: Stream<A>): Stream<A>

Prepends an event to a stream at time 0.

import { startWith, at, observe } from '@motorcycle/stream'

const stream = startWith('Hello', at(1000, 'world'))

observe(console.log, stream)
// At time 0 logs 'Hello'
// At time 1000 logs 'world'

export { startWith } from '@most/core'

state<A, B>(f: (acc: A, value: B) => A, seed$: Stream<A>, values$: Stream<B>): Stream<A>

Especially useful when keeping local state that also needs to be updated from an outside source.

import { Stream } from '@motorcycle/types'
import { query, dragOverEvent, dragstartEvent, dropEvent } from '@motorcycle/dom'
import { sample, map, state, mapList } from '@motorcycle/stream'
import { move } from '@typed/prelude'

export function ReorderableList(sources) {
  const { list$, dom } = sources
  const li = query('li', dom)
  const dragOver$ = dragOverEvent(li)
  const dragStart$ = dragstartEvent(li)
  const drop$ = dropEvent(li)
  const reducer$: Stream<(list: Array<string>) => Array<string>> = 
    sample((to, from) => move(from, to), map(getKey, drop$), map(getKey, dragStart$))
  const reorderedList$ = state((x, f) => f(x), list$, reducer$)
  // create all of our <li> tags
  const childViews$ = mapList(listItem, reorderedList$)
  // create our <ul> containgin our <li> tags
  const view$ = map(view, childViews$)

  return {
    view$,
    preventDefault$: dragOver$,
  }
}

export const state: State = curry3(__state)

function __state<A, B>(
  f: (accumulator: B, value: A) => B,
  seed$: Stream<B>,
  values$: Stream<A>
): Stream<B> {
  return switchMap(seed => scan(f, seed, values$), seed$)
}

export interface State {
  <A, B>(f: (accumulator: A, value: B) => A, seed$: Stream<A>, values$: Stream<B>): Stream<A>
  <A, B>(f: (accumulator: A, value: B) => A, seed$: Stream<A>): (values$: Stream<B>) => Stream<A>
  <A, B>(f: (accumulator: A, value: B) => A): {
    (seed$: Stream<A>, values$: Stream<B>): Stream<A>
    (seed$: Stream<A>): (values$: Stream<B>) => Stream<A>
  }
}

switchCombine<A>(streamList$: Stream<Array<Stream<A>>): Stream<ReadonlyArray<A>>

Flattens an array of streams into an array of values. Particularly useful when dealing with a list of children components.

import { switchCombine, mapSinks, map, now } from '@motorcycle/stream'

function Component(sources) {
  const { listOfData$ } = sources

  const childSinks$ = map(
    listOfData => listOfData.map(data => ChildComponent({ ...sources, data$: now(data) }))
    listOfData$
  )

  const childViews$: Stream<ReadonlyArray<VNode>> = 
    switchCombine(mapSinks(sinks => sinks.view$, childSinks$))

  const view$ = map(view, childView$)

  return { view$ }
}

function view(childViews: ReadonlyArray<VNode>): VNode {
  // ...
}

export function switchCombine<A>(streamList$: Stream<Array<Stream<A>>>): Stream<ReadonlyArray<A>> {
  return switchLatest(
    map(
      streams => (streams.length === 0 ? now([]) : combineArray((...items) => items, streams)),
      streamList$
    )
  )
}

switchLatest<A>(stream: Stream<Stream<A>>): Stream<A>

Given a higher-order stream, return a new stream that adopts the behavior of (ie emits the events of) the most recent inner stream.

import { switchLatest, now } from '@motorcycle/stream'

const A = // -1--2--3----->
const B = // -4--5--6----->
const C = // -7--8--9----->

// --A-----B-----C-------->
const source = // ...

// ---1--2--4--5--7--8--9->
const stream = switchLatest(source)

export { switchLatest } from '@most/core'

switchMap<A, B = A>(f: (a: A) => Stream<B>, s: Stream<A>): Stream<B>

Applies a function, which returns a higher-order stream, to each event value of a stream and returns a new stream that adopts the behavior of (i.e., emits the events of) the most recent inner stream.

import { now, scan, switchMap, observe, skip } from '@motorcycle/stream'

const a$ = now(1)
const b$ = now(2)
const f = (a: number) => scan((x, y) => x + y, a, b$)
const s = skip(1, switchMap(f, a$))

observe(console.log, s) // 3

export const switchMap: SwitchMapArity2 = curry2(function switchMap<A, B = A>(
  f: (a: A) => Stream<B>,
  s: Stream<A>
): Stream<B> {
  return switchLatest(map(f, s))
})

export interface SwitchMapArity2 {
  <A, B = A>(f: (a: A) => Stream<B>, s: Stream<A>): Stream<B>

  <A, B = A>(f: (a: A) => Stream<B>): SwitchMapArity1<A, B>
}

export interface SwitchMapArity1<A, B = A> {
  (s: Stream<A>): Stream<B>
}

switchMerge<A>(streams$: Stream<Array<Stream<A>>): Stream<A>

Merges a list of streams into a single stream containing events from all of the stream. Particularly useful when dealing with a list of child components.

import { switchMerge, mapSinks, now } from '@motorcycle/stream'

function Component(sources) {
  const { listOfData$ } = sources

  const childSinks$ = map(
    listOfData => listOfData.map(data => ChildComponent({ ...sources, data$: now(data) }))),
    listOfData$
  )

  const foo$ = switchMerge(mapSinks(sinks => sinks.foo$, childSinks$))

  return { foo$ } 
}

export function switchMerge<A>(streams$: Stream<Array<Stream<A>>>): Stream<A> {
  return switchLatest(map(mergeArray, streams$))
}

switchSinkOr<Sinks, K extends keyof Sinks>(or$: Sinks[K], sinkName: K, sinks$: Stream<Sinks>): Sinks[K]

Flattens a stream of sinks into a single sink.

import { switchSinkOr, map, now, never } from '@motorcycle/stream'

const switchSinkOrNever = switchSinkOr(never())

function Component(sources) {
  const { listOfItems$ } = sources

 const sinks$ = map(items => SubComponent({ ...sources, items$: now(items) }), listOfItems$)

 const history$ = switchSinkOrNever('history$', sinks$)

 return { history$ } 
}

export const switchSinkOr: SwitchSinkOr = curry3<any, any, any, any>(function switchSinkOr<
  Sinks extends { readonly [key: string]: Stream<any> },
  K extends keyof Sinks = keyof Sinks
>(or$: Sinks[K], sinkName: K, sinks$: Stream<Sinks>): Sinks[K] {
  return switchLatest(map(sinks => sinks[sinkName] || or$, sinks$))
})

export interface SwitchSinkOr {
  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K],
    sinkName: K,
    sinks$: Stream<Sinks>
  ): Sinks[K]

  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K],
    sinkName: K
  ): (sinks$: Stream<Sinks>) => Sinks[K]

  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K]
  ): (sinkName: K, sinks$: Stream<Sinks>) => Sinks[K]

  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K]
  ): (sinkName: K) => (sinks$: Stream<Sinks>) => Sinks[K]
}

take<A>(quantity: number, stream: Stream<A>): Stream<A>

Take at most the first n events of a stream.

import { take } from '@motorcycle/stream'

const source = // -1-2-3-4-5-6-7-8-9-10->
//                -1-2-3|
const stream = take(3, source)

export { take } from '@most/core'

takeWhile<A>(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>

Keep all events until predicate returns false, and discard the rest.

import { takeWhile } from '@motorcycle/stream'

const source = // -2-4-5-6-8->
//                -2-4-|
const stream = takeWhile(even, source)

export { takeWhile } from '@most/core'

tap<A>(f: (value: A) => any, stream: Stream<A>): Stream<A>

Creates a new stream that upon each event performs a side-effect.

import { tap, drain } from '@motorcycle/stream'

const logStream = tap(console.log, stream)

drain(logStream)

export { tap } from '@most/core'

throttle<A>(ms: number, stream: Stream<A>): Stream<A>

Limit the rate of events to at most one per a number of milliseconds.

In contrast to debounce, throttle simply drops events that occur "too often", whereas debounce waits for a "quiet period".

import { throttle } from '@motorcycle/stream'

const source = // -abcd---abcd--->
//                -a-c----a-c---->
const stream = throttle(2, source)

export { throttle } from '@most/core'

throwError(err: Error): Stream<never>

Create a stream in the error state. This can be useful for functions that need to return a stream, but need to signal an error.

import { throwError, chain, now } from '@motorcycle/stream'

const f = (x: Maybe<number>): Stream<number> => isNothing(x)
 ? throwError(new Error('cannot be given Nothing'))
 : now(fromJust(x)) 

const stream = chain(f, maybe$)

export { throwError } from '@most/core'

until<A>(endSignal: Stream<any>, stream: Stream<A>): Stream<A>

Keep all events in one stream until the first event occurs in another.

import { until } from '@motorcycle/stream'

const source =     // --1-2-3-4-5-6-7-8->
const endSignal =  // ---------z-------->
//                    --1-2-3-4|
const stream = until(endSingal, source)

export { until } from '@most/core'

withArrayValues<A>(array: Array<A>, stream: Stream<any>): Stream<A>

Creates a new stream by associating event times with values from an array. The resulting stream will end when all array values have been used or when the underlying stream ends.

import { withArrayValues, periodic, observe } from '@motorcycle/stream'

const stream = withArrayValues([ 1, 2, 3 ], periodic(100))

observe(console.log, stream)
// 1 -- time 0
// 2 -- time 100
// 3 -- time 200

export { withArrayValues } from '@most/core'

zip<A, B, C>(f: (a: A, b: B) => C, a$: Stream<A>, b$: Stream<B>): Stream<C>

Applies a function to corresponding pairs of events from the input streams.

import { zip, observe } from '@motorcycle/stream'

const tuple = (x, y) => [x, y]

const a$ = // --1----3-------5------6----|
const b$ = // --------2--3--------4------|
//         // --------[3,2]--[5,3]--[6,4]|
const stream = zip(tuple, a$, b$)

observe(console.log, stream)
// [3, 2]
// [5, 3]
// [6, 4]

export { zip } from '@most/core'

zipArray<A, B, C>(f: (a: A, b: B) => C, streams: [Stream<A>, Stream<B>]): Stream<C>

Applies a function to corresponding pairs of events from the input streams.

import { zipArray, observe } from '@motorcycle/stream'

const tuple = (x, y) => [x, y]

const a$ = // --1----3-------5------6----|
const b$ = // --------2--3--------4------|
//         // --------[3,2]--[5,3]--[6,4]|
const stream = zipArray(tuple [a$, b$])

observe(console.log, stream)
// [3, 2]
// [5, 3]
// [6, 4]

export { zipArray } from '@most/core'

zipArrayValues<A, B, C>(f: (arrayValue: A, streamValue: Stream<B>) => C, array: Array<A>, stream: Stream<B>): Stream<C>

Creates a new stream by applying a function with a value at increasing index of an array and the latest event value from a stream. The resulting stream will end when all array values have been used or as soon as the underlying stream ends.

import { zipArrayValues, now, concat, observe } from '@motorcycle/stream'

const f = (x, y) => x + y

const array = [ 100, 200 ]
const stream = concat(now(1), now(2))

observe(console.log, zipArrayValues(f, array, stream))
// 101
// 202

export { zipArrayValues } from '@most/core'