@alleninstitute/shift
v0.1.0
Published
A library for defining, transforming, combining, and loading datasets from anywhere
Downloads
455
Readme
Allen Institute / shift
@alleninstitute/shift is a TypeScript library for defining, transforming, combining, and loading datasets from anywhere. It provides a composable, adapter-driven model for describing data pipelines in terms of typed datasets, then executing those pipelines in a consistent, error-safe way.
Level Of Support
No Support Guaranteed: While we welcome feedback and questions, the shift library is currently provided as-is with no guarantee of direct support, updates, or bug fixes.
Core Concepts
Datasets
A Dataset is a typed description of a data shape, independent of where or how that data is fetched. There are three kinds:
| Type | Description |
| -------------------- | --------------------------------------------------------------------------------------------------------------------------------- |
| SourceDataset | A dataset backed by an adapter (e.g. a GraphQL endpoint, REST API, or database). |
| TransformedDataset | Wraps another dataset and maps its output shape into a new one. |
| ComposedDataset | Joins two datasets — loads the left side first, then uses those results to drive the query on the right side, and merges the two. |
Adapters
A DatasetAdapter is an interface that adapters implement to connect datasets to real data sources. An adapter is responsible for two things:
- Providing a way to generate a
SourceDataset. This is not an explicit requirement of the interface itself, as this process can differ widely from case to case, butSourceDatasets can only exist if adapters provide a way to produce them. For example, this could take the form of a function that takes a query-like description of data to retrieve, which is then stored inside aSourceDataset-implementing object that can be loaded later. loadDataset(dataset, params)— Executes the query for a given source dataset with the providedLoadParameters, returning anAsyncLoadResult<Loadset<Def>>. (Defbeing the type defined for the input dataset's data shape.)isEmpty(loadset)— Determines whether a loaded result set is empty.
Adapters are kept separate from the shift library itself and live in their own packages. Upcoming first-party adapter packages may include @alleninstitute/shift-graphql and @alleninstitute/shift-rest.
Lifecycle Functions
The library exposes three primary lifecycle functions for building and executing dataset pipelines:
transform(dataset, fn, isEmpty?)— Wraps a dataset with a mapping function. The function receives the loaded output of the base dataset and returns a new shaped object. An optionalisEmptycallback can override the default empty-check for the resulting loadset.compose(left, right, preparer, composer, isEmpty?)— Composes two datasets. Thepreparerfunction receives the caller'sLoadParametersand returns left/right load configurations. After loadingleft, the right-side config can derive its parameters from the left result. Thecomposerfunction then merges both loadsets into the final output shape.load(dataset, params?)— Executes the full dataset tree, recursively loading source datasets, applying transforms, and resolving compositions. Accepts optionalLoadParameters(sort, filter, pagination, etc.). Returns anAsyncLoadResult<Loadset<Def>>.
Importantly, transform() and compose() do not execute a load; instead, they are the two primary definition functions, along with whatever definition functionality provided by the adapter(s) being used. Their job is to define the expected behavior and output of the post-processing of any data loaded from the SourceDataset, as part of the load() call.
Error Handling
Shift internally uses neverthrow for explicit, type-safe error handling. Loading a dataset returns a ResultAsync, empowering individual adapters to use field-level error propagation if desired. If Shift's built-in validation mechanisms are used for validating received data, these will automatically wrap all fields in Result, with errors propagated through the pipeline as DataProcessingError values rather than thrown exceptions. Result and ResultAsync from neverthrow are wrapped inside LoadResult and AsyncLoadResult, which automatically associate the DataProcessingError as the error type of the result.
DataProcessingError carries three fields:
internalMessage— A developer-facing description of what went wrong.userMessage(optional) — A message safe to show in a UI.details(optional) — Arbitrary context for debugging.
Result Helpers
pass(value)/passAsync(value)— Wraps a value in anOkLoadResult/LoadResultAsync.fail(error)/failAsync(error)— Wraps an error in anErrLoadResult/LoadResultAsync.asFailure(error)— Converts an unknown thrown value into aDataProcessingError.coalesce(result, fallback)— Returns the existing result, or anOkwrappingfallbackif the result is nullish.
Deep Result Utilities
When working with nested Result structures (common after granular validation), these helpers recursively traverse and unwrap values:
deepUnwrap(result)— Recursively unwraps nested Results. Returns the firstErrencountered (fail-fast).deepUnwrapSoft(result)— LikedeepUnwrap, but replaces nestedErrvalues withundefinedinstead of short-circuiting.deepReduce(result, initialValue, reducer)— Traverses a Result value tree depth-first, calling a reducer at each node. Fail-fast onErr.deepReduceSoft(result, initialValue, reducer)— LikedeepReduce, but silently skipsErrbranches instead of short-circuiting.
Load Parameters
LoadParameters is the adapter-agnostic interface for holding any parameters required for querying data. Each adapter is expected to internally convert these fields into its own format (e.g. GraphQL query variables) before performing the query.
type LoadParameters<SortInput, FilterInput, OtherVars> = {
// Array of typed sort directives, based on an adapter-specific sort shape
sort?: TypedSortState<SortInput>;
// Adapter-specific filter shape
filter?: FilterInput;
// Filters that take precedence over any other filters specified, thus "scoping" the query
scopeFilter?: FilterInput;
// Cursor-based, offset-based, or none
pagination?: Pagination;
// Additional adapter-specific variables
vars?: OtherVars;
};Pagination
Three pagination strategies are supported:
| Class | Style | Fields |
| ------------------ | ----------------------------------- | --------------------------------- |
| CursorPagination | Cursor-based (e.g. GraphQL Relay) | direction, cursor, pageSize |
| OffsetPagination | Offset/limit | offset, pageSize |
| NoPagination | Single page/no specified pagination | (none) |
When composing datasets, the right-side dataset supports automatic multi-page loading (up to 20 pages). Pagination can be customized for the right-side dataset loads by passing a Pagination object in the preparer's right.pagination config variable. A custom pageReducer callback can also be supplied to control how pages are merged.
Loadsets
A Loadset is the output of a loaded dataset — the data plus metadata about the response:
type Loadset<Def> = {
data: Def;
metadata: LoadsetMetadata;
};
type LoadsetMetadata = {
totalCount: number;
pageInfo?: {
hasNextPage: boolean;
hasPreviousPage: boolean;
startCursor: string | null;
endCursor: string | null;
pageStartOffset: number | undefined;
};
};Utility functions
emptyLoadset()— Creates aLoadsetwith empty data and zeroed metadata.emptyLoadsetMetadata()— Creates default emptyLoadsetMetadata.isLoadsetEmpty(dataset, loadset)— Checks whether a loadset is empty, using the dataset'sisEmptycallback if defined, otherwise falling back tometadata.totalCount === 0.
Validation
The validation module provides utilities for performing granular, per-field Zod validation of raw API responses. Rather than failing an entire response when one field is invalid, shift can represent each field's parse result individually as a LoadResult, allowing callers to handle partial data gracefully.
Workflow
Build a validation tree from a Zod schema:
const tree = buildValidationTree(myZodSchema);This recursively processes the schema into a
ZodValidationTreeof scalar, object, and array nodes, preserving optional/nullable status at each level.Parse raw data against the tree:
parseObject(val, tree)— Validates each field of an object individually, returning a record of per-fieldLoadResultvalues. This is typically what would be called on the full tree of data, unless the data is coming back as an array.parseArray(val, tree)— Validates an array, returning aLoadResult<Array>.parseScalarWithSchema(val, schema)— Validates a single scalar value.
Advanced features
- Fragment alternatives — Supports type-conditional inline fragments, such as those in GraphQL (
... on TypeName). The__typenamefield in incoming data selects the correct validation branch. - Loose mode — When
loose: true, unexpected keys in the input are passed through un-validated rather than discarded.
Utilities
JSON Parsing
safeParseJSON(val)— Non-throwingJSON.parsethat returns aLoadResult<unknown>.asyncSafeParseJSON(val)— Async variant returning anAsyncLoadResult<unknown>.
Type Guards
isNullish(val)/isNotFound(val)— Checks fornullorundefined.isObject(val)— Checks for non-nullish objects.isRecord(val)— Checks for plain key-value records.isStringKeyedRecord(val)— Checks for records with only string keys.isPromise(val)— Checks for promise-like objects.
Getting Started
Installation
pnpm add @alleninstitute/shiftBasic Usage
Note: In this example, the data returned from the usersDataset has been simplified by removing the LoadResult layer from the picture (this can be done at the will of the Adapter by not wrapping the output Def in DefinitionResults when generating a Source Dataset). See examples below to see how LoadResult would be interacted with in a typical scenario.
import { transform, load } from '@alleninstitute/shift';
// Assume `usersDataset` is a SourceDataset obtained from an adapter package
// (e.g. a GraphQL adapter that creates typed datasets from document nodes)
const activeNamesDataset = transform(usersDataset, (data) => ({
names: data.users.map((u) => u.name),
}));
const result = await load(activeNamesDataset, { filter: { active: true } });
result.match(
(loadset) => console.log(loadset.data.names),
(err) => console.error(err.userMessage ?? err.internalMessage)
);Composing Datasets
Note: In this example, the data returned from the usersDataset and postsDataset has been simplified by removing the LoadResult layer from the picture (this can be done at the will of the Adapter by not wrapping the output Def in DefinitionResults when generating a Source Dataset). See examples below to see how LoadResult would be interacted with in a typical scenario.
import { compose, load } from '@alleninstitute/shift';
// Compose: load users first, then fetch their posts and attach them inline
const usersWithPostsDataset = compose(
usersDataset,
postsDataset,
// preparer: splits incoming params into left/right load configs
(params) => ({
left: { params },
right: {
getParams: (usersLoadset) => ({
filter: { authorId: { in: usersLoadset.data.users.map((u) => u.id) } },
}),
},
}),
// composer: attach each user's posts directly onto the output user object
(usersLoadset, postsLoadset) => ({
users: usersLoadset.data.users.map((user) => ({
...user,
posts: postsLoadset.data.items.filter((p) => p.authorId === user.id),
})),
})
);
const result = await load(usersWithPostsDataset, { filter: { active: true } });Chaining Transforms
transform returns a Dataset, so transforms are freely chainable. Each step only sees the output shape of the previous one.
import { transform, load } from '@alleninstitute/shift';
// First transform: strip fields not needed downstream
const projectSummariesDataset = transform(projectsDataset, (data) => ({
summaries: data.projects.map((projects) => projects.map((p) => ({ id: p.id, name: p.name, status: p.status }))),
}));
// Second transform: partition by status
const partitionedProjectsDataset = transform(projectSummariesDataset, (data) => ({
active: data.summaries.map((summaries) => summaries.filter((p) => p.status.isOk() && p.status.value === 'active')),
archived: data.summaries.map((summaries) =>
summaries.filter((p) => p.status.isOk() && p.status.value === 'archived')
),
}));
const result = await load(partitionedProjectsDataset, { filter: { teamId: 'team-42' } });
result.match(
(loadset) => console.log(loadset.data.active.length, 'active projects'),
(err) => console.error(err.internalMessage)
);Composing and Then Transforming
A ComposedDataset is itself a Dataset, so it can be wrapped in a transform to reshape the merged output.
import { compose, transform, load } from '@alleninstitute/shift';
// Compose: load teams, then fetch each team's members and attach them inline
const teamsWithMembersDataset = compose(
teamsDataset,
membersDataset,
(params) => ({
left: { params },
right: {
getParams: (teamsLoadset) => ({
filter: {
teamId: {
in: teamsLoadset.data.teams
.unwrapOr([])
.map((t) => t.id.unwrapOr(null))
.filter(Boolean),
},
},
}),
},
}),
(teamsLoadset, membersLoadset) => {
const members = membersLoadset.data.members.unwrapOr([]);
return {
teams: teamsLoadset.data.teams.map((teams) =>
teams.map((team) => ({
...team,
members: membersLoadset.data.members.filter(
(m) => team.id.isOk() && m.teamId.isOk() && m.teamId.value === team.id.value
),
}))
),
};
}
);
// Transform the composed result to produce a flat leaderboard sorted by member count
const leaderboardDataset = transform(teamsWithMembersDataset, (data) => ({
leaderboard: data.teams.map((teams) =>
teams
.sort((a, b) => b.members.unwrapOr([]).length - a.members.unwrapOr([]).length)
.map((team, index) => ({
rank: index + 1,
teamName: team.name,
memberCount: team.members.map((mems) => mems.length),
}))
),
}));
const result = await load(leaderboardDataset, { filter: { active: true } });Paginating the Right Side of a Compose
When the right-side dataset results spans multiple pages, the loader automatically paginates to include all available results, up to a maximum of 20 pages loaded. By default, pages are merged by concatenating arrays at matching keys. A custom pageReducer can be supplied for full control over how pages are accumulated, and a custom Pagination object can be used to specify other settings, such as page size. (Note: currently, 'after' is the only direction value ever used by the right-side pagination logic; specifying 'before' will have no effect.)
import { compose, load, CursorPagination } from '@alleninstitute/shift';
const specimensWithAllImagesDataset = compose(
specimensDataset,
imagesDataset,
(params) => ({
left: { params },
right: {
getParams: (specimensLoadset) => ({
filter: {
// Fetch all images for all specimens in the left result in one query
specimenId: {
in: specimensLoadset.data.specimens.map((specimens) =>
specimens.map((s) => s.id.unwrapOr(null)).filter(Boolean)
),
},
},
// Request pages of size 50 — the loader will keep fetching until hasNextPage
// is false or 20 pages are fetched
pagination: new CursorPagination(undefined, undefined, 50),
}),
// Custom reducer: concatenate image arrays and carry forward the latest metadata
pageReducer: (accumulated, page) => {
if (page.data.images.isErr()) {
return accumulated;
}
return {
data: { images: [...accumulated.data.images, ...page.data.images.value] },
metadata: {
totalCount: accumulated.metadata.totalCount + page.metadata.totalCount,
pageInfo: page.metadata.pageInfo,
},
};
},
},
}),
(specimensLoadset, imagesLoadset) => ({
// Attach images to their matching specimen by specimenId
specimens: specimensLoadset.data.specimens.map((specimen) => ({
...specimen,
images: imagesLoadset.data.images.filter(
(img) => specimen.id.isOk() && img.specimenId.isOk() && image.specimenId.value === specimen.id.value
),
})),
})
);Conditionally Skipping the Right Side
If getParams returns null, the right load is skipped entirely and the composer receives an empty loadset for the right side. This is useful when the right query only makes sense if the left result contains data.
import { compose, load } from '@alleninstitute/shift';
const specimenWithAnnotationsDataset = compose(
specimensDataset,
annotationsDataset,
(params) => ({
left: { params },
right: {
getParams: (specimensLoadset) => {
const specimens = specimensLoadset.data.specimens.unwrapOr([]);
// No specimen found — skip fetching annotations entirely
if (specimens.length === 0) {
return null;
}
return { filter: { specimenId: { in: specimens.map(
(s) => s.id.unwrapOr(null)).filter(Boolean)
}}};
},
},
}),
(specimensLoadset, annotationsLoadset) => {
return {
specimens: specimensLoadset.data.specimens.map(specimens => specimens.map(s => ({
...s,
annotations: (annotationsLoadset?.data.annotations.unwrapOr([]) ?? []).filter(((result) => result.map())
}))) {
...specimenLoadset.data.specimen,
annotations: annotationsLoadset.data.annotations ?? [],
}
}
}
);
const result = await load(specimenWithAnnotationsDataset, { vars: { id: 'spec-001' } });Development
Prerequisites
Setup
pnpm installScripts
| Command | Description |
| -------------------- | ---------------------------------------- |
| pnpm lint | Run linter (OXLint) |
| pnpm build | Build the library to dist/ |
| pnpm build:watch | Build in watch mode |
| pnpm test | Run tests |
| pnpm test:watch | Run tests in watch mode |
| pnpm test:ci | Run tests once (CI mode) |
| pnpm test:coverage | Run tests with coverage report |
| pnpm typecheck | Type-check without emitting output |
| pnpm fmt | Format all files with OxFmt |
| pnpm fmt:check | Check formatting without writing |
| pnpm changelog | Generate CHANGELOG.md from git history |
Project Structure
src/
lib/
datasets/
input/ # transform() and compose() — dataset construction
loading/ # load() — recursive dataset execution, source delegation
output/ # Loadset types, deep Result utilities (deepUnwrap, etc.)
utils/
parsing/ # Safe JSON parsing utilities
typing/ # Type guards and type-level utilities
validation/ # Granular per-field Zod validation (buildValidationTree, parseObject, etc.)
presets/ # Planned first-party adapter presets (e.g. GraphQL)