@nxtedition/rxjs
v2.1.7
Published
RxJS utilities and custom operators.
Maintainers
Keywords
Readme
@nxtedition/rxjs
RxJS utilities and custom operators.
Install
npm install @nxtedition/rxjsRequires rxjs@^7 as a peer dependency.
API
cached(fn, options?, keySelector?)
Higher-order function that wraps an observable-returning function with a keyed, reference-counted cache. Subscribers sharing the same key reuse one upstream subscription. When all subscribers unsubscribe, the entry stays alive for maxAge ms before being cleaned up.
import { cached } from '@nxtedition/rxjs'
const cachedFetch = cached((id: string) => fetchRecord$(id), { maxAge: 5000 })
// Two subscribers to the same key share one upstream subscription
cachedFetch('abc').subscribe(console.log)
cachedFetch('abc').subscribe(console.log) // reuses existing subscriptionOptions (second argument):
- Pass a
numberto setmaxAgedirectly:cached(fn, 5000) - Pass
nullorundefinedto use the default (maxAge: 1000) - Pass an object with:
maxAge— Time in ms to keep idle entries before eviction (default:1000). Passnullor0for immediate cleanup.bufferSize— Number of values to replay to new subscribers (default:1). Pass0to disable replay.keySelector— Function to derive the cache key from arguments (default: first argument).
keySelector (third argument): Overrides options.keySelector if both are provided.
auditMap(project): OperatorFunction<T, R>
Operator that subscribes to one inner observable at a time. When a new value arrives while an inner subscription is active, it stores the pending value and aborts the previous inner observable (via AbortSignal). Once the inner observable completes, it processes the most recent pending value.
The project function receives the source value and a context object with:
signal— AnAbortSignalthat fires when a newer value supersedes this one.
import { auditMap } from '@nxtedition/rxjs'
source$.pipe(auditMap((value, { signal }) => fetch(`/api/${value}`, { signal })))combineMap(project, equals?): OperatorFunction<T[], R[]>
Operator that takes an array of keys and subscribes to an inner observable for each key. Emits an array of the latest values from all inner observables whenever any of them update.
Reuses existing subscriptions when keys are reordered or partially updated. The optional equals function controls key identity (defaults to ===).
import { combineMap } from '@nxtedition/rxjs'
keys$.pipe(combineMap((key) => fetchData$(key)))firstValueFrom(source, config?): Promise<T | D>
Enhanced version of rxjs.firstValueFrom with AbortSignal and timeout support.
import { firstValueFrom } from '@nxtedition/rxjs'
const value = await firstValueFrom(source$, {
signal: controller.signal,
timeout: 5000,
defaultValue: null,
})lastValueFrom(source, config?): Promise<T | D>
Enhanced version of rxjs.lastValueFrom with AbortSignal and timeout support.
import { lastValueFrom } from '@nxtedition/rxjs'
const value = await lastValueFrom(source$, {
signal: controller.signal,
timeout: 5000,
defaultValue: null,
})withAbortSignal(signal): OperatorFunction<T, T>
Operator that errors the observable with an AbortError when the given AbortSignal fires.
import { withAbortSignal } from '@nxtedition/rxjs'
source$.pipe(withAbortSignal(controller.signal))retry(config?): OperatorFunction<T, T>
Fork of the RxJS retry operator with exponential backoff (capped at 60s) and an emitOnRetry hook.
import { retry } from '@nxtedition/rxjs'
source$.pipe(
retry({
count: 5,
resetOnSuccess: true,
emitOnRetry: (err, attempt) => ({ status: 'retrying', attempt }),
}),
)Config:
count— Max retries (default:Infinity)delay— A number (ms), a function returning anObservableInput, ornullfor immediate retry. Default: exponential backoff.resetOnSuccess— Reset retry counter on each successful emission (default:false)emitOnRetry— Function called on each retry, its return value is emitted to subscribers.
AbortError
Error class thrown by withAbortSignal, firstValueFrom, and lastValueFrom on abort.
import { AbortError } from '@nxtedition/rxjs'
const err = new AbortError('custom message')
err.code // 'ABORT_ERR'
err.name // 'AbortError'License
UNLICENSED
