npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

abortable-rx

v1.0.9

Published

Drop-in replacements for RxJS Observable methods and operators that work with AbortSignal

Downloads

633

Readme

abortable-rx

npm build codecov dependencies license code style: prettier semantic-release

Drop-in replacements for RxJS Observable methods and operators that work with AbortSignal. Enables easy interop between Observable code and Promise-returning functions, without losing the cancellation capabilities of RxJS.

Why?

Some operations are imperative by nature and easier to express in imperative code with async/await. Expressing these operations that need control flow with functional RxJS operators or Subjects results in unreadable and unmaintainable code. In addition, it is confusing to have async functions that only have one or no result, but return an Observable, as it is unclear how many times it will emit. RxJS has great interop with Promises, however, it doesn't provide an easy mechanism to propagate cancellation to promise-returning functions like the native fetch API. This micro library provides that mechanism.

Installation

npm install abortable-rx

Included

Observable factories

  • defer<T>(factory: (signal: AbortSignal) => ObservableInput<T>): Observable<T>
    Easiest way to wrap an abortable async function into a Promise. The factory is called every time the Observable is subscribed to, and the AbortSignal is aborted on unsubscription.
  • create<T>(subscribe?: (subscriber: Subscriber<T>, signal: AbortSignal) => TeardownLogic): Observable<T> Creates an Observable just like RxJS create, but exposes an AbortSignal in addition to the subscriber

Observable consumers

  • toPromise<T>(observable: Observable<T>, signal?: AbortSignal): Promise<T> Returns a Promise that resolves with the last emission of the given Observable, rejects if the Observable errors or rejects with an AbortError when the AbortSignal is aborted.
  • forEach<T>(source: Observable<T>, next: (value: T) => void, signal?: AbortSignal): Promise<void> Calls next for every emission and returns a Promise that resolves when the Observable completed, rejects if the Observable errors or rejects with an AbortError when the AbortSignal is aborted.

Observable operators

  • switchMap<T, R>(project: (value: T, index: number, abortSignal: AbortSignal) => ObservableInput<R>): OperatorFunction<T, R> Like RxJS switchMap, but passes an AbortSignal that is aborted when the source emits another element.
  • concatMap<T, R>(project: (value: T, index: number, abortSignal: AbortSignal) => ObservableInput<R>): OperatorFunction<T, R> Like RxJS concatMap, but passes an AbortSignal that is aborted when the returned Observable is unsubscribed from.
  • mergeMap<T, R>(project: (value: T, index: number, abortSignal: AbortSignal) => ObservableInput<R>): OperatorFunction<T, R> Like RxJS mergeMap, but passes an AbortSignal that is aborted when the returned Observable is unsubscribed from.

📖 Full API documentation

Handling AbortError

forEach and toPromise will reject the Promise with an Error if the signal is aborted. This is so calling code does not continue execution and gets a chance to cleanup with finally. You can handle this error (usually at the top level) by checking if error.name === 'AbortError' in a catch block.

If the functions you pass to defer, switchMap, etc. throw AbortError, you don't have to worry about catching it. The Promises are always converted to Observables internally, and that Observable is always unsubscribed from first, then the AbortSignal is aborted. After an Observable is unsubscribed from, all further emissions or errors are ignored, so you don't have to worry about the error terminating your Observable chain.

Example

Using fetch inside switchMap

import { fromEvent } from 'rxjs'
import { switchMap } from 'abortable-rx/operators'

fromEvent(input, 'value')
  .pipe(switchMap(async (event, i, signal) => {
    const resp = await fetch(`api/suggestions?value=${event.target.value}`, { signal })
    if (!resp.ok) {
      throw new Error(resp.statusText)
    }
    return await resp.json()
  })
  .subscribe(displaySuggestions)

Using toPromise to wait for an event to happen once

import { toPromise } from 'abortable-rx'

class ClientConnection {
  private events: Observable<Event>

  async sync(signal?: AbortSignal): Promise<void> {
    await this.scheduleSync('immediatly', signal)
    const stream = this.events.pipe(
      filter(event => event.type === 'SYNC_COMPLETED'),
      take(1)
    )
    await toPromise(stream, signal)
  }
}

Polling

import { fromEvent } from 'rxjs'
import { switchMap } from 'rxjs/operators'
import { defer } from 'abortable-rx'

fromEvent(repoDropdown, 'change')
  .pipe(switchMap(event =>
    concat(
      ['Loading...'],
      defer(async signal => {
        while (true) {
          const resp = await fetch(`api/repo/${event.target.value}`, { signal })
          if (!resp.ok) {
            throw new Error(resp.statusText)
          }
          const repo = await resp.json()
          if (repo.cloneInProgress) {
            await new Promise(resolve => setTimeout(resolve, 1000))
            continue
          }
          return repo.filesCount
        }
      })
    )
  }))
  .subscribe(content => {
    fileCount.textContent = content
  })

Support

AbortSignal is supported by all modern browsers, but there is a polyfill available if you need it.