@kk-007/market-data-engine
v1.0.0
Published
Broker-agnostic market data engine for real-time tick ingestion, OHLC candle generation, and indicator computation
Maintainers
Readme
market-data-engine
Broker-agnostic market data engine for the Indian stock market. Ingests real-time ticks via built-in WebSocket clients, builds OHLCV candles, computes indicators, and maintains a universal instrument registry — all backed by PostgreSQL.
Built for algo-trading systems, charting backends, and market data pipelines that need to work across multiple brokers without vendor lock-in.
Why
If you're building a trading system in India, you'll hit these problems:
- Every broker has a different data format. Angel One's instrument master looks nothing like Zerodha's. Their WebSocket tick formats differ too.
- You need candles from raw ticks. Brokers give you last-traded-price. You need 1m, 5m, 15m OHLCV candles.
- Indicators need candle history. Computing MA/EMA means first solving candle storage and retrieval.
- Switching brokers means rewriting everything. Your data layer shouldn't care which broker fed it.
This package solves all of that — provide your broker credentials, and the engine handles instrument sync, WebSocket connections, tick ingestion, candle building, and indicator computation.
Who is this for
- Algo traders building automated trading systems on Angel One, Zerodha, or both
- Fintech developers building charting or analytics backends for Indian markets
- Quantitative analysts who need reliable candle and indicator data from multiple sources
- Anyone tired of writing broker-specific data pipelines from scratch
Features
- Built-in broker support — Angel One (SmartAPI) and Zerodha (Kite Connect) with HTTP instrument sync and WebSocket tick streaming
- Real-time candle building — ticks automatically aggregated into configurable OHLCV timeframes (1m, 5m, 15m, or any custom interval)
- Duplicate tick handling — out-of-order and duplicate ticks are automatically rejected
- On-demand indicators — MA and EMA computed from stored candles
- Universal instrument registry — deterministic instrument IDs across brokers with token mapping
- Versioned schema migrations — safe, transactional database migrations
- Separate sync/socket credentials — use different API keys for instrument sync vs WebSocket
- PostgreSQL storage — battle-tested, with batch inserts and connection pooling
- Idempotent operations — safe to replay ticks, re-sync instruments
- Auto-reconnect — WebSocket reconnects with exponential backoff
- TypeScript-first — full type safety with exported interfaces and declaration maps
- Custom adapter support — extend with your own brokers beyond the built-in ones
Install
npm install market-data-enginePeer requirement: PostgreSQL 14+, Node.js 18+
Quick Start
1. Set up PostgreSQL
Using Docker (quickest):
docker run -d --name marketdata-pg \
-e POSTGRES_USER=marketdata \
-e POSTGRES_PASSWORD=marketdata \
-e POSTGRES_DB=marketdata \
-p 5432:5432 \
postgres:16-alpine2. Create the engine
import { createEngine } from 'market-data-engine';
const engine = createEngine({
dbUrl: 'postgresql://marketdata:marketdata@localhost:5432/marketdata',
timeframes: [60, 300, 900], // 1m, 5m, 15m candles
brokers: {
angel: {
apiKey: 'your-angel-api-key',
clientId: 'your-client-id',
jwtToken: 'your-jwt-token',
feedToken: 'your-feed-token',
},
zerodha: {
apiKey: 'your-kite-api-key',
accessToken: 'your-access-token',
},
},
});3. Run migrations
await engine.migrate(); // Creates tables, applies pending migrations4. Sync instruments
The package fetches instrument lists from the broker APIs automatically:
await engine.syncInstruments('ANGEL');
await engine.syncInstruments('ZERODHA');5. Connect WebSocket and subscribe
// Connect to all configured brokers
await engine.connect();
// Subscribe to instruments — engine resolves broker tokens from DB
await engine.subscribe({
instrumentIds: ['NSE:RELIANCE', 'NSE:TCS', 'NFO:NIFTY:2024-04-25:24500:CE'],
broker: 'ANGEL',
});
// Ticks auto-ingest into the candle builder. Optionally listen:
engine.onTick((tick) => {
console.log(`${tick.instrumentId} @ ${tick.price}`);
});
engine.onError((err, broker) => {
console.error(`[${broker}]`, err.message);
});6. Query candles
const candles = await engine.getCandles({
instrumentId: 'NSE:RELIANCE',
timeframe: 60, // 1-minute candles
limit: 100,
});7. Compute indicators
const ma = await engine.getMA({
instrumentId: 'NSE:RELIANCE',
timeframe: 300, // on 5m candles
period: 20,
limit: 50,
});
const ema = await engine.getEMA({
instrumentId: 'NSE:RELIANCE',
timeframe: 300,
period: 20,
limit: 50,
});8. Shutdown gracefully
await engine.shutdown(); // Disconnects WS, flushes candles, closes DB poolAngel One Authentication
The package provides a login helper to get session tokens:
import { angelLogin } from 'market-data-engine';
const tokens = await angelLogin({
apiKey: 'your-api-key',
clientId: 'your-client-id',
password: 'your-password',
totp: '123456', // Current TOTP code
});
// Use the returned tokens in your engine config
const engine = createEngine({
dbUrl: '...',
timeframes: [60],
brokers: {
angel: {
apiKey: 'your-api-key',
clientId: 'your-client-id',
jwtToken: tokens.jwtToken,
feedToken: tokens.feedToken,
},
},
});For Zerodha, complete the Kite Connect OAuth flow to obtain your access_token, then pass it directly to the config.
Separate Sync/Socket Credentials
Use different credentials for instrument sync (HTTP) vs tick streaming (WebSocket):
const engine = createEngine({
dbUrl: '...',
timeframes: [60],
brokers: {
angel: {
sync: {
apiKey: 'sync-key',
clientId: 'sync-client',
jwtToken: 'sync-jwt',
feedToken: 'sync-feed',
},
socket: {
apiKey: 'socket-key',
clientId: 'socket-client',
jwtToken: 'socket-jwt',
feedToken: 'socket-feed',
},
},
},
});If you pass a plain config (not { sync, socket }), the same credentials are used for both.
API Reference
createEngine(config)
Creates the engine instance.
| Config | Type | Description |
|---|---|---|
| dbUrl | string | PostgreSQL connection string |
| timeframes | number[] | Candle intervals in seconds (e.g. [60, 300, 900]) |
| brokers | BrokersConfig? | Built-in broker credentials (Angel, Zerodha) |
| adapters | BrokerAdapter[]? | Custom adapters for unsupported brokers |
| flushBatchSize | number? | Max candles per batch insert (default: 500) |
engine.ingestTick(tick)
Manually ingest a tick. Duplicate and out-of-order ticks are automatically rejected.
interface Tick {
instrumentId: string; // e.g. "NSE:RELIANCE"
price: number;
timestamp: number; // epoch ms
volume?: number;
}engine.syncInstruments(brokerName)
Sync the full instrument list from a broker. The package fetches from the broker's API, normalizes records, generates deterministic IDs, and stores broker token mappings.
engine.getCandles(params)
Query stored OHLCV candles. Returns Candle[] in descending timestamp order.
interface CandleQueryParams {
instrumentId: string;
timeframe: number; // seconds
from?: number; // epoch ms, inclusive
to?: number; // epoch ms, inclusive
limit?: number; // default 500
minVolume?: number; // filter out low-volume candles
}engine.getMA(params) / engine.getEMA(params)
Compute indicators on-demand from stored candles.
interface IndicatorQueryParams {
instrumentId: string;
timeframe: number;
period: number; // e.g. 20 for MA(20)
limit?: number; // default 100
}engine.connect(broker?)
Connect WebSocket for a specific broker or all configured brokers. Pre-loads broker token → instrument ID mappings from the database.
engine.disconnect(broker?)
Disconnect WebSocket for a specific broker or all.
engine.subscribe(params)
Subscribe to real-time ticks. The engine resolves instrument IDs to broker tokens automatically from DB mappings.
await engine.subscribe({
instrumentIds: ['NSE:RELIANCE', 'NSE:TCS'],
broker: 'ANGEL',
});engine.unsubscribe(params)
Unsubscribe from instruments. Same params as subscribe.
engine.onTick(callback)
Register a callback for every incoming WebSocket tick (after instrument ID resolution). Ticks are also auto-ingested into the candle builder.
engine.onError(callback)
Register a callback for WebSocket errors. Receives (error: Error, broker: string).
engine.migrate()
Run all pending database migrations. Returns an array of newly applied version strings. Migrations run in transactions and roll back on failure.
engine.getMigrationStatus()
Returns a list of applied migrations with timestamps.
engine.shutdown()
Disconnects all WebSockets, flushes in-memory candles to DB, and closes the connection pool.
angelLogin(params)
Login to Angel One SmartAPI. Returns { jwtToken, refreshToken, feedToken }.
generateInstrumentId(instrument)
Generate a deterministic instrument ID from its attributes.
bucketTimestamp(timestampMs, timeframeSec)
Compute the candle bucket start timestamp for a given tick.
Instrument ID Format
Instruments get deterministic, human-readable IDs:
| Type | Format | Example |
|---|---|---|
| Equity | EXCHANGE:SYMBOL | NSE:RELIANCE |
| Futures | EXCHANGE:SYMBOL:EXPIRY | NFO:NIFTY:2024-04-25 |
| Options | EXCHANGE:SYMBOL:EXPIRY:STRIKE:TYPE | NFO:NIFTY:2024-04-25:24500:CE |
The same instrument from different brokers maps to the same ID — that's how the engine stays broker-agnostic.
Writing a Custom Adapter
For brokers not built-in, implement the BrokerAdapter interface and pass via adapters:
import { BrokerAdapter, createEngine } from 'market-data-engine';
const myAdapter: BrokerAdapter = {
name: 'MY_BROKER',
async *fetchInstruments() {
const data = await fetchFromMyBrokerAPI();
for (const item of data) {
yield item;
}
},
normalizeInstrument(raw) {
return {
id: '', // Auto-generated by the engine
exchange: raw.exchange,
tradingsymbol: raw.symbol,
type: raw.type,
expiry: raw.expiry || undefined,
strike: raw.strike || undefined,
underlying: raw.underlying || undefined,
};
},
getBrokerToken(raw) {
return raw.token;
},
};
const engine = createEngine({
dbUrl: '...',
timeframes: [60],
adapters: [myAdapter],
});How Candle Building Works
The engine maintains in-memory partial candles. Each tick is bucketed:
bucket = floor(timestamp / timeframe) * timeframeWithin a bucket, OHLCV is updated:
- Open — first tick's price
- High — max of all ticks
- Low — min of all ticks
- Close — last tick's price
- Volume — sum of all ticks
When a tick falls into a new bucket, the previous candle is flushed to PostgreSQL. Completed candles are batch-inserted for performance. Duplicate ticks (same timestamp + same price) and out-of-order ticks are automatically rejected.
Database
Migrations
The package uses versioned, transactional migrations. Call engine.migrate() on startup — it creates tables on first run and applies new migrations on updates. Migration files live in src/db/migrations/.
Schema
Four tables:
schema_migrations— tracks applied migration versionsinstruments— universal instrument registryinstrument_mappings— broker-specific token to instrument ID mappingcandles— OHLCV data, keyed by(instrument_id, timeframe, timestamp)
All writes use ON CONFLICT ... DO UPDATE — safe to replay, safe to restart.
Architecture
Broker API (HTTP) Broker WebSocket
| |
v v
Adapter SocketAdapter
(instrument sync) (tick streaming)
| |
+----------+--------------+
|
v
Engine
|
+-------+-------+
| |
v v
CandleBuilder Reverse Map
(in-memory (brokerToken
OHLCV agg) -> instrumentId)
|
v
PostgreSQL
|
v
Query API
(getCandles, getMA, getEMA)Where to Use This
- As a library in your trading bot —
npm install market-data-engineand call the API directly - As the data layer in a charting backend — connect WebSockets, serve candles via your REST API
- As a standalone data collector — run a process that syncs instruments and streams ticks into PostgreSQL
- In a microservice architecture — one service ingests ticks, others query candles and indicators from the shared database
License
MIT
