ydb-qdrant-indexer
v1.1.0
Published
Indexer for ydb-qdrant that uses external embedding models and stores metadata in an external table.
Maintainers
Readme
ydb-qdrant-indexer
High-level indexing helper for ydb-qdrant.
This package takes plain-text documents, calls an external embedding model, and writes the resulting vectors into YDB collections via the ydb-qdrant programmatic client. It also keeps a small YDB table with indexing metadata (documents, versions, chunk counts, and status).
When to use it
- You already use
ydb-qdrantas a Qdrant-compatible vector store backed by YDB. - You have an external embedding model (OpenAI, local, etc.) and want a reusable ingestion pipeline:
- Chunk documents with overlap.
- Batch embed chunks.
- Upsert vectors into a collection.
- Track which documents are
pending/indexed/failed.
Install
From npm (library usage):
npm install ydb-qdrant ydb-qdrant-indexerEnvironment and credentials follow the same conventions as ydb-qdrant:
YDB_ENDPOINT,YDB_DATABASE.- One of
YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS,YDB_METADATA_CREDENTIALS,YDB_ACCESS_TOKEN_CREDENTIALS, orYDB_ANONYMOUS_CREDENTIALS.
Core concepts
Embedding model
The indexer does not call any specific provider directly. Instead it expects an EmbeddingModel implementation:
info.modelId: opaque model identifier (for payloads/observability).info.dimension: vector dimension; used to configure the target collection.embedDocuments(texts: string[]): Promise<number[][]>: batch embedding call.- Optional
embedQuery(text: string): for query-time use (not required for indexing only).
This package provides createEmbeddingModel(adapter, options?) to adapt any concrete provider:
adapter.modelId/adapter.dimensiondescribe the model.adapter.embed(texts)performs the actual API call.options.maxBatchSizecontrols how many texts are sent toembedDocumentsat once.
Metadata store
Indexing metadata is stored in a separate YDB table (default name: qdrant_index_documents) via YdbMetadataStore:
- Keys:
tenant,collection,docId. - Columns:
version,status(pending|indexed|failed),chunkCount,sourceUri,error,createdAt,updatedAt. - Helper:
createYdbMetadataStore({ endpoint?, database?, connectionString?, tableName? }).
You can swap in your own implementation by providing a custom MetadataStore that implements:
ensureReady(): create/validate tables.upsertDocument(record).getDocument(tenant, collection, docId).updateStatus(tenant, collection, docId, status, chunkCount?, error?).
Chunking
Documents are represented as IndexDocumentInput:
id: string: stable document identifier.text: string: full plain-text content.- Optional:
sourceUri,metadata(arbitrary record),version(user-supplied version string).
Chunking behaviour:
- Sliding window over characters with configurable overlap (
ChunkingOptions). - Defaults (
DEFAULT_CHUNKING_OPTIONS):maxCharacters = 2000.overlapCharacters = 200.
- Helper:
chunkDocuments(documents, options?)→DocumentChunk[].
Indexer client
The main entrypoint is createIndexerClient() which returns an IndexerClient with a single method:
indexDocuments(request: IndexDocumentsRequest): Promise<void>.
IndexDocumentsRequest fields:
tenant?: logical tenant id (defaults to"default").collection: target collection name (Qdrant-compatible).distance: one of"Cosine" | "Euclid" | "Dot" | "Manhattan".vectorSize: embedding dimension for this collection.documents: array ofIndexDocumentInput.embeddingModel:EmbeddingModelorEmbeddingModelWithOptions.metadataStore: implementation ofMetadataStore(typicallycreateYdbMetadataStore()).clientOptions?: forwarded tocreateYdbQdrantClient(endpoint, database, auth, default tenant, etc.).maxUpsertBatchSize?: points perupsertPointscall (default: 64).chunking?: overrides default chunking options.
What indexDocuments does
For each call:
Ensure collection
- Uses
createYdbQdrantClientfromydb-qdrant. getCollection(collection):- If collection exists, validates
sizeanddistanceagainstvectorSize/distancefrom the request. - If it does not exist (404), creates it with the requested vector size/distance and
data_type: "float".
- If collection exists, validates
- Uses
Chunk documents and upsert metadata
- Splits each document into overlapping chunks via
chunkDocuments. - Computes
chunkCountper document. - Computes a
versionfor each document:- Uses
doc.versionwhen provided, otherwise a deterministic SHA-256 hash ofdoc.text.
- Uses
- Calls
metadataStore.upsertDocumentfor each document with status"pending".
- Splits each document into overlapping chunks via
Embed and upsert points
- Batches chunks into groups of size
embeddingModel.options.maxBatchSize(or 32 by default). - Calls
embeddingModel.embedDocuments(texts)for each batch. - For each chunk, builds:
- Deterministic point id:
"{docId}:{chunkIndex}". - Vector: the embedding returned by the model.
- Payload object including:
doc_id,chunk_index,chunk_start,chunk_end,source_uri.embedding_model(fromembeddingModel.info.modelId).- Any
metadatafrom the original document merged in (without overriding these reserved keys).
- Deterministic point id:
- Splits points into
maxUpsertBatchSizegroups and callsupsertPoints(collection, { points })via theydb-qdrantclient.
- Batches chunks into groups of size
Update document status
- On success: for each document, calls
metadataStore.updateStatus(..., "indexed", chunkCount). - On failure at any point in embedding or upsert:
- Marks all documents as
"failed", attaching the error message viametadataStore.updateStatus(..., "failed", chunkCount, errorMessage). - Rethrows the original error.
- Marks all documents as
- On success: for each document, calls
Minimal usage example (TypeScript)
This example sketches how to wire an external embedding API, the metadata store, and the indexer client together. It omits details like HTTP client implementation for brevity.
import { createYdbMetadataStore } from "ydb-qdrant-indexer";
import {
createEmbeddingModel,
type EmbeddingModelAdapter,
} from "ydb-qdrant-indexer";
import { createIndexerClient } from "ydb-qdrant-indexer";
const adapter: EmbeddingModelAdapter = {
modelId: "my-embeddings-001",
dimension: 768,
async embed(texts) {
// Call your embedding provider here and return number[][]
return texts.map(() => new Array(768).fill(0));
},
};
const embeddingModel = createEmbeddingModel(adapter, { maxBatchSize: 16 });
const metadataStore = createYdbMetadataStore({
endpoint: process.env.YDB_ENDPOINT,
database: process.env.YDB_DATABASE,
});
const indexer = createIndexerClient();
await indexer.indexDocuments({
tenant: "my-tenant",
collection: "documents",
distance: "Cosine",
vectorSize: 768,
documents: [
{
id: "doc-1",
text: "Some text to index...",
sourceUri: "file:///docs/doc-1.txt",
metadata: { language: "en" },
},
],
embeddingModel,
metadataStore,
clientOptions: {
// forwarded to createYdbQdrantClient
endpoint: process.env.YDB_ENDPOINT,
database: process.env.YDB_DATABASE,
},
});Notes and guarantees
- Idempotent upserts: point ids are deterministic (
docId:chunkIndex), so re-running indexing for the same document content overwrites instead of duplicating points. - Document versioning:
versionis stored in the metadata table so you can compare current vs previous content and decide whether to reindex. - Metadata independence: the metadata table is separate from
qdrant_all_points, so you can query indexing status without touching the main points table. - Embedding provider agnostic: any model that can be adapted to
EmbeddingModelAdaptercan be used (cloud API, on-prem model server, etc.).
