@andrejs1979/realtime
v1.0.0
Published
Real-time streaming and WebSocket support for NoSQL
Maintainers
Readme
NoSQL Real-time Layer
The NoSQL Real-time Layer provides comprehensive real-time capabilities for modern applications, including WebSocket connections, Server-Sent Events (SSE), document change streams, vector search streaming, time series monitoring, and multi-protocol support.
Features
Core Components
- WebSocket Server: High-performance WebSocket connections using Cloudflare Durable Objects
- SSE Fallback: Server-Sent Events for browsers without WebSocket support
- Connection Management: Intelligent connection pooling, authentication, and scaling
- Subscription System: Query-based, collection-level, and geographic subscriptions
- Performance Optimization: Message batching, backpressure handling, and adaptive throttling
Real-time Features
- Document Change Streams: Live document changes with insert/update/delete notifications
- Vector Search Streaming: Real-time vector similarity search results with adaptive thresholds
- Time Series Streaming: Live metrics with aggregations, anomaly detection, and alerts
- Collaborative Editing: Real-time operational transforms for collaborative applications
- Geographic Subscriptions: Location-based data filtering and notifications
Protocol Support
- NoSQL Protocol: Custom high-performance binary protocol with compression
- Socket.IO Compatibility: Support for Socket.IO clients
- MQTT Support: IoT device connectivity with MQTT protocol
- Multi-protocol Routing: Automatic protocol detection and routing
Quick Start
Basic WebSocket Connection
import { RealtimeManager } from '@nosql/realtime';
const realtimeManager = new RealtimeManager(env);
// Handle WebSocket connections
app.get('/ws', async (request) => {
return realtimeManager.handleRequest(request);
});Document Change Streams
import { DocumentChangeStreams } from '@nosql/realtime';
const documentStreams = new DocumentChangeStreams(documentStorage, state, env);
// Subscribe to document changes
const subscription = await documentStreams.subscribe({
collection: 'users',
filter: { status: 'active' },
fullDocument: 'updateLookup'
}, (change) => {
console.log('Document changed:', change.operationType);
console.log('Full document:', change.fullDocument);
});Vector Search Streaming
import { VectorSearchStreams } from '@nosql/realtime';
const vectorStreams = new VectorSearchStreams(vectorEngine, state, env);
// Subscribe to vector similarity updates
const vectorSub = await vectorStreams.subscribe({
queryVector: [0.1, 0.2, 0.3, /* ... */],
threshold: 0.8,
limit: 10,
adaptiveThreshold: true
}, (results) => {
results.forEach(result => {
console.log(`Similar item: ${result.id} (${result.similarity})`);
});
});Time Series Monitoring
import { TimeSeriesStreams } from '@nosql/realtime';
const timeSeriesStreams = new TimeSeriesStreams(state, env);
// Monitor metrics with alerts
const metricSub = await timeSeriesStreams.subscribe({
metrics: ['cpu.usage', 'memory.usage'],
aggregations: {
functions: ['avg', 'max', 'p95'],
window: '1m'
},
anomalyDetection: {
enabled: true,
method: 'statistical',
sensitivity: 0.8
},
alerts: {
conditions: [{
id: 'high-cpu',
metric: 'cpu.usage',
operator: 'gt',
threshold: 80,
severity: 'high'
}]
}
}, (results) => {
results.forEach(result => {
if (result.changeType === 'alert') {
console.log('🚨 ALERT:', result);
} else if (result.anomaly) {
console.log('⚠️ ANOMALY:', result);
} else {
console.log('📊 Metric:', result);
}
});
});Client SDK Integration
JavaScript/TypeScript
import { NoSQLClient } from '@nosql/sdk';
const client = new NoSQLClient({
apiUrl: 'https://your-app.workers.dev',
apiKey: 'your-api-key'
});
const collection = client.collection('users');
const realtime = collection.realtime();
// Subscribe to document changes
const subscription = await realtime.subscribe({
filter: { status: 'active' },
events: ['insert', 'update'],
fullDocument: 'updateLookup'
}, (event) => {
console.log('Document changed:', event);
});
// Watch vector similarities
const vectorWatch = await realtime.watchVectorSimilarity({
vector: userEmbedding,
threshold: 0.8,
limit: 10
}, (results) => {
console.log('Similar users:', results);
});
// Monitor time series
const metricsWatch = await realtime.watchTimeSeries({
metrics: ['page_views', 'conversion_rate'],
aggregations: {
functions: ['sum', 'avg'],
window: '5m'
}
}, (data) => {
updateDashboard(data);
});React Integration
import { useRealtime } from '@nosql/react';
function UserList() {
const { isConnected, subscribe } = useRealtime({
url: 'https://your-app.workers.dev'
});
const [users, setUsers] = useState([]);
useEffect(() => {
if (!isConnected) return;
const subscription = subscribe({
collection: 'users',
filter: { status: 'active' }
}, (event) => {
setUsers(prev => {
// Handle real-time updates
switch (event.operationType) {
case 'insert':
return [...prev, event.fullDocument];
case 'update':
return prev.map(user =>
user._id === event.documentKey._id
? event.fullDocument
: user
);
case 'delete':
return prev.filter(user => user._id !== event.documentKey._id);
default:
return prev;
}
});
});
return () => subscription.unsubscribe();
}, [isConnected, subscribe]);
return (
<div>
<h2>Active Users ({users.length})</h2>
{users.map(user => (
<div key={user._id}>{user.name}</div>
))}
</div>
);
}Advanced Features
Room-based Communication
// Connect to a specific room
const request = new Request('ws://localhost:8787/ws?room=chat-123', {
headers: {
'Upgrade': 'websocket',
'X-Client-Id': 'user-456'
}
});
// Broadcast to room members
await webSocketDO.broadcastToRoom('chat-123', {
type: 'message',
data: { text: 'Hello room!', user: 'Alice' }
});Geographic Subscriptions
// Subscribe to events in a geographic area
const geoSub = await documentStreams.subscribe({
collection: 'events',
filter: {
location: {
type: 'circle',
coordinates: [-122.4194, 37.7749], // San Francisco
radius: 1000 // 1km radius
}
}
}, (event) => {
console.log('Local event:', event.fullDocument);
});Performance Monitoring
import { BackpressureHandler } from '@nosql/realtime';
const backpressureHandler = new BackpressureHandler({
maxBufferSize: 1024 * 1024, // 1MB
warningThreshold: 0.7,
criticalThreshold: 0.9,
adaptiveThrottling: true
});
// Monitor connection health
const shouldThrottle = await backpressureHandler.shouldThrottle(connectionId);
if (shouldThrottle) {
await backpressureHandler.applyThrottle(connectionId);
}MCP Integration
The real-time layer provides MCP tools for AI agents:
import { MCPRealtimeTools } from '@nosql/realtime';
const mcpTools = new MCPRealtimeTools(realtimeManager, {
enableStreaming: true,
maxConcurrentStreams: 10,
streamTimeout: 300000
});
// AI agents can use these tools:
// - subscribe_realtime
// - stream_query_results
// - watch_vector_similarities
// - monitor_metrics
// - get_realtime_statsDeployment
Cloudflare Workers Configuration
# wrangler.toml
[durable_objects]
bindings = [
{ name = "WEBSOCKET_DO", class_name = "WebSocketDurableObject" },
{ name = "SSE_DO", class_name = "SSEServerDurableObject" }
]
[vars]
ALLOW_ANONYMOUS_WS = "false"
MAX_CONNECTIONS_PER_DO = "10000"
ENABLE_COMPRESSION = "true"Environment Variables
# Required
EDGEVECTOR_API_KEY=your-api-key
EDGEVECTOR_DATABASE_URL=your-database-url
# Optional
WEBSOCKET_HEARTBEAT_INTERVAL=30000
SSE_HEARTBEAT_INTERVAL=30000
MAX_CONCURRENT_STREAMS=100
ENABLE_BACKPRESSURE=true
ENABLE_MESSAGE_BATCHING=truePerformance Characteristics
- 10K+ concurrent connections per edge location
- Sub-10ms event delivery latency from edge to client
- Automatic reconnection with exponential backoff
- Message batching for improved throughput
- Adaptive throttling to prevent overload
- Circuit breakers for connection protection
- Compression support for reduced bandwidth
Security
- Connection authentication via JWT or API keys
- Rate limiting per connection and subscription
- IP allowlisting for restricted access
- Encrypted connections (WSS/HTTPS only)
- Authorization checks for subscription access
Monitoring and Observability
// Get real-time statistics
const stats = realtimeManager.getFlowControlManager()
.getBackpressureHandler()
.getSystemStats();
console.log({
totalConnections: stats.totalConnections,
throttledConnections: stats.throttledConnections,
averageLatency: stats.averageLatency,
averageErrorRate: stats.averageErrorRate
});Examples
See the /examples directory for complete implementation examples:
- Basic Usage: Simple WebSocket and SSE connections
- Collaborative Editing: Real-time document collaboration
- IoT Monitoring: Time series data from IoT devices
- Live Dashboard: Real-time analytics dashboard
- Geographic Events: Location-based notifications
API Reference
Classes
RealtimeManager: Main orchestrator for real-time featuresWebSocketServer: WebSocket connection handlingSSEServer: Server-Sent Events implementationDocumentChangeStreams: Document change notificationsVectorSearchStreams: Vector similarity streamingTimeSeriesStreams: Time series data streamingSubscriptionManager: Subscription lifecycle managementConnectionManager: Connection authentication and queriesBackpressureHandler: Flow control and performance optimization
Interfaces
RealtimeMessage: WebSocket/SSE message formatSubscriptionOptions: Subscription configurationStreamOptions: Data streaming configurationPerformanceMetrics: System performance metricsConnectionMetrics: Individual connection metrics
Contributing
See the main NoSQL contributing guidelines for information about contributing to the real-time layer.
License
This project is licensed under the MIT License - see the LICENSE file for details.
