@origints/core
v0.4.0
Published
Origins core utilities.
Maintainers
Readme
@origints/core
Compiler-style declarative data planning system with explicit failure semantics and first-class provenance tracking.
Why
Traditional data extraction pipelines silently coerce types, swallow errors, and make it impossible to trace how a value was derived. When something goes wrong, you're left guessing.
Origins treats data extraction like compilation: build an immutable plan, execute it with full lineage tracking, and get structured failures instead of exceptions. Every transformation is recorded. Every failure is explicit.
Features
- Two-phase architecture: plan then execute
- Immutable execution plans that can be inspected and serialized
- First-class provenance tracking with full lineage graphs
- Structured failure types (missing, type, format, constraint)
- Fail-fast execution with no silent coercions
- Schema validation via Standard Schema (Zod, etc.)
- Named schema definitions via
$defNameon emit properties and output transforms - ForEach extraction with variable binding and dynamic predicates
- Plan pipeline for multi-step orchestration with cross-plan lineage
- Opt-in benchmarking with per-node and per-extraction timing
- Transform registry for decoupled execution logic
Quick Start
npm install @origints/coreimport { Planner, load, run } from '@origints/core'
const plan = new Planner()
.in(load({ name: 'Alice', age: 30 }))
.emit((out, $) => out.add('greeting', $.get('name').string()))
.compile()
const result = await run(plan)
if (result.ok) {
console.log(result.value)
// { greeting: 'Alice' }
}Installation
- Supported platforms:
- macOS / Linux / Windows
- Runtime requirements:
- Node.js >= 18
- Package managers:
- npm, pnpm, yarn
npm install @origints/core
# or
pnpm add @origints/coreUsage
Inline data extraction
import { Planner, load, run } from '@origints/core'
const plan = new Planner()
.in(load({ name: 'Alice', age: 30, role: 'admin' }))
.emit((out, $) =>
out
.add('name', $.get('name').string())
.add('age', $.get('age').number())
.add('role', $.get('role').string())
)
.compile()
const result = await run(plan)
// result.value: { name: 'Alice', age: 30, role: 'admin' }File extraction with transforms
import { Planner, loadFile, run, parseJson } from '@origints/core'
const plan = new Planner()
.in(loadFile('data.json'))
.mapIn(parseJson())
.emit((out, $) =>
out.add('id', $.get('id').number()).add('name', $.get('name').string())
)
.compile()
const result = await run(plan, {
readFile: path => fs.promises.readFile(path),
})Nested path access
const plan = new Planner()
.in(load({ user: { profile: { email: '[email protected]' } } }))
.emit((out, $) =>
out.add('email', $.get('user').get('profile').get('email').string())
)
.compile()Array mapping
const plan = new Planner()
.in(
load({
users: [
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
],
})
)
.emit((out, $) =>
out.add(
'names',
$.get('users').array(u => u.get('name').string())
)
)
.compile()
const result = await run(plan)
// result.value: { names: ['Alice', 'Bob'] }Multiple inputs
const plan = new Planner()
.in(loadFile('config.json'))
.mapIn(parseJson())
.emit((out, $) => out.add('host', $.get('host').string()))
.in(loadFile('users.json'))
.mapIn(parseJson())
.emit((out, $) =>
out.add(
'users',
$.get('data').array(u => u.get('name').string())
)
)
.compile()Literal values and conditional adds
const plan = new Planner()
.in(load({ name: 'Alice' }))
.emit((out, $) =>
out
.add('name', $.get('name').string())
.addLiteral('version', '1.0.0')
.addIfEmpty('nickname', $.get('alias').string())
)
.compile()Post-processing
const plan = new Planner()
.in(load({ name: 'alice' }))
.emit((out, $) =>
out
.add('name', $.get('name').string())
.update('name', v => (v as string).toUpperCase(), 'Uppercase')
)
.compile()
const result = await run(plan)
// result.value: { name: 'ALICE' }Optional extraction
Use optional() to gracefully handle missing or invalid data. Any extraction failure returns the default value instead of failing.
import { Planner, load, run, optional } from '@origints/core'
const plan = new Planner()
.in(load({ name: 'Alice' }))
.emit((out, $) =>
out
.add('name', $.get('name').string())
.add('nickname', optional($.get('nickname').string()))
.add('score', optional($.get('score').number(), 0))
)
.compile()
const result = await run(plan)
// result.value: { name: 'Alice', nickname: undefined, score: 0 }Fallback chains
Use tryExtract() to try multiple extraction strategies in order. Returns the first success.
import {
Planner,
load,
run,
tryExtract,
mapSpec,
literal,
} from '@origints/core'
const plan = new Planner()
.in(load({ price: '42.50' }))
.emit((out, $) =>
out.add(
'price',
tryExtract(
// Try extracting as number directly
$.get('price').number(),
// Fall back to parsing a string as float
mapSpec(
$.get('price').string(),
v => parseFloat(v as string),
'parseFloat'
),
// Last resort: null
literal(null)
)
)
)
.compile()
const result = await run(plan)
// result.value: { price: 42.5 }Value transforms
Use mapSpec() to transform extracted values at the spec level.
import { Planner, load, run, mapSpec } from '@origints/core'
const plan = new Planner()
.in(load({ date: '2024-01-15' }))
.emit((out, $) =>
out.add(
'year',
mapSpec(
$.get('date').string(),
v => new Date(v as string).getFullYear(),
'getYear'
)
)
)
.compile()
const result = await run(plan)
// result.value: { year: 2024 }Guarded extraction
Use guard() to validate extracted values. Fails with 'constraint' kind if the predicate returns false.
import { Planner, load, run, guard, tryExtract, literal } from '@origints/core'
const plan = new Planner()
.in(load({ age: -5 }))
.emit((out, $) =>
out.add(
'age',
tryExtract(
guard(
$.get('age').number(),
v => (v as number) >= 0,
'Age must be non-negative'
),
literal(0)
)
)
)
.compile()
const result = await run(plan)
// result.value: { age: 0 } (guard failed, fell back to literal)Merging plans
const configPlan = new Planner()
.in(loadFile('config.json'))
.mapIn(parseJson())
.emit((out, $) => out.add('host', $.get('host').string()))
.compile()
const usersPlan = new Planner()
.in(loadFile('users.json'))
.mapIn(parseJson())
.emit((out, $) => out.add('users', $.get('data').strings()))
.compile()
// Flat merge — combines outputs
const combined = Planner.merge(configPlan, usersPlan)
// Result type: { host: string } & { users: string[] }
// Named merge — nests under keys
const nested = Planner.mergeAs({ config: configPlan, users: usersPlan })
// Result type: { config: { host: string }, users: { users: string[] } }Schema validation
import { Planner, load, run } from '@origints/core'
import { z } from 'zod'
const UserSchema = z.object({
name: z.string(),
age: z.number().min(0),
})
const plan = new Planner()
.in(load({ name: 'Bob', age: 25 }).validate(UserSchema))
.emit((out, $) =>
out.add('name', $.get('name').string()).add('age', $.get('age').number())
)
.compile()
const result = await run(plan)Handling Results
The execution result is a discriminated union. All failures are captured as structured data—execution never throws exceptions for data errors.
const result = await run(plan)
if (result.ok) {
// .value is the typed output of your plan
console.log('Success:', result.value)
} else {
// .failures contains a list of all errors encountered
console.error('Execution failed:')
for (const failure of result.failures) {
console.error(`- [${failure.kind}] ${failure.message}`)
// Failures are linked to specific nodes in the plan
// failure.nodeId maps back to the plan AST
}
}Inspecting Lineage
Origins tracks the provenance of every value. You can format this trace as a human-readable string or get the structured data (which is JSON-serializable) using formatLineage.
import { formatLineage, formatLineageAsString } from '@origints/core'
const result = await run(plan)
// 1. Log human-readable string trace to console
console.log(formatLineageAsString(result.lineage, plan.ast))
// 2. Get structured data (JSON serializable)
const trace = formatLineage(result.lineage, plan.ast)
console.log(JSON.stringify(trace, null, 2))Benchmarking
Enable per-node execution timing to identify bottlenecks in your plan:
import { run, formatBenchmark } from '@origints/core'
const result = await run(plan, { benchmark: true })
if (result.benchmark) {
console.log(formatBenchmark(result.benchmark))
// Benchmark (245.3ms total)
// ────────────────────────────────────
// #1 source 12.1ms Direct input
// #2 transform 180.4ms Transform core:parseXlsx
// #3 emit 48.2ms Emit: companies, soiInvestments
// companies 32.1ms
// soiInvestments 16.1ms
// #4 mapOut 4.6ms MapOut: lookup
// lookup 4.6ms
//
// By phase:
// source 12.1ms (1 node)
// transform 180.4ms (1 node)
// emit 48.2ms (1 node)
// mapOut 4.6ms (1 node)
}The benchmark includes:
- Per-node wall-clock timing
- Sub-timings for each extraction within emit nodes
- Sub-timings for each transform within mapOut nodes
- Phase-level aggregation (total time per node kind)
Benchmarking adds zero overhead when disabled — no performance.now() calls unless { benchmark: true } is passed.
Output transforms
.mapOut() applies structural transformations to the output after extraction. All path-based methods use callback selectors for type-safe path construction with IDE autocomplete.
import { Planner, load, run, sum } from '@origints/core'
const plan = new Planner()
.in(
load({
items: [
{ category: 'A', amount: 100 },
{ category: 'A', amount: 200 },
{ category: 'B', amount: 50 },
],
})
)
.emit((out, $) =>
out.add(
'items',
$.get('items').array(item => ({
kind: 'object',
properties: {
category: item.get('category').string(),
amount: item.get('amount').number(),
},
}))
)
)
.mapOut($ =>
$.groupBy(o => o.items, 'category')
.at(o => o.items)
.each()
.aggregate({
operations: [sum('amount', 'total')],
into: 'inline',
})
)
.compile()
const result = await run(plan)
// result.value = {
// items: {
// A: { items: [...], total: 300 },
// B: { items: [...], total: 50 },
// }
// }Available transforms: groupBy, indexBy, aggregate, nest, unnest, sort, filter, rename, pick, omit, drop, lookup, joinBy, derive, pivot, apply.
ForEach extraction
Iterate over runtime values and use each to drive another extraction with variable binding.
import { forEach, variableRef, literal, object } from '@origints/core'
const spec = forEach(
literal(['Alice', 'Bob', 'Charlie']),
'name',
object({
greeting: variableRef('name', { extract: 'string' }),
})
)
const result = executeSpec(spec, {})
// result.value: [{ greeting: 'Alice' }, { greeting: 'Bob' }, { greeting: 'Charlie' }]Plan pipeline
Chain multiple plans sequentially with cross-plan lineage tracking.
import { PlanPipeline } from '@origints/core'
const result = await new PlanPipeline()
.step('config', configPlan)
.step('extract', input => {
const { threshold } = input as { threshold: number }
return new Planner()
.in(load({ minValue: threshold }))
.emit((out, $) => out.add('min', $.get('minValue').number()))
.compile()
})
.run()
if (result.ok) {
console.log(result.value)
// Lineage spans all steps
console.log(result.pipelineLineage.steps.size)
}Schema derivation
import { JsonSchema } from '@origints/core'
const schema = JsonSchema.output(plan, {
draft: '2020-12',
title: 'Report',
deduplicate: true,
})Project Status
- Experimental — APIs may change
Non-Goals
- Not a general-purpose ETL framework
- Not optimized for streaming large datasets
- Not a schema definition language
License
MIT
