@valian/react-query-observable
v1.3.3
Published
react query rxjs observable
Readme
@valian/react-query-observable
React Query options for RxJS observables. This package provides utilities to seamlessly integrate RxJS observables with TanStack Query (React Query), enabling reactive data fetching with automatic caching, background updates, error handling, and subscription lifecycle management.
Installation
pnpm add @valian/react-query-observablePeer Dependencies
@tanstack/react-query^5.80.0rxjs^7.8.0
Features
- 🔄 Reactive data fetching: Use RxJS Observables as data sources for TanStack Query
- 🧠 Smart caching: First emission resolves the query; later emissions update cache
- ♻️ Automatic subscription cleanup: Subscriptions are tied to query lifecycle
- 🛡️ Error handling: Pre-first-value errors reject the query; post-first-value errors surface to the runtime
- 🧭 Dynamic staleness:
staleTimeisInfinitywhile subscribed, otherwise0 - 📦 TypeScript first: Strong types for options and observable function
Quick Start
import { useQuery } from '@tanstack/react-query'
import { interval, map, take } from 'rxjs'
import { observableQueryOptions } from '@valian/react-query-observable'
function MyComponent() {
const { data, isLoading, error } = useQuery(
observableQueryOptions({
queryKey: ['ticker'],
observableFn: () => interval(1000).pipe(take(1), map((i) => ({ tick: i }))),
}),
)
if (isLoading) return <div>Loading…</div>
if (error) return <div>Error: {(error as Error).message}</div>
return <div>Tick: {data?.tick}</div>
}API
observableQueryOptions
Creates TanStack Query options from an RxJS observable function.
function observableQueryOptions<
TQueryFnData = unknown,
TError = DefaultError,
TData = TQueryFnData,
TQueryKey extends QueryKey = QueryKey,
>(options: ObservableQueryOptions<TQueryFnData, TError, TData, TQueryKey>)Parameters (selected)
observableFn(required):(ctx: QueryFunctionContext<TQueryKey>) => Observable<TQueryFnData>queryKey(required):TQueryKey- Any other standard Query Options are accepted except those managed automatically (see below).
Managed options
These are controlled internally and thus omitted from ObservableQueryOptions:
queryFn: generated fromobservableFnstaleTime:Infinitywhen there is an active subscription forqueryKey, else0retry:false- Refetch-related flags:
refetchInterval,refetchIntervalInBackground,refetchOnWindowFocus,refetchOnMount,refetchOnReconnect,retryOnMount
Other defaults
gcTime:10_000(can be overridden)
Behavior Details
- The first value emitted by the observable resolves the query promise.
- Subsequent emissions update the cached data via
client.setQueryDatafor the samequeryKey. - If the observable errors before the first value, the query promise rejects with that error.
- If the observable errors after the first value, the error is thrown in the subscription context (outside the original promise). Handle these via RxJS or global error handlers if needed.
- When the query is removed from the cache, the active subscription for the corresponding
queryKeyis automatically unsubscribed.
Advanced Examples
WebSocket stream
import { webSocket } from 'rxjs/webSocket'
import { map, retry } from 'rxjs/operators'
const { data } = useQuery(
observableQueryOptions({
queryKey: ['live-prices', symbol],
observableFn: () =>
webSocket<{ price: number }>(`wss://example.com/${symbol}`).pipe(
map((msg) => msg.price),
retry({ delay: 1000 }),
),
}),
)Combine multiple sources
import { combineLatest } from 'rxjs'
import { map } from 'rxjs/operators'
const { data } = useQuery(
observableQueryOptions({
queryKey: ['user-profile', userId],
observableFn: () =>
combineLatest([fetchUser(userId), fetchUserPosts(userId), fetchFollowers(userId)]).pipe(
map(([user, posts, followers]) => ({ user, posts, followers })),
),
}),
)TypeScript Usage
interface Todo {
id: number
title: string
completed: boolean
}
const { data } = useQuery(
observableQueryOptions<Todo[]>({
queryKey: ['todos'],
observableFn: () => fetchTodos$(),
}),
)
// data: Todo[] | undefinedLicense
MIT © Valian
