@ydbjs/topic
v6.0.7
Published
YDB Topics client for publish-subscribe messaging. Provides at-least-once delivery, exactly-once publishing, FIFO guarantees, and scalable message processing for unstructured data.
Maintainers
Readme
@ydbjs/topic
Читать на английском: README.md
Высокоуровневые, типобезопасные клиенты для YDB Topics (стриминговые очереди сообщений) на JavaScript/TypeScript. Поддерживаются эффективное потоковое чтение/запись, управление сессиями партиций, коммиты оффсетов, сжатие и транзакции.
Возможности
- Потоковый reader и writer с async‑итерацией
- Хуки жизненного цикла сессий партиций и коммит оффсетов
- Подключаемые кодеки сжатия (RAW, GZIP, ZSTD; можно свои)
- Чтение/запись с привязкой к транзакциям
- Полные типы TypeScript
Установка
npm install @ydbjs/topicТребуется Node.js >= 20.19.
Быстрый старт
Два варианта использования:
- Через верхнеуровневый клиент
topic(driver) - Через фабрики (
@ydbjs/topic/reader,@ydbjs/topic/writer)
Через верхнеуровневый клиент
import { Driver } from '@ydbjs/core'
import { topic } from '@ydbjs/topic'
const driver = new Driver(process.env['YDB_CONNECTION_STRING']!)
await driver.ready()
const t = topic(driver)
// Reader
await using reader = t.createReader({ topic: '/Root/my-topic', consumer: 'my-consumer' })
for await (const batch of reader.read()) {
for (const msg of batch) console.log(new TextDecoder().decode(msg.payload))
await reader.commit(batch)
}
// Writer
await using writer = t.createWriter({ topic: '/Root/my-topic', producer: 'my-producer' })
writer.write(new TextEncoder().encode('Hello, YDB!'))
await writer.flush()Через фабрики
import { Driver } from '@ydbjs/core'
import { createTopicReader, createTopicTxReader } from '@ydbjs/topic/reader'
import { createTopicWriter, createTopicTxWriter } from '@ydbjs/topic/writer'
const driver = new Driver(process.env['YDB_CONNECTION_STRING']!)
await driver.ready()
await using reader = createTopicReader(driver, { topic: '/Root/my-topic', consumer: 'my-consumer' })
await using writer = createTopicWriter(driver, { topic: '/Root/my-topic', producer: 'my-producer' })Reader
Опции
topic:string | TopicReaderSource | TopicReaderSource[]— путь или источники с фильтрамиconsumer:string— имя консюмераcodecMap?:Map<Codec | number, CompressionCodec>— свои кодеки для распаковкиmaxBufferBytes?:bigint— лимит внутреннего буфера (по умолчанию ~4 МБ)updateTokenIntervalMs?:number— период обновления токена (по умолчанию 60000)onPartitionSessionStart?— настройка оффсетов при старте сессииonPartitionSessionStop?— хук на остановку сессииonCommittedOffset?— уведомление об ack коммита оффсетов
TopicReaderSource поддерживает фильтры партиций и временные селекторы:
const source = {
path: '/Root/my-topic',
partitionIds: [0n, 1n],
maxLag: '5m',
readFrom: new Date(Date.now() - 60_000),
}Чтение и коммиты
const t = topic(driver)
await using reader = t.createReader({ topic: source, consumer: 'svc-a' })
for await (const batch of reader.read({ limit: 100, waitMs: 1000 })) {
if (!batch.length) continue
for (const m of batch) doSomething(m)
// Вариант A: простой — await commit
await reader.commit(batch)
// Вариант B: быстрый — fire‑and‑forget
// void reader.commit(batch)
}Перформанс‑заметка: await commit() в горячем пути снижает пропускную способность. Для высоких нагрузок используйте fire‑and‑forget плюс onCommittedOffset.
Writer
Опции
topic:stringtx?:TX— транзакция для записиproducer?:string— id продюсера (по умолчанию генерируется)codec?:CompressionCodec— сжатие (RAW/GZIP/ZSTD или своё)maxBufferBytes?:bigint— лимит буфера (по умолчанию 256 МБ)maxInflightCount?:number— максимум сообщений «в полёте» (по умолчанию 1000)flushIntervalMs?:number— периодический флаш (по умолчанию 10 мс)updateTokenIntervalMs?:number— период обновления токена (по умолчанию 60000)retryConfig?(signal)— настройка ретраев соединенияonAck?(seqNo, status)— колбэк подтверждений
Запись
const t = topic(driver)
await using writer = t.createWriter({ topic: '/Root/my-topic', producer: 'json-producer' })
const payload = new TextEncoder().encode(JSON.stringify({ foo: 'bar', ts: Date.now() }))
const seqNo = writer.write(payload)
await writer.flush()write() принимает только Uint8Array — строки/объекты кодируйте самостоятельно.
Транзакции
Запускайте чтение/запись внутри обработчика транзакций @ydbjs/query и передавайте tx, который он выдаёт. Не используйте using/явное закрытие — ресурсы управляются хуками транзакции.
- Reader:
createTopicTxReader(tx, driver, { topic, consumer })илиt.createTxReader(tx, { ... }). Offsets будут учтены через updateOffsetsInTransaction на коммите. - Writer:
createTopicTxWriter(tx, driver, { topic, ... })илиt.createTxWriter(tx, { ... }). Writer дождётся флаша перед коммитом.
import { query } from '@ydbjs/query'
import { createTopicTxReader } from '@ydbjs/topic/reader'
import { createTopicTxWriter } from '@ydbjs/topic/writer'
const qc = query(driver)
await qc.transaction(async (tx, signal) => {
const reader = createTopicTxReader(tx, driver, { topic: '/Root/my-topic', consumer: 'svc-a' })
for await (const batch of reader.read({ signal })) {
// обработка...
}
const writer = createTopicTxWriter(tx, driver, { topic: '/Root/my-topic', producer: 'p1' })
writer.write(new TextEncoder().encode('message'))
// writer дождётся flush в onCommit, ручное закрытие не требуется
})Примечание: объект tx предоставляет слой Query; интеграция с Topic выполняется автоматически внутри клиентов.
Свои кодеки
Reader: через codecMap, Writer: передайте объект CompressionCodec.
import { Codec } from '@ydbjs/api/topic'
import * as zlib from 'node:zlib'
const MyGzip = {
codec: Codec.GZIP,
compress: (p: Uint8Array) => zlib.gzipSync(p),
decompress: (p: Uint8Array) => zlib.gunzipSync(p),
}
await using reader = createTopicReader(driver, {
topic: '/Root/custom',
consumer: 'c1',
codecMap: new Map([[Codec.GZIP, MyGzip]]),
})
await using writer = createTopicWriter(driver, {
topic: '/Root/custom',
producer: 'p1',
codec: MyGzip,
})Экспортируемые модули
@ydbjs/topic:topic(driver)и типы@ydbjs/topic/reader:createTopicReader,createTopicTxReaderи типы@ydbjs/topic/writer:createTopicWriter,createTopicTxWriterи типы@ydbjs/topic/writer2: экспериментальный state‑machine writer
Лицензия
Apache-2.0
