@sha3/tick-collector
v0.2.0
Published
Real-time market data collector for TensorFlow dataset generation.
Readme
@sha3/tick-collector
Real-time market data collector for TensorFlow dataset generation.
It connects to @sha3/crypto and @sha3/polymarket, subscribes continuously to crypto and Polymarket 5m/15m markets, and writes incremental compressed event logs (.ndjson.gz) to disk.
Why It Exists
Training pipelines need reproducible, chronologically ordered market snapshots. This service provides a long-running ingestion process that normalizes multiple live feeds into one stable append-only event format.
TL;DR (60s)
npm install
npm run check
npm run startCollector output is written to data/ by default.
Quick Start
npm run startStop with Ctrl+C (SIGINT) to trigger graceful flush and shutdown.
What It Collects
- Crypto feed events from
@sha3/crypto:crypto.pricecrypto.tradecrypto.orderbookcrypto.status
- Polymarket stream events from
@sha3/polymarket:polymarket.pricepolymarket.book
Event Contract
Each persisted line is a StoredEvent JSON object:
type StoredEvent = {
eventId: string;
source: "crypto" | "polymarket";
eventType: string;
ingestedAt: number; // canonical timestamp (always present)
exchangeTs?: number; // upstream timestamp, present when available
sequence: number;
symbol?: string;
provider?: string;
marketType?: "5m" | "15m";
marketStartAt?: number; // UTC epoch ms for market window start
assetId?: string;
payload: unknown; // raw event payload
};marketSlug is intentionally not persisted. Non-applicable fields are omitted (not stored as null).
JSON Line Types
Each line in .ndjson.gz is one StoredEvent. These are the concrete line categories:
crypto.pricesource="crypto",provider=<exchange|chainlink>,symbol=<btc|eth|sol|xrp>,payload.price.
crypto.orderbooksource="crypto",provider=<exchange>,symbol=<btc|eth|sol|xrp>,payload.bids/asks.
crypto.tradesource="crypto",provider=<exchange>,symbol=<btc|eth|sol|xrp>,payload.price/size.
crypto.statussource="crypto",provider=<provider>,payload.status/message.
polymarket.pricesource="polymarket",symbol=<btc|eth|sol|xrp>,marketType=<5m|15m>,marketStartAt=<epoch-ms>,assetId=<token-id>,payload.price.
polymarket.booksource="polymarket",symbol=<btc|eth|sol|xrp>,marketType=<5m|15m>,marketStartAt=<epoch-ms>,assetId=<token-id>,payload.bids/asks.
Storage Layout
Flat folders by artifact type (time metadata is embedded in file names):
data/
journal/part-XXXXXXXX-YYYYMMDD-HHMMSS-<ingestedAt>.ndjson.gz
manifests/part-XXXXXXXX-YYYYMMDD-HHMMSS-<ingestedAt>.manifest.json
manifests/part-XXXXXXXX-YYYYMMDD-HHMMSS-<ingestedAt>.index.jsonThe <ingestedAt> suffix is UTC epoch milliseconds for the part start event.
Manifest fields:
- file
- indexFile
- minIngestedAt
- maxIngestedAt
- eventCount
- sources
- eventTypes
- createdAt
Integration Guide (External Projects)
- Run this collector service in the environment with websocket egress.
- Consume
.ndjson.gzfiles fromdata/journalin chronological order. - Use
ingestedAtas canonical ordering key in your training export step. - Use manifest files to speed up incremental dataset scans.
Install + Import
npm install @sha3/tick-collectorimport { buildCollectorApp, PersistedEventStream } from "@sha3/tick-collector";Embedding In Another Service
import { buildCollectorApp } from "@sha3/tick-collector";
const app = buildCollectorApp("./data");
await app.start();
process.on("SIGINT", () => {
void app.stop();
});Public API Reference
Exports from src/index.ts:
buildCollectorApp(outputDir?: string): CollectorAppPersistedEventStreamReadDataPointOptionsReadDataPointRangeOptionsMarketDataPoint
CollectorApp public methods:
start(): Promise<void>stop(): Promise<void>
PersistedEventStream:
new PersistedEventStream({ folder })read({ timestamp, symbol, marketType, sources?, maxDistanceMs?, orderbookLevels? }): Promise<MarketDataPoint | null>readRange({ startTimestamp, endTimestamp, stepMs, symbol, marketType, sources?, maxDistanceMs?, orderbookLevels? }): Promise<MarketDataPoint[]>
Method Parameters
read(options):
timestamp:- Target UTC epoch milliseconds for the datapoint snapshot.
symbol:- Market asset symbol (
btc,eth,sol,xrp).
- Market asset symbol (
marketType:- Window type (
5mor15m).
- Window type (
sources(optional):- Overrides enabled read sources for this call.
maxDistanceMs(optional):- Maximum allowed time gap between
timestampand any selected source event. - Example:
30_000means each selected event must be within 30 seconds oftimestamp. - If no suitable event is found within this distance, that field is returned as missing (
nullin datapoint output +coverage.missingFieldsentry).
- Maximum allowed time gap between
orderbookLevels(optional):- Max bid/ask levels kept per orderbook snapshot (top-N depth).
readRange(options):
startTimestamp:- Inclusive UTC epoch milliseconds range start.
endTimestamp:- Inclusive UTC epoch milliseconds range end.
stepMs:- Sampling interval in milliseconds between datapoints (
> 0).
- Sampling interval in milliseconds between datapoints (
symbol,marketType,sources?,maxDistanceMs?,orderbookLevels?:- Same meaning as in
read(options).
- Same meaning as in
Example:
import { PersistedEventStream } from "@sha3/tick-collector";
const stream = new PersistedEventStream({ folder: "./data" });
const datapoint = await stream.read({
timestamp: Date.now(),
symbol: "btc",
marketType: "5m",
sources: ["binance", "coinbase", "kraken", "okx", "chainlink", "polymarket"],
maxDistanceMs: 30_000,
orderbookLevels: 20
});
if (datapoint) {
console.log(datapoint.polymarket.upPrice, datapoint.coverage.missingFields);
}
const range = await stream.readRange({
startTimestamp: Date.now() - 60_000,
endTimestamp: Date.now(),
stepMs: 5_000,
symbol: "btc",
marketType: "5m",
maxDistanceMs: 30_000
});
console.log("points", range.length);Configuration Reference (src/config.ts)
CONFIG is exported as a single default object.
CONFIG.COLLECTOR.symbols- Polymarket discovery symbols and crypto symbols (
btc/eth/sol/xrp).
- Polymarket discovery symbols and crypto symbols (
CONFIG.COLLECTOR.windows- Polymarket windows (
5m,15m).
- Polymarket windows (
CONFIG.COLLECTOR.enabledSources- Active sources/providers (
binance,chainlink,polymarket, etc.).
- Active sources/providers (
CONFIG.COLLECTOR.coalesceIntervalMs- Temporal coalescing window; only the last event per source/type/instrument is persisted per bucket.
CONFIG.COLLECTOR.outputDir- Base directory for journal/manifests.
CONFIG.COLLECTOR.flushIntervalMs- Flush interval for incremental gzip writes.
CONFIG.COLLECTOR.maxGzipPartBytes- Size threshold for gzip part rotation.
CONFIG.READER.defaultSources- Default sources for datapoint reads (same flat source format as
enabledSources).
- Default sources for datapoint reads (same flat source format as
CONFIG.READER.maxDistanceMs- Maximum temporal distance allowed for nearest-event selection.
CONFIG.READER.orderbookLevels- Top-N bids/asks returned in datapoint orderbooks.
CONFIG.READER.tieBreakerPolicy- Tie break rule for nearest events (
prefer-past).
- Tie break rule for nearest events (
Example:
enabledSources: ["binance", "chainlink", "polymarket"];
coalesceIntervalMs: 500;Compatibility
- Node.js 20+
- ESM runtime (
"type": "module") - TypeScript strict mode
- Outbound websocket connectivity required
Testing
npm run testnpm run check includes:
- lint
- format check
- typecheck
- test suite
- a live integration test that runs for ~30 seconds and requires all event types
Scripts
npm run start: run collector CLInpm run check: full validation pipelinenpm run fix: lint/prettier autofixnpm run typecheck: TypeScript validationnpm run test: run tests (includes live integration)
AI Usage
Assistants must follow AGENTS.md and ai/*.md as blocking rules.
Mandatory highlights:
- Class-first architecture with constructor injection.
- One public class per file.
- Single-return policy.
- Braces in all control-flow blocks.
- Typed errors at boundaries.
- Update tests for every behavior change.
- Run
npm run checkbefore finalizing.
Troubleshooting
Websocket/network errors: verify outbound websocket access to crypto providers and Polymarket APIs.No output files: ensure the process stays up long enough to hit flush intervals and thatCONFIG.COLLECTOR.enabledSourcesis not empty.Integration test failures:test/collector/integration/live-collector.test.tsdepends on real-time upstream feeds and can fail during upstream outages.
