npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

node-es-transformer

v1.2.2

Published

Stream-based library for ingesting and transforming large data files (CSV/JSON) into Elasticsearch indices. Supports ES 8.x and 9.x with cross-version reindexing.

Readme

npm version npm downloads license Node.js version CI TypeScript Elasticsearch

node-es-transformer

Stream-based library for ingesting and transforming large data files (NDJSON/CSV/Parquet/Arrow IPC) into Elasticsearch indices.

Quick Start

npm install node-es-transformer
const transformer = require('node-es-transformer');

// Ingest a large JSON file
await transformer({
  fileName: 'data.json',
  targetIndexName: 'my-index',
  mappings: {
    properties: {
      '@timestamp': { type: 'date' },
      'message': { type: 'text' }
    }
  }
});

See Usage for more examples.

Why Use This?

If you need to ingest large NDJSON/CSV/Parquet/Arrow IPC files (GigaBytes) into Elasticsearch without running out of memory, this is the tool for you. Other solutions often run out of JS heap, hammer ES with too many requests, time out, or try to do everything in a single bulk request.

When to use this:

  • Large file ingestion (20-30 GB tested)
  • Custom JavaScript transformations
  • Cross-version migration (ES 8.x → 9.x)
  • Developer-friendly Node.js workflow

When to use alternatives:

Table of Contents

Features

  • Streaming and buffering: Files are read using streams and Elasticsearch ingestion uses buffered bulk indexing. Handles very large files (20-30 GB tested) without running out of memory.
  • High throughput: Up to 20k documents/second on a single machine (2.9 GHz Intel Core i7, 16GB RAM, SSD), depending on document size. See PERFORMANCE.md for benchmarks and tuning guidance.
  • Wildcard support: Ingest multiple files matching a pattern (e.g., logs/*.json).
  • Flexible sources: Read from files, Elasticsearch indices, or Node.js streams.
  • Reindexing with transforms: Fetch documents from existing indices and transform them using JavaScript.
  • Document splitting: Transform one source document into multiple target documents (e.g., tweets → hashtags).
  • Cross-version support: Seamlessly reindex between Elasticsearch 8.x and 9.x.

Version Compatibility

| node-es-transformer | Elasticsearch Client | Elasticsearch Server | Node.js | | ----------------------- | -------------------- | -------------------- | ------- | | 1.0.0+ | 8.x and 9.x | 8.x and 9.x | 22+ | | 1.0.0-beta7 and earlier | 8.x | 8.x | 18-20 |

Multi-Version Support: Starting with v1.0.0, the library supports both Elasticsearch 8.x and 9.x through automatic version detection and client aliasing. This enables seamless reindexing between major versions (e.g., migrating from ES 8.x to 9.x). All functionality is tested in CI against multiple ES versions including cross-version reindexing scenarios.

Upgrading? See MIGRATION.md for upgrade guidance from beta versions to v1.0.0.

Installation

npm install node-es-transformer
# or
yarn add node-es-transformer

Usage

Read NDJSON from a file

const transformer = require('node-es-transformer');

transformer({
  fileName: 'filename.json',
  targetIndexName: 'my-index',
  mappings: {
    properties: {
      '@timestamp': {
        type: 'date'
      },
      'first_name': {
        type: 'keyword'
      },
      'last_name': {
        type: 'keyword'
      }
      'full_name': {
        type: 'keyword'
      }
    }
  },
  transform(line) {
    return {
      ...line,
      full_name: `${line.first_name} ${line.last_name}`
    }
  }
});

Read CSV from a file

const transformer = require('node-es-transformer');

transformer({
  fileName: 'users.csv',
  sourceFormat: 'csv',
  targetIndexName: 'users-index',
  mappings: {
    properties: {
      id: { type: 'integer' },
      first_name: { type: 'keyword' },
      last_name: { type: 'keyword' },
      full_name: { type: 'keyword' },
    },
  },
  transform(row) {
    return {
      ...row,
      id: Number(row.id),
      full_name: `${row.first_name} ${row.last_name}`,
    };
  },
});

Read Parquet from a file

const transformer = require('node-es-transformer');

transformer({
  fileName: 'users.parquet',
  sourceFormat: 'parquet',
  targetIndexName: 'users-index',
  mappings: {
    properties: {
      id: { type: 'integer' },
      first_name: { type: 'keyword' },
      last_name: { type: 'keyword' },
      full_name: { type: 'keyword' },
    },
  },
  transform(row) {
    return {
      ...row,
      id: Number(row.id),
      full_name: `${row.first_name} ${row.last_name}`,
    };
  },
});

Read Arrow IPC from a file

const transformer = require('node-es-transformer');

transformer({
  fileName: 'users.arrow',
  sourceFormat: 'arrow',
  targetIndexName: 'users-index',
  mappings: {
    properties: {
      id: { type: 'integer' },
      first_name: { type: 'keyword' },
      last_name: { type: 'keyword' },
    },
  },
  transform(row) {
    return {
      ...row,
      id: Number(row.id),
    };
  },
});

Infer mappings from CSV sample

const transformer = require('node-es-transformer');

transformer({
  fileName: 'users.csv',
  sourceFormat: 'csv',
  targetIndexName: 'users-index',
  inferMappings: true,
  inferMappingsOptions: {
    sampleBytes: 200000,
    lines_to_sample: 2000,
  },
});

Read from another index

const transformer = require('node-es-transformer');

transformer({
  sourceIndexName: 'my-source-index',
  targetIndexName: 'my-target-index',
  // optional, if you skip mappings, they will be fetched from the source index.
  mappings: {
    properties: {
      '@timestamp': {
        type: 'date'
      },
      'first_name': {
        type: 'keyword'
      },
      'last_name': {
        type: 'keyword'
      }
      'full_name': {
        type: 'keyword'
      }
    }
  },
  transform(doc) {
    return {
      ...doc,
      full_name: `${line.first_name} ${line.last_name}`
    }
  }
});

Reindex from Elasticsearch 8.x to 9.x

The library automatically detects the Elasticsearch version and uses the appropriate client. This enables seamless reindexing between major versions:

const transformer = require('node-es-transformer');

// Auto-detection (recommended)
transformer({
  sourceClientConfig: {
    node: 'https://es8-cluster.example.com:9200',
    auth: { apiKey: 'your-es8-api-key' },
  },
  targetClientConfig: {
    node: 'https://es9-cluster.example.com:9200',
    auth: { apiKey: 'your-es9-api-key' },
  },
  sourceIndexName: 'my-source-index',
  targetIndexName: 'my-target-index',
  transform(doc) {
    // Optional transformation during reindexing
    return doc;
  },
});

// Explicit version specification (if auto-detection fails)
transformer({
  sourceClientConfig: {
    /* ... */
  },
  targetClientConfig: {
    /* ... */
  },
  sourceClientVersion: 8, // Force ES 8.x client
  targetClientVersion: 9, // Force ES 9.x client
  sourceIndexName: 'my-source-index',
  targetIndexName: 'my-target-index',
});

// Using pre-instantiated clients (advanced)
const { Client: Client8 } = require('es8');
const { Client: Client9 } = require('es9');

const sourceClient = new Client8({
  node: 'https://es8-cluster.example.com:9200',
});
const targetClient = new Client9({
  node: 'https://es9-cluster.example.com:9200',
});

transformer({
  sourceClient,
  targetClient,
  sourceIndexName: 'my-source-index',
  targetIndexName: 'my-target-index',
});

Note: To use pre-instantiated clients with different ES versions, install both client versions:

npm install es9@npm:@elastic/elasticsearch@^9.2.0
npm install es8@npm:@elastic/elasticsearch@^8.17.0

API Reference

Configuration Options

All options are passed to the main transformer() function.

Required Options

  • targetIndexName (string): The target Elasticsearch index where documents will be indexed.

Source Options

Choose one of these sources:

  • fileName (string): Source filename to ingest. Supports wildcards (e.g., logs/*.json, data/*.csv, data/*.parquet, data/*.arrow).
  • sourceIndexName (string): Source Elasticsearch index to reindex from.
  • stream (Readable): Node.js readable stream to ingest from.
  • sourceFormat ('ndjson' | 'csv' | 'parquet' | 'arrow'): Format for file/stream sources. Default: 'ndjson'.
    • arrow expects Arrow IPC file/stream payloads.
    • parquet stream sources are currently buffered in memory before row iteration (file sources remain streaming by row cursor).
    • parquet supports ZSTD-compressed files when running on Node.js 22+ (uses the built-in zlib zstd implementation).
    • parquet INT64 values are normalized for JSON: safe-range values become numbers, larger values become strings.
  • csvOptions (object): CSV parser options (delimiter, quote, columns, etc.) used when sourceFormat: 'csv'.

Client Configuration

  • sourceClient (Client): Pre-instantiated Elasticsearch client for source operations. If provided, sourceClientConfig is ignored.
  • targetClient (Client): Pre-instantiated Elasticsearch client for target operations. If not provided, uses sourceClient or creates from config.
  • sourceClientConfig (object): Elasticsearch client configuration for source. Default: { node: 'http://localhost:9200' }. Ignored if sourceClient is provided.
  • targetClientConfig (object): Elasticsearch client configuration for target. If not provided, uses sourceClientConfig. Ignored if targetClient is provided.
  • sourceClientVersion (8 | 9): Force specific ES client version for source. Auto-detected if not specified.
  • targetClientVersion (8 | 9): Force specific ES client version for target. Auto-detected if not specified.

Index Configuration

  • mappings (object): Elasticsearch document mappings for target index. If reindexing and not provided, mappings are copied from source index.
  • mappingsOverride (boolean): When reindexing, apply mappings on top of source index mappings. Default: false.
  • inferMappings (boolean): Infer mappings for fileName sources via /_text_structure/find_structure. Supported for sourceFormat: 'ndjson' and sourceFormat: 'csv' only. Ignored when mappings is provided. If inference returns ingest_pipeline, it is created as <targetIndexName>-inferred-pipeline and applied as the index default pipeline (unless pipeline is explicitly set). Default: false.
  • inferMappingsOptions (object): Options for /_text_structure/find_structure (for example sampleBytes, lines_to_sample, delimiter, quote, has_header_row, timeout).
  • deleteIndex (boolean): Delete target index if it exists before starting. Default: false.
  • indexMappingTotalFieldsLimit (number): Field limit for target index (index.mapping.total_fields.limit setting).
  • pipeline (string): Elasticsearch ingest pipeline name to use during indexing.

When inferMappings is enabled, the target cluster must allow /_text_structure/find_structure (cluster privilege: monitor_text_structure). If inferred ingest pipelines are used, the target cluster must also allow creating ingest pipelines (_ingest/pipeline).

Performance Options

  • bufferSize (number): Buffer size threshold in KBytes for bulk indexing. Default: 5120 (5 MB).
  • searchSize (number): Number of documents to fetch per search request when reindexing. Default: 100.
  • populatedFields (boolean): Detect which fields are actually populated in documents. Useful for optimizing indices with many mapped but unused fields. Default: false.

Processing Options

  • transform (function): Callback to transform documents. Signature: (doc, context?) => doc | doc[] | null | undefined.
    • Return transformed document
    • Return array of documents to split one source into multiple targets
    • Return null/undefined to skip document
  • query (object): Elasticsearch DSL query to filter source documents.
  • splitRegex (RegExp): Line split regex for file/stream sources when sourceFormat is 'ndjson'. Default: /\n/.
  • skipHeader (boolean): Header skipping for file/stream sources.
    • NDJSON: skips the first non-empty line
    • CSV: skips the first data line only when csvOptions.columns does not consume headers
    • Parquet/Arrow: ignored
    • Default: false
    • Applies only to fileName/stream sources
  • verbose (boolean): Enable verbose logging and progress bars when using the built-in logger. Default: true.
  • logger (object): Optional custom Pino-compatible logger. If omitted, the library creates an internal Pino logger (name: node-es-transformer) and uses LOG_LEVEL (if set) or info/error based on verbose.

Return Value

The transformer() function returns a Promise that resolves to an object with:

  • events (EventEmitter): Event emitter for monitoring progress.
    • 'queued': Document added to queue
    • 'indexed': Document successfully indexed
    • 'complete': All documents processed
    • 'error': Error occurred
const pino = require('pino');
const logger = pino({ name: 'my-app', level: process.env.LOG_LEVEL || 'info' });

const result = await transformer({
  /* options */
});

result.events.on('complete', () => {
  logger.info('Ingestion complete');
});

result.events.on('error', err => {
  logger.error({ err }, 'Ingestion failed');
});

TypeScript Support

Full TypeScript definitions are included. Import types for type-safe configuration:

import transformer, { TransformerOptions } from 'node-es-transformer';

const options: TransformerOptions = {
  fileName: 'data.json',
  targetIndexName: 'my-index',
};

See examples/typescript-example.ts for more examples.

Documentation

Error Handling

Always handle errors when using the library:

const pino = require('pino');
const logger = pino({ name: 'my-app', level: process.env.LOG_LEVEL || 'info' });

transformer({
  /* options */
})
  .then(() => logger.info('Success'))
  .catch(err => logger.error({ err }, 'Transformer failed'));

// Or with async/await
try {
  await transformer({
    /* options */
  });
  logger.info('Success');
} catch (err) {
  logger.error({ err }, 'Transformer failed');
}

More Examples

See the examples/ directory for practical code samples covering:

  • Basic file ingestion
  • Reindexing with transformations
  • Cross-version migration (ES 8.x → 9.x)
  • Document splitting
  • Wildcard file processing
  • Stream-based ingestion

Contributing

Contributions are welcome! Before starting work on a PR, please open an issue to discuss your proposed changes.

Support

This is a single-person best-effort project. While I aim to address issues and maintain the library, response times may vary. See VERSIONING.md for details on API stability and support expectations.

Getting help:

License

Apache 2.0