second-bar-engine
v0.1.0
Published
Offline reconstruction engine: replays .jsonl.gz Polymarket logs and emits per-second derived rows for feature engineering
Readme
second-bar-engine
Офлайн-движок реконструкции состояния рынка из исторических логов Polymarket.
Читает .jsonl.gz файлы, проигрывает поток событий сквозным проходом и эмитит
посекундный гранулят (second-bars) с book-снимками, flow-разметкой
(place / cancel / fill), lifetime-метриками уровней и зеркальным
gap-индикатором. Output — gzipped JSONL, опциональная конвертация в
партиционированный Parquet.
Назначение: предоставить feature-слою бота детерминированный датасет для
обучения, который семантически совпадает с онлайн-видением book-keeper'а
(train/serve skew устранён через общий @kglozhkin/orderbook-reducer).
Содержание
- Когда использовать
- Быстрый старт
- Установка
- CLI: флаги и примеры
- Pipeline и формат входа
- Формат выхода (schema/v1.json)
- Конвертация в Parquet
- Производительность и оценки
- Нюансы и подводные камни
- Тесты
- Архитектура коротко
- Известные ограничения
Когда использовать
- Backtest-обучение моделей. Нужны исторические per-second фичи (best, bands, flow, mid, spread, gap), а не только сырой event-лог.
- Подготовка датасета.
.jsonl.gz→ парк-файл, дальше DuckDB / Polars / PyArrow / pandas. - Воспроизводимость. На одном и том же входе выход всегда побайтово один и тот же (детерминированный single-thread проход).
Когда НЕ использовать:
- Онлайн-применение: для live-стакана есть
book-keeper. - Бэктест с симуляцией исполнения: для этого есть
execution-simulator. - Просмотр сырых событий: для этого есть
replay-miner(NATS-проигрыватель).
Быстрый старт
cd second-bar-engine
yarn install
# Прогон одного часа логов с записью в JSONL.gz
yarn start:dev --out out/2026-05-23T06.jsonl.gz \
/path/to/events-2026-05-23T06.jsonl.gz
# Несколько часов подряд (одна непрерывная сессия движка)
yarn start:dev --out out/2026-05-23.jsonl.gz \
/path/to/events-2026-05-23T*.jsonl.gz
# Конвертация в партиционированный Parquet
python3 scripts/convert_to_parquet.py \
--in out/2026-05-23.jsonl.gz \
--out parquet/После этого parquet/date=2026-05-23/part-0.parquet готов к чтению любой
библиотекой (pyarrow, polars, duckdb).
Установка
Требования
- Node.js ≥ 20
- Yarn
- Python ≥ 3.9 +
pyarrow(только для конвертации в Parquet)
TypeScript-движок
cd second-bar-engine
yarn install
yarn build # компиляция в dist/Python-конвертер
pip install pyarrow
# или (изолированный venv)
python3 -m venv .venv && source .venv/bin/activate && pip install pyarrowСкрипт лежит в scripts/convert_to_parquet.py и
не имеет отдельного requirements.txt — это намеренно, схема и
конвертер версионируются вместе с движком (план §12.4).
CLI: флаги и примеры
Запуск:
yarn start:dev <file.jsonl.gz>... [флаги]
# или после yarn build:
yarn start <file.jsonl.gz>... [флаги]Позиционные аргументы
<file.jsonl.gz>... — один или более .jsonl.gz-файлов. Обрабатываются в
порядке передачи как одна непрерывная сессия (book и буферы движка
не сбрасываются между файлами). Сортировка файлов — ответственность
вызывающей стороны (либо передавать в правильном лексикографическом порядке,
либо использовать shell-glob events-YYYY-MM-DDTHH.jsonl.gz).
Флаги
| Флаг | Default | Описание |
|---|---|---|
| --out <path> | — (no-op) | Путь к выходному .jsonl.gz. Без флага движок работает «в холостую» (полезно для smoke-теста / сбора статистики). |
| --reorder-window <duration> | 500ms | Окно ReorderBuffer-сортировки по payload.ts. Принимает Nms / Ns / Nm. Эмпирический выбор: 500ms покрывает 99.9% инверсий по реальным логам. |
| --on-bad-line skip\|fail | skip | Поведение при битой JSON-строке во входе. skip идёт дальше с инкрементом bad_lines_count, fail останавливает прогон. |
| --tick-overrides <path> | — | JSON sidecar {"marketId": 0.01, ...} с per-market tick_size для рынков, где наблюдаемый detection не сработает (см. § Tick size). |
| --progress-every <N> | 100000 | Каждые N выпущенных событий печатать progress-строку в stderr. |
Примеры
Один файл, без вывода (smoke-тест):
yarn start:dev /path/to/events-2026-05-23T06.jsonl.gzПечатает финальную статистику (число событий, токенов, рынков,
flow-балансы, расхождение fill_size_sum vs trade_volume и т.п.).
Прогон недели одной командой:
yarn start:dev --out out/week-2026-05-23.jsonl.gz \
/path/to/events-2026-05-2[3-9]T*.jsonl.gzФайлы будут отсортированы шеллом лексикографически, что согласовано с
events-YYYY-MM-DDTHH.jsonl.gz-схемой имён.
Расширенное окно reorder (если в источнике большие паузы):
yarn start:dev --reorder-window 2s --out out/special.jsonl.gz log.jsonl.gzЖёсткий режим на битых строках (CI):
yarn start:dev --on-bad-line fail --out out/clean.jsonl.gz log.jsonl.gzС override tick на 0.01-рынках:
# tick-overrides.json:
# {"0xabc...": 0.01, "0xdef...": 0.01}
yarn start:dev --tick-overrides tick-overrides.json --out out/x.jsonl.gz log.jsonl.gzПолный pipeline (логи → JSONL → Parquet):
yarn start:dev --out out/day.jsonl.gz events-2026-05-23T*.jsonl.gz
python3 scripts/convert_to_parquet.py --in out/day.jsonl.gz --out parquet/Pipeline и формат входа
.jsonl.gz файлы
↓ gunzip + readline + JSON.parse
↓ pickEventTime (payload.ts, миллисекунды Unix)
ReorderBuffer (скользящее окно 500мс, stable sort по (ts, globalLineIdx))
↓ для каждого готового event'а:
Engine.beforeProcess(ts): ← TimeFence
5a. drain pending decrease-дельт с deadline ≤ ts → cancel
5b. expire trade-buffer (ts старше 50мс) → unmatched_trade_count
5c. ГЛОБАЛЬНЫЙ second-fence: закрываем openSecond у токенов
с second_ts < floor(ts/1000); снимаем state-snapshot
5d. эмитим inFlight-секунды с emit_after ≤ ts (safety-fence +50мс)
Engine.process(envelope):
- применяем snapshot/delta к OrderbookReducer
- reconciliation для decrease-дельт (50мс окно с trade-buffer)
EOF: финализируем всё → JsonlWriter.close()Входной формат — стандартные EventEnvelope от Polymarket-потоков:
market.orderbook.updated (snapshot/delta) и market.trade.created. Tip:
структура совместима с тем, что пишет data-miner и читает replay-miner.
Формат выхода (schema/v1.json)
Файл — gzipped NDJSON:
- Строка 1: header
{"_schema":"v1"} - Строки 2..N: по одному second-bar на строку, ключи и порядок — строго schema/v1.json.
Идентификация: token_id (string), market_id (string),
second_ts (int64, unix seconds).
State (snapshot книги на конец секунды): best_bid, best_ask,
tick_size, bid_px_L1..L3 / bid_sz_L1..L3 (3 верхних уровня),
аналогично для ask, bid_band_{0_2|2_5|5_10|10_20} (объём в слоях
{0–2%, 2–5%, 5–10%, 10–20%} относительно min(yes_price, 1 - yes_price)),
аналогично для ask, total_bid_depth / total_ask_depth,
n_bid_levels / n_ask_levels.
Flow (накопленный за секунду): trade — trade_count, trade_{buy,sell}_count,
trade_{buy,sell}_volume. Delta — delta_count, place_count / place_size_sum,
cancel_{full,partial}_count / cancel_size_sum, fill_{full,partial}_count /
fill_size_sum.
Quote lifetime: closed_levels_count, closed_levels_lifetime_sum
(сумма lifetime'ов уровней, умерших в эту секунду; в секундах).
Quality: is_gap (bool), unmatched_trade_count (int32).
Что НЕ пишется
- Пустые секунды (без активности —
delta_count=0 && trade_count=0 && unmatched_trade_count=0): не попадают в JSONL. Если ваш feature-слой требует регулярной частоты — добавьте их ресэмплером. is_forward_filled: маркер только feature-resampler'а, который добавляет пропущенные секунды. Движок таких строк не пишет.- Snapshot-only секунды: снапшот — административное событие источника (полная синхронизация книги), не активность участников рынка. Если за секунду пришёл ТОЛЬКО snapshot — flow=0 → не эмитится.
Семантика разметки place / cancel / fill
Алгоритм отложенной разметки decrease-дельт в окне ±50мс с трейдом (план §4):
| Условие | Разметка |
|---|---|
| трейд рядом + size→0 | fill_full |
| трейд рядом + 0 < size < old | fill_partial |
| НЕТ трейда + size→0 | cancel_full |
| НЕТ трейда + 0 < size < old | cancel_partial |
| появление уровня / увеличение | place (сразу, не ждёт) |
Окно матчинга — ±1 tick (per-token) в шкале priceKey =
round(price / tickSize). ±2 тика не ищем.
Несколько decrease на одной цене в окне: трейд матчит ту дельту,
у которой |Δts| минимальна; остальные дозревают как cancel. Диагностика
рассогласования — расхождение fill_size_sum vs trade_*_volume.
Tick size
- Default:
0.001для всех токенов. - Detection: как только в книге встречается цена с 3 знаками после
запятой (например,
0.909), фиксируетсяtickSize=0.001(подтверждение). - Override: для известных «толстых» рынков на 0.01 передавайте sidecar
--tick-overrides:{ "0xabc123...": 0.01, "0xdef456...": 0.01 } - Слепое пятно: detection НЕ умеет повысить tick до 0.01. Если рынок
0.01-tick без override — окно матчинга trade↔delta занижено, часть
fillуйдёт вcancel + unmatched_trade. Индикатор: расхождениеfill_size_sum − trade_*_volume. Чините через override.
Gap-детекция (is_gap)
YES.bestBid + NO.bestAsk должны давать ≈ 1.0 (зеркальность бинарного
рынка). Если |sum − 1| > MIRROR_TOLERANCE (default 0.002) — is_gap=true.
Также is_gap=true если у одного из токенов нет mirror'а или одной из
сторон книги (cold start).
ВАЖНО: на gap-секундах поля state (best_*, band_*, total_*,
n_*_levels) ненадёжны. Потребитель обязан фильтровать или специально
обрабатывать такие строки перед обучением.
Конвертация в Parquet
python3 scripts/convert_to_parquet.py \
--in out/day.jsonl.gz \
--out parquet/ \
[--compression zstd|snappy|gzip|none] # default zstd
[--row-group-size 100000]
[--schema-root /path/to/second-bar-engine]Конвертер:
- Читает
.jsonl.gz, проверяет header{"_schema":"v1"}. - Кастит типы строго по schema/v1.json.
- Партиционирует по
date=YYYY-MM-DD(UTC изsecond_ts). - Сортирует внутри партиции по
(token_id, second_ts)— это даёт predicate pushdown для запросов по диапазону токенов. - Пишет
out/date=YYYY-MM-DD/part-0.parquet.
Конвертер чист от бизнес-логики. Все вычисления — в TS-движке. Если в Python появилась логика помимо cast'а — это баг архитектуры (план §7.4).
Чтение Parquet
import polars as pl
df = pl.read_parquet("parquet/date=2026-05-23/part-0.parquet")
# или весь датасет:
df = pl.read_parquet("parquet/**/*.parquet")
# Фильтрация по токену + времени (быстро, predicate pushdown):
df = df.filter(
(pl.col("token_id") == "...") &
(pl.col("second_ts") >= 1779494400)
)
# Исключить gap-секунды перед обучением:
df = df.filter(~pl.col("is_gap"))# pandas через pyarrow
import pyarrow.parquet as pq
table = pq.read_table("parquet/", filters=[("token_id", "=", "...")])
df = table.to_pandas()Колонка date в партициях
При чтении через pq.read_table("parquet/") pyarrow автоматически добавит
синтетическую колонку date (Hive-style partition discovery). В самом
parquet-файле её НЕТ. Это нормально.
Производительность и оценки
Реальные замеры на одном часовом логе (event-rate ~454K событий/час, ~800 токенов / 400 рынков):
| Этап | Время | |---|---| | Чтение + парсинг + reorder | ~10–15c | | Reducer + reconciliation | ~5–10c | | Снятие state + bands + эмиссия | ~10–15c | | Итого, час логов | ~30c | | Объём JSONL.gz | ~22 MB (144K rows) | | Объём Parquet (zstd) | ~3 MB |
Экстраполяция: неделя ≈ 168 часов × 30с = ~85 минут. Грубая оценка плана была ~3–4 часа — реальность лучше. Single-thread, V8.
Распараллеливание невозможно (состояние сквозное через все файлы).
Если упрётесь в перф — профилируйте, узкое место скорее всего
JSON.parse (10–15 μs/event на вложенных bids/asks). simdjson-like
парсеры могут ускорить в 2–3x, но это уже преждевременная оптимизация.
Нюансы и подводные камни
1. Между файлами движок состояние НЕ сбрасывает
Когда вы передаёте несколько файлов, это одна непрерывная сессия. Книги, pending-дельты, trade-buffer, in-flight-секунды переносятся через границу файла. Это требование плана §8.2 и принципиальное отличие от «часовой обработки независимыми pass'ами».
Если хотите изолировать день — запустите движок отдельно для каждого дня.
2. Битый gzip-файл (известный реальный случай)
Из 6 имеющихся логов 1 (T08) оказался с битым gzip-stream. Reader падает
по умолчанию. Workaround — пропустить файл вручную (передать только
рабочие). Заметка для будущих сессий: добавить --continue-on-gzip-error
с обязательным помечанием is_gap для следующих секунд.
3. WINDOW=500мс — компромисс
Эмпирические данные (5 часов реальных логов, 6.4М событий):
- P99 инверсии payload.ts = 66мс
- P999 = 534мс
- max = 4459мс (~13 случаев/час)
При 500мс теряем ~0.0002% событий (хвост за окном). Эти единичные
инверсии могут спровоцировать неправильную разметку fill↔cancel, но
на масштабах обучения это шум. Если очень важно — увеличьте до 2s
ценой задержки эмиссии и расхода памяти reorder-буфера.
4. is_gap смешивает cold-start и runtime gap
Если нужно различить — используйте seconds_since_last_snapshot в feature-
слое (движок такой колонки сейчас не пишет; см. план §13.4). Cold-start
обычно случается на старте файла или после рестарта источника.
5. Сохранение birth_ts при снапшоте
OrderbookReducer.applySnapshot сохраняет birth_ts для уровней,
существовавших на той же цене, даже если size изменился. Это намеренно
(план §3.1, §13.5) — иначе на активных токенах со снапшотом каждые
минуты closed_levels_lifetime_sum систематически занижается.
Остаточный bias: если уровень был полностью снят и переоткрыт между двумя снапшотами с тем же price — мы трактуем его как «продолжает жить», завышая lifetime. Это известное огрубление.
6. Снапшот не инкрементирует flow
Снапшоты — административная пересинхронизация, а не активность участников.
Из них не создаются place / cancel / fill события. Если за секунду
пришёл только snapshot (нет дельт и трейдов) — row не эмитится.
7. tick_size на 0.01-рынках без override
Detection умеет только подтвердить 0.001, не повысить до 0.01. Без
--tick-overrides 0.01-рынки обрабатываются с заниженным окном матчинга
± 0.001 (вместо ± 0.01). Это даёт несовпадение fill_size_sum vs
trade_volume. Если ваша tail-вселенная содержит такие рынки — найдите их
по статистике расхождения и добавьте в overrides.
8. Глобальный second-fence двигается только входящими событиями
Если поток событий резко обрывается (EOF или большая пауза), последние
in-flight-секунды останутся не-эмитированными до явного engine.finalize().
CLI вызывает finalize() на EOF — эта часть автоматическая. В кастомных
пайплайнах помните.
9. Memory footprint
~800 токенов × ~50 уровней × 80 байт = ~3 MB чистых данных книг + V8 overhead. Реально ~50–100 MB resident size. Не лимит.
10. Schema-версионирование (v1)
Когда схема изменится — поднимается до v2.json, header в JSONL
становится {"_schema":"v2"}. Конвертер и движок оба упадут при
несовпадении версий. Это намеренная защита от молчаливого расхождения
колонок (план §7.5).
Тесты
yarn test # все 151 тестов (4 проекта в зависимостях)
yarn test:cov # с покрытием
yarn test --testPathPattern=engine.phase5 # отдельный фейз
yarn test --testPathPattern=parquet-roundtrip # round-trip JSONL↔ParquetСостав:
engine.test.ts— pipeline phases 1–4 (reducer, dispatch, deltas).engine.reconciliation.test.ts— все 6 клеток разметки fill/cancel, правило ближайшей по|Δts|, tick override, переключение tick.engine.phase5.test.ts— глобальный second-fence, tail-pause, safety-fence эмиссии, is_gap, empty seconds.reconcile/*.test.ts— PendingDeltaQueue, TradeBuffer, priceKey.reader/reorder-buffer.test.ts— tie-breaker по(ts, globalIdx).second/bands.test.ts— расчёт слоёв глубины.gap/mirror-check.test.ts— gap-детекция.output/jsonl-writer.test.ts— Schema, sanitize, NaN/Infinity, header.output/engine-writer-e2e.test.ts— E2E Engine → JSONL.gz.output/parquet-roundtrip.test.ts— Engine → JSONL → Python → Parquet → readback. Skip'ается безpyarrow.
Архитектура коротко
second-bar-engine/
├── src/
│ ├── main.ts # CLI entry
│ ├── cli/args.ts # парсинг флагов
│ ├── reader/
│ │ ├── jsonl-reader.ts # gunzip + JSON.parse async-gen
│ │ └── reorder-buffer.ts # 500ms окно стабильной сортировки
│ ├── dispatch/resolve-kind.ts # snapshot vs delta
│ ├── engine/
│ │ ├── engine.ts # orchestrator
│ │ ├── token-state.ts # per-token state + secondAccumOf
│ │ ├── market-index.ts # marketId → (yes, no) tokenId
│ │ ├── tick-detector.ts # per-token tick observation + override
│ │ └── close-second.ts # closeSecond / global fence / emit
│ ├── reconcile/
│ │ ├── engine.ts # central reconciliation
│ │ ├── pending-queue.ts # decrease-дельты ожидающие ±50ms окна
│ │ ├── trade-buffer.ts # трейды ожидающие decrease-дельты
│ │ └── price-key.ts # per-token tick-aware key
│ ├── second/
│ │ ├── accumulator.ts # SecondAccumulator + isEmpty
│ │ ├── snapshot.ts # computeStateFields (best, L1-L3, bands)
│ │ ├── bands.ts # 0-2/2-5/5-10/10-20% слои
│ │ └── row.ts # SecondRow + EmissionSink
│ ├── gap/mirror-check.ts # is_gap через MarketIndex
│ └── output/
│ └── jsonl-writer.ts # stream + gzip + schema header + validation
├── schema/
│ └── v1.json # ЕДИНЫЙ источник истины (TS+Python)
└── scripts/
├── convert_to_parquet.py # JSONL.gz → date=YYYY-MM-DD/part-0.parquet
└── verify-inversions.ts # анализ инверсий payload.ts в логахЗависимости:
@kglozhkin/orderbook-reducer— pure reducer (тот же, что используетbook-keeperв новом режиме). Закрывает train/serve skew.@kglozhkin/event-bus— только типы (EventEnvelope,OrderBookUpdatedPayload,TradeCreatedPayload).
Полная архитектура реконструкции описана в
engine-implementation-plan.md (1500 строк,
все архитектурные решения зафиксированы). Этот README — практическое
руководство.
Известные ограничения
- WINDOW=500мс не покрывает хвост инверсий >500мс (~13/час по реальным данным). Допустимая потеря для обучения.
- Tick detection умеет только подтвердить 0.001, не повысить до 0.01.
Workaround —
--tick-overrides. - Битые gzip-файлы падают reader'а. Workaround — пропустить вручную.
is_gapсмешивает cold-start и runtime gap. Доп. колонкаseconds_since_last_snapshot— TODO.is_forward_filledне пишется движком. Это намеренно — добавляется feature-resampler'ом.- Несколько decrease на одной цене в окне с одной агрегированной дельтой
−70на трейд30даёт ошибочноеfill_size_sum=70(план §13.11). Лечения нет, диагностика — расхождение метрик.
Связанные документы
- План реализации:
engine-implementation-plan.md - Схема second-bars:
second-bar-schema.md - Shared reducer:
orderbook-reducer/ - Workspace README:
../CLAUDE.md
