@andrejs1979/storage
v1.0.0
Published
Unified storage engine for NoSQL - multi-paradigm database abstraction
Maintainers
Readme
NoSQL Storage Engine
A high-performance, edge-native storage abstraction layer that unifies Cloudflare's storage services (D1, KV, R2, Durable Objects) into a single, intelligent API optimized for sub-10ms latency.
Features
- Unified Storage API: Single interface for all storage operations across multiple backends
- Intelligent Routing: Automatic routing of operations to optimal storage backends
- Schema-Free Architecture: MongoDB-like flexibility without migration headaches
- Sub-10ms Latency: Optimized for edge performance with advanced caching and connection pooling
- Fault Tolerance: Built-in retry mechanisms, circuit breakers, and error recovery
- Vector Operations: Native support for high-dimensional vector storage and ANN search
- Time Series Support: Optimized storage and querying for time-stamped data
- Real-time Capabilities: Streaming updates and change data capture
Architecture
┌─────────────────────────────────────┐
│ Storage Router │
│ (Intelligent Operation │
│ Routing & Caching) │
└─────────────────┬───────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐ ┌──────▼──────┐
│ D1 │ │ KV │ │ R2 │ │ Durable │
│(SQL + │ │(Docs +│ │(Blobs │ │ Objects │
│Vectors│ │Cache) │ │ +Cold)│ │(Stateful + │
│ +Hot │ │ │ │ │ │ ANN Index) │
│Series)│ │ │ │ │ │ │
└───────┘ └───────┘ └───────┘ └─────────────┘Quick Start
Basic Usage
import { createNoSQLStorage, createDefaultConfig } from './storage';
// Create storage engine with all Cloudflare services
const config = createDefaultConfig(
env.DB, // D1 Database
env.KV, // KV Namespace
env.BUCKET, // R2 Bucket
env.DURABLE_OBJ // Durable Object Namespace
);
const storage = createNoSQLStorage(config);
await storage.initialize();
// Document operations (MongoDB-like)
const doc = await storage.insertDocument('users', {
_id: 'user123',
name: 'John Doe',
email: '[email protected]',
preferences: { theme: 'dark' }
});
const users = await storage.findDocuments('users', {
filter: { 'preferences.theme': 'dark' },
limit: 10
});
// Vector operations
await storage.insertVector('embeddings', {
id: 'doc123',
vector: new Float32Array([0.1, 0.2, 0.3, ...]),
dimensions: 1536,
metadata: { source: 'document', type: 'text' }
});
const similar = await storage.searchVectors('embeddings', {
vector: queryVector,
limit: 5,
threshold: 0.8
});
// Time series operations
await storage.insertTimeSeriesPoint({
metric: 'cpu.usage',
timestamp: Date.now(),
value: 85.2,
tags: { host: 'server-1', region: 'us-east' },
fields: { cores: 8 }
});Advanced Configuration
import { createHighPerformanceConfig } from './storage';
// High-performance config for sub-10ms latency
const hpConfig = createHighPerformanceConfig(
env.DB, env.KV, env.BUCKET, env.DURABLE_OBJ
);
// Customize routing rules
hpConfig.routing.documentSizeThreshold = 256 * 1024; // 256KB
hpConfig.routing.vectorDimensionThreshold = 768;
hpConfig.routing.cacheRules.cacheTTL = 3600; // 1 hour
const storage = createNoSQLStorage(hpConfig);Storage Backends
D1 Adapter (Structured Data + Hot Storage)
- Use Cases: Structured queries, recent documents, hot time series, small vectors
- Optimizations: WAL mode, connection pooling, prepared statements
- Latency Target: 2-5ms
- Capacity: Multiple 10GB shards
KV Adapter (Document Cache + Warm Storage)
- Use Cases: Document caching, frequently accessed data, warm time series
- Optimizations: Automatic compression, intelligent TTL, batch operations
- Latency Target: 1-3ms
- Capacity: 25MB per key, unlimited keys
R2 Adapter (Large Objects + Cold Storage)
- Use Cases: Large documents, high-dimensional vectors, archived time series
- Optimizations: Multipart uploads, range requests, metadata indexing
- Latency Target: 10-20ms
- Capacity: 5GB per object, unlimited objects
Durable Object Adapter (Stateful Operations)
- Use Cases: Vector indexes, distributed locks, real-time coordination
- Optimizations: Local state, hibernation support, event streaming
- Latency Target: 3-8ms
- Capacity: 128MB per object
Schema-Free Architecture
The storage engine implements a MongoDB-like schema-free approach:
// No schema definition needed - just insert documents
await storage.insertDocument('products', {
name: 'iPhone 15',
price: 999,
categories: ['electronics', 'phones'],
specs: {
storage: '128GB',
color: 'blue',
features: ['Face ID', '5G', 'Wireless Charging']
}
});
// Add new fields anytime - no migrations!
await storage.insertDocument('products', {
name: 'MacBook Pro',
price: 1999,
categories: ['electronics', 'laptops'],
warranty: '1 year', // New field
availability: { // New nested object
inStock: true,
shipDate: '2024-01-15'
}
});
// Indexes are created automatically based on query patterns
const laptops = await storage.findDocuments('products', {
filter: {
categories: 'laptops', // Auto-indexed if queried frequently
'availability.inStock': true
}
});Performance Optimizations
Intelligent Routing
Operations are automatically routed to the most appropriate backend:
// Small document → D1 (fast SQL queries)
await storage.insertDocument('sessions', { userId: 123, token: 'abc' });
// Large document → R2 with D1 metadata
await storage.insertDocument('files', {
name: 'video.mp4',
data: largeBuffer // Automatically stored in R2
});
// Frequent reads → KV cache
await storage.findDocuments('config', { filter: { env: 'prod' } }); // Cached after first readConnection Pooling
import { ConnectionPool, D1ConnectionFactory } from './storage';
const pool = new ConnectionPool(
new D1ConnectionFactory(env.DB),
{
maxConnections: 20,
minConnections: 5,
acquireTimeoutMs: 5000,
idleTimeoutMs: 300000
}
);
const connection = await pool.acquire();
// Use connection...
await pool.release(connection);Error Handling & Retries
import { RetryableOperation, DatabaseErrorHandler } from './storage';
const retryConfig = DatabaseErrorHandler.createRetryConfig();
const retryOp = new RetryableOperation(retryConfig);
// Add custom error handlers
retryOp.addErrorHandler(async (error, context) => {
if (error.code === 'CUSTOM_ERROR' && context.attempt <= 2) {
// Custom recovery logic
return true; // Retry
}
return false; // Don't retry
});
const result = await retryOp.execute(async () => {
return await storage.insertDocument('test', document);
}, { operation: 'insertDocument', backend: 'd1' });Vector Search
Basic Vector Operations
// Insert vectors
await storage.insertVector('documents', {
id: 'doc1',
vector: embedding,
dimensions: 1536,
metadata: { title: 'AI Paper', category: 'research' }
});
// Semantic search
const results = await storage.searchVectors('documents', {
vector: queryEmbedding,
limit: 10,
threshold: 0.7,
metric: 'cosine'
});
// Hybrid search (vector + metadata filtering)
const filtered = await storage.searchHybrid('documents', {
vector: queryEmbedding,
limit: 5
}, {
category: 'research', // Document filter
publishDate: { $gte: '2024-01-01' }
});Advanced Vector Index (Durable Objects)
// Build ANN index for large vector collections
const indexId = await storage.durableObject.buildVectorIndex(
'large_embeddings',
1536, // dimensions
'cosine' // distance metric
);
// Search using optimized index
const results = await storage.durableObject.searchVectorIndex(
indexId,
queryVector,
10 // top-k results
);Time Series Support
Data Ingestion
// Single point
await storage.insertTimeSeriesPoint({
metric: 'temperature',
timestamp: Date.now(),
value: 22.5,
tags: { sensor: 'living_room', unit: 'celsius' }
});
// Batch ingestion for high throughput
const points = generateMetricPoints(); // Array of 10,000 points
await storage.insertTimeSeriesBatch(points);Querying & Aggregation
// Time range query
const data = await storage.queryTimeSeries({
metric: 'cpu.usage',
timeRange: { start: '-1h', end: 'now' },
tags: { host: 'web-server-1' }
});
// Aggregation query
const aggregated = await storage.aggregateTimeSeries({
metric: 'api.latency',
timeRange: { start: '-24h', end: 'now' },
aggregation: {
window: '5m',
functions: ['avg', 'max', 'percentile'],
percentile: 95
}
});Tiered Storage
Data automatically moves through storage tiers:
- Hot (< 24h): D1 for fast queries
- Warm (1-7 days): KV with compression
- Cold (> 7 days): R2 with columnar format
Real-time Features
Change Streams
// Watch for document changes
const changeStream = await storage.watchCollection('orders', {
bufferSize: 1000,
batchInterval: 100
});
for await (const change of changeStream) {
console.log('Change detected:', change.type, change.document);
}Live Queries
// Subscribe to query results that update in real-time
const subscription = await storage.subscribe({
collection: 'inventory',
filter: { quantity: { $lt: 10 } }, // Low stock items
onChange: (results) => {
updateDashboard(results);
}
});Monitoring & Observability
Performance Metrics
const metrics = await storage.getMetrics();
console.log({
averageLatency: metrics.averageLatency,
throughput: metrics.throughput,
errorRate: metrics.errorRate,
cacheHitRate: metrics.cacheHitRate
});
// Per-backend metrics
const d1Metrics = await storage.d1.getMetrics();
const kvMetrics = await storage.kv.getMetrics();Health Checks
import { HealthChecker } from './storage';
const healthChecker = new HealthChecker();
healthChecker.addHealthCheck('d1', async () => {
const result = await storage.executeSQL('SELECT 1');
return result.success;
}, 30000); // Check every 30 seconds
healthChecker.onHealthChange('d1', (healthy) => {
if (!healthy) {
console.warn('D1 database is unhealthy!');
// Trigger alerts, failover, etc.
}
});Best Practices
1. Optimize for Access Patterns
// Frequently accessed small documents → D1 + KV cache
await storage.insertDocument('user_sessions', session);
await storage.findDocuments('user_sessions', {
filter: { active: true },
limit: 100
}); // Will be cached automatically
// Large files → R2 with metadata in D1
await storage.putBlob('uploads/video.mp4', videoBuffer, {
'content-type': 'video/mp4',
'x-user-id': 'user123'
});2. Use Appropriate Data Models
// Time series → Specialized time series operations
await storage.insertTimeSeriesPoint(point);
// Vectors → Vector operations with metadata
await storage.insertVector('embeddings', vectorData);
// Documents → Schema-free document operations
await storage.insertDocument('content', document);3. Leverage Batch Operations
// Batch inserts for better performance
const documents = generateDocuments(1000);
await storage.insertMany('logs', documents);
// Batch vector operations
const vectors = generateVectors(500);
await Promise.all(
vectors.map(v => storage.insertVector('embeddings', v))
);4. Configure Appropriate Caching
const config = createDefaultConfig(...);
// Adjust cache TTL based on data volatility
config.routing.cacheRules.cacheTTL = 3600; // 1 hour for stable data
config.routing.cacheRules.documentCache = true; // Enable doc caching
config.routing.cacheRules.queryResultCache = true; // Cache query resultsAPI Reference
See the TypeScript definitions in ./interfaces/storage-engine.ts for the complete API reference.
Performance Benchmarks
Target performance on Cloudflare's edge network:
| Operation | Target Latency | Throughput | |-----------|----------------|------------| | Document Insert (small) | < 5ms | 10K ops/sec | | Document Query (indexed) | < 3ms | 50K ops/sec | | Vector Search (ANN) | < 10ms | 1K ops/sec | | Time Series Insert | < 2ms | 100K points/sec | | Blob Storage (< 1MB) | < 15ms | 1K ops/sec | | Cache Hit | < 1ms | 100K ops/sec |
License
MIT License - see LICENSE file for details.
