@sochdb/sochdb
v0.4.3
Published
SochDB Node.js SDK - AI-native embedded database with concurrent multi-process support
Maintainers
Readme
SochDB Node.js SDK
LLM-Optimized Embedded Database with Native Vector Search
Installation
npm install @sochdb/sochdbOr from source:
cd sochdb-typescript-sdk
npm installArchitecture: Flexible Deployment
Tri-mode architecture: Embedded + Concurrent + Server (gRPC/IPC)
Choose the deployment mode that fits your needs.
SochDB Node.js SDK Documentation
LLM-Optimized Embedded Database with Native Vector Search
Table of Contents
- Quick Start
- Installation
- Features
- Architecture Overview
- Core Key-Value Operations
- Transactions (ACID with SSI)
- Query Builder
- Prefix Scanning
- SQL Operations
- Table Management & Index Policies
- Namespaces & Collections
- Priority Queues
- Vector Search
- Hybrid Search (Vector + BM25)
- Graph Operations
- Temporal Graph (Time-Travel)
- Semantic Cache
- Memory System
- Session Management
- Context Query Builder (LLM Optimization)
- Atomic Multi-Index Writes
- Recovery & WAL Management
- Checkpoints & Snapshots
- Compression & Storage
- Statistics & Monitoring
- Distributed Tracing
- Workflow & Run Tracking
- Server Mode (gRPC Client)
- IPC Client (Unix Sockets)
- Standalone VectorIndex
- Vector Utilities
- Data Formats (TOON/JSON/Columnar)
- Policy Service
- MCP (Model Context Protocol)
- Configuration Reference
- Error Handling
- Async Support
- Building & Development
- Complete Examples
- Migration Guide
1. Quick Start
Concurrent Embedded Mode
For web applications with multiple Node.js processes (PM2 cluster, multiple workers):
import { EmbeddedDatabase } from '@sochdb/sochdb';
import express from 'express';
// Open in concurrent mode - multiple processes can access simultaneously
const db = EmbeddedDatabase.openConcurrent('./web_db');
const app = express();
app.get('/user/:id', async (req, res) => {
// Multiple concurrent requests can read simultaneously (~100ns)
const data = await db.get(Buffer.from(`user:${req.params.id}`));
if (!data) {
res.status(404).json({ error: 'not found' });
return;
}
res.send(data);
});
app.post('/user/:id', async (req, res) => {
// Writes are automatically coordinated (~60µs amortized)
await db.put(Buffer.from(`user:${req.params.id}`), req.body);
res.json({ status: 'ok' });
});
// Check concurrent mode status
console.log(`Concurrent mode: ${db.isConcurrent}`); // true
// Start with PM2 cluster mode (multiple workers can access DB)
// pm2 start app.js -i max
app.listen(3000);Performance
| Operation | Standard Mode | Concurrent Mode | |-----------|---------------|-----------------| | Read (single process) | ~100ns | ~100ns | | Read (multi-process) | Blocked ❌ | ~100ns ✅ | | Write | ~5ms (fsync) | ~60µs (amortized) | | Max concurrent readers | 1 | 1024 |
PM2 Cluster Example
# Install PM2
npm install -g pm2
# Start with automatic worker scaling
pm2 start server.js -i max
# All workers can access the same database concurrently!
pm2 logsPM2 Ecosystem File
// ecosystem.config.js
module.exports = {
apps: [{
name: 'api-server',
script: './server.js',
instances: 'max', // Scale across all CPU cores
exec_mode: 'cluster',
env: {
NODE_ENV: 'production',
DB_PATH: './shared_db' // All workers use same DB
}
}]
};# Deploy with ecosystem file
pm2 start ecosystem.config.js
# Monitor all workers
pm2 monitDocker Compose with PM2
version: '3.8'
services:
app:
build: .
environment:
- NODE_ENV=production
- INSTANCES=4 # 4 PM2 workers
volumes:
- ./data:/app/data # Shared database volume
ports:
- "3000:3000"
command: pm2-runtime start ecosystem.config.jsKubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: sochdb-app
spec:
replicas: 4 # 4 pods share the database
template:
spec:
containers:
- name: app
image: myapp:latest
volumeMounts:
- name: db-storage
mountPath: /app/data
volumes:
- name: db-storage
persistentVolumeClaim:
claimName: sochdb-pvc # Shared PVC with ReadWriteManyFeatures
Memory System - LLM-Native Memory for AI Agents
Complete memory system with extraction, consolidation, and hybrid retrieval:
import {
EmbeddedDatabase,
ExtractionPipeline,
Consolidator,
HybridRetriever,
AllowedSet,
} from '@sochdb/sochdb';
const db = await EmbeddedDatabase.open('./memory_db');
// Extract entities and relations from text
const pipeline = ExtractionPipeline.fromDatabase(db, 'user_123', {
entityTypes: ['person', 'organization', 'location'],
minConfidence: 0.7,
});
const result = await pipeline.extractAndCommit(
'Alice works at Acme Corp',
myLLMExtractor // Your LLM integration
);
console.log(`Extracted ${result.entities.length} entities`);
// Consolidate facts with event sourcing
const consolidator = Consolidator.fromDatabase(db, 'user_123');
await consolidator.add({
fact: { subject: 'Alice', predicate: 'lives_in', object: 'SF' },
source: 'conversation_1',
confidence: 0.9,
});
const updated = await consolidator.consolidate();
const facts = await consolidator.getCanonicalFacts();
// Hybrid retrieval with RRF fusion
const retriever = HybridRetriever.fromDatabase(db, 'user_123', 'documents');
await retriever.indexDocuments(docs);
const results = await retriever.retrieve(
'machine learning papers',
queryEmbedding,
AllowedSet.fromNamespace('user_123')
);Key Features:
- ✅ Extraction Pipeline: Compile LLM outputs into typed facts
- ✅ Event-Sourced Consolidation: Append-only with temporal updates
- ✅ Hybrid Retrieval: RRF fusion of vector + keyword search
- ✅ Namespace Isolation: Multi-tenant security with pre-filtering
- ✅ Schema Validation: Type checking and confidence thresholds
Semantic Cache - LLM Response Caching
Vector similarity-based caching for LLM responses to reduce costs and latency:
import { EmbeddedDatabase, SemanticCache } from '@sochdb/sochdb';
const db = await EmbeddedDatabase.open('./mydb');
const cache = new SemanticCache(db, 'llm_responses');
// Store LLM response with embedding
await cache.put(
'What is machine learning?',
'Machine learning is a subset of AI...',
embedding, // 384-dim vector
3600, // TTL in seconds
{ model: 'gpt-4', tokens: 150 }
);
// Check cache before calling LLM
const hit = await cache.get(queryEmbedding, 0.85);
if (hit) {
console.log(`Cache HIT! Similarity: ${hit.score.toFixed(4)}`);
console.log(`Response: ${hit.value}`);
}
// Get statistics
const stats = await cache.stats();
console.log(`Hit rate: ${(stats.hitRate * 100).toFixed(1)}%`);Key Benefits:
- ✅ Cosine similarity matching (0-1 threshold)
- ✅ TTL-based expiration
- ✅ Hit/miss statistics tracking
- ✅ Memory usage monitoring
- ✅ Automatic expired entry purging
Context Query Builder - Token-Aware LLM Context
Assemble LLM context with priority-based truncation and token budgeting:
import { ContextQueryBuilder, ContextOutputFormat, TruncationStrategy } from '@sochdb/sochdb';
const builder = new ContextQueryBuilder()
.withBudget(4096) // Token limit
.setFormat(ContextOutputFormat.TOON)
.setTruncation(TruncationStrategy.TAIL_DROP);
builder
.literal('SYSTEM', 0, 'You are a helpful AI assistant.')
.literal('USER_PROFILE', 1, 'User: Alice, Role: Engineer')
.literal('HISTORY', 2, 'Recent conversation context...')
.literal('KNOWLEDGE', 3, 'Retrieved documents...');
const result = builder.execute();
console.log(`Tokens: ${result.tokenCount}/${4096}`);
console.log(`Context:\n${result.text}`);Key Benefits:
- ✅ Priority-based section ordering (lower = higher priority)
- ✅ Token budget enforcement
- ✅ Multiple truncation strategies (tail drop, head drop, proportional)
- ✅ Multiple output formats (TOON, JSON, Markdown)
- ✅ Token count estimation
Namespace API - Multi-Tenant Isolation
First-class namespace handles for secure multi-tenancy and data isolation:
import { Database, Namespace, Collection, DistanceMetric } from '@sochdb/sochdb';
const db = await Database.open('./mydb');
// Create isolated namespace for each tenant
const namespace = new Namespace(db, 'tenant_acme', {
name: 'tenant_acme',
displayName: 'ACME Corporation',
labels: { plan: 'enterprise', region: 'us-west' }
});
// Create vector collection
const collection = await namespace.createCollection({
name: 'documents',
dimension: 384,
metric: DistanceMetric.Cosine,
indexed: true
});
// Insert and search vectors
await collection.insert([1.0, 2.0, ...], { title: 'Doc 1' });
const results = await collection.search({ queryVector: [...], k: 10 });Priority Queue API - Task Processing
Efficient priority queue with ordered-key storage (no O(N) blob rewrites):
import { Database, PriorityQueue } from '@sochdb/sochdb';
const db = await Database.open('./queue_db');
const queue = PriorityQueue.fromDatabase(db, 'tasks');
// Enqueue with priority (lower = higher urgency)
await queue.enqueue(1, Buffer.from('urgent task'), { type: 'payment' });
// Worker processes tasks
const task = await queue.dequeue('worker-1');
if (task) {
// Process task...
await queue.ack(task.taskId);
}
// Get statistics
const stats = await queue.stats();
console.log(`Pending: ${stats.pending}, Completed: ${stats.completed}`);Key Benefits:
- ✅ O(log N) enqueue/dequeue with ordered scans
- ✅ Atomic claim protocol for concurrent workers
- ✅ Visibility timeout for crash recovery
- ✅ Dead letter queue for failed tasks
- ✅ Multiple queues per database
Architecture: Flexible Deployment
┌─────────────────────────────────────────────────────────────┐
│ DEPLOYMENT OPTIONS │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. EMBEDDED MODE (FFI) 2. SERVER MODE (gRPC) │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Node.js App │ │ Node.js App │ │
│ │ ├─ Database.open()│ │ ├─ SochDBClient() │ │
│ │ └─ Direct FFI │ │ └─ gRPC calls │ │
│ │ │ │ │ │ │ │
│ │ ▼ │ │ ▼ │ │
│ │ libsochdb_storage │ │ sochdb-grpc │ │
│ │ (Rust native) │ │ (Rust server) │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
│ ✅ No server needed ✅ Multi-language │
│ ✅ Local files ✅ Centralized logic │
│ ✅ Simple deployment ✅ Production scale │
└─────────────────────────────────────────────────────────────┘When to Use Each Mode
Embedded Mode (FFI):
- ✅ Local development and testing
- ✅ Jupyter notebooks and data science
- ✅ Single-process applications
- ✅ Edge deployments without network
- ✅ No server setup required
Server Mode (gRPC):
- ✅ Production deployments
- ✅ Multi-language teams (Python, Node.js, Go)
- ✅ Distributed systems
- ✅ Centralized business logic
- ✅ Horizontal scaling
System Requirements
For Concurrent Mode
- SochDB Core: Latest version
- Node.js: 14.0+ (18.0+ recommended)
- Native Library:
libsochdb_storage.{dylib,so} - FFI: Koffi (automatically installed)
Operating Systems:
- ✅ Linux (Ubuntu 20.04+, RHEL 8+)
- ✅ macOS (10.15+, both Intel and Apple Silicon)
- ⚠️ Windows (requires native builds)
File Descriptors:
- Default limit: 1024 (sufficient for most workloads)
- For high concurrency with PM2:
ulimit -n 4096
Memory:
- Standard mode: ~50MB base + data
- Concurrent mode: +4KB per concurrent reader slot (1024 slots = ~4MB overhead)
- PM2 cluster: Each worker has independent memory
Troubleshooting
"Database is locked" Error (Standard Mode)
Error: SQLITE_BUSY: database is lockedSolution: Use concurrent mode for multi-process access:
// ❌ Standard mode - PM2 cluster will fail
const db = new EmbeddedDatabase('./data.db');
// ✅ Concurrent mode - PM2 cluster works!
const db = EmbeddedDatabase.openConcurrent('./data.db');Library Not Found Error
Error: Dynamic library 'libsochdb_storage.dylib' not foundmacOS:
# Build and install library
cd /path/to/sochdb
cargo build --release
sudo cp target/release/libsochdb_storage.dylib /usr/local/lib/Linux:
cd /path/to/sochdb
cargo build --release
sudo cp target/release/libsochdb_storage.so /usr/local/lib/
sudo ldconfigDevelopment Mode (no install):
export DYLD_LIBRARY_PATH=/path/to/sochdb/target/release # macOS
export LD_LIBRARY_PATH=/path/to/sochdb/target/release # LinuxPM2 Cluster Issues
Symptom: Workers crash with "database locked"
Solution: Ensure concurrent mode is used:
// ecosystem.config.js
module.exports = {
apps: [{
name: 'api',
script: './server.js',
instances: 4,
exec_mode: 'cluster',
env: {
USE_CONCURRENT_MODE: 'true' // Flag to use openConcurrent()
}
}]
};// server.ts
const db = process.env.USE_CONCURRENT_MODE
? EmbeddedDatabase.openConcurrent('./db')
: new EmbeddedDatabase('./db');
console.log('Concurrent mode:', db.isConcurrent); // Should be trueDocker Volume Permissions
Symptom: EACCES: permission denied when opening database
Solution: Fix volume ownership:
FROM node:18
WORKDIR /app
# Create data directory with correct permissions
RUN mkdir -p /app/data && chown -R node:node /app
# Switch to non-root user
USER node
COPY --chown=node:node . .
RUN npm install
CMD ["npm", "start"]Performance Issues
Symptom: Concurrent reads slower than expected
Check 1 - Verify concurrent mode:
if (!db.isConcurrent) {
console.error('Database is not in concurrent mode!');
process.exit(1);
}Check 2 - Monitor PM2 workers:
pm2 monit # Real-time monitoring
pm2 logs --lines 200 # Check for errorsCheck 3 - Batch writes:
// ❌ Slow - individual writes
for (const item of items) {
await collection.insert(item);
}
// ✅ Fast - batch write
await collection.insertBatch(items);🆕 Vector Search - Native HNSW
SochDB now includes native HNSW (Hierarchical Navigable Small World) vector search for sub-millisecond similarity search across millions of vectors.
Quick Start - Vector Search
import { HnswIndex } from '@sochdb/sochdb';
// Create HNSW index
const index = new HnswIndex({
dimension: 384, // Vector dimension
maxConnections: 16, // M parameter (default: 16)
efConstruction: 200, // Build quality (default: 200)
efSearch: 100 // Search quality (default: 100)
});
// Insert vectors (batch is 10-100× faster)
index.insertBatch(
['doc1', 'doc2', 'doc3'],
[[1.0, 2.0, ...], [3.0, 4.0, ...], [5.0, 6.0, ...]]
);
// Search for similar vectors
const results = index.search(queryVector, 10);
console.log(results);
// [{ id: 'doc1', distance: 0.15 }, { id: 'doc3', distance: 0.23 }, ...]
// Clean up
index.close();Performance Comparison
| Implementation | 10K vectors | 100K vectors | 1M vectors | |----------------|-------------|--------------|------------| | Linear Scan (old) | ~50ms | ~500ms | ~5000ms | | Native HNSW (new) | <0.5ms | <1ms | <1ms | | Speedup | 100× | 500× | 5000× |
Two Ways to Use Vector Search
1. Direct HNSW API (Recommended for Production)
Best performance, full control:
import { HnswIndex } from '@sochdb/sochdb';
const index = new HnswIndex({ dimension: 1536 });
index.insertBatch(ids, embeddings);
const results = index.search(queryEmbedding, 10);✅ Use when:
- You need maximum performance
- Working with large datasets (>10K vectors)
- Building RAG/AI applications
- Have existing embedding pipeline
2. Collection API (Simple, High-Level)
Convenient API with metadata support:
import { Database } from '@sochdb/sochdb';
const db = await Database.open('./mydb');
const ns = await db.createNamespace({ name: 'docs' });
const collection = await ns.createCollection({
name: 'embeddings',
dimension: 384,
indexed: true // Note: Currently uses linear search in embedded mode
});
await collection.insert([1.0, 2.0, ...], { title: 'Document 1' }, 'doc1');
const results = await collection.search({ queryVector: [...], k: 10 });⚠️ Current Limitation: Collection API uses O(n) linear search in embedded mode. For production use with >10K vectors, use:
- Direct HNSW API (above), OR
- gRPC Server Mode (see below)
3. gRPC Server Mode (Production-Ready)
For distributed systems, multi-language support:
import { SochDBClient } from '@sochdb/sochdb';
// Start server: sochdb-grpc --port 50051
const client = new SochDBClient({ address: 'localhost:50051' });
// Create HNSW index
await client.createIndex('docs', {
dimension: 1536,
config: { m: 16, ef_construction: 200 },
metric: 'cosine'
});
// Insert and search
await client.insertBatch('docs', ids, vectors);
const results = await client.search('docs', queryVector, 10);✅ Full HNSW support with:
- Native Rust implementation
- Persistence
- Distributed queries
- Multi-language clients
Migration from Linear Search
If you're using the Collection API with large datasets and experiencing slow search:
Before (slow):
// O(n) scan through all documents
const results = await collection.search({ queryVector, k: 10 });After (fast) - Option 1: Use HnswIndex directly:
import { HnswIndex } from '@sochdb/sochdb';
const index = new HnswIndex({ dimension: 384 });
index.insertBatch(ids, vectors);
const results = index.search(queryVector, 10); // <1msAfter (fast) - Option 2: Use gRPC mode:
# Terminal 1: Start server
sochdb-grpc --port 50051
# Terminal 2: Use clientconst client = new SochDBClient({ address: 'localhost:50051' });
await client.createIndex('docs', { dimension: 384 });
const results = await client.search('docs', queryVector, 10);Complete Examples
- 06_native_vector_search.ts - Direct HNSW usage with benchmarks
- AI PDF Chatbot - LangChain RAG example
API Reference
// HnswIndex Configuration
interface HnswConfig {
dimension: number; // Required: vector dimension
maxConnections?: number; // M parameter (default: 16)
efConstruction?: number; // Build quality (default: 200)
efSearch?: number; // Search quality (default: 100)
}
// Search Result
interface SearchResult {
id: string; // Vector ID
distance: number; // Distance (lower = more similar)
}
// Main Methods
class HnswIndex {
constructor(config: HnswConfig)
insert(id: string, vector: number[]): void
insertBatch(ids: string[], vectors: number[][]): void
search(queryVector: number[], k: number, fast?: boolean): SearchResult[]
searchUltra(queryVector: number[], k: number): SearchResult[]
close(): void
// Properties
get length(): number // Number of vectors
get dimension(): number // Vector dimension
get efSearch(): number
set efSearch(value: number) // Adjust search quality
}Roadmap
- Current: Direct HNSW FFI bindings
- Next: Collection API auto-uses HNSW in embedded mode
- Future: Persistent HNSW indexes with disk storage
SochDB Node.js SDK Documentation
LLM-Optimized Embedded Database with Native Vector Search
Table of Contents
- Quick Start
- Installation
- Features
- Architecture Overview
- Core Key-Value Operations
- Transactions (ACID with SSI)
- Query Builder
- Prefix Scanning
- SQL Operations
- Table Management & Index Policies
- Namespaces & Collections
- Priority Queues
- Vector Search
- Hybrid Search (Vector + BM25)
- Graph Operations
- Temporal Graph (Time-Travel)
- Semantic Cache
- Context Query Builder (LLM Optimization)
- Atomic Multi-Index Writes
- Recovery & WAL Management
- Checkpoints & Snapshots
- Compression & Storage
- Statistics & Monitoring
- Server Mode (gRPC Client)
- IPC Client (Unix Sockets)
- Error Handling
- Complete Examples
1. Quick Start
from sochdb import Database
# Open (or create) a database
db = Database.open("./my_database")
# Store and retrieve data
db.put(b"hello", b"world")
value = db.get(b"hello") # b"world"
# Use transactions for atomic operations
with db.transaction() as txn:
txn.put(b"key1", b"value1")
txn.put(b"key2", b"value2")
# Auto-commits on success, auto-rollbacks on exception
# Clean up
db.delete(b"hello")
db.close()30-Second Overview:
- Key-Value: Fast reads/writes with
get/put/delete - Transactions: ACID with SSI isolation
- Vector Search: HNSW-based semantic search
- Hybrid Search: Combine vectors with BM25 keyword search
- Graph: Build and traverse knowledge graphs
- LLM-Optimized: TOON format uses 40-60% fewer tokens than JSON
2. Installation
npm install @sochdb/sochdbPlatform Support: | Platform | Architecture | Status | |----------|--------------|--------| | Linux | x86_64, aarch64 | ✅ Full support | | macOS | x86_64, arm64 | ✅ Full support | | Windows | x86_64 | ✅ Full support |
Optional Dependencies:
# For async support
npm install @sochdb/sochdb[async]
# For server mode
npm install @sochdb/sochdb[grpc]
# Everything
npm install @sochdb/sochdb[all]3. Architecture Overview
SochDB supports two deployment modes:
Embedded Mode (Default)
Direct Rust bindings via FFI. No server required.
from sochdb import Database
with Database.open("./mydb") as db:
db.put(b"key", b"value")
value = db.get(b"key")Best for: Local development, notebooks, single-process applications.
Server Mode (gRPC)
Thin client connecting to sochdb-grpc server.
from sochdb import SochDBClient
client = SochDBClient("localhost:50051")
client.put(b"key", b"value", namespace="default")
value = client.get(b"key", namespace="default")Best for: Production, multi-process, distributed systems.
Feature Comparison
| Feature | Embedded | Server |
|---------|----------|--------|
| Setup | npm install only | Server + client |
| Performance | Fastest (in-process) | Network overhead |
| Multi-process | ❌ | ✅ |
| Horizontal scaling | ❌ | ✅ |
| Vector search | ✅ | ✅ |
| Graph operations | ✅ | ✅ |
| Semantic cache | ✅ | ✅ |
| Context service | Limited | ✅ Full |
| MCP integration | ❌ | ✅ |
┌─────────────────────────────────────────────────────────────┐
│ DEPLOYMENT OPTIONS │
├─────────────────────────────────────────────────────────────┤
│ EMBEDDED MODE (FFI) SERVER MODE (gRPC) │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Node.js App │ │ Node.js App │ │
│ │ ├─ Database.open()│ │ ├─ SochDBClient() │ │
│ │ └─ Direct FFI │ │ └─ gRPC calls │ │
│ │ │ │ │ │ │ │
│ │ ▼ │ │ ▼ │ │
│ │ libsochdb_storage │ │ sochdb-grpc │ │
│ │ (Rust native) │ │ (Rust server) │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
│ ✅ No server needed ✅ Multi-language │
│ ✅ Local files ✅ Centralized logic │
│ ✅ Simple deployment ✅ Production scale │
└─────────────────────────────────────────────────────────────┘4. Core Key-Value Operations
All keys and values are bytes.
Basic Operations
from sochdb import Database
db = Database.open("./my_db")
# Store data
db.put(b"user:1", b"Alice")
db.put(b"user:2", b"Bob")
# Retrieve data
user = db.get(b"user:1") # Returns b"Alice" or None
# Check existence
exists = db.exists(b"user:1") # True
# Delete data
db.delete(b"user:1")
db.close()Path-Based Keys (Hierarchical)
Organize data hierarchically with path-based access:
# Store with path (strings auto-converted to bytes internally)
db.put_path("users/alice/name", b"Alice Smith")
db.put_path("users/alice/email", b"[email protected]")
db.put_path("users/bob/name", b"Bob Jones")
# Retrieve by path
name = db.get_path("users/alice/name") # b"Alice Smith"
# Delete by path
db.delete_path("users/alice/email")
# List at path (like listing directory)
children = db.list_path("users/") # ["alice", "bob"]With TTL (Time-To-Live)
# Store with expiration (seconds)
db.put(b"session:abc123", b"user_data", ttl_seconds=3600) # Expires in 1 hour
# TTL of 0 means no expiration
db.put(b"permanent_key", b"value", ttl_seconds=0)Batch Operations
# Batch put (more efficient than individual puts)
db.put_batch([
(b"key1", b"value1"),
(b"key2", b"value2"),
(b"key3", b"value3"),
])
# Batch get
values = db.get_batch([b"key1", b"key2", b"key3"])
# Returns: [b"value1", b"value2", b"value3"] (None for missing keys)
# Batch delete
db.delete_batch([b"key1", b"key2", b"key3"])Context Manager
with Database.open("./my_db") as db:
db.put(b"key", b"value")
# Automatically closes when exiting5. Transactions (ACID with SSI)
SochDB provides full ACID transactions with Serializable Snapshot Isolation (SSI).
Context Manager Pattern (Recommended)
# Auto-commits on success, auto-rollbacks on exception
with db.transaction() as txn:
txn.put(b"accounts/alice", b"1000")
txn.put(b"accounts/bob", b"500")
# Read within transaction sees your writes
balance = txn.get(b"accounts/alice") # b"1000"
# If exception occurs, rolls back automaticallyClosure Pattern (Rust-Style)
# Using with_transaction for automatic commit/rollback
def transfer_funds(txn):
alice = int(txn.get(b"accounts/alice") or b"0")
bob = int(txn.get(b"accounts/bob") or b"0")
txn.put(b"accounts/alice", str(alice - 100).encode())
txn.put(b"accounts/bob", str(bob + 100).encode())
return "Transfer complete"
result = db.with_transaction(transfer_funds)Manual Transaction Control
txn = db.begin_transaction()
try:
txn.put(b"key1", b"value1")
txn.put(b"key2", b"value2")
commit_ts = txn.commit() # Returns HLC timestamp
print(f"Committed at: {commit_ts}")
except Exception as e:
txn.abort()
raiseTransaction Properties
txn = db.transaction()
print(f"Transaction ID: {txn.id}") # Unique identifier
print(f"Start timestamp: {txn.start_ts}") # HLC start time
print(f"Isolation: {txn.isolation}") # "serializable"SSI Conflict Handling
from sochdb import TransactionConflictError
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
try:
with db.transaction() as txn:
# Read and modify
value = int(txn.get(b"counter") or b"0")
txn.put(b"counter", str(value + 1).encode())
break # Success
except TransactionConflictError:
if attempt == MAX_RETRIES - 1:
raise
# Retry on conflict
continueAll Transaction Operations
with db.transaction() as txn:
# Key-value
txn.put(key, value)
txn.get(key)
txn.delete(key)
txn.exists(key)
# Path-based
txn.put_path(path, value)
txn.get_path(path)
# Batch operations
txn.put_batch(pairs)
txn.get_batch(keys)
# Scanning
for k, v in txn.scan_prefix(b"prefix/"):
print(k, v)
# SQL (within transaction isolation)
result = txn.execute("SELECT * FROM users WHERE id = 1")Isolation Levels
from sochdb import IsolationLevel
# Default: Serializable (strongest)
with db.transaction(isolation=IsolationLevel.SERIALIZABLE) as txn:
pass
# Snapshot isolation (faster, allows some anomalies)
with db.transaction(isolation=IsolationLevel.SNAPSHOT) as txn:
pass
# Read committed (fastest, least isolation)
with db.transaction(isolation=IsolationLevel.READ_COMMITTED) as txn:
pass6. Query Builder
Fluent API for building efficient queries with predicate pushdown.
Basic Query
# Query with prefix and limit
results = db.query("users/")
.limit(10)
.execute()
for key, value in results:
print(f"{key.decode()}: {value.decode()}")Filtered Query
from sochdb import CompareOp
# Query with filters
results = db.query("orders/")
.where("status", CompareOp.EQ, "pending")
.where("amount", CompareOp.GT, 100)
.order_by("created_at", descending=True)
.limit(50)
.offset(10)
.execute()Column Selection
# Select specific fields only
results = db.query("users/")
.select(["name", "email"]) # Only fetch these columns
.where("active", CompareOp.EQ, True)
.execute()Aggregate Queries
# Count
count = db.query("orders/")
.where("status", CompareOp.EQ, "completed")
.count()
# Sum (for numeric columns)
total = db.query("orders/")
.sum("amount")
# Group by
results = db.query("orders/")
.select(["status", "COUNT(*)", "SUM(amount)"])
.group_by("status")
.execute()Query in Transaction
with db.transaction() as txn:
results = txn.query("users/")
.where("role", CompareOp.EQ, "admin")
.execute()7. Prefix Scanning
Iterate over keys with common prefixes efficiently.
Safe Prefix Scan (Recommended)
# Requires minimum 2-byte prefix (prevents accidental full scans)
for key, value in db.scan_prefix(b"users/"):
print(f"{key.decode()}: {value.decode()}")
# Raises ValueError if prefix < 2 bytesUnchecked Prefix Scan
# For internal operations needing empty/short prefixes
# WARNING: Can cause expensive full-database scans
for key, value in db.scan_prefix_unchecked(b""):
print(f"All keys: {key}")Batched Scanning (1000x Faster)
# Fetches 1000 results per FFI call instead of 1
# Performance: 10,000 results = 10 FFI calls vs 10,000 calls
for key, value in db.scan_batched(b"prefix/", batch_size=1000):
process(key, value)Reverse Scan
# Scan in reverse order (newest first)
for key, value in db.scan_prefix(b"logs/", reverse=True):
print(key, value)Range Scan
# Scan within a specific range
for key, value in db.scan_range(b"users/a", b"users/m"):
print(key, value) # All users from "a" to "m"Streaming Large Results
# For very large result sets, use streaming to avoid memory issues
for batch in db.scan_stream(b"logs/", batch_size=10000):
for key, value in batch:
process(key, value)
# Memory is freed after processing each batch8. SQL Operations
Execute SQL queries for familiar relational patterns.
Creating Tables
db.execute_sql("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE,
age INTEGER,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
db.execute_sql("""
CREATE TABLE posts (
id INTEGER PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
title TEXT NOT NULL,
content TEXT,
likes INTEGER DEFAULT 0
)
""")CRUD Operations
# Insert
db.execute_sql("""
INSERT INTO users (id, name, email, age)
VALUES (1, 'Alice', '[email protected]', 30)
""")
# Insert with parameters (prevents SQL injection)
db.execute_sql(
"INSERT INTO users (id, name, email, age) VALUES (?, ?, ?, ?)",
params=[2, "Bob", "[email protected]", 25]
)
# Select
result = db.execute_sql("SELECT * FROM users WHERE age > 25")
for row in result.rows:
print(row) # {'id': 1, 'name': 'Alice', ...}
# Update
db.execute_sql("UPDATE users SET email = '[email protected]' WHERE id = 1")
# Delete
db.execute_sql("DELETE FROM users WHERE id = 2")Upsert (Insert or Update)
# Insert or update on conflict
db.execute_sql("""
INSERT INTO users (id, name, email) VALUES (1, 'Alice', '[email protected]')
ON CONFLICT (id) DO UPDATE SET
name = excluded.name,
email = excluded.email
""")Query Results
from sochdb import SQLQueryResult
result = db.execute_sql("SELECT id, name FROM users")
print(f"Columns: {result.columns}") # ['id', 'name']
print(f"Row count: {len(result.rows)}")
print(f"Execution time: {result.execution_time_ms}ms")
for row in result.rows:
print(f"ID: {row['id']}, Name: {row['name']}")
# Convert to different formats
df = result.to_dataframe() # pandas DataFrame
json_data = result.to_json()Index Management
# Create index
db.execute_sql("CREATE INDEX idx_users_email ON users(email)")
# Create unique index
db.execute_sql("CREATE UNIQUE INDEX idx_users_email ON users(email)")
# Drop index
db.execute_sql("DROP INDEX IF EXISTS idx_users_email")
# List indexes
indexes = db.list_indexes("users")Prepared Statements
# Prepare once, execute many times
stmt = db.prepare("SELECT * FROM users WHERE age > ? AND status = ?")
# Execute with different parameters
young_active = stmt.execute([25, "active"])
old_active = stmt.execute([50, "active"])
# Close when done
stmt.close()Dialect Support
SochDB auto-detects SQL dialects:
# PostgreSQL style
db.execute_sql("INSERT INTO users VALUES (1, 'Alice') ON CONFLICT DO NOTHING")
# MySQL style
db.execute_sql("INSERT IGNORE INTO users VALUES (1, 'Alice')")
# SQLite style
db.execute_sql("INSERT OR IGNORE INTO users VALUES (1, 'Alice')")9. Table Management & Index Policies
Table Information
# Get table schema
schema = db.get_table_schema("users")
print(f"Columns: {schema.columns}")
print(f"Primary key: {schema.primary_key}")
print(f"Indexes: {schema.indexes}")
# List all tables
tables = db.list_tables()
# Drop table
db.execute_sql("DROP TABLE IF EXISTS old_table")Index Policies
Configure per-table indexing strategies for optimal performance:
# Policy constants
Database.INDEX_WRITE_OPTIMIZED # 0 - O(1) insert, O(N) scan
Database.INDEX_BALANCED # 1 - O(1) amortized insert, O(log K) scan
Database.INDEX_SCAN_OPTIMIZED # 2 - O(log N) insert, O(log N + K) scan
Database.INDEX_APPEND_ONLY # 3 - O(1) insert, O(N) scan (time-series)
# Set by constant
db.set_table_index_policy("logs", Database.INDEX_APPEND_ONLY)
# Set by string
db.set_table_index_policy("users", "scan_optimized")
# Get current policy
policy = db.get_table_index_policy("users")
print(f"Policy: {policy}") # "scan_optimized"Policy Selection Guide
| Policy | Insert | Scan | Best For |
|--------|--------|------|----------|
| write_optimized | O(1) | O(N) | High-write ingestion |
| balanced | O(1) amortized | O(log K) | General use (default) |
| scan_optimized | O(log N) | O(log N + K) | Analytics, read-heavy |
| append_only | O(1) | O(N) | Time-series, logs |
10. Namespaces & Multi-Tenancy
Organize data into logical namespaces for tenant isolation.
Creating Namespaces
from sochdb import NamespaceConfig
# Create namespace with metadata
ns = db.create_namespace(
name="tenant_123",
display_name="Acme Corp",
labels={"tier": "premium", "region": "us-east"}
)
# Simple creation
ns = db.create_namespace("tenant_456")Getting Namespaces
# Get existing namespace
ns = db.namespace("tenant_123")
# Get or create (idempotent)
ns = db.get_or_create_namespace("tenant_123")
# Check if exists
exists = db.namespace_exists("tenant_123")Context Manager for Scoped Operations
with db.use_namespace("tenant_123") as ns:
# All operations automatically scoped to tenant_123
collection = ns.collection("documents")
ns.put("config/key", b"value")
# No need to specify namespace in each callNamespace Operations
# List all namespaces
namespaces = db.list_namespaces()
print(namespaces) # ['tenant_123', 'tenant_456']
# Get namespace info
info = db.namespace_info("tenant_123")
print(f"Created: {info['created_at']}")
print(f"Labels: {info['labels']}")
print(f"Size: {info['size_bytes']}")
# Update labels
db.update_namespace("tenant_123", labels={"tier": "enterprise"})
# Delete namespace (WARNING: deletes all data in namespace)
db.delete_namespace("old_tenant", force=True)Namespace-Scoped Key-Value
ns = db.namespace("tenant_123")
# Operations automatically prefixed with namespace
ns.put("users/alice", b"data") # Actually: tenant_123/users/alice
ns.get("users/alice")
ns.delete("users/alice")
# Scan within namespace
for key, value in ns.scan("users/"):
print(key, value) # Keys shown without namespace prefixCross-Namespace Operations
# Copy data between namespaces
db.copy_between_namespaces(
source_ns="tenant_123",
target_ns="tenant_456",
prefix="shared/"
)11. Collections & Vector Search
Collections store documents with embeddings for semantic search using HNSW.
Collection Configuration
from sochdb import (
CollectionConfig,
DistanceMetric,
QuantizationType,
)
config = CollectionConfig(
name="documents",
dimension=384, # Embedding dimension (must match your model)
metric=DistanceMetric.COSINE, # COSINE, EUCLIDEAN, DOT_PRODUCT
m=16, # HNSW M parameter (connections per node)
ef_construction=100, # HNSW construction quality
ef_search=50, # HNSW search quality (higher = slower but better)
quantization=QuantizationType.NONE, # NONE, SCALAR (int8), PQ (product quantization)
enable_hybrid_search=False, # Enable BM25 + vector
content_field=None, # Field for BM25 indexing
)Creating Collections
ns = db.namespace("default")
# With config object
collection = ns.create_collection(config)
# With parameters (simpler)
collection = ns.create_collection(
name="documents",
dimension=384,
metric=DistanceMetric.COSINE
)
# Get existing collection
collection = ns.collection("documents")Inserting Documents
# Single insert
collection.insert(
id="doc1",
vector=[0.1, 0.2, ...], # 384-dim float array
metadata={"title": "Introduction", "author": "Alice", "category": "tech"}
)
# Batch insert (more efficient for bulk loading)
collection.insert_batch(
ids=["doc1", "doc2", "doc3"],
vectors=[[...], [...], [...]], # List of vectors
metadata=[
{"title": "Doc 1"},
{"title": "Doc 2"},
{"title": "Doc 3"}
]
)
# Multi-vector insert (multiple vectors per document, e.g., chunks)
collection.insert_multi(
id="long_doc",
vectors=[[...], [...], [...]], # Multiple vectors for same doc
metadata={"title": "Long Document"}
)Vector Search
from sochdb import SearchRequest
# Using SearchRequest (full control)
request = SearchRequest(
vector=[0.15, 0.25, ...], # Query vector
k=10, # Number of results
ef_search=100, # Search quality (overrides collection default)
filter={"author": "Alice"}, # Metadata filter
min_score=0.7, # Minimum similarity score
include_vectors=False, # Include vectors in results
include_metadata=True, # Include metadata in results
)
results = collection.search(request)
# Convenience method (simpler)
results = collection.vector_search(
vector=[0.15, 0.25, ...],
k=10,
filter={"author": "Alice"}
)
# Process results
for result in results:
print(f"ID: {result.id}")
print(f"Score: {result.score:.4f}") # Similarity score
print(f"Metadata: {result.metadata}")Metadata Filtering
# Equality
filter={"author": "Alice"}
# Comparison operators
filter={"age": {"$gt": 30}} # Greater than
filter={"age": {"$gte": 30}} # Greater than or equal
filter={"age": {"$lt": 30}} # Less than
filter={"age": {"$lte": 30}} # Less than or equal
filter={"author": {"$ne": "Alice"}} # Not equal
# Array operators
filter={"category": {"$in": ["tech", "science"]}} # In array
filter={"category": {"$nin": ["sports"]}} # Not in array
# Logical operators
filter={"$and": [{"author": "Alice"}, {"year": 2024}]}
filter={"$or": [{"category": "tech"}, {"category": "science"}]}
filter={"$not": {"author": "Bob"}}
# Nested filters
filter={
"$and": [
{"$or": [{"category": "tech"}, {"category": "science"}]},
{"year": {"$gte": 2020}}
]
}Collection Management
# Get collection
collection = ns.get_collection("documents")
# or
collection = ns.collection("documents")
# List collections
collections = ns.list_collections()
# Collection info
info = collection.info()
print(f"Name: {info['name']}")
print(f"Dimension: {info['dimension']}")
print(f"Count: {info['count']}")
print(f"Metric: {info['metric']}")
print(f"Index size: {info['index_size_bytes']}")
# Delete collection
ns.delete_collection("old_collection")
# Individual document operations
doc = collection.get("doc1")
collection.delete("doc1")
collection.update("doc1", metadata={"category": "updated"})
count = collection.count()Quantization for Memory Efficiency
# Scalar quantization (int8) - 4x memory reduction
config = CollectionConfig(
name="documents",
dimension=384,
quantization=QuantizationType.SCALAR
)
# Product quantization - 32x memory reduction
config = CollectionConfig(
name="documents",
dimension=768,
quantization=QuantizationType.PQ,
pq_num_subvectors=96, # 768/96 = 8 dimensions per subvector
pq_num_centroids=256 # 8-bit codes
)12. Hybrid Search (Vector + BM25)
Combine vector similarity with keyword matching for best results.
Enable Hybrid Search
config = CollectionConfig(
name="articles",
dimension=384,
enable_hybrid_search=True, # Enable BM25 indexing
content_field="text" # Field to index for BM25
)
collection = ns.create_collection(config)
# Insert with text content
collection.insert(
id="article1",
vector=[...],
metadata={
"title": "Machine Learning Tutorial",
"text": "This tutorial covers the basics of machine learning...",
"category": "tech"
}
)Keyword Search (BM25 Only)
results = collection.keyword_search(
query="machine learning tutorial",
k=10,
filter={"category": "tech"}
)Hybrid Search (Vector + BM25)
# Combine vector and keyword search
results = collection.hybrid_search(
vector=[0.1, 0.2, ...], # Query embedding
text_query="machine learning", # Keyword query
k=10,
alpha=0.7, # 0.0 = pure keyword, 1.0 = pure vector, 0.5 = balanced
filter={"category": "tech"}
)Full SearchRequest for Hybrid
request = SearchRequest(
vector=[0.1, 0.2, ...],
text_query="machine learning",
k=10,
alpha=0.7, # Blend factor
rrf_k=60.0, # RRF k parameter (Reciprocal Rank Fusion)
filter={"category": "tech"},
aggregate="max", # max | mean | first (for multi-vector docs)
as_of="2024-01-01T00:00:00Z", # Time-travel query
include_vectors=False,
include_metadata=True,
include_scores=True,
)
results = collection.search(request)
# Access detailed results
print(f"Query time: {results.query_time_ms}ms")
print(f"Total matches: {results.total_count}")
print(f"Vector results: {results.vector_results}") # Results from vector search
print(f"Keyword results: {results.keyword_results}") # Results from BM25
print(f"Fused results: {results.fused_results}") # Combined results13. Graph Operations
Build and query knowledge graphs.
Adding Nodes
# Add a node
db.add_node(
namespace="default",
node_id="alice",
node_type="person",
properties={"role": "engineer", "team": "ml", "level": "senior"}
)
db.add_node("default", "project_x", "project", {"status": "active", "priority": "high"})
db.add_node("default", "bob", "person", {"role": "manager", "team": "ml"})Adding Edges
# Add directed edge
db.add_edge(
namespace="default",
from_id="alice",
edge_type="works_on",
to_id="project_x",
properties={"role": "lead", "since": "2024-01"}
)
db.add_edge("default", "alice", "reports_to", "bob")
db.add_edge("default", "bob", "manages", "project_x")Graph Traversal
# BFS traversal from a starting node
nodes, edges = db.traverse(
namespace="default",
start_node="alice",
max_depth=3,
order="bfs" # "bfs" or "dfs"
)
for node in nodes:
print(f"Node: {node['id']} ({node['node_type']})")
print(f" Properties: {node['properties']}")
for edge in edges:
print(f"{edge['from_id']} --{edge['edge_type']}--> {edge['to_id']}")Filtered Traversal
# Traverse with filters
nodes, edges = db.traverse(
namespace="default",
start_node="alice",
max_depth=2,
edge_types=["works_on", "reports_to"], # Only follow these edge types
node_types=["person", "project"], # Only include these node types
node_filter={"team": "ml"} # Filter nodes by properties
)Graph Queries
# Find shortest path
path = db.find_path(
namespace="default",
from_id="alice",
to_id="project_y",
max_depth=5
)
# Get neighbors
neighbors = db.get_neighbors(
namespace="default",
node_id="alice",
direction="outgoing" # "outgoing", "incoming", "both"
)
# Get specific edge
edge = db.get_edge("default", "alice", "works_on", "project_x")
# Delete node (and all connected edges)
db.delete_node("default", "old_node")
# Delete edge
db.delete_edge("default", "alice", "works_on", "project_old")14. Temporal Graph (Time-Travel)
Track state changes over time with temporal edges.
Adding Temporal Edges
import time
now = int(time.time() * 1000) # milliseconds since epoch
one_hour = 60 * 60 * 1000
# Record: Door was open from 10:00 to 11:00
db.add_temporal_edge(
namespace="smart_home",
from_id="door_front",
edge_type="STATE",
to_id="open",
valid_from=now - one_hour, # Start time (ms)
valid_until=now, # End time (ms)
properties={"sensor": "motion_1", "confidence": 0.95}
)
# Record: Light is currently on (no end time yet)
db.add_temporal_edge(
namespace="smart_home",
from_id="light_living",
edge_type="STATE",
to_id="on",
valid_from=now,
valid_until=0, # 0 = still valid (no end time)
properties={"brightness": "80%", "color": "warm"}
)Time-Travel Queries
# Query modes:
# - "CURRENT": Edges valid right now
# - "POINT_IN_TIME": Edges valid at specific timestamp
# - "RANGE": All edges within a time range
# What is the current state?
edges = db.query_temporal_graph(
namespace="smart_home",
node_id="door_front",
mode="CURRENT",
edge_type="STATE"
)
current_state = edges[0]["to_id"] if edges else "unknown"
# Was the door open 1.5 hours ago?
edges = db.query_temporal_graph(
namespace="smart_home",
node_id="door_front",
mode="POINT_IN_TIME",
timestamp=now - int(1.5 * 60 * 60 * 1000)
)
was_open = any(e["to_id"] == "open" for e in edges)
# All state changes in last hour
edges = db.query_temporal_graph(
namespace="smart_home",
node_id="door_front",
mode="RANGE",
start_time=now - one_hour,
end_time=now
)
for edge in edges:
print(f"State: {edge['to_id']} from {edge['valid_from']} to {edge['valid_until']}")End a Temporal Edge
# Close the current "on" state
db.end_temporal_edge(
namespace="smart_home",
from_id="light_living",
edge_type="STATE",
to_id="on",
end_time=int(time.time() * 1000)
)15. Semantic Cache
Cache LLM responses with similarity-based retrieval for cost savings.
Storing Cached Responses
# Store response with embedding
db.cache_put(
cache_name="llm_responses",
key="What is Python?", # Original query (for display/debugging)
value="Python is a high-level programming language...",
embedding=[0.1, 0.2, ...], # Query embedding (384-dim)
ttl_seconds=3600, # Expire in 1 hour (0 = no expiry)
metadata={"model": "claude-3", "tokens": 150}
)Cache Lookup
# Check cache before calling LLM
cached = db.cache_get(
cache_name="llm_responses",
query_embedding=[0.12, 0.18, ...], # Embed the new query
threshold=0.85 # Cosine similarity threshold
)
if cached:
print(f"Cache HIT!")
print(f"Original query: {cached['key']}")
print(f"Response: {cached['value']}")
print(f"Similarity: {cached['score']:.4f}")
else:
print("Cache MISS - calling LLM...")
# Call LLM and cache the resultCache Management
# Delete specific entry
db.cache_delete("llm_responses", key="What is Python?")
# Clear entire cache
db.cache_clear("llm_responses")
# Get cache statistics
stats = db.cache_stats("llm_responses")
print(f"Total entries: {stats['count']}")
print(f"Hit rate: {stats['hit_rate']:.2%}")
print(f"Memory usage: {stats['size_bytes']}")Full Usage Pattern
def get_llm_response(query: str, embed_fn, llm_fn):
"""Get response from cache or LLM."""
query_embedding = embed_fn(query)
# Try cache first
cached = db.cache_get(
cache_name="llm_responses",
query_embedding=query_embedding,
threshold=0.90
)
if cached:
return cached['value']
# Cache miss - call LLM
response = llm_fn(query)
# Store in cache
db.cache_put(
cache_name="llm_responses",
key=query,
value=response,
embedding=query_embedding,
ttl_seconds=86400 # 24 hours
)
return response16. Context Query Builder (LLM Optimization)
Assemble LLM context with token budgeting and priority-based truncation.
Basic Context Query
from sochdb import ContextQueryBuilder, ContextFormat, TruncationStrategy
# Build context for LLM
context = ContextQueryBuilder() \
.for_session("session_123") \
.with_budget(4096) \
.format(ContextFormat.TOON) \
.literal("SYSTEM", priority=0, text="You are a helpful assistant.") \
.section("USER_PROFILE", priority=1) \
.get("user.profile.{name, preferences}") \
.done() \
.section("HISTORY", priority=2) \
.last(10, "messages") \
.where_eq("session_id", "session_123") \
.done() \
.section("KNOWLEDGE", priority=3) \
.search("documents", "$query_embedding", k=5) \
.done() \
.execute()
print(f"Token count: {context.token_count}")
print(f"Context:\n{context.text}")Section Types
| Type | Method | Description |
|------|--------|-------------|
| literal | .literal(name, priority, text) | Static text content |
| get | .get(path) | Fetch specific data by path |
| last | .last(n, table) | Most recent N records from table |
| search | .search(collection, embedding, k) | Vector similarity search |
| sql | .sql(query) | SQL query results |
Truncation Strategies
# Drop from end (keep beginning) - default
.truncation(TruncationStrategy.TAIL_DROP)
# Drop from beginning (keep end)
.truncation(TruncationStrategy.HEAD_DROP)
# Proportionally truncate across sections
.truncation(TruncationStrategy.PROPORTIONAL)
# Fail if budget exceeded
.truncation(TruncationStrategy.STRICT)Variables and Bindings
from sochdb import ContextValue
context = ContextQueryBuilder() \
.for_session("session_123") \
.set_var("query_embedding", ContextValue.Embedding([0.1, 0.2, ...])) \
.set_var("user_id", ContextValue.String("user_456")) \
.section("KNOWLEDGE", priority=2) \
.search("documents", "$query_embedding", k=5) \
.done() \
.execute()Output Formats
# TOON format (40-60% fewer tokens)
.format(ContextFormat.TOON)
# JSON format
.format(ContextFormat.JSON)
# Markdown format (human-readable)
.format(ContextFormat.MARKDOWN)
# Plain text
.format(ContextFormat.TEXT)Session Management (Agent Context)
Stateful session management for agentic use cases with permissions, sandboxing, audit logging, and budget tracking.
Session Overview
Agent session abc123:
cwd: /agents/abc123
vars: $model = "gpt-4", $budget = 1000
permissions: fs:rw, db:rw, calc:*
audit: [read /data/users, write /agents/abc123/cache]Creating Sessions
from sochdb import SessionManager, AgentContext
from datetime import timedelta
# Create session manager with idle timeout
session_mgr = SessionManager(idle_timeout=timedelta(hours=1))
# Create a new session
session = session_mgr.create_session("session_abc123")
# Get existing session
session = session_mgr.get_session("session_abc123")
# Get or create (idempotent)
session = session_mgr.get_or_create("session_abc123")
# Remove session
session_mgr.remove_session("session_abc123")
# Cleanup expired sessions
removed_count = session_mgr.cleanup_expired()
# Get active session count
count = session_mgr.session_count()Agent Context
from sochdb import AgentContext, ContextValue
# Create agent context
ctx = AgentContext("session_abc123")
print(f"Session ID: {ctx.session_id}")
print(f"Working dir: {ctx.working_dir}") # /agents/session_abc123
# Create with custom working directory
ctx = AgentContext.with_working_dir("session_abc123", "/custom/path")
# Create with full permissions (trusted agents)
ctx = AgentContext.with_full_permissions("session_abc123")Session Variables
# Set variables
ctx.set_var("model", ContextValue.String("gpt-4"))
ctx.set_var("budget", ContextValue.Number(1000.0))
ctx.set_var("debug", ContextValue.Bool(True))
ctx.set_var("tags", ContextValue.List([
ContextValue.String("ml"),
ContextValue.String("production")
]))
# Get variables
model = ctx.get_var("model") # Returns ContextValue or None
budget = ctx.get_var("budget")
# Peek (read-only, no audit)
value = ctx.peek_var("model")
# Variable substitution in strings
text = ctx.substitute_vars("Using $model with budget $budget")
# Result: "Using gpt-4 with budget 1000"Context Value Types
from sochdb import ContextValue
# String
ContextValue.String("hello")
# Number (float)
ContextValue.Number(42.5)
# Boolean
ContextValue.Bool(True)
# List
ContextValue.List([
ContextValue.String("a"),
ContextValue.Number(1.0)
])
# Object (dict)
ContextValue.Object({
"key": ContextValue.String("value"),
"count": ContextValue.Number(10.0)
})
# Null
ContextValue.Null()Permissions
from sochdb import (
AgentPermissions,
FsPermissions,
DbPermissions,
NetworkPermissions
)
# Configure permissions
ctx.permissions = AgentPermissions(
filesystem=FsPermissions(
read=True,
write=True,
mkdir=True,
delete=False,
allowed_paths=["/agents/session_abc123", "/shared/data"]
),
database=DbPermissions(
read=True,
write=True,
create=False,
drop=False,
allowed_tables=["user_*", "cache_*"] # Pattern matching
),
calculator=True,
network=NetworkPermissions(
http=True,
allowed_domains=["api.example.com", "*.internal.net"]
)
)
# Check permissions before operations
try:
ctx.check_fs_permission("/agents/session_abc123/data.json", AuditOperation.FS_READ)
# Permission granted
except ContextError as e:
print(f"Permission denied: {e}")
try:
ctx.check_db_permission("user_profiles", AuditOperation.DB_QUERY)
# Permission granted
except ContextError as e:
print(f"Permission denied: {e}")Budget Tracking
from sochdb import OperationBudget
# Configure budget limits
ctx.budget = OperationBudget(
max_tokens=100000, # Maximum tokens (input + output)
max_cost=5000, # Maximum cost in millicents ($50.00)
max_operations=10000 # Maximum operation count
)
# Consume budget (called automatically by operations)
try:
ctx.consume_budget(tokens=500, cost=10) # 500 tokens, $0.10
except ContextError as e:
if "Budget exceeded" in str(e):
print("Budget limit reached!")
# Check budget status
print(f"Tokens used: {ctx.budget.tokens_used}/{ctx.budget.max_tokens}")
print(f"Cost used: ${ctx.budget.cost_used / 100:.2f}/${ctx.budget.max_cost / 100:.2f}")
print(f"Operations: {ctx.budget.operations_used}/{ctx.budget.max_operations}")Session Transactions
# Begin transaction within session
ctx.begin_transaction(tx_id=12345)
# Create savepoint
ctx.savepoint("before_update")
# Record pending writes (for rollback)
ctx.record_pending_write(
resource_type=ResourceType.FILE,
resource_key="/agents/session_abc123/data.json",
original_value=b'{"old": "data"}'
)
# Commit transaction
ctx.commit_transaction()
# Or rollback
pending_writes = ctx.rollback_transaction()
for write in pending_writes:
print(f"Rolling back: {write.resource_key}")
# Restore original_valuePath Resolution
# Paths are resolved relative to working directory
ctx = AgentContext.with_working_dir("session_abc123", "/home/agent")
# Relative paths
resolved = ctx.resolve_path("data.json") # /home/agent/data.json
# Absolute paths pass through
resolved = ctx.resolve_path("/absolute/path") # /absolute/pathAudit Trail
# All operations are automatically logged
# Audit entry includes: timestamp, operation, resource, result, metadata
# Export audit log
audit_log = ctx.export_audit()
for entry in audit_log:
print(f"[{entry['timestamp']}] {entry['operation']}: {entry['resource']} -> {entry['result']}")
# Example output:
# [1705312345] var.set: model -> success
# [1705312346] fs.read: /data/config.json -> success
# [1705312347] db.query: users -> success
# [1705312348] fs.write: /forbidden/file -> denied:path not in allowed pathsAudit Operations
from sochdb import AuditOperation
# Filesystem operations
AuditOperation.FS_READ
AuditOperation.FS_WRITE
AuditOperation.FS_MKDIR
AuditOperation.FS_DELETE
AuditOperation.FS_LIST
# Database operations
AuditOperation.DB_QUERY
AuditOperation.DB_INSERT
AuditOperation.DB_UPDATE
AuditOperation.DB_DELETE
# Other operations
AuditOperation.CALCULATE
AuditOperation.VAR_SET
AuditOperation.VAR_GET
AuditOperation.TX_BEGIN
AuditOperation.TX_COMMIT
AuditOperation.TX_ROLLBACKTool Registry
from sochdb import ToolDefinition, ToolCallRecord
from datetime import datetime
# Register tools available to the agent
ctx.register_tool(ToolDefinition(
name="search_documents",
description="Search documents by semantic similarity",
parameters_schema='{"type": "object", "properties": {"query": {"type": "string"}}}',
requires_confirmation=False
))
ctx.register_tool(ToolDefinition(
name="delete_file",
description="Delete a file from the filesystem",
parameters_schema='{"type": "object", "properties": {"path": {"type": "string"}}}',
requires_confirmation=True # Requires user confirmation
))
# Record tool calls
ctx.record_tool_call(ToolCallRecord(
call_id="call_001",
tool_name="search_documents",
arguments='{"query": "machine learning"}',
result='[{"id": "doc1", "score": 0.95}]',
error=None,
timestamp=datetime.now()
))
# Access tool call history
for call in ctx.tool_calls:
print(f"{call.tool_name}: {call.result or call.error}")Session Lifecycle
# Check session age
age = ctx.age()
print(f"Session age: {age}")
# Check idle time
idle = ctx.idle_time()
print(f"Idle time: {idle}")
# Check if expired
if ctx.is_expired(idle_timeout=timedelta(hours=1)):
print("Session has expired!")Complete Session Example
from sochdb import (
SessionManager, AgentContext, ContextValue,
AgentPermissions, FsPermissions, DbPermissions,
OperationBudget, ToolDefinition, AuditOperation
)
from datetime import timedelta
# Initialize session manager
session_mgr = SessionManager(idle_timeout=timedelta(hours=2))
# Create session for an agent
session_id = "agent_session_12345"
ctx = session_mgr.get_or_create(session_id)
# Configure the agent
ctx.permissions = AgentPermissions(
filesystem=FsPermissions(
read=True,
write=True,
allowed_paths=[f"/agents/{session_id}", "/shared"]
),
database=DbPermissions(
read=True,
write=True,
allowed_tables=["documents", "cache_*"]
),
calculator=True
)
ctx.budget = OperationBudget(
max_tokens=50000,
max_cost=1000, # $10.00
max_operations=1000
)
# Set initial variables
ctx.set_var("model", ContextValue.String("claude-3-sonnet"))
ctx.set_var("temperature", ContextValue.Number(0.7))
# Register available tools
ctx.register_tool(ToolDefinition(
name="vector_search",
description="Search vectors by similarity",
parameters_schema='{"type": "object", "properties": {"query": {"type": "string"}, "k": {"type": "integer"}}}',
requires_confirmation=False
))
# Perform operations with permission checks
def safe