@botler/bim
v3.9.38
Published
Forever Canada operational utilities and scripts
Readme
Vector Store Documentation
Production‑oriented pgvector service with:
- Ingestion of precomputed embeddings
- Stats‑driven driver selection (single vs union branches)
- Adaptive
ivfflat.probes - Partial index lifecycle automation (create/drop thresholds)
- Re‑embedding for new models (additive)
- Structured decision logging
All core features are implemented and test‑verified. Remaining roadmap items (auto‑tune scripts, metrics export, advanced pruning) are optional.
1. Schema Bootstrap
If migrations did not already create tables, run once (idempotent):
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS vector_documents (
id TEXT PRIMARY KEY DEFAULT (gen_random_uuid()::text),
source_type TEXT, source_ref TEXT, title TEXT, metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS vector_chunks (
id TEXT PRIMARY KEY,
document_id TEXT REFERENCES vector_documents(id) ON DELETE CASCADE,
text_plain TEXT NOT NULL,
markdown_text TEXT,
page_ids TEXT[],
chunk_index INT,
token_count INT,
overlap_with_previous BOOLEAN,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS vector_embeddings (
chunk_id TEXT REFERENCES vector_chunks(id) ON DELETE CASCADE,
model TEXT NOT NULL,
embedding_dim INT NOT NULL,
embedding VECTOR(3072) NOT NULL,
jurisdiction_code TEXT,
organization_id TEXT,
website_host TEXT,
category_a TEXT,
category_b TEXT,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (chunk_id, model)
);
-- Stats table powering driver & index decisions
CREATE TABLE IF NOT EXISTS vector_filter_value_stats (
id BIGSERIAL PRIMARY KEY,
dimension TEXT NOT NULL,
value TEXT NOT NULL,
row_count BIGINT DEFAULT 0,
last_query_at TIMESTAMPTZ,
has_index BOOLEAN DEFAULT FALSE,
UNIQUE (dimension, value)
);Adjust VECTOR(3072) if using a different embedding dimension (must match VECTOR_DIM).
2. Environment Variables
Minimum:
POSTGRES_URI— connection string (required)VECTOR_DIM=3072— required embedding length
Optional tuning (defaults shown):
VECTOR_MAX_K=50— upper bound on requested kVECTOR_OVERSAMPLE_FACTOR=4— union oversampling multiplierVECTOR_MAX_UNION_VALUES=8— cap on union branchesVECTOR_LOG_DECISIONS=0|1— emit JSON decision logsVECTOR_IVFFLAT_PROBES=0— fixed probes (0 = let adaptive decide)VECTOR_PROBES_THRESHOLD=2000– candidate budget threshold for adaptive probesVECTOR_MIN_PROBES=3,VECTOR_MAX_PROBES=10VECTOR_PARTIAL_INDEX_CREATE_THRESHOLD=15000VECTOR_PARTIAL_INDEX_DROP_THRESHOLD=2000
Set these in your .env; only the first two are mandatory to start.
3. Creating a Connection & Service
import { parseDatabaseUri } from "./src/database/config/environment";
import { DatabaseConnection } from "./src/database/config/connection";
import { VectorStoreService } from "./src/database/services/vector-store.service";
async function initVectorStore() {
const uri = process.env.POSTGRES_URI!; // required
const conn = DatabaseConnection.getInstance(parseDatabaseUri(uri));
await conn.initialize();
const service = new VectorStoreService(conn); // default deterministic mock embedding provider used only for re-embedding tests
return { conn, service };
}If you want automatic embedding generation during re‑embedding with a real model, use the built‑in OpenAI provider (or implement the EmbeddingProvider interface if you prefer another backend).
4. Ingestion Workflow
ingestPreprocessed (atomic) expects you to supply:
- A
documentmetadata object - An ordered array of
chunks(each with an id andchunk_index) - Parallel array of embedding rows (same order, no
chunk_idyet) - The
modelname
const dim = parseInt(process.env.VECTOR_DIM || "3072", 10);
const { service } = await initVectorStore();
const makeVec = (seed: number) =>
Array.from({ length: dim }, (_, i) => (i === 0 ? seed : 0));
await service.ingestPreprocessed({
document: {
source_type: "policy",
source_ref: "doc-42",
title: "Policy 42",
},
chunks: [
{
id: "doc42-c0",
document_id: "temp",
text_plain: "Opening paragraph",
chunk_index: 0,
},
{
id: "doc42-c1",
document_id: "temp",
text_plain: "Second paragraph",
chunk_index: 1,
},
] as any,
embeddings: [
{
embedding_dim: dim,
embedding: makeVec(1),
jurisdiction_code: "CA-AB",
organization_id: "ORG_X",
website_host: "example.org",
},
{
embedding_dim: dim,
embedding: makeVec(2),
jurisdiction_code: "CA-AB",
organization_id: "ORG_Y",
website_host: "example.org",
},
] as any,
model: "text-embedding-3-large",
});This operation runs in a single database transaction. If any step fails, all inserts are rolled back—preventing partial state and orphan rows. Inserts are idempotent (ON CONFLICT DO NOTHING for chunks and embeddings), so re-running the same ingest converges without duplicates.
Stats are incrementally updated for any non-null jurisdiction_code, organization_id, and website_host values within the same transaction.
5. Querying
Driver selection rules:
- If exactly one value among any of (jurisdiction, organization, host) is provided → single driver path (fastest subset)
- Else choose a union dimension (one with >1 values) minimizing sum of
row_countstats; run parallel branches with oversampling → global dedupe - Fallback to global scan if no filter values
const queryVec = Array.from({ length: dim }, (_, i) => (i === 0 ? 1 : 0));
const results = await service.query({
model: "text-embedding-3-large",
jurisdiction_codes: ["CA-AB"],
organization_ids: ["ORG_X", "ORG_Y"],
website_hosts: ["example.org"],
k: 5,
embedding: queryVec,
});
console.log(results[0]);Each result includes: chunk_id, document_id, model, distance, text, and filter dimensions.
Note: Queries may open a short transaction internally to set ivfflat.probes when adaptive tuning is active; queries never modify data.
5.1 One-liner: queryByText (recommended for consumers)
Importable from the package root for a single call flow. It embeds your input text(s) with OpenAI by default, then queries the vector store and returns per-input results.
Env required:
POSTGRES_URIVECTOR_DIM(e.g., 3072)OPENAI_API_KEY
Optional env:
VECTOR_MODEL_DEFAULT(defaults totext-embedding-3-large)
Usage:
import { queryByText } from "@botler/bim";
const out = await queryByText(
[
"What are the rules for petition signatures?",
"Where can I submit forms in Edmonton?",
],
{
jurisdiction_codes: ["CA-AB"],
website_hosts: ["www.alberta.ca"],
// category filters are supported
category_a_values: ["policy"],
},
{
k: 5,
// model: "text-embedding-3-large", // optional override
// probes: 5, // optional ivfflat.probes
concurrency: 4, // parallel embedding+query per input
}
);
// Shape: [{ text, results }]
for (const item of out) {
console.log(item.text);
console.log(item.results.slice(0, 2));
}Contract:
- Input text can be a string or string[]
- Filters mirror
VectorStoreService.query(jurisdiction, org, host, category_a/b) - Options:
{ k=5, model, probes, concurrency=4 } - Output:
Array<{ text: string, results: VectorRetrievalResult[] }>in the same order as inputs
Notes:
- Connection is initialized lazily on first call and reused (singleton).
- If
VECTOR_MODEL_DEFAULTis set, it’s used whenoptions.modelisn’t provided. VECTOR_LOG_DECISIONS=1will include planner logs for underlying queries.
6. Re‑Embedding a Document
Add embeddings for a new model while preserving existing ones:
// docId obtained from earlier ingestion response or lookup by source_ref
await service.reembedDocument(docId, { model: "alt-embedding-mock" });This reads existing chunks, generates vectors through the configured EmbeddingProvider, and inserts new (chunk_id, newModel) rows. No deletion or pruning yet.
7. Stats Management
Trigger an exact recount (use sparingly in large datasets):
await service.refreshFilterStats(); // all dimensions
await service.refreshFilterStats("website_host"); // single dimensionStats table fields:
row_count(approx via incremental increments; exact after refresh)last_query_at(touched during queries)has_index(maintained by index lifecycle logic)
8. Embedding Provider Options
Use the built‑in OpenAI provider for production embeddings, or supply a custom EmbeddingProvider.
OpenAI provider usage:
import { VectorStoreService } from "./src/database/services/vector-store.service";
import { createOpenAIEmbeddingProviderFromEnv } from "./src/database/embedding/openai/openai-embedding-provider";
// Env required:
// - OPENAI_API_KEY
// - VECTOR_DIM (e.g., 3072 for text-embedding-3-large)
// Optional tuning: OPENAI_EMBED_BATCH_SIZE, OPENAI_EMBED_MAX_TOKENS_PER_BATCH, etc.
const provider = createOpenAIEmbeddingProviderFromEnv();
const service = new VectorStoreService(conn, provider);Custom provider template:
import { EmbeddingProvider } from "./src/database/embedding/embedding-provider";
class MyEmbeddingProvider implements EmbeddingProvider {
constructor(private dim = parseInt(process.env.VECTOR_DIM || "3072", 10)) {}
async embed(batch: string[], model: string): Promise<number[][]> {
// Return an array of vectors aligned with batch inputs
return batch.map(() => new Array(this.dim).fill(0));
}
}9. Partial Index Lifecycle
ensureIndexesForFilters (internal, invoked where appropriate in future automation) decides per (dimension,value) whether to create or drop a partial IVFFLAT index using thresholds:
- Create when
row_count >= VECTOR_PARTIAL_INDEX_CREATE_THRESHOLDand index absent - Drop when
row_count < VECTOR_PARTIAL_INDEX_DROP_THRESHOLDand index present
Names are sanitized: vec_<model>_<dimension>_<value> (truncated/safe chars). Creation attempts a fallback variant if WITH (lists=...) fails.
Decision log events:
index_create,index_drop,index_create_error
10. Decision Logging
Enable with VECTOR_LOG_DECISIONS=1. Sample line:
{
"ts": "2025-09-10T00:21:36.664Z",
"component": "vector-store",
"event": "decision",
"mode": "single",
"driver_dimension": "website_host",
"driver_value": "rf.example",
"driver_row_count": 12,
"k": 2,
"probes": null,
"filters": {
"jurisdiction_codes": 1,
"organization_ids": 2,
"website_hosts": 1
}
}Modes: single, union, union_branch_exec, global, index_create, index_drop, index_create_error.
11. Adaptive Probes
If no explicit probes is passed and VECTOR_IVFFLAT_PROBES=0, adaptive logic estimates a candidate budget (sum of per-branch limits) and scales ivfflat.probes between VECTOR_MIN_PROBES and VECTOR_MAX_PROBES once budget ≥ threshold.
Override per query: pass probes in query() input.
12. Error Handling
Common errors:
- Dimension mismatch → thrown if query embedding length !=
VECTOR_DIM - Index creation failure → logged (
index_create_error) but does not abort query path - Database query failure → rethrown as generic
AppErrorupstream (see connection wrapper)
Ingestion atomicity and idempotency:
- In case of process interruption or a mid-ingest error, the entire ingest is rolled back.
- Because inserts are idempotent, re-running the ingest after a failure will safely complete without creating duplicates.
Recommended: wrap external API embedding generation with retries + circuit breaker.
13. Testing & Local Verification
Run full test suite:
npm test --silentIntegration tests cover ingestion, single/union retrieval, guardrails, re‑embedding, and index lifecycle decisions.
If you see a Jest forced-exit warning, ensure the DB pool closes (already handled in tests via afterAll).
14. Performance & Tuning Checklist
| Objective | Knobs |
| ------------------------------------ | --------------------------------------------------------------------- |
| Lower latency small filtered queries | Ensure single driver filters (supply exactly one value) |
| Balance recall for many values | Increase VECTOR_OVERSAMPLE_FACTOR (watch latency) |
| Reduce wasted work large unions | Lower VECTOR_MAX_UNION_VALUES or adapt thresholds |
| Probe depth tradeoff | Fix VECTOR_IVFFLAT_PROBES or rely on adaptive range |
| Heavy hot value | Allow partial index creation by lowering create threshold temporarily |
Baseline before tuning: capture p95 latency & recall (if you have ground‑truth sets) before changing.
15. Production Considerations
- Connection Pool: If high concurrency is expected, size pool based on CPU * 4 (rule of thumb) then measure.
- Vacuum & Analyze: Rely on autovacuum; manual
ANALYZEafter large bulk ingests can improve planner stats. - Backups: Embeddings are reproducible but stats table + document metadata should be backed up normally.
- Re‑Embedding Strategy: Keep old model for shadow comparisons; once confident, prune (future helper TBD).
- Observability: Stream decision logs to log pipeline; add application metrics (latency, candidate counts) later.
16. Quick End‑to‑End Example
async function example() {
const { service, conn } = await initVectorStore();
const dim = parseInt(process.env.VECTOR_DIM || "3072", 10);
const v = (x: number) =>
Array.from({ length: dim }, (_, i) => (i === 0 ? x : 0));
// Ingest
const ingest = await service.ingestPreprocessed({
document: { source_type: "manual", source_ref: "ex-1", title: "Example" },
chunks: [
{
id: "ex-c0",
document_id: "temp",
text_plain: "Example text",
chunk_index: 0,
},
] as any,
embeddings: [
{
embedding_dim: dim,
embedding: v(5),
jurisdiction_code: "CA-AB",
organization_id: "ORG1",
website_host: "ex.host",
},
] as any,
model: "text-embedding-3-large",
});
// Query
const res = await service.query({
model: "text-embedding-3-large",
jurisdiction_codes: ["CA-AB"],
organization_ids: ["ORG1"],
website_hosts: ["ex.host"],
k: 3,
embedding: v(5),
});
console.log(res);
// Re-embed in new model
await service.reembedDocument(ingest.document_id, {
model: "alt-embedding-mock",
});
await conn.close();
}17. Roadmap (Non‑Blocking)
- Auto calibration script (collect latency/recall ⇒ suggest env)
- Metrics emitter (Prometheus / OTEL hooks)
- Old model pruning policy helper
- Hybrid lexical + vector phase
- Optional reranking with LLM or cross encoder
18. FAQ
Q: Do I need partial indexes now?
A: The lifecycle logic is present; create thresholds are high to avoid churn. Manually lower thresholds in staging to validate.
Q: Can I store embeddings for multiple dimensions?
A: All embeddings share a single vector column (dimension fixed via VECTOR_DIM). Different models distinguished by model column.
Q: How do I delete a document?
A: Delete from vector_documents; cascades remove chunks + embeddings.
Q: How big can k be?
A: Capped by VECTOR_MAX_K (default 50) to guard latency & memory.
19. Minimal Checklist to Go Live
- Set
POSTGRES_URI&VECTOR_DIM. - Run schema bootstrap (or migrations).
- Ingest at least one document with embeddings.
- Run a query & verify results.
- (Optional) Enable
VECTOR_LOG_DECISIONS=1briefly to observe planning. - Set production thresholds for index lifecycle if large skew in value cardinalities.
20. Cleaning Up Data
If you seeded synthetic or test data (e.g. via the recall harness) and want to remove it:
Dry run (synthetic only):
npx tsx scripts/database/cleanup-vector-store.tsApply deletion of synthetic rows (identified by source_type='recall'):
npx tsx scripts/database/cleanup-vector-store.ts --applyFull wipe of all vector store tables (irreversible) plus dropping partial indexes:
npx tsx scripts/database/cleanup-vector-store.ts --mode=all --apply --drop-partial-indexesPass --verbose for before/after counts. The script requires POSTGRES_URI.
Resumable ingestion
You can resume long-running ingestion without redoing preprocess:
- From preprocess (
.pre.json):
npx tsx scripts/ingest-documents-into-vector-store/ingest-documents-into-vector-store.ts \
--input=scripts/ingest-documents-into-vector-store/catalog.alberta.json \
--resume=RAG \
--resume-input=scripts/ingest-documents-into-vector-store/processed/2023NBCAE-V1_National_Building_Code2023_Alberta_Edition.pdf.pre.json- From embeddings (
.embed.json) to store only:
npx tsx scripts/ingest-documents-into-vector-store/ingest-documents-into-vector-store.ts \
--input=scripts/ingest-documents-into-vector-store/catalog.alberta.json \
--resume=store \
--resume-input=scripts/ingest-documents-into-vector-store/processed/ab-001.embed.jsonSee scripts/ingest-documents-into-vector-store/README.md for full details.
End.
