npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@unchainedshop/connector-sdk

v5.0.3

Published

Connector SDK

Readme

@unchainedshop/connector-sdk

SDK for building ETL connectors that sync data into the Unchained Commerce engine via its Bulk Import API.

The SDK manages the full Extract-Transform-Load lifecycle: fetching remote data into a local MongoDB staging database, deduplicating events against previously submitted hashes, chunking and streaming uploads to the Unchained Bulk Import endpoint, and tracking each run in a journal for incremental (differential) syncs.

Installation

npm install @unchainedshop/connector-sdk mongodb

mongodb is a peer dependency (^7.0.0).

Quick Start

import { Connector } from "@unchainedshop/connector-sdk";
import type { BulkImportEvent } from "@unchainedshop/connector-sdk";

const connector = new Connector({
  unchainedEndpoint: process.env.UNCHAINED_ENDPOINT,
  unchainedSecret: process.env.UNCHAINED_SECRET,
});

try {
  await connector.init();

  // 1. Extract — fetch remote data into local MongoDB collections
  await connector.extract(fetchProducts, "products");
  await connector.extract(fetchAssortments, "assortments", { replace: true });

  // 2. Transform — build events from extracted data
  const events: BulkImportEvent[] = buildEventsFromDB(connector.db);

  // 3. Prepare — deduplicate updates, emit deletes (streamed via async generator)
  const prepared = connector.prepareEvents(events, { emitDeletes: "PRODUCT" });

  // 4. Load — submit to Unchained in 10K-event chunks
  const ids = await connector.load(prepared);

  await connector.reportSuccess(ids);
} catch {
  await connector.reportFailure("LOAD");
} finally {
  await connector.dispose();
}

Environment Variables

| Variable | Description | Required | | --- | --- | --- | | UNCHAINED_ENDPOINT | Unchained engine base URL (e.g. https://engine.example.com) | Yes (or pass via ConnectorOptions) | | UNCHAINED_SECRET | API secret used for Bearer admin:<secret> authentication | Yes (or pass via ConnectorOptions) | | MONGO_URL | MongoDB connection string for the local staging database | Yes (or pass via ConnectorOptions) | | UNCHAINED_WORKER_ID | Worker identifier reported to the work queue (default: "connector") | No | | UNCHAINED_CONNECTOR_TIMEOUT | Timeout in minutes for runFromExternalWork (default: 30) | No | | NODE_ENV | Set to "development" to auto-start an in-memory MongoDB via mongodb-memory-server | No | | COCKPIT_ENDPOINT | Cockpit CMS endpoint (for remotes.cockpit) | No | | COCKPIT_API_KEY | Cockpit CMS API key | No |

API

Connector

The main class that manages the full ETL lifecycle. All accessors (db, unchained, journalEntry) throw if init() has not been called.

Constructor

new Connector(options?: ConnectorOptions)
interface ConnectorOptions {
  mongoUrl?: string;                        // Falls back to MONGO_URL env
  unchainedEndpoint?: string;               // Falls back to UNCHAINED_ENDPOINT env
  unchainedSecret?: string;                 // Falls back to UNCHAINED_SECRET env
  customHeaders?: Record<string, string>;   // Extra headers on all Unchained API requests
  reset?: boolean;                          // Drop journal + submitted events on init
}

All options fall back to their corresponding environment variables when omitted.

Connector.fromExisting(options)

Static factory that creates a Connector from already-initialized MongoDB and Unchained clients. The connector will not close the MongoDB connection on dispose() since it does not own it.

const connector = await Connector.fromExisting({
  mongo: existingMongoClient,
  unchainedAPI: existingUnchainedAPI,
  reset: false,
});

connector.init(): Promise<this>

Connects to MongoDB, initializes the Unchained API client, creates database indexes, and starts a new journal entry. Returns this for chaining:

const connector = await new Connector().init();

connector.dispose(): Promise<void>

Closes the MongoDB connection (if the connector owns it) and releases all internal references.

connector.db: Db

The MongoDB Db instance for the staging database. Use this to query extracted data when building transform logic.

connector.unchained: UnchainedAPI

The Unchained API client. See UnchainedAPI for the full interface.

connector.journalEntry: WithId<JournalEntry>

The current journal entry document for this run. Contains started, since (start of the sync window for differential syncs), differential (whether a previous successful run exists), and status.


Extract

connector.extract(fetchFn, collectionName, options?): Promise<number>

Calls fetchFn to retrieve an array of documents and stores them in the named MongoDB collection. Returns the number of inserted documents.

// Simple insertion
await connector.extract(
  () => fetch("https://api.example.com/products").then((r) => r.json()),
  "products",
);

// Upsert by _id (for incremental updates)
await connector.extract(fetchProducts, "products", { replace: true });

// Delete matching docs before inserting
await connector.extract(fetchProducts, "products", {
  deleteFilter: { category: "old" },
});

| Option | Type | Description | | --- | --- | --- | | replace | boolean | Upsert documents by _id instead of plain insertMany. Use when re-extracting data that may already exist. | | deleteFilter | Record<string, unknown> | Delete matching documents from the collection before inserting the new data. | | loadFromArchive | boolean | If true and archivePath is set, attempt to load data from a gzip archive instead of calling fetchFn. Falls back to fetchFn if no archive exists. | | storeInArchive | boolean | If true and archivePath is set, save fetched data to a gzip archive after extraction. | | archivePath | string | Directory path for archive files (default: "./offline-archives"). | | dateFields | string[] | Field names to deserialize as Date objects when loading from archive (JSON serializes dates as strings). |

The fetchFn receives a RequestInit object and must return Promise<P[]> where P extends BulkImportPayload.


Transform

connector.prepareEvents(newEvents, options?): AsyncGenerator<HashedBulkImportEvent>

An async generator that deduplicates update events against previously submitted hashes and optionally emits REMOVE operations for entities that are no longer present in the source data. Because it yields events one at a time, no intermediate arrays need to be allocated in memory.

const prepared = connector.prepareEvents(events, {
  emitDeletes: "PRODUCT",
  excludeKeys: (key) => key === "updatedAt",
});

| Option | Type | Description | | --- | --- | --- | | excludeKeys | (key: string) => boolean | Predicate that returns true for keys to strip before hashing. Use this to ignore volatile fields that change every sync but don't represent meaningful updates. | | emitDeletes | boolean \| string | Emit REMOVE operations for entities that were previously submitted but are missing from newEvents. Pass a string (e.g. "PRODUCT") to scope deletes to one entity type, or true for all entity types. |

Default excluded keys: Authorization, updated, created, updatedAt, createdAt, modified, modifiedAt, published.

How deduplication works:

  1. Queries the unchained_submitted_events collection for the last meaningful (non-REMOVE) event per entity:payloadId pair
  2. Hashes each incoming event's payload (with excluded keys stripped) using object-code
  3. Skips UPDATE events whose hash matches the previously submitted hash
  4. If emitDeletes is enabled, queries all previously submitted entity IDs and yields REMOVE events for any that are absent from the new event set

The input newEvents can be a plain array or an AsyncIterable<BulkImportEvent>.


Load

connector.load(events, submitOptions?): Promise<ObjectId[]>

Submits events to the Unchained Bulk Import API and records them in the unchained_submitted_events collection for future deduplication. Returns the MongoDB ObjectIds of the submitted event records.

Events are processed in chunks of 10,000. Each chunk is serialized to a temporary file and streamed to the API (unless disableStream is set). After all chunks are submitted, duplicate submitted events are garbage-collected.

Accepts a plain array or an AsyncIterable (the generator returned by prepareEvents).

const ids = await connector.load(prepared, {
  shouldUpsert: true,
  skipCacheInvalidation: true,
});
interface SubmitOptions {
  shouldUpsert?: boolean;           // CREATE upserts if an entity with the same _id already exists
  shouldUpsertOnUpdate?: boolean;   // UPDATE creates the entity if no entity with _id exists
  skipCacheInvalidation?: boolean;  // Skip cache invalidation after import
  disableStream?: boolean;          // Send JSON body instead of streaming from a temp file
}

Journal & Reporting

The journal tracks each ETL run. On init(), a new journal entry is created with status INITIAL. The since field is set to the started timestamp of the last successful (COMPLETE) run, enabling differential/incremental syncs.

connector.reportSuccess(eventIds?): Promise<void>

Records a COMPLETE status on the current journal entry. Optionally pass the ObjectId[] returned by load() to store them on the journal entry.

const ids = await connector.load(prepared);
await connector.reportSuccess(ids);

connector.reportFailure(stage?): Promise<void>

Records a failure status. Pass "EXTRACT", "TRANSFORM", or "LOAD" to indicate which stage failed (defaults to "LOAD").

try {
  await connector.extract(fetchFn, "products");
} catch {
  await connector.reportFailure("EXTRACT");
}
enum CompletionStatus {
  INITIAL = "INITIAL",
  COMPLETE = "COMPLETE",
  FAILED_EXTRACT = "FAILED_EXTRACT",
  FAILED_TRANSFORM = "FAILED_TRANSFORM",
  FAILED_LOAD = "FAILED_LOAD",
}

Existing Entity IDs

connector.getExistingEntityIds(entity?): Promise<Set<string>>

Returns a Set of payload _id values for all previously submitted entities (excluding those whose last operation was REMOVE). Optionally filter by entity type.

const existingProductIds = await connector.getExistingEntityIds("PRODUCT");
if (!existingProductIds.has("some-product-id")) {
  // This product has never been synced before
}

Archives

Utility methods for offline development and caching extracted data as gzip-compressed JSON files.

connector.loadFromArchive<T>(archiveName, options?): Promise<T | null>

Loads and decompresses a {archivePath}/{archiveName}.json.gz file. Returns the parsed data or null if the file doesn't exist.

const products = await connector.loadFromArchive<Product[]>("products", {
  archivePath: "./offline-archives",
  dateFields: ["updatedAt", "createdAt"],
});

| Option | Type | Description | | --- | --- | --- | | archivePath | string | Directory for archive files (default: "./offline-archives") | | dateFields | string[] | Field names to convert from strings back to Date objects |

connector.storeToArchive(archiveName, data, options?): Promise<void>

Serializes data as JSON, gzip-compresses it, and writes it to {archivePath}/{archiveName}.json.gz. Creates the directory if it doesn't exist.

await connector.storeToArchive("products", fetchedProducts, {
  archivePath: "./offline-archives",
});

Work Queue

runFromExternalWork(mainFn, options): Promise<void>

Orchestrates an ETL run driven by the Unchained work queue. This is designed for connectors that run as long-lived workers polling for work items.

The function:

  1. Allocates work of the given workType from the Unchained work queue
  2. If no work is available, cleans up any dead/zombie workers and returns
  3. Runs mainFn with a timeout (configurable via UNCHAINED_CONNECTOR_TIMEOUT, default: 30 minutes)
  4. Reports success or failure back to the Unchained engine
import { Connector, runFromExternalWork } from "@unchainedshop/connector-sdk";

async function syncProducts(options) {
  const connector = new Connector();
  try {
    await connector.init();
    // ... ETL logic ...
    await connector.reportSuccess(ids);
    return connector.journalEntry;
  } finally {
    await connector.dispose();
  }
}

await runFromExternalWork(syncProducts, {
  workType: "SYNC_PRODUCTS",
  unchainedAPI: connector.unchained,
});
type RunFromExternalWorkOptions<T> = {
  workType: string;        // The work type to allocate from the queue
  unchainedAPI: UnchainedAPI;  // Initialized Unchained API client
} & T;  // Additional options are passed through to mainFn

The mainFn must return a JournalEntry. If journalEntry.status === "COMPLETE", the work is reported as successful; otherwise it's reported as failed.

timeout(minutes): Promise<void>

Returns a promise that rejects after the given number of minutes. Used internally by runFromExternalWork but exported for custom timeout logic.

import { timeout } from "@unchainedshop/connector-sdk";

await Promise.race([
  longRunningOperation(),
  timeout(10),
]);

Remotes

Low-level API clients available via the remotes namespace. These are used internally by the Connector class but can be used directly for advanced use cases.

import { remotes } from "@unchainedshop/connector-sdk";

remotes.mongodb

MongoDB client initialization and in-memory server management.

initMongoDBClient(options?): Promise<MongoClient>

Creates and returns a connected MongoClient. Connection settings: 100s connect timeout, 1h socket timeout, ignoreUndefined: true.

const mongo = await remotes.mongodb.initMongoDBClient({
  mongoUrl: "mongodb://localhost:27017/connector",
});
resolveURI(options?): Promise<string>

Resolves the MongoDB connection URI. Checks (in order):

  1. options.mongoUrl
  2. MONGO_URL environment variable
  3. If NODE_ENV=development, auto-starts an in-memory MongoDB server
start(): Promise<{ uri, port, dbPath, dbName }>

Starts a MongoMemoryServer instance. Useful for development and testing.

stop(): Promise<void>

Stops the in-memory MongoDB server if one was started.

remotes.unchained

remotes.unchained.default(options?): UnchainedAPI

Creates an Unchained API client. All requests use Bearer admin:<secret> authentication with a 5-minute default timeout (15 minutes for streaming uploads).

const api = remotes.unchained.default({
  unchainedEndpoint: "https://engine.example.com",
  unchainedSecret: "my-secret",
  customHeaders: { "X-Custom": "value" },
});

UnchainedAPI methods:

| Method | Description | | --- | --- | | submitEventsAsStream(fileName, submitOptions?) | Stream a temp file to the /bulk-import endpoint (15-min timeout) | | submitEvents(body, submitOptions?) | POST a JSON body to the /bulk-import endpoint | | allocateWork(workType) | Allocate the next available work item from the queue via GraphQL. Returns the work object, null if none available, or false on connection failure. | | finishWork(workId, { success, result, error? }) | Report work completion/failure back to the engine via GraphQL | | findDeadWork(workType) | Find work items with ALLOCATED status for the current worker (zombie detection) | | addExternalWork(workType, input) | Add a new work item to the queue via GraphQL | | submitErrorMessage(title, content) | Send an error report as a MESSAGE work item | | unchainedFetch(path, params, fetchOptions) | Low-level HTTP wrapper with auth headers. Params are appended as URL search params. |

remotes.cockpit

Cockpit CMS client for fetching content.

remotes.cockpit.default(options): CockpitAPI
const cockpit = remotes.cockpit.default({
  endpoint: "https://cockpit.example.com",
  apiKey: "my-api-key",
});

CockpitAPI methods:

| Method | Description | | --- | --- | | fetchEntries(entity, locale?, { filter? }) | Fetch entries from a Cockpit collection. Locale defaults to "default" (mapped from "de"). | | findLocaleTranslation(projectName, locale?) | Fetch localization data for a Lokalize project. Locale defaults to "de". | | fetchPageById(page, id, params?) | Fetch a specific page by ID. |


Utilities

normalizePrice(textualPrice?: string | number): number

Converts a price value (string or number) to an integer in the smallest currency unit (cents). Handles thousands separators (commas, apostrophes, backticks) in string inputs.

import { normalizePrice } from "@unchainedshop/connector-sdk";

normalizePrice("2,299.00"); // 229900
normalizePrice(22.99);       // 2299
normalizePrice(undefined);   // NaN

Types

All types are exported for use in consumer code:

import type {
  BulkImportEvent,
  HashedBulkImportEvent,
  BulkImportPayload,
  BulkImportEntity,
  BulkImportOperation,
  SubmitOptions,
  ConnectorOptions,
  JournalEntry,
  RunFromExternalWorkOptions,
  UnchainedAPI,
  Db,
  MongoClient,
} from "@unchainedshop/connector-sdk";

BulkImportEvent<P>

interface BulkImportEvent<P extends BulkImportPayload = BulkImportPayload> {
  entity: BulkImportEntity;
  operation: BulkImportOperation;
  payload: P;
}

BulkImportPayload

interface BulkImportPayload {
  _id: string;
  [key: string]: unknown;
}

BulkImportEntity

One of "PRODUCT", "ASSORTMENT", "FILTER", "ENROLLMENT", "ORDER", "USER", "QUOTATION", or any custom string.

BulkImportOperation

"CREATE" | "UPDATE" | "REMOVE"

HashedBulkImportEvent<P>

Extends BulkImportEvent with a hash: string field used for deduplication.


MongoDB Collections

The connector uses two internal collections in the staging database:

| Collection | Purpose | | --- | --- | | journal | Tracks each ETL run with status, timestamps, and event IDs. Indexed on { status: 1, started: -1 }. | | unchained_submitted_events | Records every event submitted to Unchained with its payload hash for deduplication. Indexed on { emitted: 1, _id: 1 }, { payload._id: 1, entity: 1 }, { payload._id: 1 }, and { entity: 1 }. |

Extracted data is stored in user-defined collections (the collectionName parameter to extract()).


Complete Example: Work-Queue-Driven Connector

import {
  Connector,
  runFromExternalWork,
  remotes,
} from "@unchainedshop/connector-sdk";
import type { BulkImportEvent } from "@unchainedshop/connector-sdk";

// Initialize an Unchained API client for the work queue
const unchainedAPI = remotes.unchained.default();

async function syncProducts() {
  const connector = new Connector();

  try {
    await connector.init();

    // Use the journal's "since" for incremental fetches
    const since = connector.journalEntry.since;

    // Extract
    await connector.extract(
      () => fetchProductsUpdatedSince(since),
      "products",
      { replace: true },
    );

    // Transform
    const events: BulkImportEvent[] = [];
    const products = await connector.db
      .collection("products")
      .find()
      .toArray();

    for (const product of products) {
      events.push({
        entity: "PRODUCT",
        operation: "UPDATE",
        payload: {
          _id: product.externalId,
          title: product.name,
          price: product.price,
        },
      });
    }

    // Prepare (deduplicate + emit deletes)
    const prepared = connector.prepareEvents(events, {
      emitDeletes: "PRODUCT",
    });

    // Load
    const ids = await connector.load(prepared);
    await connector.reportSuccess(ids);

    return connector.journalEntry;
  } catch (e) {
    await connector.reportFailure("LOAD");
    throw e;
  } finally {
    await connector.dispose();
  }
}

// Poll the work queue in a loop
while (true) {
  await runFromExternalWork(syncProducts, {
    workType: "SYNC_PRODUCTS",
    unchainedAPI,
  });
  await new Promise((resolve) => setTimeout(resolve, 60_000));
}

License

ISC