@chrisluyi/zustand-engine
v1.2.0
Published
A typed, plugin-driven data-fetching engine built on **Zustand**, **React Query**, and **Zod**. Descriptors declare _what_ to fetch and validate; the engine handles caching, native-store persistence, debounce/throttle, schema validation, transforms, slice
Readme
@chrisluyi/zustand-engine
A typed, plugin-driven data-fetching engine built on Zustand, React Query, and Zod. Descriptors declare what to fetch and validate; the engine handles caching, native-store persistence, debounce/throttle, schema validation, transforms, slice writes, and Redux sync — with a plugin bus for custom hooks at every stage.
Installation
bun add @chrisluyi/zustand-enginePeer dependencies: zustand (^5), @tanstack/react-query (^5), zod (^4), immer (^10 or ^11, optional — provided by zustand)
Quick start
import { WorkflowEngine } from '@chrisluyi/zustand-engine'
import { QueryClient } from '@tanstack/react-query'
import { z } from 'zod'
const queryClient = new QueryClient()
const engine = new WorkflowEngine({ queryClient, name: 'my-app' })
const UserSchema = z.object({ id: z.string(), name: z.string() })
engine.register({
key: 'user',
schema: UserSchema,
fetch: async ({ options }) => {
const res = await fetch(`/api/users/${options.params?.id}`)
return res.json()
},
})
const result = await engine.trigger('user', { params: { id: '42' } })
// result.status → 'success' | 'partialSuccess' | 'error'
// result.data → { id: string; name: string } | null
const slice = engine.store.getState().slices.user
// slice.data, slice.status, slice.isLoading, slice.lastUpdated, …Engine constructor
const engine = new WorkflowEngine({
queryClient, // required — @tanstack/react-query QueryClient
// Project-level identifier. Flows automatically to:
// - DevTools label (unless devtools.name overrides it)
// - Native cache storeKey prefix: `name:descriptorKey`
// - Console log prefix: `[name]` (when logging is enabled)
name: 'my-app',
// Enable Zustand DevTools (pass { name: 'MyEngine' } to label it explicitly)
devtools: true,
// Console-log every stage transition
logging: true,
// — or with options:
logging: { prefix: '[MyApp]', logLevel: 'debug' }, // logLevel: 'log' | 'debug' | 'info'
// Global native store integration (applied to every descriptor)
nativeCache: {
retrieve: (key) => store.get(key),
upsert: (key, data) => store.set(key, data),
},
// Global default for how Zod mismatches are handled
// 'warn' (default) | 'error' | 'passthrough'
zodMode: 'warn',
})Source descriptors
engine.register({
key: 'products',
schema: ProductsSchema,
// Fetch function — receives FetchContext { key, options, queryClient, meta }
fetch: async (ctx) => {
const res = await fetch('/api/products')
return res.json()
},
// Optional transform applied after Zod validation
transform: (data) => data.map(normalize),
// React Query cache options
queryOptions: { staleTime: 60_000, gcTime: 300_000, retry: 2 },
// Debounce — coalesce rapid callers into one fetch (ms)
debounce: 300,
// Throttle — first call executes immediately;
// subsequent calls within the window share that promise (ms)
throttle: 500,
// Merge — combine existing Zustand state with incoming data before writing
merge: (existing, incoming) => [...(existing ?? []), ...incoming],
// Sync writes to Redux after every successful pipeline run
syncToRedux: true,
// Per-descriptor native cache override (or false to disable entirely)
nativeCache: { storeKey: 'native_products_key' },
// Zod mismatch handling for this descriptor — overrides engine-level zodMode
zodMode: 'error',
// Per-descriptor plugins
plugins: [myPlugin],
})Zod validation mode
Controls what happens when the API response doesn't match the schema.
| Mode | Behaviour |
|------|-----------|
| 'warn' (default) | Strips invalid fields. data is partial. status: 'partialSuccess'. Per-field warnings surfaced in slice.warnings. |
| 'error' | Pipeline fails at the validate phase. status: 'error'. |
| 'passthrough' | Raw data passes through unchanged. Warnings surfaced. status: 'partialSuccess'. |
Set globally on the engine or per descriptor. Descriptor-level overrides the engine-level.
Split sources
A single API response can be fanned out to multiple Zustand slices, each with its own schema and optional transform:
engine.register({
key: 'dashboard',
schema: DashboardSchema,
fetch: fetchDashboard,
split: {
user: {
path: 'user', // JSON path within the response to extract
schema: UserSchema, // validated independently
transform: (d) => ({ ...d, fullName: `${d.first} ${d.last}` }),
},
permissions: {
path: 'permissions',
schema: PermSchema,
// optional per-child merge — for accumulation within that split key
merge: (existing, incoming) => [...(existing ?? []), ...incoming],
},
},
})
await engine.trigger('dashboard')
engine.store.getState().slices.user // → { data: User, status: 'success', … }
engine.store.getState().slices.permissions // → { data: string[], … }
engine.store.getState().slices.dashboard // → { data: { user, permissions }, … }Default path: when no path is specified for a split entry, the split record key itself is used as the extraction path. This means the response is expected to have a top-level key matching the entry name — the conventional POST-style BFF layout.
Auto-BFF behaviour: if a section key is absent from the API response (the key resolves to undefined and no explicit path or transform was given), that section is silently skipped. Other sections and their slices are unaffected.
Merge + split interaction: descriptor.merge is applied to the raw BFF response before splitting, so every child key receives data from the accumulated merged state. SplitEntry.merge handles per-child accumulation independently.
Triggering
// Basic
await engine.trigger('user')
// Force refresh — bypasses in-flight deduplication and native cache
await engine.trigger('user', { force: true })
// Dynamic params — merged into ctx.options.params inside the fetch function
await engine.trigger('products', { params: { category: 'electronics', page: 2 } })In-flight deduplication: concurrent calls with the same key + same params share one pipeline execution. Different params run as independent pipelines.
TriggerResult
type TriggerResult<T> =
| { status: 'success'; data: T; warnings: [] }
| { status: 'partialSuccess'; data: Partial<T>; warnings: ValidationWarning[] }
| { status: 'error'; data: null; error: TriggerError }trigger never throws. Errors are returned as status: 'error' results.
wrapQuery
Run the validate → transform → write pipeline on data you already have, without fetching:
const result = await engine.wrapQuery('user', rawApiData)Useful when data arrives via WebSocket, subscription, or server-side props.
Direct slice updates
Update a slice's data without fetching or validating:
// Immer-style updater
engine.updateSlice('user', (prev) => ({ ...prev, name: 'Alice' }))
// Batch multiple slices atomically
engine.batchUpdate({
user: (prev) => ({ ...prev, name: 'Alice' }),
profile: (prev) => ({ ...prev, bio: 'Updated' }),
})Slice shape
interface SourceState<T> {
data: T | null
status: 'idle' | 'success' | 'partialSuccess' | 'error'
isLoading: boolean
isFetching: boolean
isError: boolean
error: TriggerError | null // { phase, cause: Error }
isStale: boolean
lastUpdated: number | null
warnings: ValidationWarning[] // per-field Zod warnings
lastParamsKey: string | null // sorted-JSON fingerprint of last successful params
}TriggerError.phase is 'fetch' | 'validate' | 'transform' | 'write'.
Access slices from engine.store.getState().slices[key].
Native cache
The engine integrates any async key-value store as a two-tier cache:
- Native store — persists across sessions; checked before any network call
- API — fallback on miss
How native cache works
Store key: <engineName>:<descriptorKey> (e.g. "api-demo:user-by-id"). One entry per descriptor — not per params. The most recent successful response overwrites the previous entry.
Params fingerprint (lastParamsKey): a stable sorted-JSON string of options.params, stored on the Zustand slice after every successful write. On the next trigger call the engine compares the current params against lastParamsKey:
- Same params → cache is served (if data is found in the store)
- Different params → cache is bypassed, API is called, entry overwritten with new response
Restore flow: retrieve(storeKey) is called before any network request. On a hit, data is written to the Zustand slice and afterWrite fires with source: 'native-cache'. No beforeFetch or afterFetch events fire.
Upsert flow: after every successful pipeline run, upsert(storeKey, finalData) is called fire-and-forget with the post-merge slice value.
force: true bypasses both the native cache and the shouldFetch predicate.
shouldFetch skip vs native cache — event behaviour
These are two distinct "skip API" mechanisms with different observable behaviour:
| | shouldFetch returns false | Native cache hit |
|---|---|---|
| When it fires | Before loading state is set | After isLoading: true is set |
| Plugin events | None — pipeline never starts | afterWrite fires with source: 'native-cache' |
| Async I/O | No | Yes — awaits retrieve() |
| Use case | In-memory skip (data already in slice, same payload) | Persistent storage restore (MMKV, AsyncStorage, localStorage) |
| Typical pattern | Split/BFF descriptors with auto-derived shouldFetch | Single-slice descriptors with nativeCache config |
The two can be combined — and this is the recommended pattern for BFF split descriptors used across multiple WebView instances:
- Same engine session (e.g. same WebView):
shouldFetchreturnsfalse→ instant return, no I/O, zero events. - Fresh engine (e.g. new WebView, app restart):
shouldFetchreturnstrue(slices empty) → native cache checked → hit → each split slice hydrated from cache →afterWritefires withsource: 'native-cache'. No API call. - After cache restore:
splitSectionParamsis populated from the restore params so that subsequent same-session calls also get the instantshouldFetch → falsepath without hitting the native store again. - Params changed for one section:
shouldFetchreturnstrue→ native cache bypassed (params fingerprint mismatch) → API call for changed sections only.
Browser — localStorage / sessionStorage
import { createLocalStorageAdapter } from '@chrisluyi/zustand-engine'
// localStorage — persists across browser sessions
const browserCache = createLocalStorageAdapter({ prefix: 'myapp:' })
// sessionStorage — cleared when the tab closes
const sessionCache = createLocalStorageAdapter({
prefix: 'myapp:',
storage: window.sessionStorage,
})
const engine = new WorkflowEngine({ queryClient, nativeCache: browserCache.nativeCache })
// Cache invalidation utilities
browserCache.remove('user') // removes 'myapp:user' from storage
browserCache.clear() // removes all 'myapp:*' entriesReact Native / mobile — custom store
import { createNativeStoreAdapter } from '@chrisluyi/zustand-engine'
const native = createNativeStoreAdapter({
retrieve: (key) => AsyncStorage.getItem(key).then((v) => (v ? JSON.parse(v) : null)),
upsert: (key, data) => AsyncStorage.setItem(key, JSON.stringify(data)),
})
const engine = new WorkflowEngine({ queryClient, nativeCache: native.nativeCache })Per-descriptor overrides
// Override store key only
engine.register({ key: 'user', schema, fetch, nativeCache: { storeKey: 'native_user' } })
// Custom retrieve/upsert for this descriptor only
engine.register({ key: 'secure', schema, fetch, nativeCache: { retrieve: secureGet, upsert: securePut } })
// Disable entirely for this descriptor
engine.register({ key: 'volatile', schema, fetch, nativeCache: false })
// Disable via enabled flag (equivalent to false)
engine.register({ key: 'volatile', schema, fetch, nativeCache: { enabled: false } })Merge
Accumulate data across multiple trigger calls (pagination, partial updates):
engine.register({
key: 'feed',
schema: z.array(PostSchema),
fetch: (ctx) => fetchFeedPage(ctx.options.params?.cursor),
merge: (existing, incoming) => [...(existing ?? []), ...incoming],
})
await engine.trigger('feed', { params: { cursor: null } })
await engine.trigger('feed', { params: { cursor: 'page2Token' } })
// slices.feed.data now contains both pagesThe merged value is what gets written to the Zustand slice and persisted to native cache.
POST-style BFF with split
A common BFF pattern: one endpoint accepts a payload whose top-level keys are the sections you want back, and returns a response with the same key structure.
POST /bff { profile: { userId: '42' }, preference: {} }
← { profile: {...}, preference: {...} }Use split — the record keys are the section identifiers. No path config needed: by default each split entry extracts the response key that matches its own name.
engine.register({
key: 'bff-user-data',
schema: z.object({ // envelope — accepts any subset of sections
profile: ProfileSchema.optional(),
preference: PreferenceSchema.optional(),
notice: NoticeSchema.optional(),
}),
fetch: async (ctx) => {
// ctx.options.params keys = sections requested = POST body shape
const params = ctx.options.params ?? {}
const result: Record<string, unknown> = {}
for (const section of Object.keys(params)) {
result[section] = await fetchSection(section, params[section])
}
return result
},
split: {
// record key = extraction path = section identifier
profile: { schema: ProfileSchema },
preference: { schema: PreferenceSchema },
notice: { schema: NoticeSchema },
},
// shouldFetch is auto-derived from split (see below)
})Each section lands in its own Zustand slice (slices.profile, slices.preference, slices.notice). The parent slice (slices.bff-user-data) receives the merged combined object.
Auto-derived shouldFetch for split descriptors
When a descriptor has split defined and no explicit shouldFetch, the engine injects one automatically:
Rule: fetch only if at least one of the requested sections needs it.
A section "needs fetching" if any of these is true:
- Its Zustand slice has no data yet (
slices[sectionKey].data == null) - It has never been fetched with tracked params
- Its params payload has changed since the last successful fetch (fingerprint mismatch)
// Step 1 — cold start: all 3 sections have no data → fetch fires
await engine.trigger('bff-user-data', { params: { profile: {}, preference: {}, notice: {} } })
// Step 2 — all sections loaded, payload unchanged → shouldFetch returns false → instant return
await engine.trigger('bff-user-data', { params: { profile: {} } })
// Step 3 — profile payload changed: {} → { userId: '2' } → fingerprint mismatch → fetch fires
// only profile is sent in the request; preference and notice slices are untouched
await engine.trigger('bff-user-data', { params: { profile: { userId: '2' } } })
// Step 4 — force:true bypasses shouldFetch entirely
await engine.trigger('bff-user-data', { params: { profile: {} }, force: true })Partial responses are safe: if the API only returns a subset of the sections that were requested, the engine skips writing to slices whose key is absent from the response. Existing slice data is preserved.
Per-section param fingerprints are stored after every successful API call as a stable sorted-JSON string. The next call compares each requested section's current params against its stored fingerprint — changes in one section don't affect others.
Explicit shouldFetch takes precedence over the auto-derived one.
shouldFetch (custom)
Supply shouldFetch directly for non-split descriptors or fully custom logic:
shouldFetch?: (options: TriggerOptions, existing: unknown | null) => booleanexistingisnullon first load — the predicate must handle this (returningtrue= fetch)- Bypassed by
force: true - Short-circuits before loading state is set — no spinner flash, zero plugin events fired
Adapters
EAPI adapter
Wraps an axios-compatible HTTP client:
import { createEapiAdapter } from '@chrisluyi/zustand-engine'
const eapi = createEapiAdapter(httpClient, {
getHeaders: () => ({ Authorization: `Bearer ${getToken()}` }),
onResponse: (headers) => handleResponseHeaders(headers),
})
engine.register({
key: 'user',
schema: UserSchema,
// Static base config — ctx.options.params are merged in at call time
fetch: eapi.createFetch({ url: '/api/user', method: 'GET', params: { version: 2 } }),
})
// Direct call (outside a descriptor)
const data = await eapi.fetch({ url: '/api/ping' })Redux sync adapter
Dispatches to Redux after every successful pipeline write. Acts as a global plugin:
import { createReduxSyncAdapter } from '@chrisluyi/zustand-engine'
const reduxSync = createReduxSyncAdapter({
store: reduxStore,
// Action suffix for success writes. Default: 'set'
// Dispatches { type: 'user/set', payload: data }
defaultAction: 'set',
// Action suffix for error syncs. Default: 'error'
// Dispatches { type: 'user/error', payload: { phase, message } }
errorAction: 'error',
// Explicit action creators that override auto-mapping for specific keys
mapping: {
user: (data) => userActions.setUser(data),
},
// Keys to skip entirely
exclude: ['volatile'],
// Which result statuses trigger a dispatch. Default: ['success', 'partialSuccess']
syncOn: ['success', 'partialSuccess', 'error'],
})
engine.use(reduxSync)
// Dispatch directly to Redux without going through the pipeline
reduxSync.dispatch('user', userData)Per-descriptor opt-out:
engine.register({ key: 'local-only', schema, fetch, syncToRedux: false })Native store adapter
See Native cache above.
Plugins
A plugin implements any subset of WorkflowPlugin. Middleware hooks (before/after) receive a context and must return it. Observer hooks (afterWrite, onError, onPartialSuccess) are fire-and-forget side effects.
interface WorkflowPlugin {
name: string
// Middleware hooks — transform context and must return it
beforeFetch?: (ctx: FetchContext) => Promise<FetchContext>
afterFetch?: (ctx: ResponseContext) => Promise<ResponseContext>
beforeValidate?: (ctx: ResponseContext) => Promise<ResponseContext>
afterValidate?: (ctx: ValidatedContext) => Promise<ValidatedContext>
beforeTransform?: (ctx: ValidatedContext) => Promise<ValidatedContext>
afterTransform?: (ctx: TransformContext) => Promise<TransformContext>
beforeWrite?: (ctx: WriteContext) => Promise<WriteContext>
// Observer hooks — side-effects only
afterWrite?: (ctx: WriteContext) => Promise<void>
onError?: (ctx: ErrorContext) => Promise<void>
onPartialSuccess?:(ctx: PartialContext) => Promise<void>
}FetchContext.meta is a writable bag — set values in beforeFetch and read them back via ResponseContext.meta in afterFetch.
Register globally with engine.use(plugin) or per descriptor via descriptor.plugins. Per-descriptor plugins run in addition to (not instead of) global plugins.
Built-in: console log plugin
import { createConsoleLogPlugin } from '@chrisluyi/zustand-engine'
engine.use(createConsoleLogPlugin({
prefix: '[MyApp]', // default: '[ZustandEngine]'
logLevel: 'debug', // 'log' | 'debug' | 'info' — default: 'log'
}))
// Or enable via engine constructor shorthand:
new WorkflowEngine({ queryClient, logging: true })
new WorkflowEngine({ queryClient, logging: { prefix: '[MyApp]', logLevel: 'info' } })React hook
useEngineSlice subscribes a component to a single Zustand slice. The component re-renders only when that slice changes — updates to other slices are ignored.
import { useEngineSlice } from '@chrisluyi/zustand-engine'
function UserCard() {
const slice = useEngineSlice<User>(engine, 'user')
if (slice?.isLoading) return <Spinner />
if (slice?.isError) return <ErrorBanner message={slice.error?.cause.message} />
return <div>{slice?.data?.name}</div>
}The generic T narrows slice.data to T | null. Without it the type is unknown | null.
The engine argument is structurally typed as { store: EngineStore }, so both WorkflowEngine and TestWorkflowEngine instances are accepted.
DevTools
const engine = new WorkflowEngine({
queryClient,
name: 'my-app',
devtools: true,
// — or override the DevTools label explicitly:
devtools: { name: 'MyCustomLabel' },
})Every initSlice, setSlice, and patchSlice call appears in Redux DevTools with the action name (slices/user/set) and updated state.
Type inference
import type { InferDescriptorResult, InferSplitResult } from '@chrisluyi/zustand-engine'
// Direct descriptor → infers z.infer<Schema>
type UserResult = InferDescriptorResult<typeof userDescriptor>
// Split descriptor → infers { user: ..., permissions: ... }
type DashboardResult = InferDescriptorResult<typeof dashboardDescriptor>Error handling
trigger never throws. Errors are returned as a typed result:
const result = await engine.trigger('user')
if (result.status === 'error') {
console.error(result.error.phase, result.error.cause)
// phase: 'fetch' | 'validate' | 'transform' | 'write'
}For programmatic instanceof checks, a PipelineError class is exported:
import { PipelineError } from '@chrisluyi/zustand-engine'
if (err instanceof PipelineError) {
console.error(err.phase, err.key, err.cause)
}PipelineError.cause is an explicit class property so it resolves correctly on TypeScript targets below ES2022.
Testing
createTestEngine returns a WorkflowEngine that replaces every registered descriptor's fetch function with a configurable mock. Uses staleTime: 0 / gcTime: 0 so every trigger re-invokes the mock function.
import { createTestEngine } from '@chrisluyi/zustand-engine'
const engine = createTestEngine({
mocks: {
user: { response: { id: '1', name: 'Alice' } },
},
})
engine.register({ key: 'user', schema: UserSchema })
// ↑ the fetch function is automatically replaced by the mock
const result = await engine.trigger('user')
expect(result.status).toBe('success')
expect(result.data).toEqual({ id: '1', name: 'Alice' })Mock API
// Change mock response mid-test
engine.setMockResponse('user', { id: '2', name: 'Bob' })
// Simulate a network error
engine.setMockError('user', new Error('Network error'))
// Add artificial latency
engine.setMockDelay('user', 100) // ms
// Use a custom handler for dynamic responses
createTestEngine({
mocks: {
user: {
handler: async (ctx) => ({ id: ctx.options.params?.id, name: 'Dynamic' }),
},
},
})
// Clear all mocks and QueryClient cache
engine.resetMocks()Changelog
See CHANGELOG.md.
