@seqholdings/lattice
v0.1.0
Published
TypeScript SDK for Lattice — data lineage + process orchestration
Maintainers
Readme
@seqholdings/lattice
TypeScript SDK for Lattice — data lineage and process orchestration.
Install
pnpm add @seqholdings/latticeQuick start
import { Lattice } from '@seqholdings/lattice'
const lattice = new Lattice({
apiKey: process.env.LATTICE_API_KEY,
baseUrl: 'https://atlas.seqholdings.com',
})
// List lineages.
const lineages = await lattice.lineages.list()
// Work with a specific lineage.
const graph = lattice.lineage(lineages[0].id)
// Search nodes/edges.
const hits = await graph.search.search('daily sync', { mode: 'smart' })
// Ask a question. Returns a stream with incremental answer + retrieved hits.
const stream = graph.search.ask('what does the daily sync process do?')
for await (const event of stream) {
if (event.type === 'chunk') process.stdout.write(event.data.delta)
if (event.type === 'hits') console.log('related nodes:', event.data.hits.length)
}
const { answer, hits: retrieved } = await stream.done()Processes
// Create-or-get by (lineageId, name).
const process = await lattice.processes.upsert({
lineageId: graph.id,
name: 'daily-sync',
description: 'Daily data sync from Abrigo',
})
// Start a run.
const { run } = await lattice.processes.startRun(process.id, {
input: { batch_date: '2026-04-21' },
})
// Manually run a node inside the run.
const result = await lattice.processes.runNode(process.id, 'validate-data', {
runId: run.id,
})
// Or push completion state from an external system.
await lattice.processes.completeNode(process.id, 'external-task', {
runId: run.id,
toState: 'completed',
output: { rows: 1_523 },
})
// Advance the state machine (idempotent scheduler tick).
const advanced = await lattice.processes.advanceRun(process.id, run.id)
// Cancel.
await lattice.processes.cancelRun(process.id, run.id)Process crons
A ProcessCron is a (cadence, effect) pair attached to a process. Cadences (e.g. interval) decide when to fire; effects (e.g. start_run, code) decide what happens. Both come from pluggable server-side registries — adding a new kind is one file + one registry line on the backend and shows up here automatically.
// Fire a run every 5 minutes.
await lattice.processes.crons.create(process.id, {
label: 'Nightly reconciliation',
cadence: { kind: 'interval', config: { every_seconds: 300 } },
effect: { kind: 'start_run', config: {} },
})
// Sandboxed JS that polls an external source and fans out runs.
await lattice.processes.crons.create(process.id, {
label: 'Abrigo new-loan poll',
cadence: { kind: 'interval', config: { every_seconds: 60 } },
effect: {
kind: 'code',
config: {
code: `
import { Lattice } from '@seqholdings/lattice'
export async function main(input, ctx) {
const lattice = new Lattice({
apiKey: ctx.env.LATTICE_API_KEY,
baseUrl: ctx.env.LATTICE_BASE_URL,
})
const { run } = await lattice.processes.startRun(input.processId, {
runId: \`cron:\${input.cronId}:\${input.bucketKey}\`,
})
return { fire: true, key: input.bucketKey, runId: run.id }
}
`,
},
},
})
await lattice.processes.crons.update(process.id, cron.id, { enabled: false })
await lattice.processes.crons.delete(process.id, cron.id)Idempotency is structural: every fire is a row in meta.process_cron_fires keyed on (cron_id, key). Single returns fall back to the cadence's bucket key; array fan-out requires an explicit key on every item.
Nodes (lineage)
const nodes = await graph.nodes.list({ platformId: 'databricks', limit: 50 })
await graph.nodes.update('my-node', { description: 'Primary customers table' })
// Invoke a notebook/job node and stream progress.
const initial = await graph.nodes.run('my-notebook', { date: '2026-04-21' })
if (initial.run_id) {
const stream = graph.nodes.stream('my-notebook', initial.run_id)
stream.on('event', (ev) => console.log(ev.type, ev.data))
const final = await stream.done()
console.log('final status:', final?.status)
}Live events
const controller = new AbortController()
const stream = graph.events.stream({ signal: controller.signal })
for await (const event of stream) {
if (event.event_type === 'monitor_opened') console.warn('alert:', event.detail)
}Errors
Every error extends LatticeError. Use .isInstance(err) for cross-realm safety.
import { LatticeError, LatticeNotFoundError } from '@seqholdings/lattice'
try {
await lattice.processes.get('missing')
} catch (err) {
if (LatticeNotFoundError.isInstance(err)) console.log('Not found:', err.requestId)
else if (LatticeError.isInstance(err)) console.error(err.statusCode, err.detail)
else throw err
}Status-mapped subclasses:
| Status | Class |
| --- | --- |
| 400 | LatticeBadRequestError |
| 401 | LatticeAuthenticationError |
| 403 | LatticePermissionError |
| 404 | LatticeNotFoundError |
| 409 | LatticeConflictError |
| 429 | LatticeRateLimitError |
| 5xx | LatticeServerError |
| network | LatticeConnectionError |
| timeout | LatticeTimeoutError |
| aborted | LatticeAbortError |
Streaming
LatticeStream is an async iterable that also exposes .on('event' | 'finish' | 'error') callbacks and a .done() promise for the reduced final result.
const stream = graph.nodes.stream(nodeId, runId)
stream.on('event', (e) => console.log(e.type))
stream.on('error', (err) => console.error(err))
for await (const event of stream) { /* ... */ }
const final = await stream.done()Configuration
new Lattice({
apiKey: async () => await getFreshToken(), // sync or async provider
baseUrl: 'https://atlas.seqholdings.com',
maxRetries: 2, // default
timeout: 60_000, // default
fetch: customFetch, // inject your own fetch (testing, proxying)
defaultHeaders: { 'X-Correlation-Id': 'abc' },
})The client retries transient failures automatically (408/409/429/5xx + network) with exponential backoff + jitter, honouring Retry-After when present. Aborted requests never retry.
