@healthcare-interoperability/fhir-ingestion-core
v1.0.0
Published
FHIR R4 ingestion pipeline — validates resources and dispatches to repositories from @healthcare-interoperability/fhir-storage-core. Single resource, typed-bundle, and mixed-bundle entry points with pluggable validation.
Downloads
279
Readme
@healthcare-interoperability/fhir-ingestion-core
FHIR R4 ingestion pipeline — validates resources and routes them to repositories from @healthcare-interoperability/fhir-storage-core. Single-resource, typed-bundle, and mixed-bundle entry points. Pluggable validation. Class-based for extensibility.
ESM. Node.js ≥ 20. Zero runtime dependencies.
This is not a FHIR transaction processor. It treats a Bundle as a list of resources to upsert. It does not honor
Bundle.entry.request.method, does not resolveurn:uuid:placeholder references, and does not roll back across entries on partial failure. See "Limitations and extension points" below.
Install
npm install @healthcare-interoperability/fhir-ingestion-core
# peer
npm install @healthcare-interoperability/fhir-storage-coreUsage
import { FHIRIngestionPipeline } from '@healthcare-interoperability/fhir-ingestion-core';
const pipeline = new FHIRIngestionPipeline({
// Required: maps resourceType → repository instance (from fhir-storage-core).
// Returns null/undefined for unsupported types — pipeline wraps that in
// UnsupportedResourceTypeError. May also throw directly.
repoResolver: async (type) => repos[type] ?? null,
// Optional: async function returning OperationOutcome.issue[] or full
// OperationOutcome. Called only when `options.validate.external === true`.
externalValidator: async (resource) => myValidator.validate(resource),
});
// Single resource — returns WriteResult
const result = await pipeline.resource(resource, integrationConfig);
// Array of same-type resources — returns WriteResult[] (length 1)
const results = await pipeline.typedResources('Patient', resources, integrationConfig);
// Array of mixed-type resources — groups by type, returns WriteResult[]
const results = await pipeline.mixedResources(resources, integrationConfig);
// Bundle whose entries are all one type — thin wrapper around typedResources
const results = await pipeline.typedBundle('Patient', bundle, integrationConfig);
// Bundle of mixed types — thin wrapper around mixedResources
const results = await pipeline.mixedBundle(bundle, integrationConfig);When to use which
resource()— single resource. HTTP POST routes, queue workers processing one message at a time.typedResources()/mixedResources()— primary "many resources" path. ETL jobs, scheduled syncs, batch imports, anything where you have an array of resources without a Bundle envelope.typedBundle()/mixedBundle()— when the input is already a FHIRBundle(e.g. an HTTP request body). These are thin wrappers around the array methods.
Bundle methods return WriteResult[] — the shape fhir-response-builders accepts directly:
import { FHIRResponseBuilder } from '@healthcare-interoperability/fhir-response-builders';
const responder = new FHIRResponseBuilder();
const results = await pipeline.mixedBundle(bundle, integrationConfig);
res.status(200).json(responder.bundle(results));Validation layers
Four layers, each independently togglable (except structural):
| Layer | When | Toggle | Default |
|---|---|---|---|
| Structural | Always | not togglable | always on |
| REQUIRED_FIELDS | After structural | validate.required | true |
| validateExtra | After required | validate.extra | true |
| External | Last | validate.external | false |
// Skip required + extra; opt in to external
await pipeline.resource(resource, integrationConfig, {
validate: { required: false, extra: false, external: true },
});Order matters: cheap checks first to fail fast before invoking expensive validators.
Structural validation
Always runs. Cannot be disabled via options (but can be replaced by overriding _runValidation). Checks:
resourceis an objectresource.resourceTypeis a non-empty stringresource.idis a non-empty string
Without these, the storage layer can't compute hashes — there's no useful skip path.
REQUIRED_FIELDS
Reads repo.constructor.REQUIRED_FIELDS — an array of dot-paths to required fields:
class EncounterRepository extends FHIRResourceRepository {
static RESOURCE_TYPE = 'Encounter';
static REQUIRED_FIELDS = ['status', 'class', 'subject.reference', 'period.start'];
}Missing fields throw MissingRequiredFieldsError with all missing paths attached as err.missing.
validateExtra (per-repo hook)
Static method on the repository class, called after REQUIRED_FIELDS:
class PatientRepository extends FHIRResourceRepository {
static RESOURCE_TYPE = 'Patient';
static validateExtra(resource) {
const issues = [];
if (resource.deceasedBoolean === true && !resource.deceasedDateTime) {
issues.push({
severity: 'warning',
code: 'invariant',
diagnostics: 'deceasedBoolean is true but no deceasedDateTime provided',
});
}
return issues;
}
}Returns undefined / null / [] on success, issue[] on failure. Issues with any severity (warning or error) cause ValidateExtraError to throw — there's no severity gating at this layer. Use it for hard repo-defined invariants; use external validation for advisory checks.
External validation
Async function passed at pipeline creation (or per call):
const pipeline = new FHIRIngestionPipeline({
repoResolver,
externalValidator: async (resource) => {
// returns OperationOutcome.issue[] OR full OperationOutcome
return await externalService.validate(resource);
},
});
// Per-call override:
await pipeline.resource(resource, integrationConfig, {
externalValidator: customValidatorForThisCall,
validate: { external: true },
});The validator must return either:
OperationOutcome.issue[]— bare issue array.{ resourceType: 'OperationOutcome', issue: [...] }— full resource.
Severity gating:
errororfatal→ throwExternalValidationErrorwith all issues attached.warningorinformation→ drop silently. If you want warnings logged, log them inside the validator function before returning.
Subclassing
Override any underscore-prefixed method. The most useful override points:
class MetricsPipeline extends FHIRIngestionPipeline {
// Wrap validation with timing
async _runValidation(resource, repo, options) {
const start = Date.now();
try {
await super._runValidation(resource, repo, options);
metrics.observe('validation.ok', { rt: resource.resourceType }, Date.now() - start);
} catch (err) {
metrics.observe('validation.fail', { rt: resource.resourceType, code: err.code }, Date.now() - start);
throw err;
}
}
}Override points
| Method | When called | Common reasons to override |
|---|---|---|
| _runValidation(resource, repo, options) | Once per resource in resource() (includes structural). | Add layers, change order in the single-resource path. |
| _runRemainingValidation(resource, repo, options) | Once per resource in typedResources / mixedResources (structural already ran). | Mirror your _runValidation changes here so the bulk paths stay in sync. |
| _getRepo(resourceType) | Once per type per call. | Caching, aliases (Person → Patient), fallback handlers. |
| _resolveValidateOpts(options) | Once per call to resolve flag defaults. | Flip external to default-on, add new flags. |
| _validateBundleEnvelope(bundle) | Once per bundle call. | Accept envelope variants. |
| _extractBundleResources(bundle) | Once per bundle call. | Custom envelope formats (e.g. a wrapper resource with a different shape). |
| _attachResourceContext(err, index, fhirId) | On per-resource errors in array methods. | Attach trace IDs, request context. |
Important: if you override _runValidation and skip super._runValidation(...), structural validation is skipped along with everything else. That's almost never what you want — _prepareOps inside the storage layer will then throw with a less helpful error. Either always call super, or explicitly invoke the exported validateStructure(resource) yourself.
Errors
All errors carry status (HTTP status code) and code (FHIR IssueType code) so an Express error handler can transform them into OperationOutcome responses without inspecting class names.
| Error class | status | code |
|---|---|---|
| InvalidResourceStructureError | 400 | structure |
| UnsupportedResourceTypeError | 422 | not-supported |
| ResourceTypeMismatchError | 422 | invariant |
| MissingRequiredFieldsError | 422 | required |
| ValidateExtraError | 422 | invariant |
| ExternalValidationError | 422 | (from first issue) |
| ExternalValidatorNotConfiguredError | 500 | exception |
All extend IngestionError. Bundle errors additionally carry bundleIndex and fhirId so the failing entry is traceable:
try {
await pipeline.mixedBundle(bundle, integrationConfig);
} catch (err) {
log.error({ bundleIndex: err.bundleIndex, fhirId: err.fhirId, code: err.code });
next(err); // global handler shapes the OperationOutcome
}Validation errors (ValidateExtraError, ExternalValidationError) additionally carry err.issues — an array of FHIR OperationOutcome.issue objects suitable for embedding directly in the response.
Atomicity guarantees
Validation is atomic. If any resource in a bundle fails any validation layer, no writes occur for any resource in that bundle.
Writes are NOT cross-entry transactional. For mixedBundle, each type's bulkUpsert is its own write; a failure in the second bulkUpsert does not roll back the first. True atomicity would require MongoDB transactions, which bulkUpsert doesn't currently support.
For workflows requiring true atomicity (e.g. clinical safety contexts where partial writes are unacceptable), either:
- Pre-validate everything (this pipeline already does), or
- Use single-resource ingestion in a loop inside a Mongo transaction you manage.
Limitations and extension points
This module is intentionally narrow. Things it does NOT do — and where to plug them in:
Bundle.entry.request.method
Ignored. Every entry's resource is upserted. To support DELETE, PATCH, etc., subclass mixedBundle or wrap the pipeline:
class TransactionPipeline extends FHIRIngestionPipeline {
async mixedBundle(bundle, integrationConfig, options) {
// ... dispatch by entry.request.method to upsert/delete/patch ...
}
}urn:uuid: references
The reference resolver in storage-core throws on URN references. For real transaction Bundles where entries reference each other via urn:uuid: placeholders, you need a pre-pass that:
- Walks the Bundle, finds all
urn:uuid:references. - Assigns concrete
idvalues to the resources those URNs point to. - Rewrites every reference to use the concrete id.
That's a separate module ("transaction resolver"). Build it as a wrapper that runs before this pipeline.
Cross-entry atomicity
Use the per-entry resource() method in a loop inside your own Mongo transaction. Or wait for storage-core to add transactional bulk support.
Full FHIR R4 transaction semantics
Not in scope here. A full transaction processor would compose:
- Method dispatch (POST/PUT/DELETE/PATCH/GET handling)
- Conditional create / conditional update (
If-None-Existsemantics) - URN resolution
- Transactional atomicity
- Bundle response generation with per-entry status codes
This module is a building block of that. It is not that.
API reference
class FHIRIngestionPipeline
class FHIRIngestionPipeline {
constructor(config: {
repoResolver: (resourceType: string) => Promise<Repository | null>;
externalValidator?: (resource: object) => Promise<Issue[] | OperationOutcome>;
});
// Single resource
resource(resource: object, integrationConfig: object, options?: IngestOptions): Promise<WriteResult>;
// Array forms (primary "many resources" path)
typedResources(resourceType: string, resources: object[], integrationConfig: object, options?: IngestOptions): Promise<WriteResult[]>;
mixedResources(resources: object[], integrationConfig: object, options?: IngestOptions): Promise<WriteResult[]>;
// Bundle wrappers (envelope adapters around the array methods)
typedBundle(resourceType: string, bundle: object, integrationConfig: object, options?: IngestOptions): Promise<WriteResult[]>;
mixedBundle(bundle: object, integrationConfig: object, options?: IngestOptions): Promise<WriteResult[]>;
// Override points (protected by convention)
_runValidation(resource: object, repo: object, options: IngestOptions): Promise<void>;
_runRemainingValidation(resource: object, repo: object, options: IngestOptions): Promise<void>;
_getRepo(resourceType: string): Promise<object>;
_resolveValidateOpts(options?: IngestOptions): ValidateOptions;
_validateBundleEnvelope(bundle: object): void;
_extractBundleResources(bundle: object): object[];
_attachResourceContext(err: Error, index: number, fhirId: string | undefined): Error;
}
interface IngestOptions {
validate?: {
required?: boolean; // default true
extra?: boolean; // default true
external?: boolean; // default false
};
externalValidator?: (resource: object) => Promise<Issue[] | OperationOutcome>;
}Error context fields
Errors from the array methods (typedResources / mixedResources) carry:
resourceIndex— position in the input array.fhirId—resource.idif available.
Errors from the Bundle methods (typedBundle / mixedBundle) carry both:
resourceIndex— same as above (from the underlying array method).bundleIndex— position inBundle.entry[](same numeric value asresourceIndexsince Bundle entries map 1:1 to extracted resources).
The bundleIndex field is provided for callers who think in Bundle terms.
Exposed validation helpers
For consumers who want to assemble their own pipeline (without subclassing), the underlying functions are exported:
import {
validateStructure,
validateRequired,
validateExtra,
validateExternal,
getByPath,
} from '@healthcare-interoperability/fhir-ingestion-core';Each throws on failure (see "Errors" above). Useful if you want to compose validation without buying into the pipeline class.
Requirements
- Node.js ≥ 20
@healthcare-interoperability/fhir-storage-core(provides theRepositoryshape expected byrepoResolver)
License
MIT
