@chainlink/data-streams-sdk
v1.2.1
Published
TypeScript SDK for Chainlink Data Streams
Readme
Chainlink Data Streams TypeScript SDK
TypeScript SDK for accessing Chainlink Data Streams with real-time streaming and historical data retrieval.
Table of Contents
- Features
- Installation
- Quick Start
- Configuration
- Examples
- API Reference
- Report Format
- High Availability Mode
- Error Handling
- Observability (Logs & Metrics)
- Testing
- Feed IDs
Features
- Real-time streaming via WebSocket connections
- High Availability mode with multiple connections and automatic failover
- Historical data access via REST API
- Automatic report decoding for all supported formats (V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13)
- Metrics for monitoring and observability
- Type-safe with full TypeScript support
- Event-driven architecture for complete developer control
Installation
npm install @chainlink/data-streams-sdkRequirements:
- Node.js >= 20.0.0
- TypeScript >= 5.3.x
- Valid Chainlink Data Streams credentials
Quick Start
Set your credentials:
Option 1 - Environment variables:
export API_KEY="your_api_key_here"
export USER_SECRET="your_user_secret_here"Option 2 - .env file:
# Create .env file from template
cp .env.example .env
# Edit .env with your credentials
API_KEY="your_api_key_here"
USER_SECRET="your_user_secret_here"Basic streaming:
import { createClient, LogLevel } from '@chainlink/data-streams-sdk';
const client = createClient({
apiKey: process.env.API_KEY,
userSecret: process.env.USER_SECRET,
endpoint: "https://api.dataengine.chain.link",
wsEndpoint: "wss://ws.dataengine.chain.link",
// Comment to disable SDK logging for debugging:
logging: {
logger: console,
logLevel: LogLevel.INFO
}
});
const feedID = '0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782';
const stream = client.createStream([feedID]);
stream.on('report', (report) => {
console.log('New report:', report);
});
stream.on('error', (error) => {
console.error('Stream error:', error);
});
await stream.connect();Configuration
Configuration Interface
interface Config {
// Required
apiKey: string; // API key for authentication
userSecret: string; // User secret for authentication
endpoint: string; // REST API URL
wsEndpoint: string; // WebSocket URL
// Optional - Request & Retry
timeout?: number; // Request timeout (default: 30000ms)
retryAttempts?: number; // Retry attempts (default: 3)
retryDelay?: number; // Retry delay (default: 1000ms)
// Optional - High Availability
haMode?: boolean; // Enable HA mode (default: false)
haConnectionTimeout?: number; // HA connection timeout (default: 10000ms)
connectionStatusCallback?: (isConnected: boolean, host: string, origin: string) => void;
// Optional - Logging
logging?: LoggingConfig; // See Logging Configuration section
}Basic Usage
const client = createClient({
apiKey: process.env.API_KEY,
userSecret: process.env.USER_SECRET,
endpoint: "https://api.dataengine.chain.link",
wsEndpoint: "wss://ws.dataengine.chain.link"
});High Availability Example
const haClient = createClient({
apiKey: process.env.API_KEY,
userSecret: process.env.USER_SECRET,
endpoint: "https://api.dataengine.chain.link", // Mainnet only
wsEndpoint: "wss://ws.dataengine.chain.link", // Single endpoint with origin discovery
haMode: true,
});Note: High Availability mode is only available on mainnet, not testnet.
Examples
Quick Commands:
# Real-time streaming
npx ts-node examples/stream-reports.ts 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
# High Availability streaming
npx ts-node examples/stream-reports.ts 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782 --ha
# Get latest report
npx ts-node examples/get-latest-report.ts 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
# List all available feeds
npx ts-node examples/list-feeds.tsComplete Examples:
See examples/README.md for detailed usage instructions, setup, and all available examples including:
- Streaming: Basic streaming, HA mode, metrics monitoring
- REST API: Latest reports, historical data, bulk operations, feed management
- Configuration: Logging setup, debugging, monitoring integration
API Reference
Streaming
// Create stream
const stream = client.createStream(feedIds, options?);
// Events
stream.on('report', (report) => { ... });
stream.on('error', (error) => { ... });
stream.on('disconnected', () => { ... });
stream.on('reconnecting', (info) => { ... });
// Control
await stream.connect();
await stream.close();
// Metrics
const metrics = stream.getMetrics();Stream Options
interface StreamOptions {
maxReconnectAttempts?: number; // Default: 5
// Base delay (in ms) for exponential backoff.
// Actual delay grows as: base * 2^(attempt-1) with jitter, capped at 10000ms.
// Default: 1000ms; user-provided values are clamped to the safe range [200ms, 10000ms].
reconnectInterval?: number;
}REST API
// Get feeds
const feeds = await client.listFeeds();
// Get latest report
const report = await client.getLatestReport(feedId);
// Get historical report
const report = await client.getReportByTimestamp(feedId, timestamp);
// Get report page
const reports = await client.getReportsPage(feedId, startTime, limit?);
// Get bulk reports
const reports = await client.getReportsBulk(feedIds, timestamp);Report Format
Quick Decoder Usage
import { decodeReport } from '@chainlink/data-streams-sdk';
const decoded = decodeReport(report.fullReport, report.feedID);Schema Auto-Detection
The SDK automatically detects and decodes all report versions based on Feed ID patterns:
- V2: Feed IDs starting with
0x0002 - V3: Feed IDs starting with
0x0003(Crypto Streams) - V4: Feed IDs starting with
0x0004(Real-World Assets) - V5: Feed IDs starting with
0x0005 - V6: Feed IDs starting with
0x0006(Multiple Price Values) - V7: Feed IDs starting with
0x0007 - V8: Feed IDs starting with
0x0008(Non-OTC RWA) - V9: Feed IDs starting with
0x0009(NAV Fund Data) - V10: Feed IDs starting with
0x000a(Tokenized Equity) - V11: Feed IDs starting with
0x000b(Deutsche Boerse) - V12: Feed IDs starting with
0x000c(Nav Fund Data + Next) - V13: Feed IDs starting with
0x000d(Best Bid/Ask)
Common Fields
All reports include standard metadata:
interface BaseFields {
version: "V2" | "V3" | "V4" | "V5" | "V6" | "V7" | "V8" | "V9" | "V10" | "V11" | "V12" | "V13";
nativeFee: bigint;
linkFee: bigint;
expiresAt: number;
feedID: string;
validFromTimestamp: number;
observationsTimestamp: number;
}Schema-Specific Fields
- V2/V3/V4:
price: bigint- Standard price data - V3:
bid: bigint, ask: bigint- Crypto bid/ask spreads - V4:
marketStatus: MarketStatus- Real-world asset market status - V5:
rate: bigint, timestamp: number, duration: number- Interest rate data with observation timestamp and duration - V6:
price: bigint, price2: bigint, price3: bigint, price4: bigint, price5: bigint- Multiple price values in a single payload - V7:
exchangeRate: bigint- Exchange rate data - V8:
midPrice: bigint, lastUpdateTimestamp: number, marketStatus: MarketStatus- Non-OTC RWA data - V9:
navPerShare: bigint, navDate: number, aum: bigint, ripcord: number- NAV fund data - V10:
price: bigint, lastUpdateTimestamp: number, marketStatus: MarketStatus, currentMultiplier: bigint, newMultiplier: bigint, activationDateTime: number, tokenizedPrice: bigint- Tokenized equity data - V11:
mid: bigint, LastSeenTimestampNs: number, bid: bigint, vidVolume: number, ask: bigint, askVolume: number, lastTradedPrice: bigint, marketStatus: MarketStatus- Deutsche Boerse - V12:
navPerShare: bigint, nextNavPerShare: bigint, navDate: number, ripcord: number- NAV fund data + Next - V13:
bestAsk: bigint, bestBid: bigint, askVolume: number, bidVolume: number, lastTradedPrice: bigint- Best Bid/Ask
For complete field definitions, see the documentation.
High Availability Mode
HA mode establishes multiple simultaneous connections for zero-downtime operation:
- Automatic failover between connections
- Report deduplication across connections
- Automatic origin discovery to find available endpoints
- Per-connection monitoring and statistics
const client = createClient({
// ...config
haMode: true,
wsEndpoint: "wss://ws.dataengine.chain.link", // Single endpoint (mainnet only)
});How it works: When haMode is true, the SDK automatically discovers multiple origin endpoints behind the single URL and establishes separate connections to each origin.
Connection monitoring: The optional connectionStatusCallback can be used to integrate with external monitoring systems. The SDK already provides comprehensive connection logs, so this callback is primarily useful for custom alerting or metrics collection. See examples/metrics-monitoring.ts for a complete implementation example.
Important: HA mode is only available on mainnet endpoints.
Error Handling
Error Types Overview
| Error Type | When Thrown | Key Properties |
|---------------------------------|---------------------------------------------|-----------------------------------------------|
| ValidationError | Invalid feed IDs, timestamps, parameters | message |
| AuthenticationError | Invalid credentials, HMAC failures | message |
| APIError | HTTP 4xx/5xx, network timeouts, rate limits | statusCode, message |
| ReportDecodingError | Corrupted report data, unsupported versions | message |
| WebSocketError | Connection failures, protocol errors | message |
| OriginDiscoveryError | HA discovery failures | cause, message |
| MultiConnectionError | All HA connections failed | message |
| PartialConnectionFailureError | Some HA connections failed | failedConnections, totalConnections |
| InsufficientConnectionsError | HA degraded performance | availableConnections, requiredConnections |
Usage Examples
import {
ValidationError,
AuthenticationError,
APIError,
ReportDecodingError,
WebSocketError,
OriginDiscoveryError,
MultiConnectionError
} from './src';
// REST API error handling
try {
const report = await client.getLatestReport(feedId);
} catch (error) {
if (error instanceof ValidationError) {
// Invalid feed ID or parameters
} else if (error instanceof AuthenticationError) {
// Check API credentials
} else if (error instanceof APIError) {
// Server error - check error.statusCode (429, 500, etc.)
} else if (error instanceof ReportDecodingError) {
// Corrupted or unsupported report format
}
}
// Streaming error handling
stream.on('error', (error) => {
if (error instanceof WebSocketError) {
// Connection issues - retry or fallback
} else if (error instanceof OriginDiscoveryError) {
// HA discovery failed - falls back to static config
} else if (error instanceof MultiConnectionError) {
// All HA connections failed - critical
}
});Catch-all error handling:
import { DataStreamsError } from './src';
try {
// Any SDK operation
} catch (error) {
if (error instanceof DataStreamsError) {
// Handles ANY SDK error (base class for all error types above)
console.log('SDK error:', error.message);
} else {
// Non-SDK error (network, system, etc.)
console.log('System error:', error);
}
}Observability (Logs & Metrics)
The SDK is designed to plug into your existing observability stack.
Logging (Pino/Winston/Console)
Pass your logger to the SDK and choose a verbosity level. For deep WS diagnostics, enable connection debug.
Quick Start
import { createClient, LogLevel } from '@chainlink/data-streams-sdk';
// Silent mode (default) - Zero overhead
const client = createClient({ /* ... config without logging */ });
// Basic console logging
const client = createClient({
// ... other config
logging: {
logger: {
info: console.log,
warn: console.warn,
error: console.error
}
}
});Using Pino (structured JSON):
import pino from 'pino';
import { createClient, LogLevel } from '@chainlink/data-streams-sdk';
const root = pino({ level: process.env.PINO_LEVEL || 'info' });
const sdk = root.child({ component: 'sdk' });
const client = createClient({
// ...config
logging: {
logger: {
info: sdk.info.bind(sdk),
warn: sdk.warn.bind(sdk),
error: sdk.error.bind(sdk),
debug: sdk.debug.bind(sdk),
},
logLevel: LogLevel.INFO,
// For very verbose WS diagnostics, set DEBUG + enableConnectionDebug
// logLevel: LogLevel.DEBUG,
// enableConnectionDebug: true,
},
});Command-line with pretty output:
PINO_LEVEL=info npx ts-node examples/metrics-monitoring.ts | npx pino-prettyLog Levels
Logging Configuration Options
interface LoggingConfig {
/** External logger functions (console, winston, pino, etc.) */
logger?: {
debug?: (message: string, ...args: any[]) => void;
info?: (message: string, ...args: any[]) => void;
warn?: (message: string, ...args: any[]) => void;
error?: (message: string, ...args: any[]) => void;
};
/** Minimum logging level - filters out lower priority logs */
logLevel?: LogLevel; // DEBUG (0) | INFO (1) | WARN (2) | ERROR (3)
/** Enable WebSocket ping/pong and connection state debugging logs */
enableConnectionDebug?: boolean;
}Compatible with: console, winston, pino, and any logger with debug/info/warn/error methods. See examples/logging-basic.ts for complete integration examples.
For debugging: Use LogLevel.DEBUG for full diagnostics and enableConnectionDebug: true to see WebSocket ping/pong messages and connection state transitions.
Origin tracking in HA mode shows which specific endpoint received each report.
Metrics (stream.getMetrics())
The stream.getMetrics() API provides a complete snapshot for dashboards and alerts:
const m = stream.getMetrics();
// m.accepted, m.deduplicated, m.totalReceived
// m.partialReconnects, m.fullReconnects
// m.activeConnections, m.configuredConnections
// m.originStatus: { [origin]: ConnectionStatus }Simple periodic print (example):
setInterval(() => {
const m = stream.getMetrics();
console.log(`accepted=${m.accepted} dedup=${m.deduplicated} active=${m.activeConnections}/${m.configuredConnections}`);
}, 30000);Refer to examples/metrics-monitoring.ts for a full metrics dashboard example.
Testing
npm test # All tests
npm run test:unit # Unit tests only
npm run test:integration # Integration tests onlyFeed IDs
For available feed IDs, see the official documentation.
