@unchainedshop/connector-sdk
v5.0.3
Published
Connector SDK
Readme
@unchainedshop/connector-sdk
SDK for building ETL connectors that sync data into the Unchained Commerce engine via its Bulk Import API.
The SDK manages the full Extract-Transform-Load lifecycle: fetching remote data into a local MongoDB staging database, deduplicating events against previously submitted hashes, chunking and streaming uploads to the Unchained Bulk Import endpoint, and tracking each run in a journal for incremental (differential) syncs.
Installation
npm install @unchainedshop/connector-sdk mongodbmongodb is a peer dependency (^7.0.0).
Quick Start
import { Connector } from "@unchainedshop/connector-sdk";
import type { BulkImportEvent } from "@unchainedshop/connector-sdk";
const connector = new Connector({
unchainedEndpoint: process.env.UNCHAINED_ENDPOINT,
unchainedSecret: process.env.UNCHAINED_SECRET,
});
try {
await connector.init();
// 1. Extract — fetch remote data into local MongoDB collections
await connector.extract(fetchProducts, "products");
await connector.extract(fetchAssortments, "assortments", { replace: true });
// 2. Transform — build events from extracted data
const events: BulkImportEvent[] = buildEventsFromDB(connector.db);
// 3. Prepare — deduplicate updates, emit deletes (streamed via async generator)
const prepared = connector.prepareEvents(events, { emitDeletes: "PRODUCT" });
// 4. Load — submit to Unchained in 10K-event chunks
const ids = await connector.load(prepared);
await connector.reportSuccess(ids);
} catch {
await connector.reportFailure("LOAD");
} finally {
await connector.dispose();
}Environment Variables
| Variable | Description | Required |
| --- | --- | --- |
| UNCHAINED_ENDPOINT | Unchained engine base URL (e.g. https://engine.example.com) | Yes (or pass via ConnectorOptions) |
| UNCHAINED_SECRET | API secret used for Bearer admin:<secret> authentication | Yes (or pass via ConnectorOptions) |
| MONGO_URL | MongoDB connection string for the local staging database | Yes (or pass via ConnectorOptions) |
| UNCHAINED_WORKER_ID | Worker identifier reported to the work queue (default: "connector") | No |
| UNCHAINED_CONNECTOR_TIMEOUT | Timeout in minutes for runFromExternalWork (default: 30) | No |
| NODE_ENV | Set to "development" to auto-start an in-memory MongoDB via mongodb-memory-server | No |
| COCKPIT_ENDPOINT | Cockpit CMS endpoint (for remotes.cockpit) | No |
| COCKPIT_API_KEY | Cockpit CMS API key | No |
API
Connector
The main class that manages the full ETL lifecycle. All accessors (db, unchained, journalEntry) throw if init() has not been called.
Constructor
new Connector(options?: ConnectorOptions)interface ConnectorOptions {
mongoUrl?: string; // Falls back to MONGO_URL env
unchainedEndpoint?: string; // Falls back to UNCHAINED_ENDPOINT env
unchainedSecret?: string; // Falls back to UNCHAINED_SECRET env
customHeaders?: Record<string, string>; // Extra headers on all Unchained API requests
reset?: boolean; // Drop journal + submitted events on init
}All options fall back to their corresponding environment variables when omitted.
Connector.fromExisting(options)
Static factory that creates a Connector from already-initialized MongoDB and Unchained clients. The connector will not close the MongoDB connection on dispose() since it does not own it.
const connector = await Connector.fromExisting({
mongo: existingMongoClient,
unchainedAPI: existingUnchainedAPI,
reset: false,
});connector.init(): Promise<this>
Connects to MongoDB, initializes the Unchained API client, creates database indexes, and starts a new journal entry. Returns this for chaining:
const connector = await new Connector().init();connector.dispose(): Promise<void>
Closes the MongoDB connection (if the connector owns it) and releases all internal references.
connector.db: Db
The MongoDB Db instance for the staging database. Use this to query extracted data when building transform logic.
connector.unchained: UnchainedAPI
The Unchained API client. See UnchainedAPI for the full interface.
connector.journalEntry: WithId<JournalEntry>
The current journal entry document for this run. Contains started, since (start of the sync window for differential syncs), differential (whether a previous successful run exists), and status.
Extract
connector.extract(fetchFn, collectionName, options?): Promise<number>
Calls fetchFn to retrieve an array of documents and stores them in the named MongoDB collection. Returns the number of inserted documents.
// Simple insertion
await connector.extract(
() => fetch("https://api.example.com/products").then((r) => r.json()),
"products",
);
// Upsert by _id (for incremental updates)
await connector.extract(fetchProducts, "products", { replace: true });
// Delete matching docs before inserting
await connector.extract(fetchProducts, "products", {
deleteFilter: { category: "old" },
});| Option | Type | Description |
| --- | --- | --- |
| replace | boolean | Upsert documents by _id instead of plain insertMany. Use when re-extracting data that may already exist. |
| deleteFilter | Record<string, unknown> | Delete matching documents from the collection before inserting the new data. |
| loadFromArchive | boolean | If true and archivePath is set, attempt to load data from a gzip archive instead of calling fetchFn. Falls back to fetchFn if no archive exists. |
| storeInArchive | boolean | If true and archivePath is set, save fetched data to a gzip archive after extraction. |
| archivePath | string | Directory path for archive files (default: "./offline-archives"). |
| dateFields | string[] | Field names to deserialize as Date objects when loading from archive (JSON serializes dates as strings). |
The fetchFn receives a RequestInit object and must return Promise<P[]> where P extends BulkImportPayload.
Transform
connector.prepareEvents(newEvents, options?): AsyncGenerator<HashedBulkImportEvent>
An async generator that deduplicates update events against previously submitted hashes and optionally emits REMOVE operations for entities that are no longer present in the source data. Because it yields events one at a time, no intermediate arrays need to be allocated in memory.
const prepared = connector.prepareEvents(events, {
emitDeletes: "PRODUCT",
excludeKeys: (key) => key === "updatedAt",
});| Option | Type | Description |
| --- | --- | --- |
| excludeKeys | (key: string) => boolean | Predicate that returns true for keys to strip before hashing. Use this to ignore volatile fields that change every sync but don't represent meaningful updates. |
| emitDeletes | boolean \| string | Emit REMOVE operations for entities that were previously submitted but are missing from newEvents. Pass a string (e.g. "PRODUCT") to scope deletes to one entity type, or true for all entity types. |
Default excluded keys: Authorization, updated, created, updatedAt, createdAt, modified, modifiedAt, published.
How deduplication works:
- Queries the
unchained_submitted_eventscollection for the last meaningful (non-REMOVE) event perentity:payloadIdpair - Hashes each incoming event's payload (with excluded keys stripped) using
object-code - Skips
UPDATEevents whose hash matches the previously submitted hash - If
emitDeletesis enabled, queries all previously submitted entity IDs and yieldsREMOVEevents for any that are absent from the new event set
The input newEvents can be a plain array or an AsyncIterable<BulkImportEvent>.
Load
connector.load(events, submitOptions?): Promise<ObjectId[]>
Submits events to the Unchained Bulk Import API and records them in the unchained_submitted_events collection for future deduplication. Returns the MongoDB ObjectIds of the submitted event records.
Events are processed in chunks of 10,000. Each chunk is serialized to a temporary file and streamed to the API (unless disableStream is set). After all chunks are submitted, duplicate submitted events are garbage-collected.
Accepts a plain array or an AsyncIterable (the generator returned by prepareEvents).
const ids = await connector.load(prepared, {
shouldUpsert: true,
skipCacheInvalidation: true,
});interface SubmitOptions {
shouldUpsert?: boolean; // CREATE upserts if an entity with the same _id already exists
shouldUpsertOnUpdate?: boolean; // UPDATE creates the entity if no entity with _id exists
skipCacheInvalidation?: boolean; // Skip cache invalidation after import
disableStream?: boolean; // Send JSON body instead of streaming from a temp file
}Journal & Reporting
The journal tracks each ETL run. On init(), a new journal entry is created with status INITIAL. The since field is set to the started timestamp of the last successful (COMPLETE) run, enabling differential/incremental syncs.
connector.reportSuccess(eventIds?): Promise<void>
Records a COMPLETE status on the current journal entry. Optionally pass the ObjectId[] returned by load() to store them on the journal entry.
const ids = await connector.load(prepared);
await connector.reportSuccess(ids);connector.reportFailure(stage?): Promise<void>
Records a failure status. Pass "EXTRACT", "TRANSFORM", or "LOAD" to indicate which stage failed (defaults to "LOAD").
try {
await connector.extract(fetchFn, "products");
} catch {
await connector.reportFailure("EXTRACT");
}enum CompletionStatus {
INITIAL = "INITIAL",
COMPLETE = "COMPLETE",
FAILED_EXTRACT = "FAILED_EXTRACT",
FAILED_TRANSFORM = "FAILED_TRANSFORM",
FAILED_LOAD = "FAILED_LOAD",
}Existing Entity IDs
connector.getExistingEntityIds(entity?): Promise<Set<string>>
Returns a Set of payload _id values for all previously submitted entities (excluding those whose last operation was REMOVE). Optionally filter by entity type.
const existingProductIds = await connector.getExistingEntityIds("PRODUCT");
if (!existingProductIds.has("some-product-id")) {
// This product has never been synced before
}Archives
Utility methods for offline development and caching extracted data as gzip-compressed JSON files.
connector.loadFromArchive<T>(archiveName, options?): Promise<T | null>
Loads and decompresses a {archivePath}/{archiveName}.json.gz file. Returns the parsed data or null if the file doesn't exist.
const products = await connector.loadFromArchive<Product[]>("products", {
archivePath: "./offline-archives",
dateFields: ["updatedAt", "createdAt"],
});| Option | Type | Description |
| --- | --- | --- |
| archivePath | string | Directory for archive files (default: "./offline-archives") |
| dateFields | string[] | Field names to convert from strings back to Date objects |
connector.storeToArchive(archiveName, data, options?): Promise<void>
Serializes data as JSON, gzip-compresses it, and writes it to {archivePath}/{archiveName}.json.gz. Creates the directory if it doesn't exist.
await connector.storeToArchive("products", fetchedProducts, {
archivePath: "./offline-archives",
});Work Queue
runFromExternalWork(mainFn, options): Promise<void>
Orchestrates an ETL run driven by the Unchained work queue. This is designed for connectors that run as long-lived workers polling for work items.
The function:
- Allocates work of the given
workTypefrom the Unchained work queue - If no work is available, cleans up any dead/zombie workers and returns
- Runs
mainFnwith a timeout (configurable viaUNCHAINED_CONNECTOR_TIMEOUT, default: 30 minutes) - Reports success or failure back to the Unchained engine
import { Connector, runFromExternalWork } from "@unchainedshop/connector-sdk";
async function syncProducts(options) {
const connector = new Connector();
try {
await connector.init();
// ... ETL logic ...
await connector.reportSuccess(ids);
return connector.journalEntry;
} finally {
await connector.dispose();
}
}
await runFromExternalWork(syncProducts, {
workType: "SYNC_PRODUCTS",
unchainedAPI: connector.unchained,
});type RunFromExternalWorkOptions<T> = {
workType: string; // The work type to allocate from the queue
unchainedAPI: UnchainedAPI; // Initialized Unchained API client
} & T; // Additional options are passed through to mainFnThe mainFn must return a JournalEntry. If journalEntry.status === "COMPLETE", the work is reported as successful; otherwise it's reported as failed.
timeout(minutes): Promise<void>
Returns a promise that rejects after the given number of minutes. Used internally by runFromExternalWork but exported for custom timeout logic.
import { timeout } from "@unchainedshop/connector-sdk";
await Promise.race([
longRunningOperation(),
timeout(10),
]);Remotes
Low-level API clients available via the remotes namespace. These are used internally by the Connector class but can be used directly for advanced use cases.
import { remotes } from "@unchainedshop/connector-sdk";remotes.mongodb
MongoDB client initialization and in-memory server management.
initMongoDBClient(options?): Promise<MongoClient>
Creates and returns a connected MongoClient. Connection settings: 100s connect timeout, 1h socket timeout, ignoreUndefined: true.
const mongo = await remotes.mongodb.initMongoDBClient({
mongoUrl: "mongodb://localhost:27017/connector",
});resolveURI(options?): Promise<string>
Resolves the MongoDB connection URI. Checks (in order):
options.mongoUrlMONGO_URLenvironment variable- If
NODE_ENV=development, auto-starts an in-memory MongoDB server
start(): Promise<{ uri, port, dbPath, dbName }>
Starts a MongoMemoryServer instance. Useful for development and testing.
stop(): Promise<void>
Stops the in-memory MongoDB server if one was started.
remotes.unchained
remotes.unchained.default(options?): UnchainedAPI
Creates an Unchained API client. All requests use Bearer admin:<secret> authentication with a 5-minute default timeout (15 minutes for streaming uploads).
const api = remotes.unchained.default({
unchainedEndpoint: "https://engine.example.com",
unchainedSecret: "my-secret",
customHeaders: { "X-Custom": "value" },
});UnchainedAPI methods:
| Method | Description |
| --- | --- |
| submitEventsAsStream(fileName, submitOptions?) | Stream a temp file to the /bulk-import endpoint (15-min timeout) |
| submitEvents(body, submitOptions?) | POST a JSON body to the /bulk-import endpoint |
| allocateWork(workType) | Allocate the next available work item from the queue via GraphQL. Returns the work object, null if none available, or false on connection failure. |
| finishWork(workId, { success, result, error? }) | Report work completion/failure back to the engine via GraphQL |
| findDeadWork(workType) | Find work items with ALLOCATED status for the current worker (zombie detection) |
| addExternalWork(workType, input) | Add a new work item to the queue via GraphQL |
| submitErrorMessage(title, content) | Send an error report as a MESSAGE work item |
| unchainedFetch(path, params, fetchOptions) | Low-level HTTP wrapper with auth headers. Params are appended as URL search params. |
remotes.cockpit
Cockpit CMS client for fetching content.
remotes.cockpit.default(options): CockpitAPI
const cockpit = remotes.cockpit.default({
endpoint: "https://cockpit.example.com",
apiKey: "my-api-key",
});CockpitAPI methods:
| Method | Description |
| --- | --- |
| fetchEntries(entity, locale?, { filter? }) | Fetch entries from a Cockpit collection. Locale defaults to "default" (mapped from "de"). |
| findLocaleTranslation(projectName, locale?) | Fetch localization data for a Lokalize project. Locale defaults to "de". |
| fetchPageById(page, id, params?) | Fetch a specific page by ID. |
Utilities
normalizePrice(textualPrice?: string | number): number
Converts a price value (string or number) to an integer in the smallest currency unit (cents). Handles thousands separators (commas, apostrophes, backticks) in string inputs.
import { normalizePrice } from "@unchainedshop/connector-sdk";
normalizePrice("2,299.00"); // 229900
normalizePrice(22.99); // 2299
normalizePrice(undefined); // NaNTypes
All types are exported for use in consumer code:
import type {
BulkImportEvent,
HashedBulkImportEvent,
BulkImportPayload,
BulkImportEntity,
BulkImportOperation,
SubmitOptions,
ConnectorOptions,
JournalEntry,
RunFromExternalWorkOptions,
UnchainedAPI,
Db,
MongoClient,
} from "@unchainedshop/connector-sdk";BulkImportEvent<P>
interface BulkImportEvent<P extends BulkImportPayload = BulkImportPayload> {
entity: BulkImportEntity;
operation: BulkImportOperation;
payload: P;
}BulkImportPayload
interface BulkImportPayload {
_id: string;
[key: string]: unknown;
}BulkImportEntity
One of "PRODUCT", "ASSORTMENT", "FILTER", "ENROLLMENT", "ORDER", "USER", "QUOTATION", or any custom string.
BulkImportOperation
"CREATE" | "UPDATE" | "REMOVE"
HashedBulkImportEvent<P>
Extends BulkImportEvent with a hash: string field used for deduplication.
MongoDB Collections
The connector uses two internal collections in the staging database:
| Collection | Purpose |
| --- | --- |
| journal | Tracks each ETL run with status, timestamps, and event IDs. Indexed on { status: 1, started: -1 }. |
| unchained_submitted_events | Records every event submitted to Unchained with its payload hash for deduplication. Indexed on { emitted: 1, _id: 1 }, { payload._id: 1, entity: 1 }, { payload._id: 1 }, and { entity: 1 }. |
Extracted data is stored in user-defined collections (the collectionName parameter to extract()).
Complete Example: Work-Queue-Driven Connector
import {
Connector,
runFromExternalWork,
remotes,
} from "@unchainedshop/connector-sdk";
import type { BulkImportEvent } from "@unchainedshop/connector-sdk";
// Initialize an Unchained API client for the work queue
const unchainedAPI = remotes.unchained.default();
async function syncProducts() {
const connector = new Connector();
try {
await connector.init();
// Use the journal's "since" for incremental fetches
const since = connector.journalEntry.since;
// Extract
await connector.extract(
() => fetchProductsUpdatedSince(since),
"products",
{ replace: true },
);
// Transform
const events: BulkImportEvent[] = [];
const products = await connector.db
.collection("products")
.find()
.toArray();
for (const product of products) {
events.push({
entity: "PRODUCT",
operation: "UPDATE",
payload: {
_id: product.externalId,
title: product.name,
price: product.price,
},
});
}
// Prepare (deduplicate + emit deletes)
const prepared = connector.prepareEvents(events, {
emitDeletes: "PRODUCT",
});
// Load
const ids = await connector.load(prepared);
await connector.reportSuccess(ids);
return connector.journalEntry;
} catch (e) {
await connector.reportFailure("LOAD");
throw e;
} finally {
await connector.dispose();
}
}
// Poll the work queue in a loop
while (true) {
await runFromExternalWork(syncProducts, {
workType: "SYNC_PRODUCTS",
unchainedAPI,
});
await new Promise((resolve) => setTimeout(resolve, 60_000));
}License
ISC
