@unrdf/federation
v26.5.5
Published
UNRDF Federation - Distributed RDF Query with RAFT Consensus and Multi-Master Replication
Maintainers
Readme
@unrdf/federation
Peer Discovery and Distributed Query Execution
Federate RDF graphs across multiple peers. Discover peers, route queries, and execute distributed operations with automatic failover.
Quick Start
See QUICKSTART-FEDERATION.md for a 5-minute guide to production federated queries.
One-command demo:
node examples/production-federation.mjsFeatures
Peer Management
- ✅ Peer discovery (DNS-SD, mDNS)
- ✅ Dynamic peer registration/removal
- ✅ Peer metadata and configuration
- ✅ Connection pooling
- ✅ Automatic reconnection
Query Execution
- ✅ Remote SPARQL execution
- ✅ Multiple query strategies (broadcast, selective, failover)
- ✅ Query routing and optimization
- ✅ Result aggregation
- ✅ Timeout configuration
- ✅ Retry logic
Health Monitoring
- ✅ Peer health monitoring
- ✅ Automatic health checks
- ✅ Health scores (0-100)
- ✅ Fallback peer selection
- ✅ Degraded peer handling
Statistics & Monitoring
- ✅ Query statistics tracking
- ✅ Error rate monitoring
- ✅ Performance metrics
- ✅ Per-peer statistics
- ✅ Response time tracking
- ✅ OpenTelemetry integration (metrics, traces, spans)
Installation
pnpm add @unrdf/federationUsage
Basic Federation
import { createCoordinator } from '@unrdf/federation'
// Create a federation coordinator with initial peers
const coordinator = createCoordinator({
peers: [
{
id: 'dbpedia',
endpoint: 'https://dbpedia.org/sparql',
metadata: { description: 'DBpedia SPARQL endpoint' }
},
{
id: 'wikidata',
endpoint: 'https://query.wikidata.org/sparql',
metadata: { description: 'Wikidata Query Service' }
}
],
strategy: 'broadcast', // Query all peers
timeout: 10000 // 10 second timeout
})
// Execute federated query
const result = await coordinator.query(`
SELECT DISTINCT ?type WHERE {
?s a ?type .
} LIMIT 10
`)
console.log(`Results: ${result.results.length}`)
console.log(`Success: ${result.successCount}/${result.totalPeers}`)Dynamic Peer Management
// Add peer dynamically
await coordinator.addPeer(
'local',
'http://localhost:3030/dataset/sparql',
{ description: 'Local Fuseki instance' }
)
// List registered peers
const peers = coordinator.listPeers()
peers.forEach(peer => {
console.log(`${peer.id}: ${peer.endpoint} (${peer.status})`)
})
// Remove peer
const removed = coordinator.removePeer('local')
console.log(`Removed: ${removed}`)Query Strategies
Broadcast Strategy
Query ALL peers, aggregate results:
const result = await coordinator.query(sparqlQuery, {
strategy: 'broadcast',
timeout: 10000
})
// result.peerResults contains results from each peer
// result.results contains aggregated/merged resultsUse when:
- Need comprehensive results from all sources
- Want to compare data across peers
- High availability is critical
Selective Strategy
Query HEALTHY peers only, skip degraded/unreachable:
const result = await coordinator.query(sparqlQuery, {
strategy: 'selective',
timeout: 5000
})Use when:
- Want optimal performance
- Can tolerate partial results
- Network conditions vary
Failover Strategy
Query ONE peer, try next on failure:
const result = await coordinator.query(sparqlQuery, {
strategy: 'failover',
timeout: 3000
})Use when:
- Need single authoritative source
- Want minimal network usage
- Peers have redundant data
Health Monitoring
// Run health check
const health = await coordinator.healthCheck()
console.log(`Healthy: ${health.healthyPeers}/${health.totalPeers}`)
console.log(`Degraded: ${health.degradedPeers}`)
console.log(`Unreachable: ${health.unreachablePeers}`)
// Query specific peer
const peerResult = await coordinator.queryPeer('dbpedia', sparqlQuery)
console.log(`Success: ${peerResult.success}`)
console.log(`Duration: ${peerResult.duration}ms`)Statistics
// Get federation statistics
const stats = coordinator.getStats()
console.log(`Total queries: ${stats.totalQueries}`)
console.log(`Total errors: ${stats.totalErrors}`)
console.log(`Error rate: ${(stats.errorRate * 100).toFixed(2)}%`)Architecture
The @unrdf/federation package provides a robust distributed systems substrate for RDF data.
High-Level Components
graph TD
Client[SPARQL Client] --> Coord[Federation Coordinator]
subgraph "Core Coordination"
Coord --> PM[Peer Manager]
Coord --> DQE[Distributed Query Engine]
Coord --> HM[Health Monitor]
}
subgraph "Advanced V6 Features"
Coord --> CM[Consensus Manager / RAFT]
Coord --> DRM[Data Replication Manager]
Coord --> AE[Advanced Federation Engine]
}
PM --> P1[Peer A]
PM --> P2[Peer B]
PM --> P3[Peer C]
DQE --> Strategy{Strategy}
Strategy --> Broadcast
Strategy --> Selective
Strategy --> FailoverDetailed Architecture
- Federation Coordinator: The central orchestrator managing all sub-components and providing the public API.
- Peer Manager: Handles registration, metadata, and connection pooling for remote endpoints.
- Distributed Query Engine: Routes SPARQL queries across peers using pluggable strategies and aggregates results.
- Consensus Manager (V6): Implements RAFT consensus for consistent state across the federation.
- Data Replication Manager (V6): Manages multi-master replication and conflict resolution between nodes.
- Health Monitor: Periodically probes peers and calculates health scores (0-100) for intelligent routing.
- Statistics Tracker: Provides real-time metrics and observability via OpenTelemetry.
Query Flow
- Request: Coordinator receives a SPARQL query from the client.
- Routing: The Strategy determines which peers are eligible based on health and metadata.
- Execution: Queries are dispatched in parallel to the selected peers.
- Aggregation: Results are merged, deduplicated, and normalized.
- Feedback: Execution metrics are recorded and health scores are updated.
API Reference
createCoordinator(config)
Creates a federation coordinator.
Parameters:
config.peers(Array): Initial peersid(string): Unique peer identifierendpoint(string): SPARQL endpoint URLmetadata(object, optional): Peer metadata
config.strategy(string): Default query strategy ('broadcast', 'selective', 'failover')config.timeout(number): Default timeout in millisecondsconfig.healthCheckInterval(number, optional): Health check interval (default: 60000ms)config.retryAttempts(number, optional): Retry attempts for failed queries (default: 3)config.retryDelay(number, optional): Delay between retries (default: 1000ms)
Returns: Coordinator instance
Coordinator Methods
addPeer(id, endpoint, metadata?)
Adds a new peer to the federation.
removePeer(id)
Removes a peer from the federation.
listPeers()
Returns array of all registered peers.
query(sparqlQuery, options?)
Executes a federated SPARQL query.
Options:
strategy: Query strategy ('broadcast', 'selective', 'failover')timeout: Query timeout in milliseconds
Returns: Query result object with:
success: booleanresults: Array of bindingssuccessCount: Number of successful peersfailureCount: Number of failed peerstotalDuration: Total query durationpeerResults: Per-peer result details
queryPeer(peerId, sparqlQuery)
Queries a specific peer.
healthCheck()
Runs health checks on all peers.
Returns: Health status object with:
totalPeers: Total number of peershealthyPeers: Number of healthy peersdegradedPeers: Number of degraded peersunreachablePeers: Number of unreachable peers
getStats()
Returns federation statistics.
Returns: Statistics object with:
totalQueries: Total queries executedtotalErrors: Total errors encounterederrorRate: Error rate (0-1)totalDuration: Total query duration
Use Cases
Multi-graph Queries
Query across multiple RDF sources:
const result = await coordinator.query(`
SELECT ?person ?name ?birthPlace WHERE {
?person foaf:name ?name .
?person dbo:birthPlace ?birthPlace .
} LIMIT 100
`, { strategy: 'broadcast' })Distributed Systems
Coordinate RDF operations across distributed nodes:
// Add organizational peers
await coordinator.addPeer('hr', 'http://hr.example.com/sparql')
await coordinator.addPeer('finance', 'http://finance.example.com/sparql')
// Query across departments
const employees = await coordinator.query(`
SELECT ?emp ?dept ?salary WHERE {
?emp org:department ?dept .
?emp org:salary ?salary .
}
`, { strategy: 'selective' })Data Federation
Combine datasets from multiple sources:
// Public knowledge bases
await coordinator.addPeer('dbpedia', 'https://dbpedia.org/sparql')
await coordinator.addPeer('wikidata', 'https://query.wikidata.org/sparql')
// Federated query across public + private data
const combined = await coordinator.query(`
SELECT ?entity ?label ?description WHERE {
?entity rdfs:label ?label .
OPTIONAL { ?entity schema:description ?description }
}
`, { strategy: 'broadcast' })High Availability
Route to healthy peers automatically:
// Failover automatically if primary is down
const result = await coordinator.query(sparqlQuery, {
strategy: 'failover'
})Performance Characteristics
| Metric | Value | Notes | |--------|-------|-------| | Query Latency | <100ms overhead | Federation coordination | | Peer Timeout | 10s default | Configurable per query | | Health Check | 60s interval | Background monitoring | | Failover Time | <1s | Automatic on failure | | Max Peers | Unlimited | Limited by resources | | Parallel Queries | N peers | Concurrent execution |
Troubleshooting
Cannot connect to peer
Symptom: ECONNREFUSED or timeout errors
Solution:
- Check peer endpoint URL is correct
- Verify network connectivity:
curl <endpoint> - Check firewall/CORS settings
- Increase timeout:
{ timeout: 30000 }
Query returns empty results
Symptom: results.length === 0 but no errors
Solution:
- Test query on individual peer:
coordinator.queryPeer('peer-id', query) - Check SPARQL syntax
- Verify peer has data
- Check peer endpoint accepts HTTP POST
Slow query performance
Symptom: Queries taking >10 seconds
Solution:
- Use
strategy: 'selective'to skip unhealthy peers - Reduce query complexity
- Add LIMIT clause
- Query specific peer instead of broadcast
Too many failed peers
Symptom: health.unreachablePeers > health.healthyPeers
Solution:
- Run health check:
await coordinator.healthCheck() - Remove unreachable peers
- Check peer endpoints are accessible
- Review peer configuration
Examples
Complete examples are available in the examples/ directory:
- basic.mjs - Basic federation usage (15 min)
- production-federation.mjs - Production-ready example with all features (20 min)
- distributed-queries/ - Advanced distributed query patterns
- peer-discovery/ - Peer discovery and management
New to federation? Start with QUICKSTART-FEDERATION.md.
Observability
The federation package includes built-in OpenTelemetry instrumentation for production monitoring:
import { createCoordinator } from '@unrdf/federation'
const coordinator = createCoordinator({
peers: [...],
observability: {
serviceName: 'my-federation',
version: '[VERSION]'
}
})
// Metrics automatically tracked:
// - federation.queries (counter)
// - federation.errors (counter)
// - federation.query_duration (histogram)
// - federation.peer_health (up/down counter)
// - federation.concurrent_queries (gauge)Note: Metrics are exported via @opentelemetry/api and can be collected by any OTEL-compatible backend (Prometheus, Jaeger, etc.).
Dependencies
@unrdf/core- RDF substrate@unrdf/hooks- Policy enforcement@opentelemetry/api- Observability instrumentation
SLA Requirements
Strict SLA for Federated Queries:
- Coordination Latency: <100ms overhead per query (excluding network)
- Health Detection: <1s to detect and route around a failed peer
- Success Rate: >99% success rate for healthy peer sets
- Error Rate: <0.1% internal coordinator error rate
Enforcement:
- Automatic failover when latency exceeds 1.5x average
- Circuit breaking for repeatedly failing endpoints
- Real-time SLA monitoring via OpenTelemetry metrics
Browser Compatibility
- Node.js: 18+ ✅
- Browser: Not yet supported (planned)
VOC Usage
- VOC-1: Knowledge Agent (discover agent peers)
- VOC-2: Sync Agent (peer discovery and sync)
- VOC-5: Data Engineer (federate data sources)
Documentation
Complete documentation is organized using the Diataxis framework:
- Quick Start: QUICKSTART-FEDERATION.md - Get started in 5 minutes
- Examples: examples/ - Code examples
- API Reference: This README - Complete API documentation
- User Guide: See monorepo examples/ - Federation patterns
Development
# Run tests
pnpm test
# Run tests in watch mode
pnpm test:watch
# Lint
pnpm lint
# Format
pnpm formatRequirements
- Node.js >= [VERSION]
- pnpm >= [VERSION]
- Network access to SPARQL endpoints
License
MIT
