npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@sochdb/sochdb

v0.4.3

Published

SochDB Node.js SDK - AI-native embedded database with concurrent multi-process support

Readme

SochDB Node.js SDK

LLM-Optimized Embedded Database with Native Vector Search


Installation

npm install @sochdb/sochdb

Or from source:

cd sochdb-typescript-sdk
npm install

Architecture: Flexible Deployment

Tri-mode architecture: Embedded + Concurrent + Server (gRPC/IPC)
Choose the deployment mode that fits your needs.


SochDB Node.js SDK Documentation

LLM-Optimized Embedded Database with Native Vector Search


Table of Contents

  1. Quick Start
  2. Installation
  3. Features
  4. Architecture Overview
  5. Core Key-Value Operations
  6. Transactions (ACID with SSI)
  7. Query Builder
  8. Prefix Scanning
  9. SQL Operations
  10. Table Management & Index Policies
  11. Namespaces & Collections
  12. Priority Queues
  13. Vector Search
  14. Hybrid Search (Vector + BM25)
  15. Graph Operations
  16. Temporal Graph (Time-Travel)
  17. Semantic Cache
  18. Memory System
  19. Session Management
  20. Context Query Builder (LLM Optimization)
  21. Atomic Multi-Index Writes
  22. Recovery & WAL Management
  23. Checkpoints & Snapshots
  24. Compression & Storage
  25. Statistics & Monitoring
  26. Distributed Tracing
  27. Workflow & Run Tracking
  28. Server Mode (gRPC Client)
  29. IPC Client (Unix Sockets)
  30. Standalone VectorIndex
  31. Vector Utilities
  32. Data Formats (TOON/JSON/Columnar)
  33. Policy Service
  34. MCP (Model Context Protocol)
  35. Configuration Reference
  36. Error Handling
  37. Async Support
  38. Building & Development
  39. Complete Examples
  40. Migration Guide

1. Quick Start

Concurrent Embedded Mode

For web applications with multiple Node.js processes (PM2 cluster, multiple workers):

import { EmbeddedDatabase } from '@sochdb/sochdb';
import express from 'express';

// Open in concurrent mode - multiple processes can access simultaneously
const db = EmbeddedDatabase.openConcurrent('./web_db');

const app = express();

app.get('/user/:id', async (req, res) => {
  // Multiple concurrent requests can read simultaneously (~100ns)
  const data = await db.get(Buffer.from(`user:${req.params.id}`));
  if (!data) {
    res.status(404).json({ error: 'not found' });
    return;
  }
  res.send(data);
});

app.post('/user/:id', async (req, res) => {
  // Writes are automatically coordinated (~60µs amortized)
  await db.put(Buffer.from(`user:${req.params.id}`), req.body);
  res.json({ status: 'ok' });
});

// Check concurrent mode status
console.log(`Concurrent mode: ${db.isConcurrent}`);  // true

// Start with PM2 cluster mode (multiple workers can access DB)
// pm2 start app.js -i max
app.listen(3000);

Performance

| Operation | Standard Mode | Concurrent Mode | |-----------|---------------|-----------------| | Read (single process) | ~100ns | ~100ns | | Read (multi-process) | Blocked ❌ | ~100ns ✅ | | Write | ~5ms (fsync) | ~60µs (amortized) | | Max concurrent readers | 1 | 1024 |

PM2 Cluster Example

# Install PM2
npm install -g pm2

# Start with automatic worker scaling
pm2 start server.js -i max

# All workers can access the same database concurrently!
pm2 logs

PM2 Ecosystem File

// ecosystem.config.js
module.exports = {
  apps: [{
    name: 'api-server',
    script: './server.js',
    instances: 'max',  // Scale across all CPU cores
    exec_mode: 'cluster',
    env: {
      NODE_ENV: 'production',
      DB_PATH: './shared_db'  // All workers use same DB
    }
  }]
};
# Deploy with ecosystem file
pm2 start ecosystem.config.js

# Monitor all workers
pm2 monit

Docker Compose with PM2

version: '3.8'
services:
  app:
    build: .
    environment:
      - NODE_ENV=production
      - INSTANCES=4  # 4 PM2 workers
    volumes:
      - ./data:/app/data  # Shared database volume
    ports:
      - "3000:3000"
    command: pm2-runtime start ecosystem.config.js

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: sochdb-app
spec:
  replicas: 4  # 4 pods share the database
  template:
    spec:
      containers:
      - name: app
        image: myapp:latest
        volumeMounts:
        - name: db-storage
          mountPath: /app/data
      volumes:
      - name: db-storage
        persistentVolumeClaim:
          claimName: sochdb-pvc  # Shared PVC with ReadWriteMany

Features

Memory System - LLM-Native Memory for AI Agents

Complete memory system with extraction, consolidation, and hybrid retrieval:

import {
  EmbeddedDatabase,
  ExtractionPipeline,
  Consolidator,
  HybridRetriever,
  AllowedSet,
} from '@sochdb/sochdb';

const db = await EmbeddedDatabase.open('./memory_db');

// Extract entities and relations from text
const pipeline = ExtractionPipeline.fromDatabase(db, 'user_123', {
  entityTypes: ['person', 'organization', 'location'],
  minConfidence: 0.7,
});

const result = await pipeline.extractAndCommit(
  'Alice works at Acme Corp',
  myLLMExtractor  // Your LLM integration
);
console.log(`Extracted ${result.entities.length} entities`);

// Consolidate facts with event sourcing
const consolidator = Consolidator.fromDatabase(db, 'user_123');
await consolidator.add({
  fact: { subject: 'Alice', predicate: 'lives_in', object: 'SF' },
  source: 'conversation_1',
  confidence: 0.9,
});

const updated = await consolidator.consolidate();
const facts = await consolidator.getCanonicalFacts();

// Hybrid retrieval with RRF fusion
const retriever = HybridRetriever.fromDatabase(db, 'user_123', 'documents');
await retriever.indexDocuments(docs);

const results = await retriever.retrieve(
  'machine learning papers',
  queryEmbedding,
  AllowedSet.fromNamespace('user_123')
);

→ See Full Example

Key Features:

  • ✅ Extraction Pipeline: Compile LLM outputs into typed facts
  • ✅ Event-Sourced Consolidation: Append-only with temporal updates
  • ✅ Hybrid Retrieval: RRF fusion of vector + keyword search
  • ✅ Namespace Isolation: Multi-tenant security with pre-filtering
  • ✅ Schema Validation: Type checking and confidence thresholds

Semantic Cache - LLM Response Caching

Vector similarity-based caching for LLM responses to reduce costs and latency:

import { EmbeddedDatabase, SemanticCache } from '@sochdb/sochdb';

const db = await EmbeddedDatabase.open('./mydb');
const cache = new SemanticCache(db, 'llm_responses');

// Store LLM response with embedding
await cache.put(
  'What is machine learning?',
  'Machine learning is a subset of AI...',
  embedding,  // 384-dim vector
  3600,       // TTL in seconds
  { model: 'gpt-4', tokens: 150 }
);

// Check cache before calling LLM
const hit = await cache.get(queryEmbedding, 0.85);
if (hit) {
  console.log(`Cache HIT! Similarity: ${hit.score.toFixed(4)}`);
  console.log(`Response: ${hit.value}`);
}

// Get statistics
const stats = await cache.stats();
console.log(`Hit rate: ${(stats.hitRate * 100).toFixed(1)}%`);

→ See Full Example

Key Benefits:

  • ✅ Cosine similarity matching (0-1 threshold)
  • ✅ TTL-based expiration
  • ✅ Hit/miss statistics tracking
  • ✅ Memory usage monitoring
  • ✅ Automatic expired entry purging

Context Query Builder - Token-Aware LLM Context

Assemble LLM context with priority-based truncation and token budgeting:

import { ContextQueryBuilder, ContextOutputFormat, TruncationStrategy } from '@sochdb/sochdb';

const builder = new ContextQueryBuilder()
  .withBudget(4096)  // Token limit
  .setFormat(ContextOutputFormat.TOON)
  .setTruncation(TruncationStrategy.TAIL_DROP);

builder
  .literal('SYSTEM', 0, 'You are a helpful AI assistant.')
  .literal('USER_PROFILE', 1, 'User: Alice, Role: Engineer')
  .literal('HISTORY', 2, 'Recent conversation context...')
  .literal('KNOWLEDGE', 3, 'Retrieved documents...');

const result = builder.execute();
console.log(`Tokens: ${result.tokenCount}/${4096}`);
console.log(`Context:\n${result.text}`);

→ See Full Example

Key Benefits:

  • ✅ Priority-based section ordering (lower = higher priority)
  • ✅ Token budget enforcement
  • ✅ Multiple truncation strategies (tail drop, head drop, proportional)
  • ✅ Multiple output formats (TOON, JSON, Markdown)
  • ✅ Token count estimation

Namespace API - Multi-Tenant Isolation

First-class namespace handles for secure multi-tenancy and data isolation:

import { Database, Namespace, Collection, DistanceMetric } from '@sochdb/sochdb';

const db = await Database.open('./mydb');

// Create isolated namespace for each tenant
const namespace = new Namespace(db, 'tenant_acme', {
  name: 'tenant_acme',
  displayName: 'ACME Corporation',
  labels: { plan: 'enterprise', region: 'us-west' }
});

// Create vector collection
const collection = await namespace.createCollection({
  name: 'documents',
  dimension: 384,
  metric: DistanceMetric.Cosine,
  indexed: true
});

// Insert and search vectors
await collection.insert([1.0, 2.0, ...], { title: 'Doc 1' });
const results = await collection.search({ queryVector: [...], k: 10 });

→ See Full Example

Priority Queue API - Task Processing

Efficient priority queue with ordered-key storage (no O(N) blob rewrites):

import { Database, PriorityQueue } from '@sochdb/sochdb';

const db = await Database.open('./queue_db');
const queue = PriorityQueue.fromDatabase(db, 'tasks');

// Enqueue with priority (lower = higher urgency)
await queue.enqueue(1, Buffer.from('urgent task'), { type: 'payment' });

// Worker processes tasks
const task = await queue.dequeue('worker-1');
if (task) {
  // Process task...
  await queue.ack(task.taskId);
}

// Get statistics
const stats = await queue.stats();
console.log(`Pending: ${stats.pending}, Completed: ${stats.completed}`);

→ See Full Example

Key Benefits:

  • ✅ O(log N) enqueue/dequeue with ordered scans
  • ✅ Atomic claim protocol for concurrent workers
  • ✅ Visibility timeout for crash recovery
  • ✅ Dead letter queue for failed tasks
  • ✅ Multiple queues per database

Architecture: Flexible Deployment

┌─────────────────────────────────────────────────────────────┐
│                    DEPLOYMENT OPTIONS                        │
├─────────────────────────────────────────────────────────────┤
│                                                               │
│  1. EMBEDDED MODE (FFI)          2. SERVER MODE (gRPC)      │
│  ┌─────────────────────┐         ┌─────────────────────┐   │
│  │   Node.js App        │         │   Node.js App        │   │
│  │   ├─ Database.open()│         │   ├─ SochDBClient() │   │
│  │   └─ Direct FFI     │         │   └─ gRPC calls     │   │
│  │         │           │         │         │           │   │
│  │         ▼           │         │         ▼           │   │
│  │   libsochdb_storage │         │   sochdb-grpc       │   │
│  │   (Rust native)     │         │   (Rust server)     │   │
│  └─────────────────────┘         └─────────────────────┘   │
│                                                               │
│  ✅ No server needed               ✅ Multi-language          │
│  ✅ Local files                    ✅ Centralized logic      │
│  ✅ Simple deployment              ✅ Production scale       │
└─────────────────────────────────────────────────────────────┘

When to Use Each Mode

Embedded Mode (FFI):

  • ✅ Local development and testing
  • ✅ Jupyter notebooks and data science
  • ✅ Single-process applications
  • ✅ Edge deployments without network
  • ✅ No server setup required

Server Mode (gRPC):

  • ✅ Production deployments
  • ✅ Multi-language teams (Python, Node.js, Go)
  • ✅ Distributed systems
  • ✅ Centralized business logic
  • ✅ Horizontal scaling


System Requirements

For Concurrent Mode

  • SochDB Core: Latest version
  • Node.js: 14.0+ (18.0+ recommended)
  • Native Library: libsochdb_storage.{dylib,so}
  • FFI: Koffi (automatically installed)

Operating Systems:

  • ✅ Linux (Ubuntu 20.04+, RHEL 8+)
  • ✅ macOS (10.15+, both Intel and Apple Silicon)
  • ⚠️ Windows (requires native builds)

File Descriptors:

  • Default limit: 1024 (sufficient for most workloads)
  • For high concurrency with PM2: ulimit -n 4096

Memory:

  • Standard mode: ~50MB base + data
  • Concurrent mode: +4KB per concurrent reader slot (1024 slots = ~4MB overhead)
  • PM2 cluster: Each worker has independent memory

Troubleshooting

"Database is locked" Error (Standard Mode)

Error: SQLITE_BUSY: database is locked

Solution: Use concurrent mode for multi-process access:

// ❌ Standard mode - PM2 cluster will fail
const db = new EmbeddedDatabase('./data.db');

// ✅ Concurrent mode - PM2 cluster works!
const db = EmbeddedDatabase.openConcurrent('./data.db');

Library Not Found Error

Error: Dynamic library 'libsochdb_storage.dylib' not found

macOS:

# Build and install library
cd /path/to/sochdb
cargo build --release
sudo cp target/release/libsochdb_storage.dylib /usr/local/lib/

Linux:

cd /path/to/sochdb
cargo build --release
sudo cp target/release/libsochdb_storage.so /usr/local/lib/
sudo ldconfig

Development Mode (no install):

export DYLD_LIBRARY_PATH=/path/to/sochdb/target/release  # macOS
export LD_LIBRARY_PATH=/path/to/sochdb/target/release    # Linux

PM2 Cluster Issues

Symptom: Workers crash with "database locked"

Solution: Ensure concurrent mode is used:

// ecosystem.config.js
module.exports = {
  apps: [{
    name: 'api',
    script: './server.js',
    instances: 4,
    exec_mode: 'cluster',
    env: {
      USE_CONCURRENT_MODE: 'true'  // Flag to use openConcurrent()
    }
  }]
};
// server.ts
const db = process.env.USE_CONCURRENT_MODE 
  ? EmbeddedDatabase.openConcurrent('./db')
  : new EmbeddedDatabase('./db');

console.log('Concurrent mode:', db.isConcurrent);  // Should be true

Docker Volume Permissions

Symptom: EACCES: permission denied when opening database

Solution: Fix volume ownership:

FROM node:18
WORKDIR /app

# Create data directory with correct permissions
RUN mkdir -p /app/data && chown -R node:node /app

# Switch to non-root user
USER node

COPY --chown=node:node . .
RUN npm install

CMD ["npm", "start"]

Performance Issues

Symptom: Concurrent reads slower than expected

Check 1 - Verify concurrent mode:

if (!db.isConcurrent) {
    console.error('Database is not in concurrent mode!');
    process.exit(1);
}

Check 2 - Monitor PM2 workers:

pm2 monit  # Real-time monitoring
pm2 logs --lines 200  # Check for errors

Check 3 - Batch writes:

// ❌ Slow - individual writes
for (const item of items) {
    await collection.insert(item);
}

// ✅ Fast - batch write
await collection.insertBatch(items);

🆕 Vector Search - Native HNSW

SochDB now includes native HNSW (Hierarchical Navigable Small World) vector search for sub-millisecond similarity search across millions of vectors.

Quick Start - Vector Search

import { HnswIndex } from '@sochdb/sochdb';

// Create HNSW index
const index = new HnswIndex({
  dimension: 384,           // Vector dimension
  maxConnections: 16,       // M parameter (default: 16)
  efConstruction: 200,      // Build quality (default: 200)
  efSearch: 100             // Search quality (default: 100)
});

// Insert vectors (batch is 10-100× faster)
index.insertBatch(
  ['doc1', 'doc2', 'doc3'],
  [[1.0, 2.0, ...], [3.0, 4.0, ...], [5.0, 6.0, ...]]
);

// Search for similar vectors
const results = index.search(queryVector, 10);
console.log(results);
// [{ id: 'doc1', distance: 0.15 }, { id: 'doc3', distance: 0.23 }, ...]

// Clean up
index.close();

Performance Comparison

| Implementation | 10K vectors | 100K vectors | 1M vectors | |----------------|-------------|--------------|------------| | Linear Scan (old) | ~50ms | ~500ms | ~5000ms | | Native HNSW (new) | <0.5ms | <1ms | <1ms | | Speedup | 100× | 500× | 5000× |

Two Ways to Use Vector Search

1. Direct HNSW API (Recommended for Production)

Best performance, full control:

import { HnswIndex } from '@sochdb/sochdb';

const index = new HnswIndex({ dimension: 1536 });
index.insertBatch(ids, embeddings);
const results = index.search(queryEmbedding, 10);

✅ Use when:

  • You need maximum performance
  • Working with large datasets (>10K vectors)
  • Building RAG/AI applications
  • Have existing embedding pipeline

2. Collection API (Simple, High-Level)

Convenient API with metadata support:

import { Database } from '@sochdb/sochdb';

const db = await Database.open('./mydb');
const ns = await db.createNamespace({ name: 'docs' });

const collection = await ns.createCollection({
  name: 'embeddings',
  dimension: 384,
  indexed: true  // Note: Currently uses linear search in embedded mode
});

await collection.insert([1.0, 2.0, ...], { title: 'Document 1' }, 'doc1');
const results = await collection.search({ queryVector: [...], k: 10 });

⚠️ Current Limitation: Collection API uses O(n) linear search in embedded mode. For production use with >10K vectors, use:

  • Direct HNSW API (above), OR
  • gRPC Server Mode (see below)

3. gRPC Server Mode (Production-Ready)

For distributed systems, multi-language support:

import { SochDBClient } from '@sochdb/sochdb';

// Start server: sochdb-grpc --port 50051
const client = new SochDBClient({ address: 'localhost:50051' });

// Create HNSW index
await client.createIndex('docs', {
  dimension: 1536,
  config: { m: 16, ef_construction: 200 },
  metric: 'cosine'
});

// Insert and search
await client.insertBatch('docs', ids, vectors);
const results = await client.search('docs', queryVector, 10);

✅ Full HNSW support with:

  • Native Rust implementation
  • Persistence
  • Distributed queries
  • Multi-language clients

Migration from Linear Search

If you're using the Collection API with large datasets and experiencing slow search:

Before (slow):

// O(n) scan through all documents
const results = await collection.search({ queryVector, k: 10 });

After (fast) - Option 1: Use HnswIndex directly:

import { HnswIndex } from '@sochdb/sochdb';

const index = new HnswIndex({ dimension: 384 });
index.insertBatch(ids, vectors);
const results = index.search(queryVector, 10); // <1ms

After (fast) - Option 2: Use gRPC mode:

# Terminal 1: Start server
sochdb-grpc --port 50051

# Terminal 2: Use client
const client = new SochDBClient({ address: 'localhost:50051' });
await client.createIndex('docs', { dimension: 384 });
const results = await client.search('docs', queryVector, 10);

Complete Examples

API Reference

// HnswIndex Configuration
interface HnswConfig {
  dimension: number;           // Required: vector dimension
  maxConnections?: number;     // M parameter (default: 16)
  efConstruction?: number;     // Build quality (default: 200)
  efSearch?: number;           // Search quality (default: 100)
}

// Search Result
interface SearchResult {
  id: string;                  // Vector ID
  distance: number;            // Distance (lower = more similar)
}

// Main Methods
class HnswIndex {
  constructor(config: HnswConfig)
  insert(id: string, vector: number[]): void
  insertBatch(ids: string[], vectors: number[][]): void
  search(queryVector: number[], k: number, fast?: boolean): SearchResult[]
  searchUltra(queryVector: number[], k: number): SearchResult[]
  close(): void
  
  // Properties
  get length(): number         // Number of vectors
  get dimension(): number      // Vector dimension
  get efSearch(): number
  set efSearch(value: number)  // Adjust search quality
}

Roadmap

  • Current: Direct HNSW FFI bindings
  • Next: Collection API auto-uses HNSW in embedded mode
  • Future: Persistent HNSW indexes with disk storage

SochDB Node.js SDK Documentation

LLM-Optimized Embedded Database with Native Vector Search


Table of Contents

  1. Quick Start
  2. Installation
  3. Features
  4. Architecture Overview
  5. Core Key-Value Operations
  6. Transactions (ACID with SSI)
  7. Query Builder
  8. Prefix Scanning
  9. SQL Operations
  10. Table Management & Index Policies
  11. Namespaces & Collections
  12. Priority Queues
  13. Vector Search
  14. Hybrid Search (Vector + BM25)
  15. Graph Operations
  16. Temporal Graph (Time-Travel)
  17. Semantic Cache
  18. Context Query Builder (LLM Optimization)
  19. Atomic Multi-Index Writes
  20. Recovery & WAL Management
  21. Checkpoints & Snapshots
  22. Compression & Storage
  23. Statistics & Monitoring
  24. Server Mode (gRPC Client)
  25. IPC Client (Unix Sockets)
  26. Error Handling
  27. Complete Examples

1. Quick Start

from sochdb import Database

# Open (or create) a database
db = Database.open("./my_database")

# Store and retrieve data
db.put(b"hello", b"world")
value = db.get(b"hello")  # b"world"

# Use transactions for atomic operations
with db.transaction() as txn:
    txn.put(b"key1", b"value1")
    txn.put(b"key2", b"value2")
    # Auto-commits on success, auto-rollbacks on exception

# Clean up
db.delete(b"hello")
db.close()

30-Second Overview:

  • Key-Value: Fast reads/writes with get/put/delete
  • Transactions: ACID with SSI isolation
  • Vector Search: HNSW-based semantic search
  • Hybrid Search: Combine vectors with BM25 keyword search
  • Graph: Build and traverse knowledge graphs
  • LLM-Optimized: TOON format uses 40-60% fewer tokens than JSON

2. Installation

npm install @sochdb/sochdb

Platform Support: | Platform | Architecture | Status | |----------|--------------|--------| | Linux | x86_64, aarch64 | ✅ Full support | | macOS | x86_64, arm64 | ✅ Full support | | Windows | x86_64 | ✅ Full support |

Optional Dependencies:

# For async support
npm install @sochdb/sochdb[async]

# For server mode
npm install @sochdb/sochdb[grpc]

# Everything
npm install @sochdb/sochdb[all]

3. Architecture Overview

SochDB supports two deployment modes:

Embedded Mode (Default)

Direct Rust bindings via FFI. No server required.

from sochdb import Database

with Database.open("./mydb") as db:
    db.put(b"key", b"value")
    value = db.get(b"key")

Best for: Local development, notebooks, single-process applications.

Server Mode (gRPC)

Thin client connecting to sochdb-grpc server.

from sochdb import SochDBClient

client = SochDBClient("localhost:50051")
client.put(b"key", b"value", namespace="default")
value = client.get(b"key", namespace="default")

Best for: Production, multi-process, distributed systems.

Feature Comparison

| Feature | Embedded | Server | |---------|----------|--------| | Setup | npm install only | Server + client | | Performance | Fastest (in-process) | Network overhead | | Multi-process | ❌ | ✅ | | Horizontal scaling | ❌ | ✅ | | Vector search | ✅ | ✅ | | Graph operations | ✅ | ✅ | | Semantic cache | ✅ | ✅ | | Context service | Limited | ✅ Full | | MCP integration | ❌ | ✅ |

┌─────────────────────────────────────────────────────────────┐
│                    DEPLOYMENT OPTIONS                        │
├─────────────────────────────────────────────────────────────┤
│  EMBEDDED MODE (FFI)             SERVER MODE (gRPC)         │
│  ┌─────────────────────┐         ┌─────────────────────┐   │
│  │   Node.js App        │         │   Node.js App        │   │
│  │   ├─ Database.open()│         │   ├─ SochDBClient() │   │
│  │   └─ Direct FFI     │         │   └─ gRPC calls     │   │
│  │         │           │         │         │           │   │
│  │         ▼           │         │         ▼           │   │
│  │   libsochdb_storage │         │   sochdb-grpc       │   │
│  │   (Rust native)     │         │   (Rust server)     │   │
│  └─────────────────────┘         └─────────────────────┘   │
│                                                              │
│  ✅ No server needed             ✅ Multi-language          │
│  ✅ Local files                  ✅ Centralized logic       │
│  ✅ Simple deployment            ✅ Production scale        │
└─────────────────────────────────────────────────────────────┘

4. Core Key-Value Operations

All keys and values are bytes.

Basic Operations

from sochdb import Database

db = Database.open("./my_db")

# Store data
db.put(b"user:1", b"Alice")
db.put(b"user:2", b"Bob")

# Retrieve data
user = db.get(b"user:1")  # Returns b"Alice" or None

# Check existence
exists = db.exists(b"user:1")  # True

# Delete data
db.delete(b"user:1")

db.close()

Path-Based Keys (Hierarchical)

Organize data hierarchically with path-based access:

# Store with path (strings auto-converted to bytes internally)
db.put_path("users/alice/name", b"Alice Smith")
db.put_path("users/alice/email", b"[email protected]")
db.put_path("users/bob/name", b"Bob Jones")

# Retrieve by path
name = db.get_path("users/alice/name")  # b"Alice Smith"

# Delete by path
db.delete_path("users/alice/email")

# List at path (like listing directory)
children = db.list_path("users/")  # ["alice", "bob"]

With TTL (Time-To-Live)

# Store with expiration (seconds)
db.put(b"session:abc123", b"user_data", ttl_seconds=3600)  # Expires in 1 hour

# TTL of 0 means no expiration
db.put(b"permanent_key", b"value", ttl_seconds=0)

Batch Operations

# Batch put (more efficient than individual puts)
db.put_batch([
    (b"key1", b"value1"),
    (b"key2", b"value2"),
    (b"key3", b"value3"),
])

# Batch get
values = db.get_batch([b"key1", b"key2", b"key3"])
# Returns: [b"value1", b"value2", b"value3"] (None for missing keys)

# Batch delete
db.delete_batch([b"key1", b"key2", b"key3"])

Context Manager

with Database.open("./my_db") as db:
    db.put(b"key", b"value")
    # Automatically closes when exiting

5. Transactions (ACID with SSI)

SochDB provides full ACID transactions with Serializable Snapshot Isolation (SSI).

Context Manager Pattern (Recommended)

# Auto-commits on success, auto-rollbacks on exception
with db.transaction() as txn:
    txn.put(b"accounts/alice", b"1000")
    txn.put(b"accounts/bob", b"500")
    
    # Read within transaction sees your writes
    balance = txn.get(b"accounts/alice")  # b"1000"
    
    # If exception occurs, rolls back automatically

Closure Pattern (Rust-Style)

# Using with_transaction for automatic commit/rollback
def transfer_funds(txn):
    alice = int(txn.get(b"accounts/alice") or b"0")
    bob = int(txn.get(b"accounts/bob") or b"0")
    
    txn.put(b"accounts/alice", str(alice - 100).encode())
    txn.put(b"accounts/bob", str(bob + 100).encode())
    
    return "Transfer complete"

result = db.with_transaction(transfer_funds)

Manual Transaction Control

txn = db.begin_transaction()
try:
    txn.put(b"key1", b"value1")
    txn.put(b"key2", b"value2")
    
    commit_ts = txn.commit()  # Returns HLC timestamp
    print(f"Committed at: {commit_ts}")
except Exception as e:
    txn.abort()
    raise

Transaction Properties

txn = db.transaction()
print(f"Transaction ID: {txn.id}")      # Unique identifier
print(f"Start timestamp: {txn.start_ts}")  # HLC start time
print(f"Isolation: {txn.isolation}")    # "serializable"

SSI Conflict Handling

from sochdb import TransactionConflictError

MAX_RETRIES = 3

for attempt in range(MAX_RETRIES):
    try:
        with db.transaction() as txn:
            # Read and modify
            value = int(txn.get(b"counter") or b"0")
            txn.put(b"counter", str(value + 1).encode())
        break  # Success
    except TransactionConflictError:
        if attempt == MAX_RETRIES - 1:
            raise
        # Retry on conflict
        continue

All Transaction Operations

with db.transaction() as txn:
    # Key-value
    txn.put(key, value)
    txn.get(key)
    txn.delete(key)
    txn.exists(key)
    
    # Path-based
    txn.put_path(path, value)
    txn.get_path(path)
    
    # Batch operations
    txn.put_batch(pairs)
    txn.get_batch(keys)
    
    # Scanning
    for k, v in txn.scan_prefix(b"prefix/"):
        print(k, v)
    
    # SQL (within transaction isolation)
    result = txn.execute("SELECT * FROM users WHERE id = 1")

Isolation Levels

from sochdb import IsolationLevel

# Default: Serializable (strongest)
with db.transaction(isolation=IsolationLevel.SERIALIZABLE) as txn:
    pass

# Snapshot isolation (faster, allows some anomalies)
with db.transaction(isolation=IsolationLevel.SNAPSHOT) as txn:
    pass

# Read committed (fastest, least isolation)
with db.transaction(isolation=IsolationLevel.READ_COMMITTED) as txn:
    pass

6. Query Builder

Fluent API for building efficient queries with predicate pushdown.

Basic Query

# Query with prefix and limit
results = db.query("users/")
    .limit(10)
    .execute()

for key, value in results:
    print(f"{key.decode()}: {value.decode()}")

Filtered Query

from sochdb import CompareOp

# Query with filters
results = db.query("orders/")
    .where("status", CompareOp.EQ, "pending")
    .where("amount", CompareOp.GT, 100)
    .order_by("created_at", descending=True)
    .limit(50)
    .offset(10)
    .execute()

Column Selection

# Select specific fields only
results = db.query("users/")
    .select(["name", "email"])  # Only fetch these columns
    .where("active", CompareOp.EQ, True)
    .execute()

Aggregate Queries

# Count
count = db.query("orders/")
    .where("status", CompareOp.EQ, "completed")
    .count()

# Sum (for numeric columns)
total = db.query("orders/")
    .sum("amount")

# Group by
results = db.query("orders/")
    .select(["status", "COUNT(*)", "SUM(amount)"])
    .group_by("status")
    .execute()

Query in Transaction

with db.transaction() as txn:
    results = txn.query("users/")
        .where("role", CompareOp.EQ, "admin")
        .execute()

7. Prefix Scanning

Iterate over keys with common prefixes efficiently.

Safe Prefix Scan (Recommended)

# Requires minimum 2-byte prefix (prevents accidental full scans)
for key, value in db.scan_prefix(b"users/"):
    print(f"{key.decode()}: {value.decode()}")

# Raises ValueError if prefix < 2 bytes

Unchecked Prefix Scan

# For internal operations needing empty/short prefixes
# WARNING: Can cause expensive full-database scans
for key, value in db.scan_prefix_unchecked(b""):
    print(f"All keys: {key}")

Batched Scanning (1000x Faster)

# Fetches 1000 results per FFI call instead of 1
# Performance: 10,000 results = 10 FFI calls vs 10,000 calls

for key, value in db.scan_batched(b"prefix/", batch_size=1000):
    process(key, value)

Reverse Scan

# Scan in reverse order (newest first)
for key, value in db.scan_prefix(b"logs/", reverse=True):
    print(key, value)

Range Scan

# Scan within a specific range
for key, value in db.scan_range(b"users/a", b"users/m"):
    print(key, value)  # All users from "a" to "m"

Streaming Large Results

# For very large result sets, use streaming to avoid memory issues
for batch in db.scan_stream(b"logs/", batch_size=10000):
    for key, value in batch:
        process(key, value)
    # Memory is freed after processing each batch

8. SQL Operations

Execute SQL queries for familiar relational patterns.

Creating Tables

db.execute_sql("""
    CREATE TABLE users (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        email TEXT UNIQUE,
        age INTEGER,
        created_at TEXT DEFAULT CURRENT_TIMESTAMP
    )
""")

db.execute_sql("""
    CREATE TABLE posts (
        id INTEGER PRIMARY KEY,
        user_id INTEGER REFERENCES users(id),
        title TEXT NOT NULL,
        content TEXT,
        likes INTEGER DEFAULT 0
    )
""")

CRUD Operations

# Insert
db.execute_sql("""
    INSERT INTO users (id, name, email, age) 
    VALUES (1, 'Alice', '[email protected]', 30)
""")

# Insert with parameters (prevents SQL injection)
db.execute_sql(
    "INSERT INTO users (id, name, email, age) VALUES (?, ?, ?, ?)",
    params=[2, "Bob", "[email protected]", 25]
)

# Select
result = db.execute_sql("SELECT * FROM users WHERE age > 25")
for row in result.rows:
    print(row)  # {'id': 1, 'name': 'Alice', ...}

# Update
db.execute_sql("UPDATE users SET email = '[email protected]' WHERE id = 1")

# Delete
db.execute_sql("DELETE FROM users WHERE id = 2")

Upsert (Insert or Update)

# Insert or update on conflict
db.execute_sql("""
    INSERT INTO users (id, name, email) VALUES (1, 'Alice', '[email protected]')
    ON CONFLICT (id) DO UPDATE SET 
        name = excluded.name,
        email = excluded.email
""")

Query Results

from sochdb import SQLQueryResult

result = db.execute_sql("SELECT id, name FROM users")

print(f"Columns: {result.columns}")      # ['id', 'name']
print(f"Row count: {len(result.rows)}")
print(f"Execution time: {result.execution_time_ms}ms")

for row in result.rows:
    print(f"ID: {row['id']}, Name: {row['name']}")

# Convert to different formats
df = result.to_dataframe()  # pandas DataFrame
json_data = result.to_json()

Index Management

# Create index
db.execute_sql("CREATE INDEX idx_users_email ON users(email)")

# Create unique index
db.execute_sql("CREATE UNIQUE INDEX idx_users_email ON users(email)")

# Drop index
db.execute_sql("DROP INDEX IF EXISTS idx_users_email")

# List indexes
indexes = db.list_indexes("users")

Prepared Statements

# Prepare once, execute many times
stmt = db.prepare("SELECT * FROM users WHERE age > ? AND status = ?")

# Execute with different parameters
young_active = stmt.execute([25, "active"])
old_active = stmt.execute([50, "active"])

# Close when done
stmt.close()

Dialect Support

SochDB auto-detects SQL dialects:

# PostgreSQL style
db.execute_sql("INSERT INTO users VALUES (1, 'Alice') ON CONFLICT DO NOTHING")

# MySQL style
db.execute_sql("INSERT IGNORE INTO users VALUES (1, 'Alice')")

# SQLite style  
db.execute_sql("INSERT OR IGNORE INTO users VALUES (1, 'Alice')")

9. Table Management & Index Policies

Table Information

# Get table schema
schema = db.get_table_schema("users")
print(f"Columns: {schema.columns}")
print(f"Primary key: {schema.primary_key}")
print(f"Indexes: {schema.indexes}")

# List all tables
tables = db.list_tables()

# Drop table
db.execute_sql("DROP TABLE IF EXISTS old_table")

Index Policies

Configure per-table indexing strategies for optimal performance:

# Policy constants
Database.INDEX_WRITE_OPTIMIZED  # 0 - O(1) insert, O(N) scan
Database.INDEX_BALANCED         # 1 - O(1) amortized insert, O(log K) scan
Database.INDEX_SCAN_OPTIMIZED   # 2 - O(log N) insert, O(log N + K) scan
Database.INDEX_APPEND_ONLY      # 3 - O(1) insert, O(N) scan (time-series)

# Set by constant
db.set_table_index_policy("logs", Database.INDEX_APPEND_ONLY)

# Set by string
db.set_table_index_policy("users", "scan_optimized")

# Get current policy
policy = db.get_table_index_policy("users")
print(f"Policy: {policy}")  # "scan_optimized"

Policy Selection Guide

| Policy | Insert | Scan | Best For | |--------|--------|------|----------| | write_optimized | O(1) | O(N) | High-write ingestion | | balanced | O(1) amortized | O(log K) | General use (default) | | scan_optimized | O(log N) | O(log N + K) | Analytics, read-heavy | | append_only | O(1) | O(N) | Time-series, logs |


10. Namespaces & Multi-Tenancy

Organize data into logical namespaces for tenant isolation.

Creating Namespaces

from sochdb import NamespaceConfig

# Create namespace with metadata
ns = db.create_namespace(
    name="tenant_123",
    display_name="Acme Corp",
    labels={"tier": "premium", "region": "us-east"}
)

# Simple creation
ns = db.create_namespace("tenant_456")

Getting Namespaces

# Get existing namespace
ns = db.namespace("tenant_123")

# Get or create (idempotent)
ns = db.get_or_create_namespace("tenant_123")

# Check if exists
exists = db.namespace_exists("tenant_123")

Context Manager for Scoped Operations

with db.use_namespace("tenant_123") as ns:
    # All operations automatically scoped to tenant_123
    collection = ns.collection("documents")
    ns.put("config/key", b"value")
    
    # No need to specify namespace in each call

Namespace Operations

# List all namespaces
namespaces = db.list_namespaces()
print(namespaces)  # ['tenant_123', 'tenant_456']

# Get namespace info
info = db.namespace_info("tenant_123")
print(f"Created: {info['created_at']}")
print(f"Labels: {info['labels']}")
print(f"Size: {info['size_bytes']}")

# Update labels
db.update_namespace("tenant_123", labels={"tier": "enterprise"})

# Delete namespace (WARNING: deletes all data in namespace)
db.delete_namespace("old_tenant", force=True)

Namespace-Scoped Key-Value

ns = db.namespace("tenant_123")

# Operations automatically prefixed with namespace
ns.put("users/alice", b"data")      # Actually: tenant_123/users/alice
ns.get("users/alice")
ns.delete("users/alice")

# Scan within namespace
for key, value in ns.scan("users/"):
    print(key, value)  # Keys shown without namespace prefix

Cross-Namespace Operations

# Copy data between namespaces
db.copy_between_namespaces(
    source_ns="tenant_123",
    target_ns="tenant_456",
    prefix="shared/"
)

11. Collections & Vector Search

Collections store documents with embeddings for semantic search using HNSW.

Collection Configuration

from sochdb import (
    CollectionConfig,
    DistanceMetric,
    QuantizationType,
)

config = CollectionConfig(
    name="documents",
    dimension=384,                          # Embedding dimension (must match your model)
    metric=DistanceMetric.COSINE,           # COSINE, EUCLIDEAN, DOT_PRODUCT
    m=16,                                   # HNSW M parameter (connections per node)
    ef_construction=100,                    # HNSW construction quality
    ef_search=50,                           # HNSW search quality (higher = slower but better)
    quantization=QuantizationType.NONE,     # NONE, SCALAR (int8), PQ (product quantization)
    enable_hybrid_search=False,             # Enable BM25 + vector
    content_field=None,                     # Field for BM25 indexing
)

Creating Collections

ns = db.namespace("default")

# With config object
collection = ns.create_collection(config)

# With parameters (simpler)
collection = ns.create_collection(
    name="documents",
    dimension=384,
    metric=DistanceMetric.COSINE
)

# Get existing collection
collection = ns.collection("documents")

Inserting Documents

# Single insert
collection.insert(
    id="doc1",
    vector=[0.1, 0.2, ...],  # 384-dim float array
    metadata={"title": "Introduction", "author": "Alice", "category": "tech"}
)

# Batch insert (more efficient for bulk loading)
collection.insert_batch(
    ids=["doc1", "doc2", "doc3"],
    vectors=[[...], [...], [...]],  # List of vectors
    metadata=[
        {"title": "Doc 1"},
        {"title": "Doc 2"},
        {"title": "Doc 3"}
    ]
)

# Multi-vector insert (multiple vectors per document, e.g., chunks)
collection.insert_multi(
    id="long_doc",
    vectors=[[...], [...], [...]],  # Multiple vectors for same doc
    metadata={"title": "Long Document"}
)

Vector Search

from sochdb import SearchRequest

# Using SearchRequest (full control)
request = SearchRequest(
    vector=[0.15, 0.25, ...],       # Query vector
    k=10,                           # Number of results
    ef_search=100,                  # Search quality (overrides collection default)
    filter={"author": "Alice"},     # Metadata filter
    min_score=0.7,                  # Minimum similarity score
    include_vectors=False,          # Include vectors in results
    include_metadata=True,          # Include metadata in results
)
results = collection.search(request)

# Convenience method (simpler)
results = collection.vector_search(
    vector=[0.15, 0.25, ...],
    k=10,
    filter={"author": "Alice"}
)

# Process results
for result in results:
    print(f"ID: {result.id}")
    print(f"Score: {result.score:.4f}")  # Similarity score
    print(f"Metadata: {result.metadata}")

Metadata Filtering

# Equality
filter={"author": "Alice"}

# Comparison operators
filter={"age": {"$gt": 30}}              # Greater than
filter={"age": {"$gte": 30}}             # Greater than or equal
filter={"age": {"$lt": 30}}              # Less than
filter={"age": {"$lte": 30}}             # Less than or equal
filter={"author": {"$ne": "Alice"}}      # Not equal

# Array operators
filter={"category": {"$in": ["tech", "science"]}}    # In array
filter={"category": {"$nin": ["sports"]}}            # Not in array

# Logical operators
filter={"$and": [{"author": "Alice"}, {"year": 2024}]}
filter={"$or": [{"category": "tech"}, {"category": "science"}]}
filter={"$not": {"author": "Bob"}}

# Nested filters
filter={
    "$and": [
        {"$or": [{"category": "tech"}, {"category": "science"}]},
        {"year": {"$gte": 2020}}
    ]
}

Collection Management

# Get collection
collection = ns.get_collection("documents")
# or
collection = ns.collection("documents")

# List collections
collections = ns.list_collections()

# Collection info
info = collection.info()
print(f"Name: {info['name']}")
print(f"Dimension: {info['dimension']}")
print(f"Count: {info['count']}")
print(f"Metric: {info['metric']}")
print(f"Index size: {info['index_size_bytes']}")

# Delete collection
ns.delete_collection("old_collection")

# Individual document operations
doc = collection.get("doc1")
collection.delete("doc1")
collection.update("doc1", metadata={"category": "updated"})
count = collection.count()

Quantization for Memory Efficiency

# Scalar quantization (int8) - 4x memory reduction
config = CollectionConfig(
    name="documents",
    dimension=384,
    quantization=QuantizationType.SCALAR
)

# Product quantization - 32x memory reduction
config = CollectionConfig(
    name="documents",
    dimension=768,
    quantization=QuantizationType.PQ,
    pq_num_subvectors=96,   # 768/96 = 8 dimensions per subvector
    pq_num_centroids=256    # 8-bit codes
)

12. Hybrid Search (Vector + BM25)

Combine vector similarity with keyword matching for best results.

Enable Hybrid Search

config = CollectionConfig(
    name="articles",
    dimension=384,
    enable_hybrid_search=True,      # Enable BM25 indexing
    content_field="text"            # Field to index for BM25
)
collection = ns.create_collection(config)

# Insert with text content
collection.insert(
    id="article1",
    vector=[...],
    metadata={
        "title": "Machine Learning Tutorial",
        "text": "This tutorial covers the basics of machine learning...",
        "category": "tech"
    }
)

Keyword Search (BM25 Only)

results = collection.keyword_search(
    query="machine learning tutorial",
    k=10,
    filter={"category": "tech"}
)

Hybrid Search (Vector + BM25)

# Combine vector and keyword search
results = collection.hybrid_search(
    vector=[0.1, 0.2, ...],        # Query embedding
    text_query="machine learning",  # Keyword query
    k=10,
    alpha=0.7,  # 0.0 = pure keyword, 1.0 = pure vector, 0.5 = balanced
    filter={"category": "tech"}
)

Full SearchRequest for Hybrid

request = SearchRequest(
    vector=[0.1, 0.2, ...],
    text_query="machine learning",
    k=10,
    alpha=0.7,                      # Blend factor
    rrf_k=60.0,                     # RRF k parameter (Reciprocal Rank Fusion)
    filter={"category": "tech"},
    aggregate="max",                # max | mean | first (for multi-vector docs)
    as_of="2024-01-01T00:00:00Z",   # Time-travel query
    include_vectors=False,
    include_metadata=True,
    include_scores=True,
)
results = collection.search(request)

# Access detailed results
print(f"Query time: {results.query_time_ms}ms")
print(f"Total matches: {results.total_count}")
print(f"Vector results: {results.vector_results}")    # Results from vector search
print(f"Keyword results: {results.keyword_results}")  # Results from BM25
print(f"Fused results: {results.fused_results}")      # Combined results

13. Graph Operations

Build and query knowledge graphs.

Adding Nodes

# Add a node
db.add_node(
    namespace="default",
    node_id="alice",
    node_type="person",
    properties={"role": "engineer", "team": "ml", "level": "senior"}
)

db.add_node("default", "project_x", "project", {"status": "active", "priority": "high"})
db.add_node("default", "bob", "person", {"role": "manager", "team": "ml"})

Adding Edges

# Add directed edge
db.add_edge(
    namespace="default",
    from_id="alice",
    edge_type="works_on",
    to_id="project_x",
    properties={"role": "lead", "since": "2024-01"}
)

db.add_edge("default", "alice", "reports_to", "bob")
db.add_edge("default", "bob", "manages", "project_x")

Graph Traversal

# BFS traversal from a starting node
nodes, edges = db.traverse(
    namespace="default",
    start_node="alice",
    max_depth=3,
    order="bfs"  # "bfs" or "dfs"
)

for node in nodes:
    print(f"Node: {node['id']} ({node['node_type']})")
    print(f"  Properties: {node['properties']}")
    
for edge in edges:
    print(f"{edge['from_id']} --{edge['edge_type']}--> {edge['to_id']}")

Filtered Traversal

# Traverse with filters
nodes, edges = db.traverse(
    namespace="default",
    start_node="alice",
    max_depth=2,
    edge_types=["works_on", "reports_to"],  # Only follow these edge types
    node_types=["person", "project"],        # Only include these node types
    node_filter={"team": "ml"}               # Filter nodes by properties
)

Graph Queries

# Find shortest path
path = db.find_path(
    namespace="default",
    from_id="alice",
    to_id="project_y",
    max_depth=5
)

# Get neighbors
neighbors = db.get_neighbors(
    namespace="default",
    node_id="alice",
    direction="outgoing"  # "outgoing", "incoming", "both"
)

# Get specific edge
edge = db.get_edge("default", "alice", "works_on", "project_x")

# Delete node (and all connected edges)
db.delete_node("default", "old_node")

# Delete edge
db.delete_edge("default", "alice", "works_on", "project_old")

14. Temporal Graph (Time-Travel)

Track state changes over time with temporal edges.

Adding Temporal Edges

import time

now = int(time.time() * 1000)  # milliseconds since epoch
one_hour = 60 * 60 * 1000

# Record: Door was open from 10:00 to 11:00
db.add_temporal_edge(
    namespace="smart_home",
    from_id="door_front",
    edge_type="STATE",
    to_id="open",
    valid_from=now - one_hour,  # Start time (ms)
    valid_until=now,             # End time (ms)
    properties={"sensor": "motion_1", "confidence": 0.95}
)

# Record: Light is currently on (no end time yet)
db.add_temporal_edge(
    namespace="smart_home",
    from_id="light_living",
    edge_type="STATE",
    to_id="on",
    valid_from=now,
    valid_until=0,  # 0 = still valid (no end time)
    properties={"brightness": "80%", "color": "warm"}
)

Time-Travel Queries

# Query modes:
# - "CURRENT": Edges valid right now
# - "POINT_IN_TIME": Edges valid at specific timestamp
# - "RANGE": All edges within a time range

# What is the current state?
edges = db.query_temporal_graph(
    namespace="smart_home",
    node_id="door_front",
    mode="CURRENT",
    edge_type="STATE"
)
current_state = edges[0]["to_id"] if edges else "unknown"

# Was the door open 1.5 hours ago?
edges = db.query_temporal_graph(
    namespace="smart_home",
    node_id="door_front",
    mode="POINT_IN_TIME",
    timestamp=now - int(1.5 * 60 * 60 * 1000)
)
was_open = any(e["to_id"] == "open" for e in edges)

# All state changes in last hour
edges = db.query_temporal_graph(
    namespace="smart_home",
    node_id="door_front",
    mode="RANGE",
    start_time=now - one_hour,
    end_time=now
)
for edge in edges:
    print(f"State: {edge['to_id']} from {edge['valid_from']} to {edge['valid_until']}")

End a Temporal Edge

# Close the current "on" state
db.end_temporal_edge(
    namespace="smart_home",
    from_id="light_living",
    edge_type="STATE",
    to_id="on",
    end_time=int(time.time() * 1000)
)

15. Semantic Cache

Cache LLM responses with similarity-based retrieval for cost savings.

Storing Cached Responses

# Store response with embedding
db.cache_put(
    cache_name="llm_responses",
    key="What is Python?",           # Original query (for display/debugging)
    value="Python is a high-level programming language...",
    embedding=[0.1, 0.2, ...],       # Query embedding (384-dim)
    ttl_seconds=3600,                # Expire in 1 hour (0 = no expiry)
    metadata={"model": "claude-3", "tokens": 150}
)

Cache Lookup

# Check cache before calling LLM
cached = db.cache_get(
    cache_name="llm_responses",
    query_embedding=[0.12, 0.18, ...],  # Embed the new query
    threshold=0.85                       # Cosine similarity threshold
)

if cached:
    print(f"Cache HIT!")
    print(f"Original query: {cached['key']}")
    print(f"Response: {cached['value']}")
    print(f"Similarity: {cached['score']:.4f}")
else:
    print("Cache MISS - calling LLM...")
    # Call LLM and cache the result

Cache Management

# Delete specific entry
db.cache_delete("llm_responses", key="What is Python?")

# Clear entire cache
db.cache_clear("llm_responses")

# Get cache statistics
stats = db.cache_stats("llm_responses")
print(f"Total entries: {stats['count']}")
print(f"Hit rate: {stats['hit_rate']:.2%}")
print(f"Memory usage: {stats['size_bytes']}")

Full Usage Pattern

def get_llm_response(query: str, embed_fn, llm_fn):
    """Get response from cache or LLM."""
    query_embedding = embed_fn(query)
    
    # Try cache first
    cached = db.cache_get(
        cache_name="llm_responses",
        query_embedding=query_embedding,
        threshold=0.90
    )
    
    if cached:
        return cached['value']
    
    # Cache miss - call LLM
    response = llm_fn(query)
    
    # Store in cache
    db.cache_put(
        cache_name="llm_responses",
        key=query,
        value=response,
        embedding=query_embedding,
        ttl_seconds=86400  # 24 hours
    )
    
    return response

16. Context Query Builder (LLM Optimization)

Assemble LLM context with token budgeting and priority-based truncation.

Basic Context Query

from sochdb import ContextQueryBuilder, ContextFormat, TruncationStrategy

# Build context for LLM
context = ContextQueryBuilder() \
    .for_session("session_123") \
    .with_budget(4096) \
    .format(ContextFormat.TOON) \
    .literal("SYSTEM", priority=0, text="You are a helpful assistant.") \
    .section("USER_PROFILE", priority=1) \
        .get("user.profile.{name, preferences}") \
        .done() \
    .section("HISTORY", priority=2) \
        .last(10, "messages") \
        .where_eq("session_id", "session_123") \
        .done() \
    .section("KNOWLEDGE", priority=3) \
        .search("documents", "$query_embedding", k=5) \
        .done() \
    .execute()

print(f"Token count: {context.token_count}")
print(f"Context:\n{context.text}")

Section Types

| Type | Method | Description | |------|--------|-------------| | literal | .literal(name, priority, text) | Static text content | | get | .get(path) | Fetch specific data by path | | last | .last(n, table) | Most recent N records from table | | search | .search(collection, embedding, k) | Vector similarity search | | sql | .sql(query) | SQL query results |

Truncation Strategies

# Drop from end (keep beginning) - default
.truncation(TruncationStrategy.TAIL_DROP)

# Drop from beginning (keep end)
.truncation(TruncationStrategy.HEAD_DROP)

# Proportionally truncate across sections
.truncation(TruncationStrategy.PROPORTIONAL)

# Fail if budget exceeded
.truncation(TruncationStrategy.STRICT)

Variables and Bindings

from sochdb import ContextValue

context = ContextQueryBuilder() \
    .for_session("session_123") \
    .set_var("query_embedding", ContextValue.Embedding([0.1, 0.2, ...])) \
    .set_var("user_id", ContextValue.String("user_456")) \
    .section("KNOWLEDGE", priority=2) \
        .search("documents", "$query_embedding", k=5) \
        .done() \
    .execute()

Output Formats

# TOON format (40-60% fewer tokens)
.format(ContextFormat.TOON)

# JSON format
.format(ContextFormat.JSON)

# Markdown format (human-readable)
.format(ContextFormat.MARKDOWN)

# Plain text
.format(ContextFormat.TEXT)

Session Management (Agent Context)

Stateful session management for agentic use cases with permissions, sandboxing, audit logging, and budget tracking.

Session Overview

Agent session abc123:
  cwd: /agents/abc123
  vars: $model = "gpt-4", $budget = 1000
  permissions: fs:rw, db:rw, calc:*
  audit: [read /data/users, write /agents/abc123/cache]

Creating Sessions

from sochdb import SessionManager, AgentContext
from datetime import timedelta

# Create session manager with idle timeout
session_mgr = SessionManager(idle_timeout=timedelta(hours=1))

# Create a new session
session = session_mgr.create_session("session_abc123")

# Get existing session
session = session_mgr.get_session("session_abc123")

# Get or create (idempotent)
session = session_mgr.get_or_create("session_abc123")

# Remove session
session_mgr.remove_session("session_abc123")

# Cleanup expired sessions
removed_count = session_mgr.cleanup_expired()

# Get active session count
count = session_mgr.session_count()

Agent Context

from sochdb import AgentContext, ContextValue

# Create agent context
ctx = AgentContext("session_abc123")
print(f"Session ID: {ctx.session_id}")
print(f"Working dir: {ctx.working_dir}")  # /agents/session_abc123

# Create with custom working directory
ctx = AgentContext.with_working_dir("session_abc123", "/custom/path")

# Create with full permissions (trusted agents)
ctx = AgentContext.with_full_permissions("session_abc123")

Session Variables

# Set variables
ctx.set_var("model", ContextValue.String("gpt-4"))
ctx.set_var("budget", ContextValue.Number(1000.0))
ctx.set_var("debug", ContextValue.Bool(True))
ctx.set_var("tags", ContextValue.List([
    ContextValue.String("ml"),
    ContextValue.String("production")
]))

# Get variables
model = ctx.get_var("model")  # Returns ContextValue or None
budget = ctx.get_var("budget")

# Peek (read-only, no audit)
value = ctx.peek_var("model")

# Variable substitution in strings
text = ctx.substitute_vars("Using $model with budget $budget")
# Result: "Using gpt-4 with budget 1000"

Context Value Types

from sochdb import ContextValue

# String
ContextValue.String("hello")

# Number (float)
ContextValue.Number(42.5)

# Boolean
ContextValue.Bool(True)

# List
ContextValue.List([
    ContextValue.String("a"),
    ContextValue.Number(1.0)
])

# Object (dict)
ContextValue.Object({
    "key": ContextValue.String("value"),
    "count": ContextValue.Number(10.0)
})

# Null
ContextValue.Null()

Permissions

from sochdb import (
    AgentPermissions,
    FsPermissions,
    DbPermissions,
    NetworkPermissions
)

# Configure permissions
ctx.permissions = AgentPermissions(
    filesystem=FsPermissions(
        read=True,
        write=True,
        mkdir=True,
        delete=False,
        allowed_paths=["/agents/session_abc123", "/shared/data"]
    ),
    database=DbPermissions(
        read=True,
        write=True,
        create=False,
        drop=False,
        allowed_tables=["user_*", "cache_*"]  # Pattern matching
    ),
    calculator=True,
    network=NetworkPermissions(
        http=True,
        allowed_domains=["api.example.com", "*.internal.net"]
    )
)

# Check permissions before operations
try:
    ctx.check_fs_permission("/agents/session_abc123/data.json", AuditOperation.FS_READ)
    # Permission granted
except ContextError as e:
    print(f"Permission denied: {e}")

try:
    ctx.check_db_permission("user_profiles", AuditOperation.DB_QUERY)
    # Permission granted
except ContextError as e:
    print(f"Permission denied: {e}")

Budget Tracking

from sochdb import OperationBudget

# Configure budget limits
ctx.budget = OperationBudget(
    max_tokens=100000,        # Maximum tokens (input + output)
    max_cost=5000,            # Maximum cost in millicents ($50.00)
    max_operations=10000      # Maximum operation count
)

# Consume budget (called automatically by operations)
try:
    ctx.consume_budget(tokens=500, cost=10)  # 500 tokens, $0.10
except ContextError as e:
    if "Budget exceeded" in str(e):
        print("Budget limit reached!")

# Check budget status
print(f"Tokens used: {ctx.budget.tokens_used}/{ctx.budget.max_tokens}")
print(f"Cost used: ${ctx.budget.cost_used / 100:.2f}/${ctx.budget.max_cost / 100:.2f}")
print(f"Operations: {ctx.budget.operations_used}/{ctx.budget.max_operations}")

Session Transactions

# Begin transaction within session
ctx.begin_transaction(tx_id=12345)

# Create savepoint
ctx.savepoint("before_update")

# Record pending writes (for rollback)
ctx.record_pending_write(
    resource_type=ResourceType.FILE,
    resource_key="/agents/session_abc123/data.json",
    original_value=b'{"old": "data"}'
)

# Commit transaction
ctx.commit_transaction()

# Or rollback
pending_writes = ctx.rollback_transaction()
for write in pending_writes:
    print(f"Rolling back: {write.resource_key}")
    # Restore original_value

Path Resolution

# Paths are resolved relative to working directory
ctx = AgentContext.with_working_dir("session_abc123", "/home/agent")

# Relative paths
resolved = ctx.resolve_path("data.json")  # /home/agent/data.json

# Absolute paths pass through
resolved = ctx.resolve_path("/absolute/path")  # /absolute/path

Audit Trail

# All operations are automatically logged
# Audit entry includes: timestamp, operation, resource, result, metadata

# Export audit log
audit_log = ctx.export_audit()
for entry in audit_log:
    print(f"[{entry['timestamp']}] {entry['operation']}: {entry['resource']} -> {entry['result']}")

# Example output:
# [1705312345] var.set: model -> success
# [1705312346] fs.read: /data/config.json -> success
# [1705312347] db.query: users -> success
# [1705312348] fs.write: /forbidden/file -> denied:path not in allowed paths

Audit Operations

from sochdb import AuditOperation

# Filesystem operations
AuditOperation.FS_READ
AuditOperation.FS_WRITE
AuditOperation.FS_MKDIR
AuditOperation.FS_DELETE
AuditOperation.FS_LIST

# Database operations
AuditOperation.DB_QUERY
AuditOperation.DB_INSERT
AuditOperation.DB_UPDATE
AuditOperation.DB_DELETE

# Other operations
AuditOperation.CALCULATE
AuditOperation.VAR_SET
AuditOperation.VAR_GET
AuditOperation.TX_BEGIN
AuditOperation.TX_COMMIT
AuditOperation.TX_ROLLBACK

Tool Registry

from sochdb import ToolDefinition, ToolCallRecord
from datetime import datetime

# Register tools available to the agent
ctx.register_tool(ToolDefinition(
    name="search_documents",
    description="Search documents by semantic similarity",
    parameters_schema='{"type": "object", "properties": {"query": {"type": "string"}}}',
    requires_confirmation=False
))

ctx.register_tool(ToolDefinition(
    name="delete_file",
    description="Delete a file from the filesystem",
    parameters_schema='{"type": "object", "properties": {"path": {"type": "string"}}}',
    requires_confirmation=True  # Requires user confirmation
))

# Record tool calls
ctx.record_tool_call(ToolCallRecord(
    call_id="call_001",
    tool_name="search_documents",
    arguments='{"query": "machine learning"}',
    result='[{"id": "doc1", "score": 0.95}]',
    error=None,
    timestamp=datetime.now()
))

# Access tool call history
for call in ctx.tool_calls:
    print(f"{call.tool_name}: {call.result or call.error}")

Session Lifecycle

# Check session age
age = ctx.age()
print(f"Session age: {age}")

# Check idle time
idle = ctx.idle_time()
print(f"Idle time: {idle}")

# Check if expired
if ctx.is_expired(idle_timeout=timedelta(hours=1)):
    print("Session has expired!")

Complete Session Example

from sochdb import (
    SessionManager, AgentContext, ContextValue,
    AgentPermissions, FsPermissions, DbPermissions,
    OperationBudget, ToolDefinition, AuditOperation
)
from datetime import timedelta

# Initialize session manager
session_mgr = SessionManager(idle_timeout=timedelta(hours=2))

# Create session for an agent
session_id = "agent_session_12345"
ctx = session_mgr.get_or_create(session_id)

# Configure the agent
ctx.permissions = AgentPermissions(
    filesystem=FsPermissions(
        read=True,
        write=True,
        allowed_paths=[f"/agents/{session_id}", "/shared"]
    ),
    database=DbPermissions(
        read=True,
        write=True,
        allowed_tables=["documents", "cache_*"]
    ),
    calculator=True
)

ctx.budget = OperationBudget(
    max_tokens=50000,
    max_cost=1000,  # $10.00
    max_operations=1000
)

# Set initial variables
ctx.set_var("model", ContextValue.String("claude-3-sonnet"))
ctx.set_var("temperature", ContextValue.Number(0.7))

# Register available tools
ctx.register_tool(ToolDefinition(
    name="vector_search",
    description="Search vectors by similarity",
    parameters_schema='{"type": "object", "properties": {"query": {"type": "string"}, "k": {"type": "integer"}}}',
    requires_confirmation=False
))

# Perform operations with permission checks
def safe