npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@kk-007/market-data-engine

v1.0.0

Published

Broker-agnostic market data engine for real-time tick ingestion, OHLC candle generation, and indicator computation

Readme

market-data-engine

Broker-agnostic market data engine for the Indian stock market. Ingests real-time ticks via built-in WebSocket clients, builds OHLCV candles, computes indicators, and maintains a universal instrument registry — all backed by PostgreSQL.

Built for algo-trading systems, charting backends, and market data pipelines that need to work across multiple brokers without vendor lock-in.

Why

If you're building a trading system in India, you'll hit these problems:

  • Every broker has a different data format. Angel One's instrument master looks nothing like Zerodha's. Their WebSocket tick formats differ too.
  • You need candles from raw ticks. Brokers give you last-traded-price. You need 1m, 5m, 15m OHLCV candles.
  • Indicators need candle history. Computing MA/EMA means first solving candle storage and retrieval.
  • Switching brokers means rewriting everything. Your data layer shouldn't care which broker fed it.

This package solves all of that — provide your broker credentials, and the engine handles instrument sync, WebSocket connections, tick ingestion, candle building, and indicator computation.

Who is this for

  • Algo traders building automated trading systems on Angel One, Zerodha, or both
  • Fintech developers building charting or analytics backends for Indian markets
  • Quantitative analysts who need reliable candle and indicator data from multiple sources
  • Anyone tired of writing broker-specific data pipelines from scratch

Features

  • Built-in broker support — Angel One (SmartAPI) and Zerodha (Kite Connect) with HTTP instrument sync and WebSocket tick streaming
  • Real-time candle building — ticks automatically aggregated into configurable OHLCV timeframes (1m, 5m, 15m, or any custom interval)
  • Duplicate tick handling — out-of-order and duplicate ticks are automatically rejected
  • On-demand indicators — MA and EMA computed from stored candles
  • Universal instrument registry — deterministic instrument IDs across brokers with token mapping
  • Versioned schema migrations — safe, transactional database migrations
  • Separate sync/socket credentials — use different API keys for instrument sync vs WebSocket
  • PostgreSQL storage — battle-tested, with batch inserts and connection pooling
  • Idempotent operations — safe to replay ticks, re-sync instruments
  • Auto-reconnect — WebSocket reconnects with exponential backoff
  • TypeScript-first — full type safety with exported interfaces and declaration maps
  • Custom adapter support — extend with your own brokers beyond the built-in ones

Install

npm install market-data-engine

Peer requirement: PostgreSQL 14+, Node.js 18+

Quick Start

1. Set up PostgreSQL

Using Docker (quickest):

docker run -d --name marketdata-pg \
  -e POSTGRES_USER=marketdata \
  -e POSTGRES_PASSWORD=marketdata \
  -e POSTGRES_DB=marketdata \
  -p 5432:5432 \
  postgres:16-alpine

2. Create the engine

import { createEngine } from 'market-data-engine';

const engine = createEngine({
  dbUrl: 'postgresql://marketdata:marketdata@localhost:5432/marketdata',
  timeframes: [60, 300, 900], // 1m, 5m, 15m candles
  brokers: {
    angel: {
      apiKey: 'your-angel-api-key',
      clientId: 'your-client-id',
      jwtToken: 'your-jwt-token',
      feedToken: 'your-feed-token',
    },
    zerodha: {
      apiKey: 'your-kite-api-key',
      accessToken: 'your-access-token',
    },
  },
});

3. Run migrations

await engine.migrate(); // Creates tables, applies pending migrations

4. Sync instruments

The package fetches instrument lists from the broker APIs automatically:

await engine.syncInstruments('ANGEL');
await engine.syncInstruments('ZERODHA');

5. Connect WebSocket and subscribe

// Connect to all configured brokers
await engine.connect();

// Subscribe to instruments — engine resolves broker tokens from DB
await engine.subscribe({
  instrumentIds: ['NSE:RELIANCE', 'NSE:TCS', 'NFO:NIFTY:2024-04-25:24500:CE'],
  broker: 'ANGEL',
});

// Ticks auto-ingest into the candle builder. Optionally listen:
engine.onTick((tick) => {
  console.log(`${tick.instrumentId} @ ${tick.price}`);
});

engine.onError((err, broker) => {
  console.error(`[${broker}]`, err.message);
});

6. Query candles

const candles = await engine.getCandles({
  instrumentId: 'NSE:RELIANCE',
  timeframe: 60,    // 1-minute candles
  limit: 100,
});

7. Compute indicators

const ma = await engine.getMA({
  instrumentId: 'NSE:RELIANCE',
  timeframe: 300,   // on 5m candles
  period: 20,
  limit: 50,
});

const ema = await engine.getEMA({
  instrumentId: 'NSE:RELIANCE',
  timeframe: 300,
  period: 20,
  limit: 50,
});

8. Shutdown gracefully

await engine.shutdown(); // Disconnects WS, flushes candles, closes DB pool

Angel One Authentication

The package provides a login helper to get session tokens:

import { angelLogin } from 'market-data-engine';

const tokens = await angelLogin({
  apiKey: 'your-api-key',
  clientId: 'your-client-id',
  password: 'your-password',
  totp: '123456', // Current TOTP code
});

// Use the returned tokens in your engine config
const engine = createEngine({
  dbUrl: '...',
  timeframes: [60],
  brokers: {
    angel: {
      apiKey: 'your-api-key',
      clientId: 'your-client-id',
      jwtToken: tokens.jwtToken,
      feedToken: tokens.feedToken,
    },
  },
});

For Zerodha, complete the Kite Connect OAuth flow to obtain your access_token, then pass it directly to the config.

Separate Sync/Socket Credentials

Use different credentials for instrument sync (HTTP) vs tick streaming (WebSocket):

const engine = createEngine({
  dbUrl: '...',
  timeframes: [60],
  brokers: {
    angel: {
      sync: {
        apiKey: 'sync-key',
        clientId: 'sync-client',
        jwtToken: 'sync-jwt',
        feedToken: 'sync-feed',
      },
      socket: {
        apiKey: 'socket-key',
        clientId: 'socket-client',
        jwtToken: 'socket-jwt',
        feedToken: 'socket-feed',
      },
    },
  },
});

If you pass a plain config (not { sync, socket }), the same credentials are used for both.

API Reference

createEngine(config)

Creates the engine instance.

| Config | Type | Description | |---|---|---| | dbUrl | string | PostgreSQL connection string | | timeframes | number[] | Candle intervals in seconds (e.g. [60, 300, 900]) | | brokers | BrokersConfig? | Built-in broker credentials (Angel, Zerodha) | | adapters | BrokerAdapter[]? | Custom adapters for unsupported brokers | | flushBatchSize | number? | Max candles per batch insert (default: 500) |

engine.ingestTick(tick)

Manually ingest a tick. Duplicate and out-of-order ticks are automatically rejected.

interface Tick {
  instrumentId: string;  // e.g. "NSE:RELIANCE"
  price: number;
  timestamp: number;     // epoch ms
  volume?: number;
}

engine.syncInstruments(brokerName)

Sync the full instrument list from a broker. The package fetches from the broker's API, normalizes records, generates deterministic IDs, and stores broker token mappings.

engine.getCandles(params)

Query stored OHLCV candles. Returns Candle[] in descending timestamp order.

interface CandleQueryParams {
  instrumentId: string;
  timeframe: number;     // seconds
  from?: number;         // epoch ms, inclusive
  to?: number;           // epoch ms, inclusive
  limit?: number;        // default 500
  minVolume?: number;    // filter out low-volume candles
}

engine.getMA(params) / engine.getEMA(params)

Compute indicators on-demand from stored candles.

interface IndicatorQueryParams {
  instrumentId: string;
  timeframe: number;
  period: number;        // e.g. 20 for MA(20)
  limit?: number;        // default 100
}

engine.connect(broker?)

Connect WebSocket for a specific broker or all configured brokers. Pre-loads broker token → instrument ID mappings from the database.

engine.disconnect(broker?)

Disconnect WebSocket for a specific broker or all.

engine.subscribe(params)

Subscribe to real-time ticks. The engine resolves instrument IDs to broker tokens automatically from DB mappings.

await engine.subscribe({
  instrumentIds: ['NSE:RELIANCE', 'NSE:TCS'],
  broker: 'ANGEL',
});

engine.unsubscribe(params)

Unsubscribe from instruments. Same params as subscribe.

engine.onTick(callback)

Register a callback for every incoming WebSocket tick (after instrument ID resolution). Ticks are also auto-ingested into the candle builder.

engine.onError(callback)

Register a callback for WebSocket errors. Receives (error: Error, broker: string).

engine.migrate()

Run all pending database migrations. Returns an array of newly applied version strings. Migrations run in transactions and roll back on failure.

engine.getMigrationStatus()

Returns a list of applied migrations with timestamps.

engine.shutdown()

Disconnects all WebSockets, flushes in-memory candles to DB, and closes the connection pool.

angelLogin(params)

Login to Angel One SmartAPI. Returns { jwtToken, refreshToken, feedToken }.

generateInstrumentId(instrument)

Generate a deterministic instrument ID from its attributes.

bucketTimestamp(timestampMs, timeframeSec)

Compute the candle bucket start timestamp for a given tick.

Instrument ID Format

Instruments get deterministic, human-readable IDs:

| Type | Format | Example | |---|---|---| | Equity | EXCHANGE:SYMBOL | NSE:RELIANCE | | Futures | EXCHANGE:SYMBOL:EXPIRY | NFO:NIFTY:2024-04-25 | | Options | EXCHANGE:SYMBOL:EXPIRY:STRIKE:TYPE | NFO:NIFTY:2024-04-25:24500:CE |

The same instrument from different brokers maps to the same ID — that's how the engine stays broker-agnostic.

Writing a Custom Adapter

For brokers not built-in, implement the BrokerAdapter interface and pass via adapters:

import { BrokerAdapter, createEngine } from 'market-data-engine';

const myAdapter: BrokerAdapter = {
  name: 'MY_BROKER',

  async *fetchInstruments() {
    const data = await fetchFromMyBrokerAPI();
    for (const item of data) {
      yield item;
    }
  },

  normalizeInstrument(raw) {
    return {
      id: '',  // Auto-generated by the engine
      exchange: raw.exchange,
      tradingsymbol: raw.symbol,
      type: raw.type,
      expiry: raw.expiry || undefined,
      strike: raw.strike || undefined,
      underlying: raw.underlying || undefined,
    };
  },

  getBrokerToken(raw) {
    return raw.token;
  },
};

const engine = createEngine({
  dbUrl: '...',
  timeframes: [60],
  adapters: [myAdapter],
});

How Candle Building Works

The engine maintains in-memory partial candles. Each tick is bucketed:

bucket = floor(timestamp / timeframe) * timeframe

Within a bucket, OHLCV is updated:

  • Open — first tick's price
  • High — max of all ticks
  • Low — min of all ticks
  • Close — last tick's price
  • Volume — sum of all ticks

When a tick falls into a new bucket, the previous candle is flushed to PostgreSQL. Completed candles are batch-inserted for performance. Duplicate ticks (same timestamp + same price) and out-of-order ticks are automatically rejected.

Database

Migrations

The package uses versioned, transactional migrations. Call engine.migrate() on startup — it creates tables on first run and applies new migrations on updates. Migration files live in src/db/migrations/.

Schema

Four tables:

  • schema_migrations — tracks applied migration versions
  • instruments — universal instrument registry
  • instrument_mappings — broker-specific token to instrument ID mapping
  • candles — OHLCV data, keyed by (instrument_id, timeframe, timestamp)

All writes use ON CONFLICT ... DO UPDATE — safe to replay, safe to restart.

Architecture

Broker API (HTTP)         Broker WebSocket
      |                         |
      v                         v
  Adapter                  SocketAdapter
  (instrument sync)        (tick streaming)
      |                         |
      +----------+--------------+
                 |
                 v
              Engine
                 |
         +-------+-------+
         |               |
         v               v
   CandleBuilder    Reverse Map
   (in-memory       (brokerToken
    OHLCV agg)      -> instrumentId)
         |
         v
     PostgreSQL
         |
         v
     Query API
  (getCandles, getMA, getEMA)

Where to Use This

  • As a library in your trading botnpm install market-data-engine and call the API directly
  • As the data layer in a charting backend — connect WebSockets, serve candles via your REST API
  • As a standalone data collector — run a process that syncs instruments and streams ticks into PostgreSQL
  • In a microservice architecture — one service ingests ticks, others query candles and indicators from the shared database

License

MIT