@yihuangdb/storage-object
v0.2.3
Published
A Node.js storage object layer library using Redis OM
Downloads
20
Maintainers
Readme
StorageObject API Documentation (Complete)
Table of Contents
Quick Start
The simplest way to use StorageObject:
import { storage } from '@yihuangdb/storage-object';
// Create storage with simple schema (all fields are indexed by default)
const users = await storage('users', {
name: 'text', // Full-text search field
email: 'string', // Exact match field
age: 'number', // Numeric field
isActive: 'boolean'
});
// Create
const user = await users.create({
name: 'John Doe',
email: '[email protected]',
age: 30,
isActive: true
});
// Read
const found = await users.findById(user.entityId);
const johns = await users.find({ name: 'John' });
const all = await users.findAll();
// Update
await users.update(user.entityId, { age: 31 });
// Delete
await users.delete(user.entityId);
// Export/Import
const backup = await users.backup();
const result = await users.restore(backup);Core API
StorageSystem
The main entry point for managing storage instances:
import { StorageSystem, StorageSchema } from '@yihuangdb/storage-object';
// Create a new storage
const users = await StorageSystem.create('users', {
name: 'text',
email: 'string',
age: 'number'
});
// Get existing storage
const users = await StorageSystem.get('users');
// Open storage (throws if doesn't exist)
const users = await StorageSystem.open('users');
// Get or create
const users = await StorageSystem.getOrCreate('users', schema);
// Check existence
if (await StorageSystem.exists('users')) {
// ...
}
// List all storages
const schemas = await StorageSystem.list();
const names = await StorageSystem.names();
// Delete storage
await StorageSystem.delete('users');
// Clear data without deleting schema
await StorageSystem.clear('users');
// System-wide export/import
const exported = await StorageSystem.exportAll('./backup');
const imported = await StorageSystem.importAll('./backup');StorageSchema
Define schemas with validation and advanced options:
import { StorageSchema } from '@yihuangdb/storage-object';
// Simple definition
const schema = StorageSchema.define({
name: 'text',
email: 'string',
age: 'number'
});
// Advanced definition with validation
const schema = StorageSchema.define({
email: {
type: 'string',
indexed: true,
required: true,
validate: (value) => value.includes('@') || 'Invalid email'
},
age: {
type: 'number',
indexed: true,
validate: (value) => value >= 0 || 'Age must be positive'
},
role: {
type: 'string',
indexed: true,
default: 'user'
},
tags: {
type: 'string[]',
separator: ',',
indexed: true
},
createdAt: {
type: 'date',
indexed: true,
default: () => new Date()
}
});
// Validate data against schema
const result = schema.validate(data);
if (!result.valid) {
console.error('Validation errors:', result.errors);
}
// Schema operations
const fields = schema.getFields();
const indexedFields = schema.getIndexedFields();
// Schema evolution
const extended = schema.extend({ newField: 'string' });
const merged = schema1.merge(schema2);
const diff = schema1.diff(schema2);
const migrationPlan = oldSchema.migrateTo(newSchema);Schema Definition
Field Types
string- Exact match string fieldtext- Full-text searchable string fieldnumber- Numeric field with range queriesboolean- Boolean field (true/false)date- Date field stored as timestamppoint- Geographic point field (lon,lat)string[]- Array of stringsnumber[]- Array of numbers
Field Options
{
type: 'string', // Field type (required)
indexed: true, // Enable indexing for searches
required: true, // Field is required
default: 'value', // Default value or function
validate: (v) => true, // Validation function (client-side only)
sortable: true, // Enable sorting (text only)
normalized: true, // Normalize text (text only)
separator: ',', // Array separator
description: 'Field desc' // Field documentation
}Note: Validation functions run client-side in Node.js, not in Redis.
CRUD Operations
Create Operations
// Single create
const user = await users.create({
name: 'John',
email: '[email protected]'
});
// Batch create
const created = await users.createMany([
{ name: 'John', email: '[email protected]' },
{ name: 'Jane', email: '[email protected]' }
]);
// Atomic batch create (all or nothing)
const result = await users.batchCreate(data, {
atomic: true,
chunkSize: 100
});Read Operations
// Find by ID
const user = await users.findById('uuid-here');
// Find with query
const johns = await users.find({ name: 'John' });
// Find one
const admin = await users.findOne({ role: 'admin' });
// Find all
const all = await users.findAll();
// Find with options
const results = await users.find(
{ age: { $gte: 18 } },
{
limit: 10,
offset: 0,
sortBy: 'createdAt',
sortOrder: 'DESC'
}
);
// Count
const total = await users.count();
const adults = await users.count({ age: { $gte: 18 } });
// Check existence
const exists = await users.exists('user-id');Update Operations
// Single update
const updated = await users.update('user-id', {
age: 31
});
// Update with optimistic locking
const updated = await users.update('user-id',
{ age: 31 },
{
version: user.__version,
retries: 3
}
);
// Update many
const results = await users.updateMany(
{ role: 'user' },
{ role: 'member' }
);
// Batch update
const result = await users.batchUpdate(updates, {
atomic: true
});Delete Operations
// Single delete
await users.delete('user-id');
// Delete many
const deleted = await users.deleteMany({
isActive: false
});
// Batch delete
const result = await users.batchDelete(ids, {
atomic: true
});
// Clear all data
await users.clear();Export/Import
Comprehensive export/import functionality at both storage and system levels.
Storage-Level Export/Import
// Export single storage
const metadata = await users.export('./users-backup.json');
console.log(`Exported ${metadata.exportedEntityCount} entities`);
// Export with options
const metadata = await users.export('./users.ndjson', {
exportFormat: 'ndjson', // json, ndjson, binary
compressOutput: true, // gzip compression
includeSchema: true, // include schema definition
includeData: true // include entity data
});
// Export incremental changes
const metadata = await users.export('./changes.json', {
incrementalExport: true,
fromStorageVersion: 100,
toStorageVersion: 200
});
// Import from backup
const result = await users.import('./users-backup.json');
console.log(`Imported ${result.importedEntityCount} entities`);
console.log(`Failed: ${result.failedEntityCount}`);
// Import with validation
const result = await users.import('./users.ndjson', {
validateSchemaVersion: true,
entityMergeStrategy: 'merge', // replace, merge, skip
continueOnError: true,
dryRun: false // test import without changes
});
// Quick backup/restore
const backupPath = await users.backup('before-migration');
// ... make changes ...
const result = await users.restore(backupPath);Incremental Export/Import (Version-Based)
// Export incremental changes between versions
const metadata = await users.exportIncremental(
'./changes-100-200.json',
100, // fromVersion (exclusive)
200 // toVersion (inclusive, optional - defaults to current)
);
console.log(`Exported changes from v${metadata.fromStorageVersion} to v${metadata.toStorageVersion}`);
// Export with compression
const metadata = await users.exportIncremental(
'./changes.json.gz',
lastSyncVersion,
undefined, // Export to current version
{ compressOutput: true }
);
// Import incremental changes
const result = await users.importIncremental('./changes.json');
console.log(`Applied ${result.importedEntityCount} changes`);
console.log(`Now at version ${result.endStorageVersion}`);
// Import with validation
const result = await users.importIncremental('./changes.json', {
validateSchemaVersion: true,
continueOnError: true,
entityMergeStrategy: 'merge'
});Storage Synchronization
// Sync two storage instances
const sourceStorage = await StorageSystem.get('users');
const targetStorage = await StorageSystem.get('users-replica');
// Sync changes from source to target
const syncResult = await sourceStorage.sync(targetStorage, {
fromVersion: lastSyncVersion, // Sync changes since this version
toVersion: undefined, // Sync up to current version
strategy: 'merge', // merge, replace, or skip
batchSize: 100 // Process in batches
});
console.log(`Synced ${syncResult.syncedCount} entities`);
console.log(`Failed: ${syncResult.failedCount}`);
console.log(`Source version: ${syncResult.sourceVersion}`);
console.log(`Target version: ${syncResult.targetVersion}`);
// Bidirectional sync
const sync1 = await storage1.sync(storage2, { fromVersion: lastSync1 });
const sync2 = await storage2.sync(storage1, { fromVersion: lastSync2 });Version Management
// Enable change tracking (if not enabled at creation)
const version = await storage.enableChangeTrackingAsync();
console.log(`Change tracking enabled at version ${version}`);
// Get current storage version
const currentVersion = await storage.getCurrentStorageVersion();
console.log(`Storage is at version ${currentVersion}`);
// Get changes since a specific version
const changes = await storage.getChangesSinceVersion(100, 50); // Get 50 changes since v100
console.log(`Found ${changes.storageChanges.length} changes`);
// Get change summary between versions
const summary = await storage.getChangeSummary(100, 200);
console.log(`Changes: ${summary.created} created, ${summary.updated} updated, ${summary.deleted} deleted`);
console.log(`Total: ${summary.totalChanges} changes affecting ${summary.affectedEntityIds.length} entities`);
// Get version history
const history = await storage.getVersionHistory(10); // Last 10 versions
history.forEach(entry => {
console.log(`Version ${entry.version}: ${entry.changeCount} changes at ${new Date(entry.timestamp)}`);
});System-Level Export/Import
// Export all storages
const result = await StorageSystem.exportAll('./backup', {
compressOutput: true,
includeSchema: true,
includeData: true,
schemasToExport: ['users', 'products'] // optional filter
});
console.log(`Exported ${result.totalExportedEntities} entities`);
console.log('Schemas:', result.exportedSchemas);
// Import all storages
const imported = await StorageSystem.importAll('./backup', {
validateSchemaVersion: true,
entityMergeStrategy: 'replace',
continueOnError: false,
schemasToImport: ['users'] // optional filter
});
console.log(`Imported ${imported.totalImportedEntities} entities`);
console.log(`Failed: ${imported.totalFailedEntities}`);
// Incremental export (changes only)
const lastVersions = new Map([
['users', 100],
['products', 50]
]);
const incremental = await StorageSystem.exportIncremental(
'./incremental',
lastVersions,
{ compressOutput: true }
);
// System backup/restore
const backupDir = await StorageSystem.systemBackup('full-backup');
console.log(`Backup created: ${backupDir}`);
const restored = await StorageSystem.systemRestore(backupDir);
console.log(`Restored ${restored.totalImportedEntities} entities`);Export Formats
- JSON - Standard JSON format, human-readable
- NDJSON - Newline-delimited JSON, streaming-friendly
- Binary - Compact binary format for large datasets
Merge Strategies
- replace - Replace existing entities completely
- merge - Merge new fields with existing entities
- skip - Skip entities that already exist
Advanced Features
Optimistic Locking
Prevent concurrent update conflicts:
const storage = await StorageSystem.create('users', schema, {
enableOptimisticLocking: true
});
// Updates include version checking
try {
await storage.update('id', data, {
version: entity.__version
});
} catch (error) {
if (error instanceof OptimisticLockError) {
// Handle concurrent modification
}
}Change Tracking & Versioning
Comprehensive change tracking for audit trails and synchronization:
// Enable change tracking at creation
const storage = await StorageSystem.create('users', schema, {
enableChangeTracking: true
});
// Or enable later on existing storage
const version = await storage.enableChangeTrackingAsync();
// Version Operations
const currentVersion = await storage.getCurrentStorageVersion();
const changes = await storage.getChangesSinceVersion(100, 50); // limit 50
// Change Summary
const summary = await storage.getChangeSummary(100, 200);
console.log(`${summary.created} created, ${summary.updated} updated, ${summary.deleted} deleted`);
// Version History
const history = await storage.getVersionHistory(10);
history.forEach(v => console.log(`v${v.version}: ${v.changeCount} changes`));
// Incremental Operations
await storage.exportIncremental('./changes.json', lastVersion);
await storage.importIncremental('./changes.json');
// Synchronization
await sourceStorage.sync(targetStorage, {
fromVersion: lastSyncVersion,
strategy: 'merge'
});Use Cases
Audit Trail
// Export daily changes for compliance const startOfDay = await getVersionAtTimestamp(startTime); const endOfDay = await storage.getCurrentStorageVersion(); await storage.exportIncremental(`./audit/${date}.json`, startOfDay, endOfDay);Multi-Region Sync
// Periodic sync between regions setInterval(async () => { const lastSync = await getLastSyncVersion(); await usEast.sync(euWest, { fromVersion: lastSync }); }, 60000);Real-time Replication
// Stream changes to replica async function replicate() { const changes = await primary.getChangesSinceVersion(lastVersion); for (const change of changes.storageChanges) { if (change.operation === 'c') await replica.create(...); if (change.operation === 'u') await replica.update(...); if (change.operation === 'd') await replica.delete(...); } }Conflict Resolution
// Merge changes from multiple sources await base.sync(branch1, { strategy: 'merge' }); const result = await base.sync(branch2, { strategy: 'merge', continueOnError: true }); if (result.errors) await handleConflicts(result.errors);
Batch Operations
High-performance batch operations:
// Atomic batch (all or nothing)
const result = await storage.batchCreate(items, {
atomic: true,
chunkSize: 1000
});
if (result.success) {
console.log(`Created ${result.successful} items`);
} else {
console.error(`Failed items:`, result.errors);
// Rollback handled automatically
}Performance Monitoring
Track operation performance:
import { profiler } from '@yihuangdb/storage-object';
// Enable profiling
profiler.enable();
// Perform operations...
// Get metrics
const metrics = profiler.getMetrics();
console.log('Average create time:', metrics.create.avg);
// Get report
const report = profiler.getReport();Schema Migration
Migrate between schema versions:
const oldSchema = StorageSchema.define({
firstName: 'string',
lastName: 'string'
});
const newSchema = StorageSchema.define({
fullName: 'string',
email: 'string'
});
// Plan migration
const plan = oldSchema.migrateTo(newSchema, {
transformers: {
fullName: (data) => `${data.firstName} ${data.lastName}`
}
});
// Execute migration
const result = await StorageSystem.migrate('users', newSchema, {
transform: plan.steps[0].transform,
validateData: true
});TypeScript Support
Full TypeScript support with type inference:
interface User {
name: string;
email: string;
age: number;
isActive: boolean;
}
// Type-safe storage
const users = await StorageSystem.create<User>('users', {
name: 'text',
email: 'string',
age: 'number',
isActive: 'boolean'
});
// TypeScript knows the shape
const user = await users.create({
name: 'John',
email: '[email protected]',
age: 30,
isActive: true
});
// Type checking on updates
await users.update(user.entityId, {
age: 31 // ✓ Valid
// invalid: true // ✗ TypeScript error
});Connection Management
Configure Redis connection:
// Global configuration
await StorageSystem.initialize({
redis: {
host: 'localhost',
port: 6379,
password: 'secret',
db: 0
},
connectionPool: {
maxSize: 20,
idleTimeout: 30000
},
defaultOptions: {
useJSON: true,
enableOptimisticLocking: true
}
});
// Or configure with function
StorageSystem.configure((config) => {
config.redis = {
host: process.env.REDIS_HOST || 'localhost'
};
});Error Handling
import { OptimisticLockError } from '@yihuangdb/storage-object';
try {
await storage.update(id, data);
} catch (error) {
if (error instanceof OptimisticLockError) {
// Handle concurrent modification
console.log('Conflict detected, retrying...');
} else {
// Handle other errors
console.error('Update failed:', error);
}
}Best Practices
Use the Quick Start API for simple cases
const users = await storage('users', { name: 'text', email: 'string' });Define schemas separately for reuse
const userSchema = StorageSchema.define({ /* ... */ }); const users = await StorageSystem.create('users', userSchema);Enable optimistic locking for concurrent environments
const storage = await StorageSystem.create('users', schema, { enableOptimisticLocking: true });Use batch operations for bulk data
await storage.batchCreate(largeDataSet, { atomic: true });Add validation to schemas
email: { type: 'string', validate: (v) => v.includes('@') || 'Invalid email' }Use TypeScript interfaces for type safety
interface User { /* ... */ } const users = await StorageSystem.create<User>('users', schema);Regular backups
// Automatic daily backup setInterval(async () => { await storage.backup(`daily-${new Date().toISOString()}`); }, 24 * 60 * 60 * 1000);Clean up connections when done
await storage.disconnect();
Version Tracking API Reference
Core Version Methods
| Method | Description | Returns |
|--------|-------------|------|
| getCurrentStorageVersion() | Get current version number | number \| null |
| getChangesSinceVersion(from, limit?) | Get changes since version | StorageVersionBatch |
| getChangeSummary(from, to?) | Summarize changes between versions | ChangeSummary |
| getVersionHistory(limit?) | Get version history | Array<VersionEntry> |
| enableChangeTrackingAsync() | Enable version tracking | Promise<number> |
Export/Import Methods
| Method | Type | Description |
|--------|------|-------------|
| export(path, options?) | Full/Incremental | General export with options |
| exportIncremental(path, from, to?, options?) | Incremental | Export changes between versions |
| import(path, options?) | Full/Incremental | General import with options |
| importIncremental(path, options?) | Incremental | Import and apply changes |
| backup(name?) | Full | Quick compressed backup |
| restore(path) | Full | Quick restore from backup |
Synchronization
| Method | Description |
|--------|---------|
| sync(target, options?) | Synchronize with another storage instance |
Sync Options
interface SyncOptions {
fromVersion?: number; // Start version (default: 0)
toVersion?: number; // End version (default: current)
strategy?: 'replace' | 'merge' | 'skip'; // Merge strategy
batchSize?: number; // Batch size for processing
}Complete Example
import { StorageSystem, StorageSchema } from '@yihuangdb/storage-object';
// Define typed schema
interface User {
name: string;
email: string;
age: number;
role: string;
tags: string[];
isActive: boolean;
createdAt: Date;
}
// Create schema with validation
const userSchema = StorageSchema.define<User>({
name: { type: 'text', indexed: true, required: true },
email: {
type: 'string',
indexed: true,
required: true,
validate: (email) => email.includes('@') || 'Invalid email'
},
age: {
type: 'number',
indexed: true,
validate: (age) => age >= 0 && age <= 150 || 'Invalid age'
},
role: {
type: 'string',
indexed: true,
default: 'user'
},
tags: { type: 'string[]', separator: ',' },
isActive: { type: 'boolean', indexed: true, default: true },
createdAt: { type: 'date', indexed: true, default: () => new Date() }
});
// Create storage with features
const users = await StorageSystem.create<User>('users', userSchema, {
enableOptimisticLocking: true,
enableChangeTracking: true, // Enable version tracking
useJSON: true
});
// Create users
const john = await users.create({
name: 'John Doe',
email: '[email protected]',
age: 30,
role: 'admin',
tags: ['vip', 'early-adopter']
});
// Track version after initial data
const v1 = await users.getCurrentStorageVersion();
// Query users
const admins = await users.find({ role: 'admin' });
const activeVips = await users.find({
isActive: true,
tags: ['vip']
});
// Backup before changes
const backupPath = await users.backup('before-role-migration');
// Batch update
await users.updateMany(
{ role: 'user' },
{ role: 'member' }
);
const v2 = await users.getCurrentStorageVersion();
// Get change summary
const summary = await users.getChangeSummary(v1, v2);
console.log(`Updated ${summary.updated} users from 'user' to 'member'`);
// Export incremental changes
await users.exportIncremental('./role-migration-changes.json', v1, v2);
// Create replica and sync
const replica = await StorageSystem.create<User>('users-replica', userSchema, {
enableChangeTracking: true
});
// Sync changes to replica
const syncResult = await users.sync(replica, {
fromVersion: 0,
strategy: 'replace'
});
console.log(`Synced ${syncResult.syncedCount} entities to replica`);
// Export full data
await users.export('./users-export.json', {
compressOutput: true,
includeSchema: true
});
// If something goes wrong, restore
// await users.restore(backupPath);Incremental Sync Example
// Multi-region synchronization
const primary = await StorageSystem.create('users-primary', schema, {
enableChangeTracking: true
});
const replica = await StorageSystem.create('users-replica', schema, {
enableChangeTracking: true
});
// Initial data
await primary.create({ name: 'Alice', email: '[email protected]' });
await primary.create({ name: 'Bob', email: '[email protected]' });
const v1 = await primary.getCurrentStorageVersion();
// Make changes
await primary.update(aliceId, { role: 'admin' });
await primary.delete(bobId);
await primary.create({ name: 'Charlie', email: '[email protected]' });
const v2 = await primary.getCurrentStorageVersion();
// Export only the changes
const changesPath = './changes-v1-v2.json';
await primary.exportIncremental(changesPath, v1, v2);
// Import changes to replica
await replica.importIncremental(changesPath);
// Or use direct sync
const syncResult = await primary.sync(replica, {
fromVersion: v1,
strategy: 'merge'
});
// Verify synchronization
const primaryCount = await primary.count();
const replicaCount = await replica.count();
console.log(`Sync complete: Primary=${primaryCount}, Replica=${replicaCount}`);
// Get version history
const history = await primary.getVersionHistory(5);
history.forEach(entry => {
console.log(`Version ${entry.version}: ${entry.changeCount} changes`);
});