openkbs-pulse
v2.0.1
Published
Real-time WebSocket SDK for OpenKBS
Downloads
1,666
Maintainers
Readme
OpenKBS Elastic Pulse - Real-time WebSocket SDK
Complete guide for building real-time applications with OpenKBS Elastic Pulse.
Overview
Elastic Pulse is a real-time WebSocket pub/sub service that enables instant communication between clients connected to your OpenKBS application. It supports:
- Channel-based messaging - Organize messages into logical channels
- Presence tracking - Know who's online in real-time
- Auto-reconnection - Handles network interruptions gracefully
- Multi-region - US (us-east-1) and EU (eu-central-1) endpoints
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ Client Applications │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Browser │ │ Browser │ │ Browser │ │
│ │ (pulse.js) │ │ (pulse.js) │ │ (pulse.js) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼─────────────────┼─────────────────┼───────────────────────┘
│ WebSocket │ WebSocket │ WebSocket
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ API Gateway WebSocket │
│ wss://pulse.vpc1.{us|eu} │
│ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ lambda-elastic-pulse │ │
│ │ (Connection Handler) │ │
│ └───────────┬────────────┘ │
└──────────────────────────┼──────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ DynamoDB │ │ lambda-kb │ │ Your App │
│ Connections │ │ (Publish) │ │ (Backend) │
└──────────────┘ └──────────────┘ └──────────────┘Installation
npm install openkbs-pulseOr via CDN:
<script src="https://unpkg.com/openkbs-pulse/pulse.js"></script>Frontend SDK (Browser)
Quick Start
import Pulse from 'openkbs-pulse';
// Initialize with token from your backend
const pulse = new Pulse({
kbId: 'your-kb-id',
token: 'jwt-token-from-backend',
debug: true // Optional: enable console logging
});
// Subscribe to a channel
const channel = pulse.channel('chat');
// Listen for messages
channel.subscribe('new_message', (data) => {
console.log('New message:', data);
});
// Publish a message
channel.publish('new_message', {
text: 'Hello world!',
user: 'John'
});Connection Management
| State | Description |
|-------|-------------|
| connecting | Establishing WebSocket connection |
| connected | Connected and ready |
| disconnected | Not connected |
| reconnecting | Lost connection, auto-reconnecting |
| failed | Connection attempt failed |
// Monitor connection state
pulse.onStateChange((state) => {
console.log('Connection state:', state);
});
// Check if connected
if (pulse.isConnected) {
// Safe to send messages
}
// Manually disconnect
pulse.disconnect();
// Reconnect
pulse.connect();Channels
const channel = pulse.channel('notifications');
// Subscribe to specific event
channel.subscribe('alert', (data) => {
showNotification(data.message);
});
// Subscribe to ALL events on channel
channel.subscribe((data, rawMessage) => {
console.log('Any event:', data);
});
// Unsubscribe returns a cleanup function
const unsubscribe = channel.subscribe('alert', handler);
unsubscribe(); // Stop listening
// Unsubscribe from entire channel
channel.unsubscribe();Presence
Track who's online in real-time:
const channel = pulse.channel('room-123');
// Get notified when member list changes
channel.presence.subscribe((members) => {
console.log('Online members:', members.length);
members.forEach(m => console.log(m.userId, m.connectionId));
});
// Get notified when someone joins
channel.presence.onEnter((member) => {
console.log('User joined:', member.userId);
});
// Get notified when someone leaves
channel.presence.onLeave((member) => {
console.log('User left:', member.userId);
});
// Enter presence with custom data
channel.presence.enter({
name: 'John Doe',
avatar: 'https://...',
status: 'online'
});
// Update your presence data
channel.presence.update({
status: 'away'
});
// Leave presence
channel.presence.leave();
// Get current members
const members = channel.presence.members;
const count = channel.presence.count;Full Frontend Example
import Pulse from 'openkbs-pulse';
// Get token from your backend
const { token, endpoint } = await fetch('/api/pulse-token').then(r => r.json());
const pulse = new Pulse({
kbId: 'my-app-kb-id',
token,
endpoint, // Optional: auto-detected from token
debug: true
});
// Chat channel
const chat = pulse.channel('chat');
// Listen for new messages
chat.subscribe('message', (data) => {
appendMessage(data.user, data.text, data.timestamp);
});
// Track typing indicators
chat.subscribe('typing', (data) => {
showTypingIndicator(data.user);
});
// Track online users
chat.presence.subscribe((members) => {
updateOnlineUsers(members);
});
chat.presence.enter({ name: currentUser.name });
// Send a message
function sendMessage(text) {
chat.publish('message', {
user: currentUser.name,
text,
timestamp: Date.now()
});
}
// Send typing indicator
function onTyping() {
chat.publish('typing', { user: currentUser.name });
}Server SDK (Node.js / Elastic Functions)
Quick Start
import pulse from 'openkbs-pulse/server';
// Generate token for client authentication
const tokenData = await pulse.getToken(kbId, apiKey, userId);
// Returns: { token, endpoint, region, expiresIn }
// Publish message to channel (server-side)
await pulse.publish('notifications', 'new_order', {
orderId: '12345',
total: 99.99
}, { kbId, apiKey });
// Get channel presence
const { count, members } = await pulse.presence('chat', { kbId, apiKey });API Reference
pulse.getToken(kbId, apiKey, userId)
Generate a JWT token for client WebSocket authentication.
const { token, endpoint, region, expiresIn } = await pulse.getToken(
'your-kb-id',
'your-api-key',
'user-123' // User identifier
);
// Send to client
res.json({ token, endpoint });pulse.publish(channel, event, data, options)
Publish a message from your server to all channel subscribers.
// Basic usage
await pulse.publish('orders', 'new_order', {
id: 'ord-123',
items: ['item1', 'item2']
}, { kbId, apiKey });
// The SDK reads kbId/apiKey from:
// 1. options parameter (explicit)
// 2. Environment variables: OPENKBS_KB_ID, OPENKBS_API_KEYpulse.presence(channel, options)
Get current presence information for a channel.
const { count, members } = await pulse.presence('chat', { kbId, apiKey });
console.log(`${count} users online`);
members.forEach(m => {
console.log(m.userId, m.connectionId, m.connectedAt);
});Complete Example: Photo Gallery
A real-time photo sharing application demonstrating all features.
Backend (Elastic Function)
// functions/api/index.mjs
import pg from 'pg';
import { S3Client, PutObjectCommand, DeleteObjectCommand } from '@aws-sdk/client-s3';
import pulse from 'openkbs-pulse/server';
const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
const s3 = new S3Client({ region: 'eu-central-1' });
export const handler = async (event) => {
const { action, ...data } = JSON.parse(event.body || '{}');
const kbId = event.kbId;
const apiKey = event.kbData?.AK;
switch (action) {
case 'list':
// Return all photos
const result = await pool.query('SELECT * FROM photos ORDER BY created_at DESC');
return { statusCode: 200, body: JSON.stringify(result.rows) };
case 'upload':
// Upload photo to S3
const buffer = Buffer.from(data.content, 'base64');
await s3.send(new PutObjectCommand({
Bucket: process.env.STORAGE_BUCKET,
Key: `media/${data.filename}`,
Body: buffer
}));
// Save to database
const photo = await pool.query(
'INSERT INTO photos (filename, caption, uploaded_by) VALUES ($1, $2, $3) RETURNING *',
[data.filename, data.caption, data.uploadedBy]
);
// Broadcast to all connected clients
await pulse.publish('gallery', 'photo_added', photo.rows[0], { kbId, apiKey });
return { statusCode: 200, body: JSON.stringify(photo.rows[0]) };
case 'delete':
// Delete from S3 and database...
await pulse.publish('gallery', 'photo_deleted', { id: data.id }, { kbId, apiKey });
return { statusCode: 200, body: JSON.stringify({ success: true }) };
case 'getToken':
// Generate Pulse token for client
const tokenData = await pulse.getToken(kbId, apiKey, data.userId);
return { statusCode: 200, body: JSON.stringify(tokenData) };
default:
return { statusCode: 400, body: JSON.stringify({ error: 'Invalid action' }) };
}
};Frontend (React)
// src/App.jsx
import { useState, useEffect, useRef } from 'react';
const API_URL = 'https://your-app.com/api';
const KB_ID = 'your-kb-id';
// Simple Pulse client
class PulseClient {
constructor(kbId, token, endpoint) {
this.kbId = kbId;
this.token = token;
this.endpoint = endpoint;
this.ws = null;
this.listeners = {};
this.presenceListeners = [];
}
connect() {
const url = `${this.endpoint}?kbId=${this.kbId}&token=${this.token}&channel=gallery`;
this.ws = new WebSocket(url);
this.ws.onopen = () => {
this.emit('connect');
this.ws.send(JSON.stringify({ action: 'presence', channel: 'gallery' }));
};
this.ws.onclose = () => this.emit('disconnect');
this.ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.type === 'presence') {
this.presenceListeners.forEach(fn => fn(msg.count));
} else if (msg.type === 'message' && msg.data) {
const eventType = msg.data.type || msg.data.event;
if (eventType) this.emit(eventType, msg.data);
}
};
}
on(event, fn) {
if (!this.listeners[event]) this.listeners[event] = [];
this.listeners[event].push(fn);
}
emit(event, data) {
(this.listeners[event] || []).forEach(fn => fn(data));
}
onPresence(fn) {
this.presenceListeners.push(fn);
}
close() {
if (this.ws) this.ws.close();
}
}
export default function App() {
const [photos, setPhotos] = useState([]);
const [connected, setConnected] = useState(false);
const [viewers, setViewers] = useState(0);
const pulseRef = useRef(null);
useEffect(() => {
// Load initial photos
fetch(API_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ action: 'list' })
})
.then(r => r.json())
.then(setPhotos);
// Connect to Pulse
fetch(API_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ action: 'getToken', userId: 'user_' + Math.random().toString(36).slice(2, 8) })
})
.then(r => r.json())
.then(({ token, endpoint }) => {
const pulse = new PulseClient(KB_ID, token, endpoint);
pulseRef.current = pulse;
pulse.on('connect', () => setConnected(true));
pulse.on('disconnect', () => setConnected(false));
// Real-time updates
pulse.on('photo_added', (data) => {
setPhotos(prev => {
if (prev.some(p => p.id === data.id)) return prev;
return [data, ...prev];
});
});
pulse.on('photo_deleted', (data) => {
setPhotos(prev => prev.filter(p => p.id !== data.id));
});
pulse.onPresence(setViewers);
pulse.connect();
});
return () => pulseRef.current?.close();
}, []);
const handleUpload = async (photoData) => {
await fetch(API_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ action: 'upload', ...photoData })
});
// Photo will appear via real-time update
};
return (
<div>
<header>
<span className={connected ? 'online' : 'offline'}></span>
{viewers} viewers online
</header>
<UploadForm onUpload={handleUpload} />
<div className="gallery">
{photos.map(photo => (
<PhotoCard key={photo.id} photo={photo} />
))}
</div>
</div>
);
}Message Format
Client → Server
// Subscribe to channel
{ "action": "subscribe", "channel": "chat" }
// Publish message
{ "action": "publish", "channel": "chat", "event": "message", "data": { ... } }
// Request presence
{ "action": "presence", "channel": "chat" }
// Enter presence
{ "action": "presence_enter", "channel": "chat", "data": { "name": "John" } }
// Leave presence
{ "action": "presence_leave", "channel": "chat" }
// Update presence
{ "action": "presence_update", "channel": "chat", "data": { "status": "away" } }
// Ping (heartbeat)
{ "action": "ping" }Server → Client
// Message broadcast
{
"type": "message",
"channel": "chat",
"data": { "type": "new_message", "text": "Hello", "user": "John" },
"timestamp": 1704067200000
}
// Presence sync (full member list)
{
"type": "presence",
"action": "sync",
"channel": "chat",
"count": 5,
"members": [
{ "userId": "user1", "connectionId": "abc123", "connectedAt": 1704067200000 }
]
}
// Presence enter
{
"type": "presence",
"action": "enter",
"channel": "chat",
"member": { "userId": "user2", "connectionId": "def456", "data": { "name": "Jane" } }
}
// Presence leave
{
"type": "presence",
"action": "leave",
"channel": "chat",
"member": { "userId": "user2", "connectionId": "def456" }
}
// Pong (heartbeat response)
{ "type": "pong", "timestamp": 1704067200000 }
// Error
{ "type": "error", "error": "Error message" }Enabling Elastic Pulse
Enable via OpenKBS CLI:
openkbs pulse enable --region eu-central-1Or via API:
// Call from authenticated context
await fetch('https://kb.openkbs.com', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
action: 'enableElasticPulse',
kbId: 'your-kb-id',
apiKey: 'your-api-key'
})
});Pricing
- €5 per million messages (~$5.50 USD)
- Billed probabilistically (1 in 1000 messages sampled)
- Includes connection management and presence
Limits
| Resource | Limit | |----------|-------| | Message size | 32 KB | | Presence members returned | 100 (count is accurate) | | Connection TTL | 24 hours | | Reconnect backoff | 1s → 2s → 5s → 10s → 30s max |
Pulse Constructor Options
new Pulse(options)| Option | Type | Default | Description |
|--------|------|---------|-------------|
| kbId | string | required | Your Knowledge Base ID |
| token | string | required | JWT token from backend |
| region | string | 'us-east-1' | us-east-1 or eu-central-1 |
| endpoint | string | auto | Custom WebSocket endpoint |
| channel | string | 'default' | Default channel |
| debug | boolean | false | Enable console logging |
| autoConnect | boolean | true | Connect on instantiation |
Best Practices
- Always handle disconnections - Use
onStateChangeto update UI - Deduplicate messages - Check for existing items before adding
- Use specific channels - Don't put everything in one channel
- Clean up on unmount - Call
pulse.disconnect()when component unmounts - Leave presence on page unload - Call
channel.presence.leave()
TypeScript Support
Type definitions included:
import Pulse from 'openkbs-pulse';
interface Message {
text: string;
user: string;
timestamp: number;
}
const pulse = new Pulse({ kbId: 'xxx', token: 'yyy' });
const channel = pulse.channel('chat');
channel.subscribe('message', (data: Message) => {
console.log(data.text);
});Troubleshooting
Connection fails with 401
- Token expired - get a new token from your backend
- Invalid kbId/token combination
Connection fails with 403
- Elastic Pulse not enabled for this KB
- kbId in token doesn't match connection kbId
Messages not received
- Check channel name matches between publisher and subscriber
- Verify WebSocket is connected (
pulse.isConnected) - Enable debug mode:
new Pulse({ ..., debug: true })
Presence count incorrect
- Stale connections are cleaned up automatically (410 Gone)
- TTL is 24 hours for inactive connections
