@rebarxyz/pipeline
v0.0.28
Published
Simplified data pipeline framework for Rebar
Downloads
18
Maintainers
Readme
@rebarxyz/pipeline
Simplified data pipeline framework for Rebar. Build data sources with minimal code.
Features
- ✅ Automatic batching - All updates bundled into single transaction
- ✅ Change detection - Only submits when values actually change
- ✅ Multiple schedules - Timer-based, block-based, or event-based
- ✅ Multi-asset support - Track multiple assets/pools/tokens with one definition
- ✅ Type-safe - Full TypeScript support
- ✅ Minimal boilerplate - Focus on your data logic, not infrastructure
Installation
npm install @rebarxyz/pipelineQuick Start
Simple Timer-Based Data Source
import { RebarClient } from '@rebarxyz/sdk';
import { SimplePipeline, defineSource } from '@rebarxyz/pipeline';
const businessMetrics = defineSource({
name: 'Daily Business Metrics',
outputs: {
daily_revenue: 'float',
active_users: 'uint256'
},
schedule: 'every 1 hour',
async fetch() {
const db = await connectToDatabase();
return {
daily_revenue: await getRevenue(db),
active_users: await getActiveUsers(db)
};
}
});
const client = new RebarClient(url, wallet, owner);
const pipeline = new SimplePipeline(client, owner, [businessMetrics]);
await pipeline.start();Block-Based Data Source
const ethGas = defineSource({
name: 'Ethereum Gas Metrics',
outputs: {
gas_limit: 'uint256',
gas_used: 'uint256',
base_fee: 'uint256'
},
schedule: 'every block',
async fetch({ trigger }) {
if (trigger.type !== 'block') return null;
return {
gas_limit: trigger.block.gasLimit.toString(),
gas_used: trigger.block.gasUsed.toString(),
base_fee: (trigger.block.baseFeePerGas ?? 0n).toString()
};
}
});
const pipeline = new SimplePipeline(client, owner, [ethGas], {
ethRpcUrl: process.env.ETH_RPC_URL, // HTTP endpoint
ethWsUrl: process.env.ETH_WS_URL // WebSocket endpoint (optional but recommended)
});
await pipeline.start();Multi-Asset Protocol Tracking
import { defineMultiAssetSource } from '@rebarxyz/pipeline';
const TOKENS = {
weth: { address: '0x...', name: 'WETH' },
usdc: { address: '0x...', name: 'USDC' }
};
const aavePools = defineMultiAssetSource({
name: 'Aave V3 Pools',
assets: TOKENS,
outputsPerAsset: (token, key) => ({
[`${key}_supply_apr`]: {
type: 'float',
name: `${token.name} Supply APR`,
unit: 'percent'
}
}),
schedule: 'every block',
async fetchAsset(token, key, { clients }) {
const apr = await getAaveAPR(clients.ethereum, token.address);
return { [`${key}_supply_apr`]: apr };
}
});What the Framework Does Automatically
- Creates data nodes - On first run, all outputs are created on Rebar
- Reuses existing nodes - Loads inventory to avoid duplicate creation
- Schedules updates - Sets up timers, block watchers, or event listeners
- Detects changes - Compares new values to current state
- Batches transactions - Combines all updates into single transaction
- Handles errors - Continues running even if individual updates fail
Inventory Management
The framework uses inventory files to track created data nodes, making deployments idempotent and preventing duplicate node creation.
Basic Usage
import { SimplePipeline, loadInventory, saveInventory } from '@rebarxyz/pipeline';
// Load existing inventory (returns null if doesn't exist)
const inventory = await loadInventory('./data/inventory.json');
// Create pipeline with existing inventory
const pipeline = new SimplePipeline(client, owner, sources, {
existingInventory: inventory ?? undefined,
ethRpcUrl: process.env.ETH_RPC_URL,
ethWsUrl: process.env.ETH_WS_URL
});
await pipeline.start();
// Save inventory after first run
if (!inventory) {
await saveInventory('./data/inventory.json', pipeline.getInventory());
}How It Works
- First run: No inventory exists → creates all nodes → saves inventory
- Subsequent runs: Loads inventory → reuses existing nodes → no creation costs
- Adding sources: Missing nodes detected → creates only new nodes → updates inventory
The inventory is a simple JSON file mapping output keys to addresses:
{
"daily_revenue": {
"address": "rebar1...",
"owner": "rebar1..."
},
"active_users": {
"address": "rebar1...",
"owner": "rebar1..."
}
}API Reference
defineSource(config)
Create a simple data source.
Config:
name- Human-readable nameoutputs- Map of output keys to types or detailed configsschedule- When to update ('every 1 hour','every block', or event config)fetch- Function that returns dataonError- Optional error handler
defineMultiAssetSource(config)
Create a multi-asset data source (for tracking protocols with multiple assets/pools).
Config:
name- Human-readable nameassets- Map of assets to trackoutputsPerAsset- Function that generates outputs for each assetschedule- When to updatefetchAsset- Function that fetches data for one assetonAssetError- Optional per-asset error handler
SimplePipeline
Main orchestrator that manages sources.
Methods:
start()- Create nodes and begin schedulingstop()- Stop all timers and watchersaddSource(source)- Add a data sourceremoveSource(id)- Remove a data sourcegetInventory()- Get all created data nodesgetSourceState(id)- Get state of a specific source
SetupHelper
Utility for one-time setup of functions and calculations.
Methods:
createFunction(config)- Create a function nodecreateCalculation(config)- Create a calculation nodesaveAddresses(path, addresses)- Save addresses to JSON fileloadAddresses(path)- Load addresses from JSON file
Examples
See the /examples directory for complete examples:
fed-macro.ts- Fetching economic data from FRED APIeth-gas.ts- Block-based Ethereum gas metricsaave-pools.ts- Multi-asset protocol tracking
Advanced Usage
Using Functions in Calculations
import { SetupHelper } from '@rebarxyz/pipeline';
const setup = new SetupHelper(client, owner);
// Create utility function
const aprToApy = await setup.createFunction({
name: 'APR to APY',
calculationType: 'starlark',
calculation: `
def apr_to_apy(apr, compounds_per_year):
return pow((1 + apr / compounds_per_year), compounds_per_year) - 1
`
});
// Reference in data source
const source = defineSource({
name: 'Staking Metrics',
functions: {
apr_to_apy: aprToApy.address
},
// ... rest of config
});Conditional Updates
Return null from fetch() to skip an update:
async fetch({ trigger, currentValues }) {
// Only update if significant change
const newValue = await getValue();
if (Math.abs(newValue - currentValues.myMetric) < 0.01) {
return null; // Skip update
}
return { myMetric: newValue };
}Error Handling
const source = defineSource({
// ... config
onError(error) {
console.error('Failed:', error);
// Send alert, log to monitoring service, etc.
}
});License
MIT
