@unrdf/federation
v26.4.8
Published
UNRDF Federation - Distributed RDF Query with RAFT Consensus and Multi-Master Replication
Maintainers
Readme
@unrdf/federation
Peer Discovery and Distributed Query Execution
Federate RDF graphs across multiple peers. Discover peers, route queries, and execute distributed operations with automatic failover.
Quick Start
See QUICKSTART-FEDERATION.md for a 5-minute guide to production federated queries.
One-command demo:
node examples/production-federation.mjsFeatures
Peer Management
- ✅ Peer discovery (DNS-SD, mDNS)
- ✅ Dynamic peer registration/removal
- ✅ Peer metadata and configuration
- ✅ Connection pooling
- ✅ Automatic reconnection
Query Execution
- ✅ Remote SPARQL execution
- ✅ Multiple query strategies (broadcast, selective, failover)
- ✅ Query routing and optimization
- ✅ Result aggregation
- ✅ Timeout configuration
- ✅ Retry logic
Health Monitoring
- ✅ Peer health monitoring
- ✅ Automatic health checks
- ✅ Health scores (0-100)
- ✅ Fallback peer selection
- ✅ Degraded peer handling
Statistics & Monitoring
- ✅ Query statistics tracking
- ✅ Error rate monitoring
- ✅ Performance metrics
- ✅ Per-peer statistics
- ✅ Response time tracking
- ✅ OpenTelemetry integration (metrics, traces, spans)
Installation
pnpm add @unrdf/federationUsage
Basic Federation
import { createCoordinator } from '@unrdf/federation'
// Create a federation coordinator with initial peers
const coordinator = createCoordinator({
peers: [
{
id: 'dbpedia',
endpoint: 'https://dbpedia.org/sparql',
metadata: { description: 'DBpedia SPARQL endpoint' }
},
{
id: 'wikidata',
endpoint: 'https://query.wikidata.org/sparql',
metadata: { description: 'Wikidata Query Service' }
}
],
strategy: 'broadcast', // Query all peers
timeout: 10000 // 10 second timeout
})
// Execute federated query
const result = await coordinator.query(`
SELECT DISTINCT ?type WHERE {
?s a ?type .
} LIMIT 10
`)
console.log(`Results: ${result.results.length}`)
console.log(`Success: ${result.successCount}/${result.totalPeers}`)Dynamic Peer Management
// Add peer dynamically
await coordinator.addPeer(
'local',
'http://localhost:3030/dataset/sparql',
{ description: 'Local Fuseki instance' }
)
// List registered peers
const peers = coordinator.listPeers()
peers.forEach(peer => {
console.log(`${peer.id}: ${peer.endpoint} (${peer.status})`)
})
// Remove peer
const removed = coordinator.removePeer('local')
console.log(`Removed: ${removed}`)Query Strategies
Broadcast Strategy
Query ALL peers, aggregate results:
const result = await coordinator.query(sparqlQuery, {
strategy: 'broadcast',
timeout: 10000
})
// result.peerResults contains results from each peer
// result.results contains aggregated/merged resultsUse when:
- Need comprehensive results from all sources
- Want to compare data across peers
- High availability is critical
Selective Strategy
Query HEALTHY peers only, skip degraded/unreachable:
const result = await coordinator.query(sparqlQuery, {
strategy: 'selective',
timeout: 5000
})Use when:
- Want optimal performance
- Can tolerate partial results
- Network conditions vary
Failover Strategy
Query ONE peer, try next on failure:
const result = await coordinator.query(sparqlQuery, {
strategy: 'failover',
timeout: 3000
})Use when:
- Need single authoritative source
- Want minimal network usage
- Peers have redundant data
Health Monitoring
// Run health check
const health = await coordinator.healthCheck()
console.log(`Healthy: ${health.healthyPeers}/${health.totalPeers}`)
console.log(`Degraded: ${health.degradedPeers}`)
console.log(`Unreachable: ${health.unreachablePeers}`)
// Query specific peer
const peerResult = await coordinator.queryPeer('dbpedia', sparqlQuery)
console.log(`Success: ${peerResult.success}`)
console.log(`Duration: ${peerResult.duration}ms`)Statistics
// Get federation statistics
const stats = coordinator.getStats()
console.log(`Total queries: ${stats.totalQueries}`)
console.log(`Total errors: ${stats.totalErrors}`)
console.log(`Error rate: ${(stats.errorRate * 100).toFixed(2)}%`)Architecture
Federation Coordinator
│
├── Peer Manager
│ ├── Peer Registration
│ ├── Health Tracking
│ ├── Metadata Management
│ └── Connection Pooling
│
├── Distributed Query Engine
│ ├── Query Routing
│ ├── Strategy Selection
│ │ ├── Broadcast (all peers)
│ │ ├── Selective (healthy only)
│ │ └── Failover (single with fallback)
│ ├── Parallel Execution
│ └── Result Aggregation
│
├── Health Monitor
│ ├── Periodic Health Checks
│ ├── Health Score Calculation
│ ├── Degradation Detection
│ └── Auto-Failover
│
└── Statistics Tracker
├── Query Metrics
├── Error Tracking
├── Performance Monitoring
└── Per-Peer Statistics
Query Flow:
1. Coordinator receives SPARQL query
2. Strategy determines which peers to query
3. Queries executed in parallel across selected peers
4. Results aggregated and returned
5. Health scores updated
6. Statistics recordedPeer Lifecycle
┌─────────────┐
│ INITIAL │
└──────┬──────┘
│
│ addPeer()
▼
┌─────────────┐
│ ACTIVE │◄────────┐
└──────┬──────┘ │
│ │
│ healthCheck() │
▼ │
┌─────────────┐ │
│ DEGRADED │─────────┘
└──────┬──────┘ recover
│
│ continued failure
▼
┌─────────────┐
│ UNREACHABLE │
└──────┬──────┘
│
│ removePeer()
▼
┌─────────────┐
│ REMOVED │
└─────────────┘API Reference
createCoordinator(config)
Creates a federation coordinator.
Parameters:
config.peers(Array): Initial peersid(string): Unique peer identifierendpoint(string): SPARQL endpoint URLmetadata(object, optional): Peer metadata
config.strategy(string): Default query strategy ('broadcast', 'selective', 'failover')config.timeout(number): Default timeout in millisecondsconfig.healthCheckInterval(number, optional): Health check interval (default: 60000ms)config.retryAttempts(number, optional): Retry attempts for failed queries (default: 3)config.retryDelay(number, optional): Delay between retries (default: 1000ms)
Returns: Coordinator instance
Coordinator Methods
addPeer(id, endpoint, metadata?)
Adds a new peer to the federation.
removePeer(id)
Removes a peer from the federation.
listPeers()
Returns array of all registered peers.
query(sparqlQuery, options?)
Executes a federated SPARQL query.
Options:
strategy: Query strategy ('broadcast', 'selective', 'failover')timeout: Query timeout in milliseconds
Returns: Query result object with:
success: booleanresults: Array of bindingssuccessCount: Number of successful peersfailureCount: Number of failed peerstotalDuration: Total query durationpeerResults: Per-peer result details
queryPeer(peerId, sparqlQuery)
Queries a specific peer.
healthCheck()
Runs health checks on all peers.
Returns: Health status object with:
totalPeers: Total number of peershealthyPeers: Number of healthy peersdegradedPeers: Number of degraded peersunreachablePeers: Number of unreachable peers
getStats()
Returns federation statistics.
Returns: Statistics object with:
totalQueries: Total queries executedtotalErrors: Total errors encounterederrorRate: Error rate (0-1)totalDuration: Total query duration
Use Cases
Multi-graph Queries
Query across multiple RDF sources:
const result = await coordinator.query(`
SELECT ?person ?name ?birthPlace WHERE {
?person foaf:name ?name .
?person dbo:birthPlace ?birthPlace .
} LIMIT 100
`, { strategy: 'broadcast' })Distributed Systems
Coordinate RDF operations across distributed nodes:
// Add organizational peers
await coordinator.addPeer('hr', 'http://hr.example.com/sparql')
await coordinator.addPeer('finance', 'http://finance.example.com/sparql')
// Query across departments
const employees = await coordinator.query(`
SELECT ?emp ?dept ?salary WHERE {
?emp org:department ?dept .
?emp org:salary ?salary .
}
`, { strategy: 'selective' })Data Federation
Combine datasets from multiple sources:
// Public knowledge bases
await coordinator.addPeer('dbpedia', 'https://dbpedia.org/sparql')
await coordinator.addPeer('wikidata', 'https://query.wikidata.org/sparql')
// Federated query across public + private data
const combined = await coordinator.query(`
SELECT ?entity ?label ?description WHERE {
?entity rdfs:label ?label .
OPTIONAL { ?entity schema:description ?description }
}
`, { strategy: 'broadcast' })High Availability
Route to healthy peers automatically:
// Failover automatically if primary is down
const result = await coordinator.query(sparqlQuery, {
strategy: 'failover'
})Performance Characteristics
| Metric | Value | Notes | |--------|-------|-------| | Query Latency | <100ms overhead | Federation coordination | | Peer Timeout | 10s default | Configurable per query | | Health Check | 60s interval | Background monitoring | | Failover Time | <1s | Automatic on failure | | Max Peers | Unlimited | Limited by resources | | Parallel Queries | N peers | Concurrent execution |
Troubleshooting
Cannot connect to peer
Symptom: ECONNREFUSED or timeout errors
Solution:
- Check peer endpoint URL is correct
- Verify network connectivity:
curl <endpoint> - Check firewall/CORS settings
- Increase timeout:
{ timeout: 30000 }
Query returns empty results
Symptom: results.length === 0 but no errors
Solution:
- Test query on individual peer:
coordinator.queryPeer('peer-id', query) - Check SPARQL syntax
- Verify peer has data
- Check peer endpoint accepts HTTP POST
Slow query performance
Symptom: Queries taking >10 seconds
Solution:
- Use
strategy: 'selective'to skip unhealthy peers - Reduce query complexity
- Add LIMIT clause
- Query specific peer instead of broadcast
Too many failed peers
Symptom: health.unreachablePeers > health.healthyPeers
Solution:
- Run health check:
await coordinator.healthCheck() - Remove unreachable peers
- Check peer endpoints are accessible
- Review peer configuration
Examples
Complete examples are available in the examples/ directory:
- basic.mjs - Basic federation usage (15 min)
- production-federation.mjs - Production-ready example with all features (20 min)
- distributed-queries/ - Advanced distributed query patterns
- peer-discovery/ - Peer discovery and management
New to federation? Start with QUICKSTART-FEDERATION.md.
Observability
The federation package includes built-in OpenTelemetry instrumentation for production monitoring:
import { createCoordinator } from '@unrdf/federation'
const coordinator = createCoordinator({
peers: [...],
observability: {
serviceName: 'my-federation',
version: '1.0.0'
}
})
// Metrics automatically tracked:
// - federation.queries (counter)
// - federation.errors (counter)
// - federation.query_duration (histogram)
// - federation.peer_health (up/down counter)
// - federation.concurrent_queries (gauge)Note: Metrics are exported via @opentelemetry/api and can be collected by any OTEL-compatible backend (Prometheus, Jaeger, etc.).
Dependencies
@unrdf/core- RDF substrate@unrdf/hooks- Policy enforcement@opentelemetry/api- Observability instrumentation
Browser Compatibility
- Node.js: 18+ ✅
- Browser: Not yet supported (planned)
VOC Usage
- VOC-1: Knowledge Agent (discover agent peers)
- VOC-2: Sync Agent (peer discovery and sync)
- VOC-5: Data Engineer (federate data sources)
Documentation
Complete documentation is organized using the Diataxis framework:
- Quick Start: QUICKSTART-FEDERATION.md - Get started in 5 minutes
- Examples: examples/ - Code examples
- API Reference: This README - Complete API documentation
- User Guide: See monorepo examples/ - Federation patterns
Development
# Run tests
pnpm test
# Run tests in watch mode
pnpm test:watch
# Lint
pnpm lint
# Format
pnpm formatRequirements
- Node.js >= 18.0.0
- pnpm >= 8.0.0
- Network access to SPARQL endpoints
License
MIT
