@skedulo/integration-sdk
v0.8.2
Published
Runtime SDK for Skedulo integration data flows
Maintainers
Keywords
Readme
Overview
Runtime SDK for Skedulo integration data flows. Integration tenant packages (e.g. hubspot-integration) use this SDK to execute field mappings, sync records, filter data, and generate previews.
Installation
This package is intended for use inside Skedulo tenant integration packages.
yarn add @skedulo/integration-sdkUsage
Server-side (Node.js integration packages):
import type { DataFlow, Adapter, SyncResult } from '@skedulo/integration-sdk';
import {
BaseAdapter,
SyncService,
SyncRunner,
PreviewService,
MetadataHandler,
IdMappingService,
SyncMode,
AdapterRole,
SyncStage,
} from '@skedulo/integration-sdk';Browser/UI (types and constants only, no Node.js deps):
import type { FilterSpec, FilterOperator, SyncPipelineState } from '@skedulo/integration-sdk/browser';
import { FILTER_OPERATORS, SyncStage } from '@skedulo/integration-sdk/browser';The /browser entry point re-exports all types and pure constants without pulling in PSS, FME, or any Node.js built-ins. Use it in browser bundles to avoid bundling server-side dependencies.
Writing an Adapter
Extend BaseAdapter rather than implementing Adapter directly:
import { BaseAdapter } from '@skedulo/integration-sdk';
import type { IntegrationContext, DataFlow, UpsertOutcome } from '@skedulo/integration-sdk';
export class MyAdapter extends BaseAdapter {
// Required
async upsert(ctx: IntegrationContext, flow: DataFlow, records: Record<string, unknown>[]): Promise<UpsertOutcome[]> { /* ... */ }
async delete(ctx: IntegrationContext, flow: DataFlow, ids: string[]): Promise<void> { /* ... */ }
async getObjects(ctx: IntegrationContext) { /* ... */ }
async getFields(ctx: IntegrationContext, objectName: string) { /* ... */ }
async fetchSampleRecords(ctx: IntegrationContext, objectName: string, properties: string[], limit: number) { /* ... */ }
// Optional hooks — override only what you need
async beforeMap(ctx: IntegrationContext): Promise<void> {
if (this.isSource()) {
// flatten raw records, add synthetic fields to ctx.pipeline.data[i].raw
}
}
async afterUpsert(ctx: IntegrationContext): Promise<void> {
if (this.isTarget()) {
// write-back: store target IDs back in the source system
const outcomes = ctx.pipeline?.outcomes ?? [];
}
}
// Optional delta sync — implement one of these to enable per-record change detection
getRecordTimestamp(raw: Record<string, unknown>): Date | null {
const ts = raw['updatedAt'];
if (!ts) return null;
const d = new Date(String(ts));
return isNaN(d.getTime()) ? null : d;
}
}See docs/ADAPTERS.md for the full method reference and lifecycle hook guide.
Dispatcher protocol
Tenant integration functions expose a single POST endpoint that forwards the request into BaseAdapterHandler.dispatch. The action to execute is selected via the sked-integration-action request header; the body is the raw action input, and the response is an ActionResult:
type ActionResult<T> =
| { ok: true; result: T }
| {
ok: false;
error: { code: string; message: string; details?: unknown; retryable?: boolean };
};retryable tells callers (PSS, IS, UI) whether a retry could succeed. It comes from the thrown error's ErrorCategory — see Errors below.
Server wiring
import { BaseAdapterHandler } from '@skedulo/integration-sdk';
import { MyAdapter } from './my-adapter';
const handler = new BaseAdapterHandler({ adapter: new MyAdapter() });
// Wire the SDK dispatcher to a single POST route. The framework call shape
// matches Skedulo's function-utilities runtime (body, headers, method, path,
// skedContext); use whatever your HTTP layer provides.
app.post('/', async (req, res) => {
const { status, body } = await handler.dispatch(
req.body,
req.headers,
req.method,
req.path,
req.skedContext,
);
res.status(status).json(body);
});Client calls
The SDK does not ship a dispatch client — the wire protocol is simple enough that consumers (UI, cross-function calls) write a one-screen helper. Import the wire constants instead of hardcoding strings:
import { ACTION_HEADER, ACTIONS } from '@skedulo/integration-sdk';
async function dispatch<T = unknown>(action: string, input: unknown): Promise<T> {
const res = await fetch(`${baseUrl}/`, {
method: 'POST',
headers: {
'content-type': 'application/json',
authorization: `Bearer ${token}`,
[ACTION_HEADER]: action,
},
body: JSON.stringify(input ?? {}),
});
const envelope = await res.json();
if (!envelope.ok) {
throw Object.assign(new Error(envelope.error.message), envelope.error);
}
return envelope.result;
}
const flow = await dispatch(ACTIONS.dataFlow.activate, { id: 42 });ACTIONS is a frozen, dot-grouped map of all action names (ACTIONS.dataFlow.activate, ACTIONS.metadata.getObjects, …). ACTION_HEADER is the literal header name (sked-integration-action).
Authoring an action
Use defineAction to wire the standard middleware stack (metrics → logging → audit → requireAdmin) around the handler:
import { defineAction, ACTIONS, ConflictError } from '@skedulo/integration-sdk';
export const myAction = defineAction({
name: ACTIONS.scheduledJob.create,
parseInput: parseCreateInput,
requireAdmin: true, // 403 ForbiddenError if caller isn't admin
audit: true, // emits an audit log line on success
idempotent: false, // surfaced to callers so PSS doesn't blind-retry
handler: async (input, ctx) => {
if (await exists(input.name)) {
throw new ConflictError('JOB_ALREADY_EXISTS', `'${input.name}' already exists`);
}
return createJob(ctx, input);
},
});Every action receives ctx.requestId (correlation id) and ctx.metrics (counter/histogram sink). Inject a real metrics impl via BaseAdapterHandlerConfig.metrics; the default is a no-op.
Errors
Throw a typed subclass instead of new ActionError(CODE, ...). The dispatcher maps the error's category to HTTP status and echoes retryable in the envelope.
| Subclass | Category | HTTP | Retryable |
|---------------------------|-----------------------|-----:|:---------:|
| InvalidInputError | invalid_input | 422 | no |
| ForbiddenError | forbidden | 403 | no |
| NotFoundError | not_found | 404 | no |
| ConflictError | conflict | 409 | no |
| PreconditionFailedError | precondition_failed | 412 | no |
| NotImplementedError | not_implemented | 501 | no |
| UpstreamTransientError | upstream_transient | 503 | yes |
| UpstreamPermanentError | upstream_permanent | 424 | no |
| bare ActionError | internal | 500 | no |
Sync model
There are two ways a data flow runs:
- Realtime (per-flow triggered action) — the flow's
triggerConfigcarriessync_config: { strategy: 'realtime' }. Activation provisions one PSS triggered action; record changes dispatchevent.on-skedulo(Skedulo → vendor) orevent.on-vendor(vendor → Skedulo) immediately. - Batch (integration-level scheduled jobs) — the flow has no
sync_config(triggerConfigisnull). Operators group batch flows into ordered scheduled jobs at the integration level. Each job provisions one PSS scheduled webhook that firessync.run-job, which runs the job'sflow_idssequentially (continue-on-error).
// Realtime flow — triggerConfig
{
"sync_config": {
"strategy": "realtime",
"trigger_name": "sked-int-…", // set by the SDK on activate
"trigger_type": "triggered_action"
}
}
// Scheduled jobs — tenant_integration.config (an IntegrationConfig)
{
"scheduled_jobs": [
{
"webhook_name": "sked-int-hubspot-job-nightly-crm", // identity + ES artifact name
"name": "Nightly CRM",
"interval_value": 5,
"interval_unit": "minutes", // "minutes" (1–60) | "hours" (1–24)
"flow_ids": [12, 8, 3], // ordered; unique across jobs (J5)
"enabled": true
}
]
}Narrow a raw triggerConfig with asSyncConfig(triggerConfig) → SyncConfig | null (returns non-null only for realtime). Narrow a raw config with asIntegrationConfig(value) → IntegrationConfig (always returns { scheduled_jobs: [] } for null/garbage; drops malformed entries).
Per-flow lifecycle (realtime only)
The SDK provisions the triggered action on dataFlow.activate and tears it down on dataFlow.deactivate, two-phase fail-closed, via the PSS ArtifactClient (ArtifactType.TRIGGERED_ACTION). Batch flows are status-only — no artifact, no provisioning. Each realtime artifact is named deterministically:
sked-int-<integrationType>-<sourceObject>-to-<targetObject>-flow<id>Scheduled-job lifecycle
scheduled-job.create / .update / .delete (admin-gated) write the IntegrationConfig via the SDK's IntegrationConfigClient (config-first, two-phase fail-closed), then reconcile a PSS webhook named:
sked-int-<integrationType>-job-<slug(name)>The webhook posts sync.run-job with the sked-scheduled-job-name header carrying the webhook_name. The SDK looks up the job in config and runs each flow in order; one flow failing does not stop the others.
Artifact names act as idempotency keys and are capped (the SDK throws rather than truncating silently): 255 chars for per-flow realtime artifacts (system-derived) and 200 chars for scheduled-job webhooks (derived from the user-supplied job name, so a long display name is rejected up-front). The callback URL baked into each artifact is {{ SKEDULO_API_URL }}/function/<type>/<type> — the platform substitutes the tenant API host when the artifact fires.
See docs/DATA-FLOW-LIFECYCLE.md for the full activate/deactivate flow and the scheduled-job action surface.
Development
yarn build # Compile TypeScript → dist/
yarn typecheck # Type-check src/ and test/ without emitting output
yarn test # Run all tests
yarn test:coverage # Run tests with coverage
yarn format # Format all src/ and test/ files with PrettierDocumentation
| Doc | What it covers |
|---|---|
| docs/ADAPTERS.md | Adapter roles, lifecycle hooks, BaseAdapter, implementation guide |
| docs/DATA-FLOW-LIFECYCLE.md | activate/deactivate, sync_config shape, artifact provisioning |
| docs/SYNC-PIPELINE.md | Batch and event-driven sync, pipeline state, error capture |
| docs/ARCHITECTURE.md | Module map, design decisions, key concepts |
| docs/FIELD-MAPPINGS.md | Transform types, MappingConfig format, validation |
| docs/FILTERS.md | FilterSpec, operators, two-layer filter architecture |
| docs/TEMPLATES.md | Pre-built DataFlowTemplate and SampleAPI definitions |
| docs/PREVIEWS.md | PreviewService — dry-run mapping against sample records |
