npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@lde/pipeline

v0.28.7

Published

A framework for transforming large RDF datasets, primarily using [SPARQL](https://www.w3.org/TR/sparql11-query/) queries with TypeScript for the parts that are hard to express in SPARQL alone.

Downloads

2,849

Readme

Pipeline

A framework for transforming large RDF datasets, primarily using SPARQL queries with TypeScript for the parts that are hard to express in SPARQL alone.

  • SPARQL-native. Data transformations are plain SPARQL query files — portable, transparent, testable and version-controlled.
  • Composable. Executors are an interface: wrap a SPARQL executor with custom TypeScript to handle edge cases like date parsing or string normalisation (see Executor).
  • Extensible. A plugin system lets packages like @lde/pipeline-void (or your own plugins) hook into the pipeline lifecycle.

Components

A Pipeline consists of:

  • a Dataset Selector that selects which datasets to process
  • a Distribution Resolver that resolves each dataset to a usable SPARQL endpoint
  • one or more Stages, each consisting of:
    • an optional Item Selector that selects resources (as variable bindings) for fan-out
    • one or more Executors that generate triples

Dataset Selector

Selects datasets, either manually or by querying a DCAT Dataset Registry:

// From a registry
const selector = new RegistrySelector({
  registry: new Client(new URL('https://example.com/sparql')),
});

// Manual
const selector = new ManualDatasetSelection([dataset]);

Stage

A stage groups an item selector, one or more executors, and configuration:

new Stage({
  name: 'per-class',
  itemSelector: new SparqlItemSelector({
    query: 'SELECT DISTINCT ?class WHERE { ?s a ?class }',
  }),
  executors: executor,
  batchSize: 100,
  maxConcurrency: 5,
});

Batch size

batchSize (default: 10) controls how many variable bindings are passed to each executor call as a VALUES clause. It also sets the page size for the item selector's SPARQL requests, so that each paginated request fills exactly one executor batch.

Some SPARQL endpoints enforce different result limits for SELECT and CONSTRUCT queries. Since the selector uses SELECT and the executor uses CONSTRUCT, a LIMIT clause in the selector query overrides batchSize as the page size. Use this when the endpoint caps SELECT results below your desired batch size:

// Endpoint caps SELECT results at 500, but each CONSTRUCT can handle 1000 bindings.
new Stage({
  name: 'per-class',
  itemSelector: new SparqlItemSelector({
    query: 'SELECT DISTINCT ?class WHERE { ?s a ?class } LIMIT 500',
  }),
  executors: executor,
  batchSize: 1000, // Two SELECT pages fill one CONSTRUCT batch.
});

Concurrency

maxConcurrency (default: 10) limits the total number of concurrent SPARQL queries. Within each batch, all executors run in parallel; the number of concurrent batches is automatically reduced to ⌊maxConcurrency / executorCount⌋ so the total query pressure stays within the limit. For example, with maxConcurrency: 10 and two executors per stage, up to 5 batches run concurrently (10 SPARQL queries total).

Item Selector

Selects resources from the distribution and fans out executor calls per batch of results. Implements the ItemSelector interface:

interface ItemSelector {
  select(distribution: Distribution, batchSize?: number): AsyncIterable<VariableBindings>;
}

The distribution is received at run time, so selectors don't need the endpoint URL at construction time. The batchSize parameter is set by the stage. Use SparqlItemSelector for SPARQL-based selection with automatic pagination:

new SparqlItemSelector({
  query: 'SELECT DISTINCT ?class WHERE { ?s a ?class }',
});

For dynamic queries that depend on the distribution, implement ItemSelector directly:

const itemSelector: ItemSelector = {
  select: (distribution, batchSize) => {
    const query = buildQuery(distribution);
    return new SparqlItemSelector({ query }).select(distribution, batchSize);
  },
};

Executor

Generates RDF triples. The built-in SparqlConstructExecutor runs a SPARQL CONSTRUCT query with template substitution and variable bindings:

const executor = new SparqlConstructExecutor({
  query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }',
});

When querying endpoints that return line-oriented formats like N-Triples (e.g. QLever), enable lineBuffer to work around an N3.js chunk-splitting bug that causes intermittent parse errors on large responses:

const executor = new SparqlConstructExecutor({
  query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }',
  lineBuffer: true,
});

SPARQL CONSTRUCT queries can produce duplicate triples — for example, constant triples (like ?dataset a edm:ProvidedCHO) are emitted for every solution row. Enable deduplicate to remove duplicates inline on the stream using a string-based identity set (inspired by Comunica's distinctConstruct):

const executor = new SparqlConstructExecutor({
  query: 'CONSTRUCT { ?s a edm:ProvidedCHO . ?s ?p ?o } WHERE { ?s ?p ?o }',
  deduplicate: true,
});

The dedup set is scoped to each execute() call, so memory stays bounded to the number of unique quads per batch. A standalone deduplicateQuads() function is also exported for use outside the executor.

Executor is an interface, so you can implement your own for logic that's hard to express in pure SPARQL — for example, cleaning up messy date notations or converting locale-specific dates to ISO 8601. The decorator pattern lets you wrap a SPARQL executor and post-process its quad stream in TypeScript:

import { DataFactory } from 'n3';
import type { Quad, Literal } from '@rdfjs/types';
import type { Dataset, Distribution } from '@lde/dataset';
import {
  type Executor,
  type ExecuteOptions,
  NotSupported,
} from '@lde/pipeline';

class TransformExecutor implements Executor {
  constructor(
    private readonly inner: Executor,
    private readonly transform: (
      quads: AsyncIterable<Quad>,
      dataset: Dataset,
    ) => AsyncIterable<Quad>,
  ) {}

  async execute(
    dataset: Dataset,
    distribution: Distribution,
    options?: ExecuteOptions,
  ): Promise<AsyncIterable<Quad> | NotSupported> {
    const result = await this.inner.execute(dataset, distribution, options);
    if (result instanceof NotSupported) return result;
    return this.transform(result, dataset);
  }
}

Then use it to wrap any SPARQL executor:

new Stage({
  name: 'dates',
  executors: new TransformExecutor(
    await SparqlConstructExecutor.fromFile('dates.rq'),
    async function* (quads) {
      for await (const quad of quads) {
        if (quad.object.termType === 'Literal' && isMessyDate(quad.object)) {
          const cleaned = DataFactory.literal(
            parseDutchDate(quad.object.value),
            DataFactory.namedNode('http://www.w3.org/2001/XMLSchema#date'),
          );
          yield DataFactory.quad(quad.subject, quad.predicate, cleaned);
        } else {
          yield quad;
        }
      }
    },
  ),
});

This keeps SPARQL doing the heavy lifting while TypeScript handles the edge cases. See @lde/pipeline-void's VocabularyExecutor for a real-world example of this pattern.

Validation

Stages can optionally validate their output quads against a Validator. Validation operates on the combined output of all executors per batch, not on individual quads or per-executor output. A batch produces a complete result set — a self-contained cluster of linked resources — that can be meaningfully matched against SHACL shapes. Even with a single executor, each batch is a complete unit; with multiple executors, shapes that reference triples from different executors are validated correctly.

Validating individual quads would be meaningless, since a single quad carries no structural context for shape matching. Validating the full pipeline output would also be problematic: because the pipeline streams results in batches, it doesn’t know where resource cluster boundaries fall. Batching the output could split a valid cluster across two batches, causing partial resources to fail validation even though the complete cluster is valid.

Quads are buffered, validated, and then written or discarded based on the onInvalid policy. When no validator is configured, quads stream directly with zero overhead.

import { ShaclValidator } from '@lde/pipeline-shacl-validator';

new Stage({
  name: 'transform',
  executors: await SparqlConstructExecutor.fromFile('transform.rq'),
  validation: {
    validator: new ShaclValidator({
      shapesFile: './shapes.ttl',
      reportDir: './validation',
    }),
    onInvalid: 'write', // 'write' (default) | 'skip' | 'halt'
  },
});

| onInvalid | Behaviour | | ----------- | -------------------------------------------------- | | 'write' | Write quads even if validation fails (default) | | 'skip' | Discard the batch silently | | 'halt' | Throw an error, stopping the pipeline |

Validator is an interface, so you can implement your own validation strategy. See @lde/pipeline-shacl-validator for the SHACL implementation.

Writer

Writes generated quads to a destination:

  • SparqlUpdateWriter — writes to a SPARQL endpoint via UPDATE queries
  • FileWriter — writes to local files

Plugins

Plugins hook into the pipeline lifecycle via the PipelinePlugin interface. Register them in the plugins array when constructing a Pipeline.

provenancePlugin()

Appends PROV-O provenance quads (prov:Entity, prov:Activity, prov:startedAtTime, prov:endedAtTime) to every stage’s output.

schemaOrgNormalizationPlugin()

Normalizes http://schema.org/ to https://schema.org/ in void:class and void:property quad objects, so downstream consumers can rely on a single canonical namespace. void:vocabulary quads are left unchanged so consumers can see which namespace the source dataset actually uses.

import { schemaOrgNormalizationPlugin, provenancePlugin } from '@lde/pipeline';

new Pipeline({
  // ...
  plugins: [schemaOrgNormalizationPlugin(), provenancePlugin()],
});

Usage

import {
  Pipeline,
  Stage,
  SparqlConstructExecutor,
  SparqlItemSelector,
  SparqlUpdateWriter,
  ManualDatasetSelection,
} from '@lde/pipeline';

const pipeline = new Pipeline({
  datasetSelector: new ManualDatasetSelection([dataset]),
  stages: [
    new Stage({
      name: 'per-class',
      itemSelector: new SparqlItemSelector({
        query: 'SELECT DISTINCT ?class WHERE { ?s a ?class }',
      }),
      executors: new SparqlConstructExecutor({
        query:
          'CONSTRUCT { ?class a <http://example.org/Class> } WHERE { ?s a ?class }',
      }),
    }),
  ],
  writers: new SparqlUpdateWriter({
    endpoint: new URL('http://localhost:7200/repositories/lde/statements'),
  }),
});

await pipeline.run();