npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@sushanth/toondb

v0.3.6

Published

ToonDB is an AI-native database with token-optimized output, O(|path|) lookups, built-in vector search, and durable transactions.

Readme

ToonDB Node.js SDK v0.3.6

Dual-mode architecture: Embedded (FFI) + Server (gRPC/IPC)
Choose the deployment mode that fits your needs.

Architecture: Flexible Deployment

┌─────────────────────────────────────────────────────────────┐
│                    DEPLOYMENT OPTIONS                        │
├─────────────────────────────────────────────────────────────┤
│                                                               │
│  1. EMBEDDED MODE (FFI)          2. SERVER MODE (gRPC)      │
│  ┌─────────────────────┐         ┌─────────────────────┐   │
│  │   Node.js App        │         │   Node.js App        │   │
│  │   ├─ Database.open()│         │   ├─ ToonDBClient() │   │
│  │   └─ Direct FFI     │         │   └─ gRPC calls     │   │
│  │         │           │         │         │           │   │
│  │         ▼           │         │         ▼           │   │
│  │   libtoondb_storage │         │   toondb-grpc       │   │
│  │   (Rust native)     │         │   (Rust server)     │   │
│  └─────────────────────┘         └─────────────────────┘   │
│                                                               │
│  ✅ No server needed               ✅ Multi-language          │
│  ✅ Local files                    ✅ Centralized logic      │
│  ✅ Simple deployment              ✅ Production scale       │
└─────────────────────────────────────────────────────────────┘

When to Use Each Mode

Embedded Mode (FFI):

  • ✅ Local development and testing
  • ✅ Jupyter notebooks and data science
  • ✅ Single-process applications
  • ✅ Edge deployments without network
  • ✅ No server setup required

Server Mode (gRPC):

  • ✅ Production deployments
  • ✅ Multi-language teams (Python, Node.js, Go)
  • ✅ Distributed systems
  • ✅ Centralized business logic
  • ✅ Horizontal scaling

Installation

npm install @sushanth/toondb

Or from source:

cd toondb-typescript-sdk
npm install

ToonDB Node.js SDK Documentation

Version 0.3.6 | LLM-Optimized Embedded Database with Native Vector Search


Table of Contents

  1. Quick Start
  2. Installation
  3. Architecture Overview
  4. Core Key-Value Operations
  5. Transactions (ACID with SSI)
  6. Query Builder
  7. Prefix Scanning
  8. SQL Operations
  9. Table Management & Index Policies
  10. Namespaces & Multi-Tenancy
  11. Collections & Vector Search
  12. Hybrid Search (Vector + BM25)
  13. Graph Operations
  14. Temporal Graph (Time-Travel)
  15. Semantic Cache
  16. Context Query Builder (LLM Optimization) and Session
  17. Atomic Multi-Index Writes
  18. Recovery & WAL Management
  19. Checkpoints & Snapshots
  20. Compression & Storage
  21. Statistics & Monitoring
  22. Distributed Tracing
  23. Workflow & Run Tracking
  24. Server Mode (gRPC Client)
  25. IPC Client (Unix Sockets)
  26. Standalone VectorIndex
  27. Vector Utilities
  28. Data Formats (TOON/JSON/Columnar)
  29. Policy Service
  30. MCP (Model Context Protocol)
  31. Configuration Reference
  32. Error Handling
  33. Async Support
  34. Building & Development
  35. Complete Examples
  36. Migration Guide

1. Quick Start

from toondb import Database

# Open (or create) a database
db = Database.open("./my_database")

# Store and retrieve data
db.put(b"hello", b"world")
value = db.get(b"hello")  # b"world"

# Use transactions for atomic operations
with db.transaction() as txn:
    txn.put(b"key1", b"value1")
    txn.put(b"key2", b"value2")
    # Auto-commits on success, auto-rollbacks on exception

# Clean up
db.delete(b"hello")
db.close()

30-Second Overview:

  • Key-Value: Fast reads/writes with get/put/delete
  • Transactions: ACID with SSI isolation
  • Vector Search: HNSW-based semantic search
  • Hybrid Search: Combine vectors with BM25 keyword search
  • Graph: Build and traverse knowledge graphs
  • LLM-Optimized: TOON format uses 40-60% fewer tokens than JSON

2. Installation

npm install @sushanth/toondb

Platform Support: | Platform | Architecture | Status | |----------|--------------|--------| | Linux | x86_64, aarch64 | ✅ Full support | | macOS | x86_64, arm64 | ✅ Full support | | Windows | x86_64 | ✅ Full support |

Optional Dependencies:

# For async support
npm install @sushanth/toondb[async]

# For server mode
npm install @sushanth/toondb[grpc]

# Everything
npm install @sushanth/toondb[all]

3. Architecture Overview

ToonDB supports two deployment modes:

Embedded Mode (Default)

Direct Rust bindings via FFI. No server required.

from toondb import Database

with Database.open("./mydb") as db:
    db.put(b"key", b"value")
    value = db.get(b"key")

Best for: Local development, notebooks, single-process applications.

Server Mode (gRPC)

Thin client connecting to toondb-grpc server.

from toondb import ToonDBClient

client = ToonDBClient("localhost:50051")
client.put(b"key", b"value", namespace="default")
value = client.get(b"key", namespace="default")

Best for: Production, multi-process, distributed systems.

Feature Comparison

| Feature | Embedded | Server | |---------|----------|--------| | Setup | npm install only | Server + client | | Performance | Fastest (in-process) | Network overhead | | Multi-process | ❌ | ✅ | | Horizontal scaling | ❌ | ✅ | | Vector search | ✅ | ✅ | | Graph operations | ✅ | ✅ | | Semantic cache | ✅ | ✅ | | Context service | Limited | ✅ Full | | MCP integration | ❌ | ✅ |

┌─────────────────────────────────────────────────────────────┐
│                    DEPLOYMENT OPTIONS                        │
├─────────────────────────────────────────────────────────────┤
│  EMBEDDED MODE (FFI)             SERVER MODE (gRPC)         │
│  ┌─────────────────────┐         ┌─────────────────────┐   │
│  │   Node.js App        │         │   Node.js App        │   │
│  │   ├─ Database.open()│         │   ├─ ToonDBClient() │   │
│  │   └─ Direct FFI     │         │   └─ gRPC calls     │   │
│  │         │           │         │         │           │   │
│  │         ▼           │         │         ▼           │   │
│  │   libtoondb_storage │         │   toondb-grpc       │   │
│  │   (Rust native)     │         │   (Rust server)     │   │
│  └─────────────────────┘         └─────────────────────┘   │
│                                                              │
│  ✅ No server needed             ✅ Multi-language          │
│  ✅ Local files                  ✅ Centralized logic       │
│  ✅ Simple deployment            ✅ Production scale        │
└─────────────────────────────────────────────────────────────┘

4. Core Key-Value Operations

All keys and values are bytes.

Basic Operations

from toondb import Database

db = Database.open("./my_db")

# Store data
db.put(b"user:1", b"Alice")
db.put(b"user:2", b"Bob")

# Retrieve data
user = db.get(b"user:1")  # Returns b"Alice" or None

# Check existence
exists = db.exists(b"user:1")  # True

# Delete data
db.delete(b"user:1")

db.close()

Path-Based Keys (Hierarchical)

Organize data hierarchically with path-based access:

# Store with path (strings auto-converted to bytes internally)
db.put_path("users/alice/name", b"Alice Smith")
db.put_path("users/alice/email", b"[email protected]")
db.put_path("users/bob/name", b"Bob Jones")

# Retrieve by path
name = db.get_path("users/alice/name")  # b"Alice Smith"

# Delete by path
db.delete_path("users/alice/email")

# List at path (like listing directory)
children = db.list_path("users/")  # ["alice", "bob"]

With TTL (Time-To-Live)

# Store with expiration (seconds)
db.put(b"session:abc123", b"user_data", ttl_seconds=3600)  # Expires in 1 hour

# TTL of 0 means no expiration
db.put(b"permanent_key", b"value", ttl_seconds=0)

Batch Operations

# Batch put (more efficient than individual puts)
db.put_batch([
    (b"key1", b"value1"),
    (b"key2", b"value2"),
    (b"key3", b"value3"),
])

# Batch get
values = db.get_batch([b"key1", b"key2", b"key3"])
# Returns: [b"value1", b"value2", b"value3"] (None for missing keys)

# Batch delete
db.delete_batch([b"key1", b"key2", b"key3"])

Context Manager

with Database.open("./my_db") as db:
    db.put(b"key", b"value")
    # Automatically closes when exiting

5. Transactions (ACID with SSI)

ToonDB provides full ACID transactions with Serializable Snapshot Isolation (SSI).

Context Manager Pattern (Recommended)

# Auto-commits on success, auto-rollbacks on exception
with db.transaction() as txn:
    txn.put(b"accounts/alice", b"1000")
    txn.put(b"accounts/bob", b"500")
    
    # Read within transaction sees your writes
    balance = txn.get(b"accounts/alice")  # b"1000"
    
    # If exception occurs, rolls back automatically

Closure Pattern (Rust-Style)

# Using with_transaction for automatic commit/rollback
def transfer_funds(txn):
    alice = int(txn.get(b"accounts/alice") or b"0")
    bob = int(txn.get(b"accounts/bob") or b"0")
    
    txn.put(b"accounts/alice", str(alice - 100).encode())
    txn.put(b"accounts/bob", str(bob + 100).encode())
    
    return "Transfer complete"

result = db.with_transaction(transfer_funds)

Manual Transaction Control

txn = db.begin_transaction()
try:
    txn.put(b"key1", b"value1")
    txn.put(b"key2", b"value2")
    
    commit_ts = txn.commit()  # Returns HLC timestamp
    print(f"Committed at: {commit_ts}")
except Exception as e:
    txn.abort()
    raise

Transaction Properties

txn = db.transaction()
print(f"Transaction ID: {txn.id}")      # Unique identifier
print(f"Start timestamp: {txn.start_ts}")  # HLC start time
print(f"Isolation: {txn.isolation}")    # "serializable"

SSI Conflict Handling

from toondb import TransactionConflictError

MAX_RETRIES = 3

for attempt in range(MAX_RETRIES):
    try:
        with db.transaction() as txn:
            # Read and modify
            value = int(txn.get(b"counter") or b"0")
            txn.put(b"counter", str(value + 1).encode())
        break  # Success
    except TransactionConflictError:
        if attempt == MAX_RETRIES - 1:
            raise
        # Retry on conflict
        continue

All Transaction Operations

with db.transaction() as txn:
    # Key-value
    txn.put(key, value)
    txn.get(key)
    txn.delete(key)
    txn.exists(key)
    
    # Path-based
    txn.put_path(path, value)
    txn.get_path(path)
    
    # Batch operations
    txn.put_batch(pairs)
    txn.get_batch(keys)
    
    # Scanning
    for k, v in txn.scan_prefix(b"prefix/"):
        print(k, v)
    
    # SQL (within transaction isolation)
    result = txn.execute("SELECT * FROM users WHERE id = 1")

Isolation Levels

from toondb import IsolationLevel

# Default: Serializable (strongest)
with db.transaction(isolation=IsolationLevel.SERIALIZABLE) as txn:
    pass

# Snapshot isolation (faster, allows some anomalies)
with db.transaction(isolation=IsolationLevel.SNAPSHOT) as txn:
    pass

# Read committed (fastest, least isolation)
with db.transaction(isolation=IsolationLevel.READ_COMMITTED) as txn:
    pass

6. Query Builder

Fluent API for building efficient queries with predicate pushdown.

Basic Query

# Query with prefix and limit
results = db.query("users/")
    .limit(10)
    .execute()

for key, value in results:
    print(f"{key.decode()}: {value.decode()}")

Filtered Query

from toondb import CompareOp

# Query with filters
results = db.query("orders/")
    .where("status", CompareOp.EQ, "pending")
    .where("amount", CompareOp.GT, 100)
    .order_by("created_at", descending=True)
    .limit(50)
    .offset(10)
    .execute()

Column Selection

# Select specific fields only
results = db.query("users/")
    .select(["name", "email"])  # Only fetch these columns
    .where("active", CompareOp.EQ, True)
    .execute()

Aggregate Queries

# Count
count = db.query("orders/")
    .where("status", CompareOp.EQ, "completed")
    .count()

# Sum (for numeric columns)
total = db.query("orders/")
    .sum("amount")

# Group by
results = db.query("orders/")
    .select(["status", "COUNT(*)", "SUM(amount)"])
    .group_by("status")
    .execute()

Query in Transaction

with db.transaction() as txn:
    results = txn.query("users/")
        .where("role", CompareOp.EQ, "admin")
        .execute()

7. Prefix Scanning

Iterate over keys with common prefixes efficiently.

Safe Prefix Scan (Recommended)

# Requires minimum 2-byte prefix (prevents accidental full scans)
for key, value in db.scan_prefix(b"users/"):
    print(f"{key.decode()}: {value.decode()}")

# Raises ValueError if prefix < 2 bytes

Unchecked Prefix Scan

# For internal operations needing empty/short prefixes
# WARNING: Can cause expensive full-database scans
for key, value in db.scan_prefix_unchecked(b""):
    print(f"All keys: {key}")

Batched Scanning (1000x Faster)

# Fetches 1000 results per FFI call instead of 1
# Performance: 10,000 results = 10 FFI calls vs 10,000 calls

for key, value in db.scan_batched(b"prefix/", batch_size=1000):
    process(key, value)

Reverse Scan

# Scan in reverse order (newest first)
for key, value in db.scan_prefix(b"logs/", reverse=True):
    print(key, value)

Range Scan

# Scan within a specific range
for key, value in db.scan_range(b"users/a", b"users/m"):
    print(key, value)  # All users from "a" to "m"

Streaming Large Results

# For very large result sets, use streaming to avoid memory issues
for batch in db.scan_stream(b"logs/", batch_size=10000):
    for key, value in batch:
        process(key, value)
    # Memory is freed after processing each batch

8. SQL Operations

Execute SQL queries for familiar relational patterns.

Creating Tables

db.execute_sql("""
    CREATE TABLE users (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        email TEXT UNIQUE,
        age INTEGER,
        created_at TEXT DEFAULT CURRENT_TIMESTAMP
    )
""")

db.execute_sql("""
    CREATE TABLE posts (
        id INTEGER PRIMARY KEY,
        user_id INTEGER REFERENCES users(id),
        title TEXT NOT NULL,
        content TEXT,
        likes INTEGER DEFAULT 0
    )
""")

CRUD Operations

# Insert
db.execute_sql("""
    INSERT INTO users (id, name, email, age) 
    VALUES (1, 'Alice', '[email protected]', 30)
""")

# Insert with parameters (prevents SQL injection)
db.execute_sql(
    "INSERT INTO users (id, name, email, age) VALUES (?, ?, ?, ?)",
    params=[2, "Bob", "[email protected]", 25]
)

# Select
result = db.execute_sql("SELECT * FROM users WHERE age > 25")
for row in result.rows:
    print(row)  # {'id': 1, 'name': 'Alice', ...}

# Update
db.execute_sql("UPDATE users SET email = '[email protected]' WHERE id = 1")

# Delete
db.execute_sql("DELETE FROM users WHERE id = 2")

Upsert (Insert or Update)

# Insert or update on conflict
db.execute_sql("""
    INSERT INTO users (id, name, email) VALUES (1, 'Alice', '[email protected]')
    ON CONFLICT (id) DO UPDATE SET 
        name = excluded.name,
        email = excluded.email
""")

Query Results

from toondb import SQLQueryResult

result = db.execute_sql("SELECT id, name FROM users")

print(f"Columns: {result.columns}")      # ['id', 'name']
print(f"Row count: {len(result.rows)}")
print(f"Execution time: {result.execution_time_ms}ms")

for row in result.rows:
    print(f"ID: {row['id']}, Name: {row['name']}")

# Convert to different formats
df = result.to_dataframe()  # pandas DataFrame
json_data = result.to_json()

Index Management

# Create index
db.execute_sql("CREATE INDEX idx_users_email ON users(email)")

# Create unique index
db.execute_sql("CREATE UNIQUE INDEX idx_users_email ON users(email)")

# Drop index
db.execute_sql("DROP INDEX IF EXISTS idx_users_email")

# List indexes
indexes = db.list_indexes("users")

Prepared Statements

# Prepare once, execute many times
stmt = db.prepare("SELECT * FROM users WHERE age > ? AND status = ?")

# Execute with different parameters
young_active = stmt.execute([25, "active"])
old_active = stmt.execute([50, "active"])

# Close when done
stmt.close()

Dialect Support

ToonDB auto-detects SQL dialects:

# PostgreSQL style
db.execute_sql("INSERT INTO users VALUES (1, 'Alice') ON CONFLICT DO NOTHING")

# MySQL style
db.execute_sql("INSERT IGNORE INTO users VALUES (1, 'Alice')")

# SQLite style  
db.execute_sql("INSERT OR IGNORE INTO users VALUES (1, 'Alice')")

9. Table Management & Index Policies

Table Information

# Get table schema
schema = db.get_table_schema("users")
print(f"Columns: {schema.columns}")
print(f"Primary key: {schema.primary_key}")
print(f"Indexes: {schema.indexes}")

# List all tables
tables = db.list_tables()

# Drop table
db.execute_sql("DROP TABLE IF EXISTS old_table")

Index Policies

Configure per-table indexing strategies for optimal performance:

# Policy constants
Database.INDEX_WRITE_OPTIMIZED  # 0 - O(1) insert, O(N) scan
Database.INDEX_BALANCED         # 1 - O(1) amortized insert, O(log K) scan
Database.INDEX_SCAN_OPTIMIZED   # 2 - O(log N) insert, O(log N + K) scan
Database.INDEX_APPEND_ONLY      # 3 - O(1) insert, O(N) scan (time-series)

# Set by constant
db.set_table_index_policy("logs", Database.INDEX_APPEND_ONLY)

# Set by string
db.set_table_index_policy("users", "scan_optimized")

# Get current policy
policy = db.get_table_index_policy("users")
print(f"Policy: {policy}")  # "scan_optimized"

Policy Selection Guide

| Policy | Insert | Scan | Best For | |--------|--------|------|----------| | write_optimized | O(1) | O(N) | High-write ingestion | | balanced | O(1) amortized | O(log K) | General use (default) | | scan_optimized | O(log N) | O(log N + K) | Analytics, read-heavy | | append_only | O(1) | O(N) | Time-series, logs |


10. Namespaces & Multi-Tenancy

Organize data into logical namespaces for tenant isolation.

Creating Namespaces

from toondb import NamespaceConfig

# Create namespace with metadata
ns = db.create_namespace(
    name="tenant_123",
    display_name="Acme Corp",
    labels={"tier": "premium", "region": "us-east"}
)

# Simple creation
ns = db.create_namespace("tenant_456")

Getting Namespaces

# Get existing namespace
ns = db.namespace("tenant_123")

# Get or create (idempotent)
ns = db.get_or_create_namespace("tenant_123")

# Check if exists
exists = db.namespace_exists("tenant_123")

Context Manager for Scoped Operations

with db.use_namespace("tenant_123") as ns:
    # All operations automatically scoped to tenant_123
    collection = ns.collection("documents")
    ns.put("config/key", b"value")
    
    # No need to specify namespace in each call

Namespace Operations

# List all namespaces
namespaces = db.list_namespaces()
print(namespaces)  # ['tenant_123', 'tenant_456']

# Get namespace info
info = db.namespace_info("tenant_123")
print(f"Created: {info['created_at']}")
print(f"Labels: {info['labels']}")
print(f"Size: {info['size_bytes']}")

# Update labels
db.update_namespace("tenant_123", labels={"tier": "enterprise"})

# Delete namespace (WARNING: deletes all data in namespace)
db.delete_namespace("old_tenant", force=True)

Namespace-Scoped Key-Value

ns = db.namespace("tenant_123")

# Operations automatically prefixed with namespace
ns.put("users/alice", b"data")      # Actually: tenant_123/users/alice
ns.get("users/alice")
ns.delete("users/alice")

# Scan within namespace
for key, value in ns.scan("users/"):
    print(key, value)  # Keys shown without namespace prefix

Cross-Namespace Operations

# Copy data between namespaces
db.copy_between_namespaces(
    source_ns="tenant_123",
    target_ns="tenant_456",
    prefix="shared/"
)

11. Collections & Vector Search

Collections store documents with embeddings for semantic search using HNSW.

Collection Configuration

from toondb import (
    CollectionConfig,
    DistanceMetric,
    QuantizationType,
)

config = CollectionConfig(
    name="documents",
    dimension=384,                          # Embedding dimension (must match your model)
    metric=DistanceMetric.COSINE,           # COSINE, EUCLIDEAN, DOT_PRODUCT
    m=16,                                   # HNSW M parameter (connections per node)
    ef_construction=100,                    # HNSW construction quality
    ef_search=50,                           # HNSW search quality (higher = slower but better)
    quantization=QuantizationType.NONE,     # NONE, SCALAR (int8), PQ (product quantization)
    enable_hybrid_search=False,             # Enable BM25 + vector
    content_field=None,                     # Field for BM25 indexing
)

Creating Collections

ns = db.namespace("default")

# With config object
collection = ns.create_collection(config)

# With parameters (simpler)
collection = ns.create_collection(
    name="documents",
    dimension=384,
    metric=DistanceMetric.COSINE
)

# Get existing collection
collection = ns.collection("documents")

Inserting Documents

# Single insert
collection.insert(
    id="doc1",
    vector=[0.1, 0.2, ...],  # 384-dim float array
    metadata={"title": "Introduction", "author": "Alice", "category": "tech"}
)

# Batch insert (more efficient for bulk loading)
collection.insert_batch(
    ids=["doc1", "doc2", "doc3"],
    vectors=[[...], [...], [...]],  # List of vectors
    metadata=[
        {"title": "Doc 1"},
        {"title": "Doc 2"},
        {"title": "Doc 3"}
    ]
)

# Multi-vector insert (multiple vectors per document, e.g., chunks)
collection.insert_multi(
    id="long_doc",
    vectors=[[...], [...], [...]],  # Multiple vectors for same doc
    metadata={"title": "Long Document"}
)

Vector Search

from toondb import SearchRequest

# Using SearchRequest (full control)
request = SearchRequest(
    vector=[0.15, 0.25, ...],       # Query vector
    k=10,                           # Number of results
    ef_search=100,                  # Search quality (overrides collection default)
    filter={"author": "Alice"},     # Metadata filter
    min_score=0.7,                  # Minimum similarity score
    include_vectors=False,          # Include vectors in results
    include_metadata=True,          # Include metadata in results
)
results = collection.search(request)

# Convenience method (simpler)
results = collection.vector_search(
    vector=[0.15, 0.25, ...],
    k=10,
    filter={"author": "Alice"}
)

# Process results
for result in results:
    print(f"ID: {result.id}")
    print(f"Score: {result.score:.4f}")  # Similarity score
    print(f"Metadata: {result.metadata}")

Metadata Filtering

# Equality
filter={"author": "Alice"}

# Comparison operators
filter={"age": {"$gt": 30}}              # Greater than
filter={"age": {"$gte": 30}}             # Greater than or equal
filter={"age": {"$lt": 30}}              # Less than
filter={"age": {"$lte": 30}}             # Less than or equal
filter={"author": {"$ne": "Alice"}}      # Not equal

# Array operators
filter={"category": {"$in": ["tech", "science"]}}    # In array
filter={"category": {"$nin": ["sports"]}}            # Not in array

# Logical operators
filter={"$and": [{"author": "Alice"}, {"year": 2024}]}
filter={"$or": [{"category": "tech"}, {"category": "science"}]}
filter={"$not": {"author": "Bob"}}

# Nested filters
filter={
    "$and": [
        {"$or": [{"category": "tech"}, {"category": "science"}]},
        {"year": {"$gte": 2020}}
    ]
}

Collection Management

# Get collection
collection = ns.get_collection("documents")
# or
collection = ns.collection("documents")

# List collections
collections = ns.list_collections()

# Collection info
info = collection.info()
print(f"Name: {info['name']}")
print(f"Dimension: {info['dimension']}")
print(f"Count: {info['count']}")
print(f"Metric: {info['metric']}")
print(f"Index size: {info['index_size_bytes']}")

# Delete collection
ns.delete_collection("old_collection")

# Individual document operations
doc = collection.get("doc1")
collection.delete("doc1")
collection.update("doc1", metadata={"category": "updated"})
count = collection.count()

Quantization for Memory Efficiency

# Scalar quantization (int8) - 4x memory reduction
config = CollectionConfig(
    name="documents",
    dimension=384,
    quantization=QuantizationType.SCALAR
)

# Product quantization - 32x memory reduction
config = CollectionConfig(
    name="documents",
    dimension=768,
    quantization=QuantizationType.PQ,
    pq_num_subvectors=96,   # 768/96 = 8 dimensions per subvector
    pq_num_centroids=256    # 8-bit codes
)

12. Hybrid Search (Vector + BM25)

Combine vector similarity with keyword matching for best results.

Enable Hybrid Search

config = CollectionConfig(
    name="articles",
    dimension=384,
    enable_hybrid_search=True,      # Enable BM25 indexing
    content_field="text"            # Field to index for BM25
)
collection = ns.create_collection(config)

# Insert with text content
collection.insert(
    id="article1",
    vector=[...],
    metadata={
        "title": "Machine Learning Tutorial",
        "text": "This tutorial covers the basics of machine learning...",
        "category": "tech"
    }
)

Keyword Search (BM25 Only)

results = collection.keyword_search(
    query="machine learning tutorial",
    k=10,
    filter={"category": "tech"}
)

Hybrid Search (Vector + BM25)

# Combine vector and keyword search
results = collection.hybrid_search(
    vector=[0.1, 0.2, ...],        # Query embedding
    text_query="machine learning",  # Keyword query
    k=10,
    alpha=0.7,  # 0.0 = pure keyword, 1.0 = pure vector, 0.5 = balanced
    filter={"category": "tech"}
)

Full SearchRequest for Hybrid

request = SearchRequest(
    vector=[0.1, 0.2, ...],
    text_query="machine learning",
    k=10,
    alpha=0.7,                      # Blend factor
    rrf_k=60.0,                     # RRF k parameter (Reciprocal Rank Fusion)
    filter={"category": "tech"},
    aggregate="max",                # max | mean | first (for multi-vector docs)
    as_of="2024-01-01T00:00:00Z",   # Time-travel query
    include_vectors=False,
    include_metadata=True,
    include_scores=True,
)
results = collection.search(request)

# Access detailed results
print(f"Query time: {results.query_time_ms}ms")
print(f"Total matches: {results.total_count}")
print(f"Vector results: {results.vector_results}")    # Results from vector search
print(f"Keyword results: {results.keyword_results}")  # Results from BM25
print(f"Fused results: {results.fused_results}")      # Combined results

13. Graph Operations

Build and query knowledge graphs.

Adding Nodes

# Add a node
db.add_node(
    namespace="default",
    node_id="alice",
    node_type="person",
    properties={"role": "engineer", "team": "ml", "level": "senior"}
)

db.add_node("default", "project_x", "project", {"status": "active", "priority": "high"})
db.add_node("default", "bob", "person", {"role": "manager", "team": "ml"})

Adding Edges

# Add directed edge
db.add_edge(
    namespace="default",
    from_id="alice",
    edge_type="works_on",
    to_id="project_x",
    properties={"role": "lead", "since": "2024-01"}
)

db.add_edge("default", "alice", "reports_to", "bob")
db.add_edge("default", "bob", "manages", "project_x")

Graph Traversal

# BFS traversal from a starting node
nodes, edges = db.traverse(
    namespace="default",
    start_node="alice",
    max_depth=3,
    order="bfs"  # "bfs" or "dfs"
)

for node in nodes:
    print(f"Node: {node['id']} ({node['node_type']})")
    print(f"  Properties: {node['properties']}")
    
for edge in edges:
    print(f"{edge['from_id']} --{edge['edge_type']}--> {edge['to_id']}")

Filtered Traversal

# Traverse with filters
nodes, edges = db.traverse(
    namespace="default",
    start_node="alice",
    max_depth=2,
    edge_types=["works_on", "reports_to"],  # Only follow these edge types
    node_types=["person", "project"],        # Only include these node types
    node_filter={"team": "ml"}               # Filter nodes by properties
)

Graph Queries

# Find shortest path
path = db.find_path(
    namespace="default",
    from_id="alice",
    to_id="project_y",
    max_depth=5
)

# Get neighbors
neighbors = db.get_neighbors(
    namespace="default",
    node_id="alice",
    direction="outgoing"  # "outgoing", "incoming", "both"
)

# Get specific edge
edge = db.get_edge("default", "alice", "works_on", "project_x")

# Delete node (and all connected edges)
db.delete_node("default", "old_node")

# Delete edge
db.delete_edge("default", "alice", "works_on", "project_old")

14. Temporal Graph (Time-Travel)

Track state changes over time with temporal edges.

Adding Temporal Edges

import time

now = int(time.time() * 1000)  # milliseconds since epoch
one_hour = 60 * 60 * 1000

# Record: Door was open from 10:00 to 11:00
db.add_temporal_edge(
    namespace="smart_home",
    from_id="door_front",
    edge_type="STATE",
    to_id="open",
    valid_from=now - one_hour,  # Start time (ms)
    valid_until=now,             # End time (ms)
    properties={"sensor": "motion_1", "confidence": 0.95}
)

# Record: Light is currently on (no end time yet)
db.add_temporal_edge(
    namespace="smart_home",
    from_id="light_living",
    edge_type="STATE",
    to_id="on",
    valid_from=now,
    valid_until=0,  # 0 = still valid (no end time)
    properties={"brightness": "80%", "color": "warm"}
)

Time-Travel Queries

# Query modes:
# - "CURRENT": Edges valid right now
# - "POINT_IN_TIME": Edges valid at specific timestamp
# - "RANGE": All edges within a time range

# What is the current state?
edges = db.query_temporal_graph(
    namespace="smart_home",
    node_id="door_front",
    mode="CURRENT",
    edge_type="STATE"
)
current_state = edges[0]["to_id"] if edges else "unknown"

# Was the door open 1.5 hours ago?
edges = db.query_temporal_graph(
    namespace="smart_home",
    node_id="door_front",
    mode="POINT_IN_TIME",
    timestamp=now - int(1.5 * 60 * 60 * 1000)
)
was_open = any(e["to_id"] == "open" for e in edges)

# All state changes in last hour
edges = db.query_temporal_graph(
    namespace="smart_home",
    node_id="door_front",
    mode="RANGE",
    start_time=now - one_hour,
    end_time=now
)
for edge in edges:
    print(f"State: {edge['to_id']} from {edge['valid_from']} to {edge['valid_until']}")

End a Temporal Edge

# Close the current "on" state
db.end_temporal_edge(
    namespace="smart_home",
    from_id="light_living",
    edge_type="STATE",
    to_id="on",
    end_time=int(time.time() * 1000)
)

15. Semantic Cache

Cache LLM responses with similarity-based retrieval for cost savings.

Storing Cached Responses

# Store response with embedding
db.cache_put(
    cache_name="llm_responses",
    key="What is Python?",           # Original query (for display/debugging)
    value="Python is a high-level programming language...",
    embedding=[0.1, 0.2, ...],       # Query embedding (384-dim)
    ttl_seconds=3600,                # Expire in 1 hour (0 = no expiry)
    metadata={"model": "claude-3", "tokens": 150}
)

Cache Lookup

# Check cache before calling LLM
cached = db.cache_get(
    cache_name="llm_responses",
    query_embedding=[0.12, 0.18, ...],  # Embed the new query
    threshold=0.85                       # Cosine similarity threshold
)

if cached:
    print(f"Cache HIT!")
    print(f"Original query: {cached['key']}")
    print(f"Response: {cached['value']}")
    print(f"Similarity: {cached['score']:.4f}")
else:
    print("Cache MISS - calling LLM...")
    # Call LLM and cache the result

Cache Management

# Delete specific entry
db.cache_delete("llm_responses", key="What is Python?")

# Clear entire cache
db.cache_clear("llm_responses")

# Get cache statistics
stats = db.cache_stats("llm_responses")
print(f"Total entries: {stats['count']}")
print(f"Hit rate: {stats['hit_rate']:.2%}")
print(f"Memory usage: {stats['size_bytes']}")

Full Usage Pattern

def get_llm_response(query: str, embed_fn, llm_fn):
    """Get response from cache or LLM."""
    query_embedding = embed_fn(query)
    
    # Try cache first
    cached = db.cache_get(
        cache_name="llm_responses",
        query_embedding=query_embedding,
        threshold=0.90
    )
    
    if cached:
        return cached['value']
    
    # Cache miss - call LLM
    response = llm_fn(query)
    
    # Store in cache
    db.cache_put(
        cache_name="llm_responses",
        key=query,
        value=response,
        embedding=query_embedding,
        ttl_seconds=86400  # 24 hours
    )
    
    return response

16. Context Query Builder (LLM Optimization)

Assemble LLM context with token budgeting and priority-based truncation.

Basic Context Query

from toondb import ContextQueryBuilder, ContextFormat, TruncationStrategy

# Build context for LLM
context = ContextQueryBuilder() \
    .for_session("session_123") \
    .with_budget(4096) \
    .format(ContextFormat.TOON) \
    .literal("SYSTEM", priority=0, text="You are a helpful assistant.") \
    .section("USER_PROFILE", priority=1) \
        .get("user.profile.{name, preferences}") \
        .done() \
    .section("HISTORY", priority=2) \
        .last(10, "messages") \
        .where_eq("session_id", "session_123") \
        .done() \
    .section("KNOWLEDGE", priority=3) \
        .search("documents", "$query_embedding", k=5) \
        .done() \
    .execute()

print(f"Token count: {context.token_count}")
print(f"Context:\n{context.text}")

Section Types

| Type | Method | Description | |------|--------|-------------| | literal | .literal(name, priority, text) | Static text content | | get | .get(path) | Fetch specific data by path | | last | .last(n, table) | Most recent N records from table | | search | .search(collection, embedding, k) | Vector similarity search | | sql | .sql(query) | SQL query results |

Truncation Strategies

# Drop from end (keep beginning) - default
.truncation(TruncationStrategy.TAIL_DROP)

# Drop from beginning (keep end)
.truncation(TruncationStrategy.HEAD_DROP)

# Proportionally truncate across sections
.truncation(TruncationStrategy.PROPORTIONAL)

# Fail if budget exceeded
.truncation(TruncationStrategy.STRICT)

Variables and Bindings

from toondb import ContextValue

context = ContextQueryBuilder() \
    .for_session("session_123") \
    .set_var("query_embedding", ContextValue.Embedding([0.1, 0.2, ...])) \
    .set_var("user_id", ContextValue.String("user_456")) \
    .section("KNOWLEDGE", priority=2) \
        .search("documents", "$query_embedding", k=5) \
        .done() \
    .execute()

Output Formats

# TOON format (40-60% fewer tokens)
.format(ContextFormat.TOON)

# JSON format
.format(ContextFormat.JSON)

# Markdown format (human-readable)
.format(ContextFormat.MARKDOWN)

# Plain text
.format(ContextFormat.TEXT)

Session Management (Agent Context)

Stateful session management for agentic use cases with permissions, sandboxing, audit logging, and budget tracking.

Session Overview

Agent session abc123:
  cwd: /agents/abc123
  vars: $model = "gpt-4", $budget = 1000
  permissions: fs:rw, db:rw, calc:*
  audit: [read /data/users, write /agents/abc123/cache]

Creating Sessions

from toondb import SessionManager, AgentContext
from datetime import timedelta

# Create session manager with idle timeout
session_mgr = SessionManager(idle_timeout=timedelta(hours=1))

# Create a new session
session = session_mgr.create_session("session_abc123")

# Get existing session
session = session_mgr.get_session("session_abc123")

# Get or create (idempotent)
session = session_mgr.get_or_create("session_abc123")

# Remove session
session_mgr.remove_session("session_abc123")

# Cleanup expired sessions
removed_count = session_mgr.cleanup_expired()

# Get active session count
count = session_mgr.session_count()

Agent Context

from toondb import AgentContext, ContextValue

# Create agent context
ctx = AgentContext("session_abc123")
print(f"Session ID: {ctx.session_id}")
print(f"Working dir: {ctx.working_dir}")  # /agents/session_abc123

# Create with custom working directory
ctx = AgentContext.with_working_dir("session_abc123", "/custom/path")

# Create with full permissions (trusted agents)
ctx = AgentContext.with_full_permissions("session_abc123")

Session Variables

# Set variables
ctx.set_var("model", ContextValue.String("gpt-4"))
ctx.set_var("budget", ContextValue.Number(1000.0))
ctx.set_var("debug", ContextValue.Bool(True))
ctx.set_var("tags", ContextValue.List([
    ContextValue.String("ml"),
    ContextValue.String("production")
]))

# Get variables
model = ctx.get_var("model")  # Returns ContextValue or None
budget = ctx.get_var("budget")

# Peek (read-only, no audit)
value = ctx.peek_var("model")

# Variable substitution in strings
text = ctx.substitute_vars("Using $model with budget $budget")
# Result: "Using gpt-4 with budget 1000"

Context Value Types

from toondb import ContextValue

# String
ContextValue.String("hello")

# Number (float)
ContextValue.Number(42.5)

# Boolean
ContextValue.Bool(True)

# List
ContextValue.List([
    ContextValue.String("a"),
    ContextValue.Number(1.0)
])

# Object (dict)
ContextValue.Object({
    "key": ContextValue.String("value"),
    "count": ContextValue.Number(10.0)
})

# Null
ContextValue.Null()

Permissions

from toondb import (
    AgentPermissions,
    FsPermissions,
    DbPermissions,
    NetworkPermissions
)

# Configure permissions
ctx.permissions = AgentPermissions(
    filesystem=FsPermissions(
        read=True,
        write=True,
        mkdir=True,
        delete=False,
        allowed_paths=["/agents/session_abc123", "/shared/data"]
    ),
    database=DbPermissions(
        read=True,
        write=True,
        create=False,
        drop=False,
        allowed_tables=["user_*", "cache_*"]  # Pattern matching
    ),
    calculator=True,
    network=NetworkPermissions(
        http=True,
        allowed_domains=["api.example.com", "*.internal.net"]
    )
)

# Check permissions before operations
try:
    ctx.check_fs_permission("/agents/session_abc123/data.json", AuditOperation.FS_READ)
    # Permission granted
except ContextError as e:
    print(f"Permission denied: {e}")

try:
    ctx.check_db_permission("user_profiles", AuditOperation.DB_QUERY)
    # Permission granted
except ContextError as e:
    print(f"Permission denied: {e}")

Budget Tracking

from toondb import OperationBudget

# Configure budget limits
ctx.budget = OperationBudget(
    max_tokens=100000,        # Maximum tokens (input + output)
    max_cost=5000,            # Maximum cost in millicents ($50.00)
    max_operations=10000      # Maximum operation count
)

# Consume budget (called automatically by operations)
try:
    ctx.consume_budget(tokens=500, cost=10)  # 500 tokens, $0.10
except ContextError as e:
    if "Budget exceeded" in str(e):
        print("Budget limit reached!")

# Check budget status
print(f"Tokens used: {ctx.budget.tokens_used}/{ctx.budget.max_tokens}")
print(f"Cost used: ${ctx.budget.cost_used / 100:.2f}/${ctx.budget.max_cost / 100:.2f}")
print(f"Operations: {ctx.budget.operations_used}/{ctx.budget.max_operations}")

Session Transactions

# Begin transaction within session
ctx.begin_transaction(tx_id=12345)

# Create savepoint
ctx.savepoint("before_update")

# Record pending writes (for rollback)
ctx.record_pending_write(
    resource_type=ResourceType.FILE,
    resource_key="/agents/session_abc123/data.json",
    original_value=b'{"old": "data"}'
)

# Commit transaction
ctx.commit_transaction()

# Or rollback
pending_writes = ctx.rollback_transaction()
for write in pending_writes:
    print(f"Rolling back: {write.resource_key}")
    # Restore original_value

Path Resolution

# Paths are resolved relative to working directory
ctx = AgentContext.with_working_dir("session_abc123", "/home/agent")

# Relative paths
resolved = ctx.resolve_path("data.json")  # /home/agent/data.json

# Absolute paths pass through
resolved = ctx.resolve_path("/absolute/path")  # /absolute/path

Audit Trail

# All operations are automatically logged
# Audit entry includes: timestamp, operation, resource, result, metadata

# Export audit log
audit_log = ctx.export_audit()
for entry in audit_log:
    print(f"[{entry['timestamp']}] {entry['operation']}: {entry['resource']} -> {entry['result']}")

# Example output:
# [1705312345] var.set: model -> success
# [1705312346] fs.read: /data/config.json -> success
# [1705312347] db.query: users -> success
# [1705312348] fs.write: /forbidden/file -> denied:path not in allowed paths

Audit Operations

from toondb import AuditOperation

# Filesystem operations
AuditOperation.FS_READ
AuditOperation.FS_WRITE
AuditOperation.FS_MKDIR
AuditOperation.FS_DELETE
AuditOperation.FS_LIST

# Database operations
AuditOperation.DB_QUERY
AuditOperation.DB_INSERT
AuditOperation.DB_UPDATE
AuditOperation.DB_DELETE

# Other operations
AuditOperation.CALCULATE
AuditOperation.VAR_SET
AuditOperation.VAR_GET
AuditOperation.TX_BEGIN
AuditOperation.TX_COMMIT
AuditOperation.TX_ROLLBACK

Tool Registry

from toondb import ToolDefinition, ToolCallRecord
from datetime import datetime

# Register tools available to the agent
ctx.register_tool(ToolDefinition(
    name="search_documents",
    description="Search documents by semantic similarity",
    parameters_schema='{"type": "object", "properties": {"query": {"type": "string"}}}',
    requires_confirmation=False
))

ctx.register_tool(ToolDefinition(
    name="delete_file",
    description="Delete a file from the filesystem",
    parameters_schema='{"type": "object", "properties": {"path": {"type": "string"}}}',
    requires_confirmation=True  # Requires user confirmation
))

# Record tool calls
ctx.record_tool_call(ToolCallRecord(
    call_id="call_001",
    tool_name="search_documents",
    arguments='{"query": "machine learning"}',
    result='[{"id": "doc1", "score": 0.95}]',
    error=None,
    timestamp=datetime.now()
))

# Access tool call history
for call in ctx.tool_calls:
    print(f"{call.tool_name}: {call.result or call.error}")

Session Lifecycle

# Check session age
age = ctx.age()
print(f"Session age: {age}")

# Check idle time
idle = ctx.idle_time()
print(f"Idle time: {idle}")

# Check if expired
if ctx.is_expired(idle_timeout=timedelta(hours=1)):
    print("Session has expired!")

Complete Session Example

from toondb import (
    SessionManager, AgentContext, ContextValue,
    AgentPermissions, FsPermissions, DbPermissions,
    OperationBudget, ToolDefinition, AuditOperation
)
from datetime import timedelta

# Initialize session manager
session_mgr = SessionManager(idle_timeout=timedelta(hours=2))

# Create session for an agent
session_id = "agent_session_12345"
ctx = session_mgr.get_or_create(session_id)

# Configure the agent
ctx.permissions = AgentPermissions(
    filesystem=FsPermissions(
        read=True,
        write=True,
        allowed_paths=[f"/agents/{session_id}", "/shared"]
    ),
    database=DbPermissions(
        read=True,
        write=True,
        allowed_tables=["documents", "cache_*"]
    ),
    calculator=True
)

ctx.budget = OperationBudget(
    max_tokens=50000,
    max_cost=1000,  # $10.00
    max_operations=1000
)

# Set initial variables
ctx.set_var("model", ContextValue.String("claude-3-sonnet"))
ctx.set_var("temperature", ContextValue.Number(0.7))

# Register available tools
ctx.register_tool(ToolDefinition(
    name="vector_search",
    description="Search vectors by similarity",
    parameters_schema='{"type": "object", "properties": {"query": {"type": "string"}, "k": {"type": "integer"}}}',
    requires_confirmation=False
))

# Perform operations with permission checks
def safe_read_file(ctx: AgentContext, path: str) -> bytes:
    resolved = ctx.resolve_path(path)
    ctx.check_fs_permission(resolved, AuditOperation.FS_READ)
    ctx.consume_budget(tokens=100, cost=1)
    # ... actual file read ...
    return b"file contents"

def safe_db_query(ctx: AgentContext, table: str, query: str):
    ctx.check_db_permission(table, AuditOperation.DB_QUERY)
    ctx.consume_budget(tokens=500, cost=5)
    # ... actual query ...
    return []

# Use in transaction
ctx.begin_transaction(tx_id=1)
try:
    # Operations here...
    ctx.commit_transaction()
except Exception as e:
    ctx.rollback_transaction()
    raise

# Export audit trail for debugging/compliance
audit = ctx.export_audit()
print(f"Session performed {len(audit)} operations")

# Cleanup
session_mgr.cleanup_expired()

Session Errors

from toondb import ContextError

try:
    ctx.check_fs_permission("/forbidden", AuditOperation.FS_READ)
except ContextError as e:
    if e.is_permission_denied():
        print(f"Permission denied: {e.message}")
    elif e.is_variable_not_found():
        print(f"Variable not found: {e.variable_name}")
    elif e.is_budget_exceeded():
        print(f"Budget exceeded: {e.budget_type}")
    elif e.is_transaction_error():
        print(f"Transaction error: {e.message}")
    elif e.is_invalid_path():
        print(f"Invalid path: {e.path}")
    elif e.is_session_expired():
        print("Session has expired")

17. Atomic Multi-Index Writes

Ensure consistency across KV storage, vectors, and graphs with atomic operations.

Problem Without Atomicity

# Without atomic writes, a crash can leave:
# - Embedding exists but graph edges don't
# - KV data exists but embedding is missing
# - Partial graph relationships

Atomic Memory Writer

from toondb import AtomicMemoryWriter, MemoryOp

writer = AtomicMemoryWriter(db)

# Build atomic operation set
result = writer.write_atomic(
    memory_id="memory_123",
    ops=[
        # Store the blob/content
        MemoryOp.PutBlob(
            key=b"memories/memory_123/content",
            value=b"Meeting notes: discussed project timeline..."
        ),
        
        # Store the embedding
        MemoryOp.PutEmbedding(
            collection="memories",
            id="memory_123",
            embedding=[0.1, 0.2, ...],
            metadata={"type": "meeting", "date": "2024-01-15"}
        ),
        
        # Create graph nodes
        MemoryOp.CreateNode(
            namespace="default",
            node_id="memory_123",
            node_type="memory",
            properties={"importance": "high"}
        ),
        
        # Create graph edges
        MemoryOp.CreateEdge(
            namespace="default",
            from_id="memory_123",
            edge_type="relates_to",
            to_id="project_x",
            properties={}
        ),
    ]
)

print(f"Intent ID: {result.intent_id}")
print(f"Operations applied: {result.ops_applied}")
print(f"Status: {result.status}")  # "committed"

How It Works

1. Write intent(id, ops...) to WAL    ← Crash-safe
2. Apply ops one-by-one
3. Write commit(id) to WAL            ← All-or-nothing
4. Recovery replays incomplete intents

18. Recovery & WAL Management

ToonDB uses Write-Ahead Logging (WAL) for durability with automatic recovery.

Recovery Manager

from toondb import RecoveryManager

recovery = db.recovery()

# Check if recovery is needed
if recovery.needs_recovery():
    result = recovery.recover()
    print(f"Status: {result.status}")
    print(f"Replayed entries: {result.replayed_entries}")

WAL Verification

# Verify WAL integrity
result = recovery.verify_wal()

print(f"Valid: {result.is_valid}")
print(f"Total entries: {result.total_entries}")
print(f"Valid entries: {result.valid_entries}")
print(f"Corrupted: {result.corrupted_entries}")
print(f"Last valid LSN: {result.last_valid_lsn}")

if result.checksum_errors:
    for error in result.checksum_errors:
        print(f"Checksum error at LSN {error.lsn}: expected {error.expected}, got {error.actual}")

Force Checkpoint

# Force a checkpoint (flush memtable to disk)
result = recovery.checkpoint()

print(f"Checkpoint LSN: {result.checkpoint_lsn}")
print(f"Duration: {result.duration_ms}ms")

WAL Statistics

stats = recovery.wal_stats()

print(f"Total size: {stats.total_size_bytes} bytes")
print(f"Active size: {stats.active_size_bytes} bytes")
print(f"Archived size: {stats.archived_size_bytes} bytes")
print(f"Entry count: {stats.entry_count}")
print(f"Oldest LSN: {stats.oldest_entry_lsn}")
print(f"Newest LSN: {stats.newest_entry_lsn}")

WAL Truncation

# Truncate WAL after checkpoint (reclaim disk space)
result = recovery.truncate_wal(up_to_lsn=12345)

print(f"Truncated to LSN: {result.up_to_lsn}")
print(f"Bytes freed: {result.bytes_freed}")

Open with Auto-Recovery

from toondb import open_with_recovery

# Automatically recovers if needed
db = open_with_recovery("./my_database")

19. Checkpoints & Snapshots

Application Checkpoints

Save and restore application state for workflow interruption/resumption.

from toondb import CheckpointService

checkpoint_svc = db.checkpoint_service()

# Create a checkpoint
checkpoint_id = checkpoint_svc.create(
    name="workflow_step_3",
    state=serialized_state,  # bytes
    metadata={"step": "3", "user": "alice", "workflow": "data_pipeline"}
)

# Restore checkpoint
state = checkpoint_svc.restore(checkpoint_id)

# List checkpoints
checkpoints = checkpoint_svc.list()
for cp in checkpoints:
    print(f"{cp.name}: {cp.created_at}, {cp.state_size} bytes")

# Delete checkpoint
checkpoint_svc.delete(checkpoint_id)

Workflow Checkpointing

# Create a workflow run
run_id = checkpoint_svc.create_run(
    workflow="data_pipeline",
    params={"input_file": "data.csv", "batch_size": 1000}
)

# Save checkpoint at each node/step
checkpoint_svc.save_node_checkpoint(
    run_id=run_id,
    node_id="transform_step",
    state=step_state,
    metadata={"rows_processed": 5000}
)

# Load latest checkpoint for a node
checkpoint = checkpoint_svc.load_node_checkpoint(run_id, "transform_step")

# List all checkpoints for a run
node_checkpoints = checkpoint_svc.list_run_checkpoints(run_id)

Snapshot Reader (Point-in-Time)

# Create a consistent snapshot for reading
snapshot = db.snapshot()

# Read from snapshot (doesn't see newer writes)
value = snapshot.get(b"key")

# All reads within snapshot see consistent state
with db.snapshot() as snap:
    v1 = snap.get(b"key1")
    v2 = snap.get(b"key2")  # Same consistent view

# Meanwhile, writes continue in main DB
db.put(b"key1", b"new_value")  # Snapshot doesn't see this

20. Compression & Storage

Compression Settings

from toondb import CompressionType

db = Database.open("./my_db", config={
    # Compression for SST files
    "compression": CompressionType.LZ4,      # LZ4 (fast), ZSTD (better ratio), NONE
    "compression_level": 3,                   # ZSTD: 1-22, LZ4: ignored
    
    # Compression for WAL
    "wal_compression": CompressionType.NONE,  # Usually NONE for WAL (already sequential)
})

Compression Comparison

| Type | Ratio | Compress Speed | Decompress Speed | Use Case | |------|-------|----------------|------------------|----------| | NONE | 1x | N/A | N/A | Already compressed data | | LZ4 | ~2.5x | ~780 MB/s | ~4500 MB/s | General use (default) | | ZSTD | ~3.5x | ~520 MB/s | ~1800 MB/s | Cold storage, large datasets |

Storage Statistics

stats = db.storage_stats()

print(f"Data size: {stats.data_size_bytes}")
print(f"Index size: {stats.index_size_bytes}")
print(f"WAL size: {stats.wal_size_bytes}")
print(f"Compression ratio: {stats.compression_ratio:.2f}x")
print(f"SST files: {stats.sst_file_count}")
print(f"Levels: {stats.level_stats}")

Compaction Control

# Manual compaction (reclaim space, optimize reads)
db.compact()

# Compact specific level
db.compact_level(level=0)

# Get compaction stats
stats = db.compaction_stats()
print(f"Pending compactions: {stats.pending_compactions}")
print(f"Running compactions: {stats.running_compactions}")

21. Statistics & Monitoring

Database Statistics

stats = db.stats()

# Transaction stats
print(f"Active transactions: {stats.active_transactions}")
print(f"Committed transactions: {stats.committed_transactions}")
print(f"Aborted transactions: {stats.aborted_transactions}")
print(f"Conflict rate: {stats.conflict_rate:.2%}")

# Operation stats
print(f"Total reads: {stats.total_reads}")
print(f"Total writes: {stats.total_writes}")
print(f"Cache hit rate: {stats.cache_hit_rate:.2%}")

# Storage stats
print(f"Key count: {stats.key_count}")
print(f"Total data size: {stats.total_data_bytes}")

Token Statistics (LLM Optimization)

stats = db.token_stats()

print(f"TOON tokens emitted: {stats.toon_tokens_emitted}")
print(f"Equivalent JSON tokens: {stats.json_tokens_equivalent}")
print(f"Token savings: {stats.token_savings_percent:.1f}%")

Performance Metrics

metrics = db.performance_metrics()

# Latency percentiles
print(f"Read P50: {metrics.read_latency_p50_us}µs")
print(f"Read P99: {metrics.read_latency_p99_us}µs")
print(f"Write P50: {metrics.write_latency_p50_us}µs")
print(f"Write P99: {metrics.write_latency_p99_us}µs")

# Throughput
print(f"Reads/sec: {metrics.reads_per_second}")
print(f"Writes/sec: {metrics.writes_per_second}")

22. Distributed Tracing

Track operations for debugging and performance analysis.

Starting Traces

from toondb import TraceStore

traces = TraceStore(db)

# Start a trace run
run = traces.start_run(
    name="user_request",
    resource={"service": "api", "version": "1.0.0"}
)
trace_id = run.trace_id

Creating Spans

from toondb import SpanKind, SpanStatusCode

# Start root span
root_span = traces.start_span(
    trace_id=trace_id,
    name="handle_request",
    parent_span_id=None,
    kind=SpanKind.SERVER
)

# Start child span
db_span = traces.start_span(
    trace_id=trace_id,
    name="database_query",
    parent_span_id=root_span.span_id,
    kind=SpanKind.CLIENT
)

# Add attributes
traces.set_span_attributes(trace_id, db_span.span_id, {
    "db.system": "toondb",
    "db.operation": "SELECT",
    "db.table": "users"
})

# End spans
traces.end_span(trace_id, db_span.span_id, SpanStatusCode.OK)
traces.end_span(trace_id, root_span.span_id, SpanStatusCode.OK)

# End the trace run
traces.end_run(trace_id, TraceStatus.COMPLETED)

Domain Events

# Log retrieval (for RAG debugging)
traces.log_retrieval(
    trace_id=trace_id,
    query="user query",
    results=[{"id": "doc1", "score": 0.95}],
    latency_ms=15
)

# Log LLM call
traces.log_llm_call(
    trace_id=trace_id,
    model="claude-3-sonnet",
    input_tokens=500,
    output_tokens=200,
    latency_ms=1200
)

23. Workflow & Run Tracking

Track long-running workflows with events and state.

Creating Workflow Runs

from toondb import WorkflowService, RunStatus

workflow_svc = db.workflow_service()

# Create a new run
run = workflow_svc.create_run(
    run_id="run_123",
    workflow="data_pipeline",
    params={"input": "data.csv", "output": "results.json"}
)

print(f"Run ID: {run.run_id}")
print(f"Status: {run.status}")
print(f"Created: {run.created_at}")

Appending Events

from toondb import WorkflowEvent, EventType

# Append events as workflow progresses
workflow_svc.append_event(WorkflowEvent(
    run_id="run_123",
    event_type=EventType.NODE_STARTED,
    node_id="extract",
    data={"input_file": "data.csv"}
))

workflow_svc.append_event(WorkflowEvent(
    run_id="run_123",
    event_type=EventType.NODE_COMPLETED,
    node_id="extract",
    data={"rows_extracted": 10000}
))

Querying Events

# Get all events for a run
events = workflow_svc.get_events("run_123")

# Get events since a sequence number
new_events = workflow_svc.get_events("run_123", since_seq=10, limit=100)

# Stream events (for real-time monitoring)
for event in workflow_svc.stream_events("run_123"):
    print(f"[{event.seq}] {event.event_type}: {event.node_id}")

Update Run Status

# Update status
workflow_svc.update_run_status("run_123", RunStatus.COMPLETED)

# Or mark as failed
workflow_svc.update_run_status("run_123", RunStatus.FAILED)

24. Server Mode (gRPC Client)

Full-featured client for distributed deployments.

Connection

from toondb import ToonDBClient

# Basic connection
client = ToonDBClient("localhost:50051")

# With TLS
client = ToonDBClient("localhost:50051", secure=True, ca_cert="ca.pem")

# With authentication
client = ToonDBClient("localhost:50051", api_key="your_api_key")

# Context manager
with ToonDBClient("localhost:50051") as client:
    client.put(b"key", b"value")

Key-Value Operations

# Put with TTL
client.put(b"key", b"value", namespace="default", ttl_seconds=3600)

# Get
value = client.get(b"key", namespace="default")

# Delete
client.delete(b"key", namespace="default")

# Batch operations
client.put_batch([
    (b"key1", b"value1"),
    (b"key2", b"value2"),
], namespace="default")

Vector Operations (Server Mode)

# Create index
client.create_index(
    name="embeddings",
    dimension=384,
    metric="cosine",
    m=16,
    ef_construction=200
)

# Insert vectors
client.insert_vectors(
    index_name="embeddings",
    ids=[1, 2, 3],
    vectors=[[...], [...], [...]]
)

# Search
results = client.search(
    index_name="embeddings",
    query=[0.1, 0.2, ...],
    k=10,
    ef_search=50
)

for result in results:
    print(f"ID: {result.id}, Distance: {result.distance}")

Collection Operations (Server Mode)

# Create collection
client.create_collection(
    name="documents",
    dimension=384,
    namespace="default",
    metric="cosine"
)

# Add documents
client.add_documents(
    collection_name="documents",
    documents=[
        {"id": "1", "content": "Hello", "embedding": [...], "metadata": {...}},
        {"id": "2", "content": "World", "embedding": [...], "metadata": {...}}
    ],
    namespace="default"
)

# Search
results = client.search_collection(
    collection_name="documents",
    query_vector=[...],
    k=10,
    namespace="default",
    filter={"author": "Alice"}
)

Context Service (Server Mode)

# Query context for LLM
context = client.query_context(
    session_id="session_123",
    sections=[
        {"name": "system", "priority": 0, "type": "literal", 
         "content": "You are a helpful assistant."},
        {"name": "history", "priority": 1, "type": "recent", 
         "table": "messages", "top_k": 10},
        {"name": "knowledge", "priority": 2, "type": "search", 
         "collection": "documents", "embedding": [...], "top_k": 5}
    ],
    token_limit=4096,
    format="toon"
)

print(context.text)
print(f"Tokens used: {context.token_count}")

25. IPC Client (Unix Sockets)

Local server communication via Unix sockets (lower latency than gRPC).

from toondb import IpcClient

# Connect
client = IpcClient.connect("/tmp/toondb.sock", timeout=30.0)

# Basic operations
client.put(b"key", b"value")
value = client.get(b"key")
client.delete(b"key")

# Path operations
client.put_path(["users", "alice"], b"data")
value = client.get_path(["users", "alice"])

# Query
result = client.query("users/", limit=100)

# Scan
results = client.scan("prefix/")

# Transactions
txn_id = client.begin_transaction()
# ... operations ...
commit_ts = client.commit(txn_id)
# or client.abort(txn_id)

# Admin
client.ping()
client.checkpoint()
stats = client.stats()

client.close()

26. Standalone VectorIndex

Direct HNSW index operations without collections.

from toondb import VectorIndex, VectorIndexConfig, DistanceMetric
import numpy as np

# Create index
config = VectorIndexConfig(
    dimension=384,
    metric=DistanceMetric.COSINE,
    m=16,
    ef_construction=200,
    ef_search=50,
    max_elements=100000
)
index = VectorIndex(config)

# Insert single vector
index.insert(id=1, vector=np.array([0.1, 0.2, ...], dtype=np.float32))

# Batch insert
ids = np.array([1, 2, 3], dtype=np.uint64)
vectors = np.array([[...], [...], [...]], dtype=np.float32)
count = index.insert_batch(ids, vectors)

# Fast batch insert (returns failures)
inserted, failed = index.insert_batch_fast(ids, vectors)

# Search
query = np.array([0.1, 0.2, ...], dtype=np.float32)
results = index.search(query, k=10, ef_search=100)

for id, distance in results:
    print(f"ID: {id}, Distance: {distance}")

# Properties
print(f"Size: {len(index)}")
print(f"Dimension: {index.dimension}")

# Save/load
index.save("./index.bin")
index = VectorIndex.load("./index.bin")

27. Vector Utilities

Standalone vector operations for preprocessing and analysis.

from toondb import vector

# Distance calculations
a = [1.0, 0.0, 0.0]
b = [0.707, 0.707, 0.0]

cosine_dist = vector.cosine_distance(a, b)
euclidean_dist = vector.euclidean_distance(a, b)
dot_product = vector.dot_product(a, b)

print(f"Cosine distance: {cosine_dist:.4f}")
print(f"Euclidean distance: {euclidean_dist:.4f}")
print(f"Dot product: {dot_product:.4f}")

# Normalize a vector
v = [3.0, 4.0]
normalized = vector.normalize(v)
print(f"Normalized: {normalized}")  # [0.6, 0.8]

# Batch normalize
vectors = [[3.0, 4.0], [1.0, 0.0]]
normalized_batch = vector.normalize_batch(vectors)

# Compute centroid
vectors = [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]
centroid = vector.centroid(vectors)

# Cosine similarity (1 - distance)
similarity = vector.cosine_similarity(a, b)

28. Data Formats (TOON/JSON/Columnar)

Wire Formats

from toondb import WireFormat

# Available formats
WireFormat.TOON      # Token-efficient (40-66% fewer tokens)
WireFormat.JSON      # Standard JSON
WireFormat.COLUMNAR  # Raw columnar for analytics

# Parse from string
fmt = WireFormat.from_string("toon")

# Convert between formats
data = {"users": [{"id": 1, "name": "Alice"}]}
toon_data = WireFormat.to_toon(data)
json_data = WireFormat.to_json(data)

TOON Format Benefits

TOON uses 40-60% fewer tokens than JSON:

# JSON (15 tokens)
{"users": [{"id": 1, "name": "Alice"}]}

# TOON (9 tokens)
users:
  - id: 1
    name: Alice

Context Formats

from toondb import ContextFormat

ContextFormat.TOON      # Token-efficient
ContextFormat.JSON      # Structured data
ContextFormat.MARKDOWN  # Human-readable

# Format capabilities
from toondb import FormatCapabilities

# Convert between formats
ctx_fmt = FormatCapabilities.wire_to_context(WireFormat.TOON)
wire_fmt = FormatCapabilities.context_to_wire(ContextFormat.JSON)

# Check round-trip support
if FormatCapabilities.supports_round_trip(WireFormat.TOON):
    print("Safe for decode(encode(x)) = x")

29. Policy Se