@sha3/polymarket-snapshot-collector
v1.2.0
Published
Internal service that subscribes to `@sha3/polymarket-snapshot`, stores full snapshots in ClickHouse, and exposes a small HTTP API for market discovery, historical snapshot retrieval, and current in-memory state.
Readme
@sha3/polymarket-snapshot-collector
Internal service that subscribes to @sha3/polymarket-snapshot, stores full snapshots in ClickHouse, and exposes a small HTTP API for market discovery, historical snapshot retrieval, and current in-memory state.
TL;DR
npm install
npm run check
npm run startcurl "http://localhost:3000/markets?asset=btc&window=5m"curl "http://localhost:3000/markets"Why
- Keep Polymarket snapshot persistence in one process with one storage model.
- Keep market metadata in
marketand store only time-varying snapshot fields insnapshot. - Expose a simple internal API for market lists, stored snapshot playback, and current in-memory state.
Main Capabilities
- Creates
marketandsnapshottables on startup when they do not exist. - Subscribes internally to the configured asset and window pairs.
- Prevents duplicate writes for the same canonical snapshot identity with a short in-memory deduplication cache.
- Warns and skips conflicting duplicate payloads when the same canonical identity is received again with different data.
- Stores stable market metadata once in
marketand avoids repeating it in every snapshot row. - Exposes HTTP endpoints for market listing, snapshot retrieval, and current in-memory state.
Installation
npm installRunning Locally
npm run startDefaults:
- HTTP bind:
0.0.0.0:3000 - ClickHouse:
http://localhost:8123 - ClickHouse user:
default - ClickHouse password:
default
Because the service binds to 0.0.0.0, it can be called from other machines in the same internal network by using the host machine IP address.
Use the LAN ClickHouse instance when needed:
CLICKHOUSE_URL=http://192.168.1.2:8123 npm run startUsage
import { ServiceRuntime } from "@sha3/polymarket-snapshot-collector";
const serviceRuntime = ServiceRuntime.createDefault();
await serviceRuntime.startServer();Examples
Build the server without binding:
import { ServiceRuntime } from "@sha3/polymarket-snapshot-collector";
const serviceRuntime = ServiceRuntime.createDefault();
const server = serviceRuntime.buildServer();Stop the runtime cleanly:
import { ServiceRuntime } from "@sha3/polymarket-snapshot-collector";
const serviceRuntime = ServiceRuntime.createDefault();
await serviceRuntime.startServer();
await serviceRuntime.stop();HTTP API
GET /
Returns:
{
"ok": true,
"serviceName": "@sha3/polymarket-snapshot-collector"
}GET /markets
Query params:
asset: optional, one ofbtc,eth,sol,xrpwindow: optional, one of5m,15mfromDate: optional ISO timestamp
Example:
curl "http://192.168.1.10:3000/markets?asset=btc&window=5m&fromDate=2026-03-11T10:00:00.000Z"curl "http://192.168.1.10:3000/markets?asset=btc"curl "http://192.168.1.10:3000/markets"Behavior notes:
- when no filters are provided, the endpoint returns all stored markets
- when only
assetis provided, only that asset is returned across all windows - when only
windowis provided, only that window is returned across all assets - when both are provided, both filters are applied
- each returned market includes
prevPriceToBeat, containing the latest configured previouspriceToBeatcloses for the sameasset/window
GET /markets/:slug/snapshots
Example:
curl "http://192.168.1.10:3000/markets/btc-5m-example/snapshots"Behavior notes:
- rows are returned in ascending
generatedAt 5mmarkets may contain at most600snapshots15mmarkets may contain at most1800snapshots
GET /state
Returns the current in-memory state for all supported asset/window pairs.
Behavior notes:
- memory-backed, not reconstructed from ClickHouse
- always returns exactly
8entries inmarkets - entry order is stable:
btc/5m,btc/15m,eth/5m,eth/15m,sol/5m,sol/15m,xrp/5m,xrp/15m - entries with no active market still exist with
market: nullandlatestSnapshot: null
Example:
curl "http://192.168.1.10:3000/state"Public API
ServiceRuntime
createDefault()
Builds the default runtime wiring.
Returns:
ServiceRuntime
buildServer()
Builds the Hono-based Node HTTP server without calling listen().
Returns:
Server
startServer()
Creates schema, starts the collector, and starts listening on the configured host and port.
Returns:
Promise<Server>
stop()
Stops the HTTP server, disconnects the collector runtime, and closes the ClickHouse client.
Returns:
Promise<void>
AppInfoPayload
type AppInfoPayload = { ok: true; serviceName: string };MarketSummary
type MarketSummary = {
slug: string;
window: "5m" | "15m";
asset: "btc" | "eth" | "sol" | "xrp";
priceToBeat: number | null;
marketStart: string;
marketEnd: string;
prevPriceToBeat: number[];
};MarketListPayload
type MarketListPayload = { markets: MarketSummary[] };MarketSnapshotsPayload
type MarketSnapshotsPayload = {
slug: string;
asset: "btc" | "eth" | "sol" | "xrp";
window: "5m" | "15m";
marketStart: string;
marketEnd: string;
snapshots: Snapshot[];
};StatePayload
type StatePayload = {
generatedAt: string;
markets: Array<{
asset: "btc" | "eth" | "sol" | "xrp";
window: "5m" | "15m";
market: MarketSummary | null;
snapshotCount: number;
latestSnapshot: {
generatedAt: number;
priceToBeat: number | null;
upPrice: number | null;
downPrice: number | null;
chainlinkPrice: number | null;
binancePrice: number | null;
coinbasePrice: number | null;
krakenPrice: number | null;
okxPrice: number | null;
} | null;
marketDirection: "UP" | "DOWN" | "UNKNOWN";
latestSnapshotAgeMs: number | null;
isStale: boolean;
}>;
};Field notes:
generatedAt: server-side timestamp for the state payloadmarkets: stable list of one entry per supportedasset/windowsnapshotCount: number of snapshots seen for the currently active market in that entrylatestSnapshot: latest successfully ingested snapshot summary for that entrymarketDirection: derived fromchainlinkPriceversuspriceToBeatlatestSnapshotAgeMs: age of the latest snapshot when the payload was producedisStale: whether the entry has no latest snapshot or the latest snapshot age exceeds the configured stale threshold
Compatibility
- Node.js 20+
- ESM
- TypeScript with relative
.tsimports enabled
Configuration
config.RESPONSE_CONTENT_TYPE: responsecontent-typeheader.config.HTTP_HOST: bind host forstartServer(). Default is0.0.0.0so the API is reachable from other internal hosts.config.DEFAULT_PORT: bind port forstartServer().config.SERVICE_NAME: service name returned byGET /.config.CLICKHOUSE_URL: ClickHouse base URL.config.CLICKHOUSE_DATABASE: ClickHouse database name.config.CLICKHOUSE_USERNAME: ClickHouse username.config.CLICKHOUSE_PASSWORD: ClickHouse password.config.CLICKHOUSE_MARKET_TABLE: market table name.config.CLICKHOUSE_SNAPSHOT_TABLE: snapshot table name.config.SNAPSHOT_INTERVAL_MS: polling interval passed to@sha3/polymarket-snapshot.config.STATE_STALE_AFTER_MS: age threshold used to mark state entries as stale. Default1000ms.config.MARKET_RECENT_HISTORY_LIMIT: number of previouspriceToBeatcloses stored inprevPriceToBeatfor each market. Default3.config.SNAPSHOT_INSERT_BATCH_MAX_SIZE: maximum number of snapshot rows grouped into one ClickHouse insert. Default512.config.SUPPORTED_ASSETS: subscribed assets.config.SUPPORTED_WINDOWS: subscribed windows.config.SNAPSHOT_DEDUPLICATION_TTL_MS: duplicate cache TTL in milliseconds.config.SNAPSHOT_DEDUPLICATION_MAX_KEYS: duplicate cache max size.
Scripts
npm run standards:checknpm run lintnpm run format:checknpm run typechecknpm run testnpm run checknpm run start
Structure
src/app/service-runtime.service.ts: runtime orchestrationsrc/clickhouse/: ClickHouse client and schema servicessrc/collector/: snapshot collector runtimesrc/http/: HTTP server and request error mappingsrc/market/: market persistence repositorysrc/snapshot/: snapshot persistence, query logic, deduplication, in-memory state store, and exported payload typestest/: deterministic node:test coverage
Troubleshooting
ClickHouse connection failures
Verify CLICKHOUSE_URL, CLICKHOUSE_USERNAME, and CLICKHOUSE_PASSWORD. To use the LAN instance set CLICKHOUSE_URL=http://192.168.1.2:8123.
API is not reachable from another machine
Verify that HTTP_HOST is still 0.0.0.0 and that the machine firewall allows inbound traffic on the configured port.
Duplicate snapshot writes
The canonical identity is market_slug + asset + window + generated_at. The service prevents duplicate writes in the ingestion path with a short in-memory deduplication cache and skips conflicting duplicate payloads with a warning.
ClickHouse MergeTree ordering keys are not unique constraints, so the storage engine does not enforce row uniqueness by itself. If duplicate rows already exist because data was inserted outside this service or before deduplication was in place, clean that data from ClickHouse before relying on the API responses.
Snapshot count consistency failures
5m markets must stay at or below 600 rows and 15m markets must stay at or below 1800 rows.
AI Workflow
- Read
AGENTS.md,ai/contract.json, and the assistant adapter before editing code. - Keep managed files read-only unless the task is a standards update.
- Run
npm run standards:checkandnpm run checkbefore finishing.
