@c-tech/c-indexer-consumer
v1.0.8
Published
A TypeScript library for consuming blockchain events through Kafka
Readme
@c-tech/c-indexer-consumer
A TypeScript library for consuming blockchain events through Kafka, designed to work with blockchain ETL services. This package simplifies the process of registering blockchain event listeners and consuming them via Kafka.
Features
- 🔗 Easy Event Registration: Register blockchain contract events with minimal configuration
- 📨 Kafka Integration: Built-in Kafka consumer management with kafkajs
- 🔧 Type-Safe: Full TypeScript support with comprehensive type definitions
- 🎯 Event Parsing: Automatic event signature parsing using ethers.js v6
- 🔄 Transform Scripts: Support for custom data transformation scripts
- ⚡ Efficient: Optimized for handling high-throughput blockchain events
Installation
npm install @c-tech/c-indexer-consumeror with bun:
bun add @c-tech/c-indexer-consumerPrerequisites
- A blockchain ETL service endpoint
- Kafka broker(s) configured and running
- Contract ABI for the events you want to monitor
Usage
Basic Example
import {BlockWatcherRegister, withTransformScript} from "@c-tech/c-indexer-consumer";
import {Kafka} from "kafkajs";
// Initialize Kafka
const kafka = new Kafka({
clientId: "my-app",
brokers: ["localhost:9092"],
});
// Create a BlockWatcherRegister instance
const watcher = new BlockWatcherRegister({
etlBaseUrl: "https://your-etl-service.com",
defaultGroupId: "my-consumer-group", // optional, default: "blockwatcher-group"
});
// Define your contract ABI
const contractABI = `[
{
"anonymous": false,
"inputs": [
{"indexed": true, "name": "from", "type": "address"},
{"indexed": true, "name": "to", "type": "address"},
{"indexed": false, "name": "value", "type": "uint256"}
],
"name": "Transfer",
"type": "event"
}
]`;
// Register an event handler
await watcher.register(
"0x1234567890123456789012345678901234567890", // contract address
contractABI,
"Transfer", // event name
1, // chain ID (1 = Ethereum mainnet)
async (payload) => {
// payload is already parsed - contains the event data
console.log("Transfer event:", payload);
},
"my-group-id" // optional groupId, uses defaultGroupId if not provided
);
// Start consuming Kafka messages
await watcher.startKafka(kafka, {
groupId: "my-consumer-group", // optional, overrides defaultGroupId
fromBeginning: true, // optional, default: true
});Advanced Usage with Custom Kafka Topic
import {
BlockWatcherRegister,
withKafkaTopic
} from "@c-tech/c-indexer-consumer";
const watcher = new BlockWatcherRegister({
etlBaseUrl: "https://your-etl-service.com",
});
// Register with custom Kafka topic
await watcher.register(
"0x1234567890123456789012345678901234567890",
contractABI,
"Transfer",
1,
async (payload, meta) => {
console.log("Transfer event:", payload);
console.log("Block number:", meta.block_number);
},
undefined, // groupId (optional)
withKafkaTopic("my-custom-topic-name") // custom Kafka topic
);
await watcher.startKafka(kafka);Advanced Usage with Transform Scripts
import {
BlockWatcherRegister,
withTransformScript,
withTestData,
withKafkaTopic
} from "@c-tech/c-indexer-consumer";
const watcher = new BlockWatcherRegister({
etlBaseUrl: "https://your-etl-service.com",
});
// Custom transformation script
const transformScript = `
export function transform(data, meta) {
return {
...data,
timestamp: Date.now(),
chainId: meta.chain_id,
blockNumber: meta.block_number
};
}
`;
await watcher.register(
"0x1234567890123456789012345678901234567890",
contractABI,
"Transfer",
1,
async (payload, meta) => {
// Payload is already transformed by the script
console.log("Transformed event:", payload);
console.log("With metadata:", meta);
},
undefined, // groupId (optional)
withTransformScript(transformScript),
withTestData({mockField: "test"}),
withKafkaTopic("custom-transfer-topic") // custom Kafka topic (optional)
);
await watcher.startKafka(kafka, {
groupId: "my-consumer-group",
fromBeginning: true,
});Multiple Event Registration
const watcher = new BlockWatcherRegister({
etlBaseUrl: "https://your-etl-service.com",
});
// Handler functions
const handleTransfer = async (payload: any, meta: EtlMetaData) => {
console.log(`Transfer on chain ${meta.chain_id}:`, payload);
};
const handleApproval = async (payload: any) => {
console.log("Approval event:", payload);
};
const handleSwap = async (payload: any, meta: EtlMetaData) => {
console.log(`Swap at block ${meta.block_number}:`, payload);
};
// Register multiple events
await watcher.register(
"0xContractA...",
contractABI_A,
"Transfer",
1,
handleTransfer
);
await watcher.register(
"0xContractB...",
contractABI_B,
"Approval",
1,
handleApproval
);
await watcher.register(
"0xContractC...",
contractABI_C,
"Swap",
1,
handleSwap
);
// Start consuming all registered events
await watcher.startKafka(kafka);API Reference
BlockWatcherRegister
Main class for managing blockchain event registration and consumption.
Constructor
new BlockWatcherRegister(args
:
BlockWatcherConfig
)BlockWatcherConfig:
type BlockWatcherConfig = {
etlBaseUrl: string;
defaultGroupId?: string; // optional, default: "blockwatcher-group"
}etlBaseUrl: The base URL of your blockchain ETL servicedefaultGroupId: Default Kafka consumer group ID for all registered events
Methods
register()
Register a blockchain event to monitor.
async
register(
contractAddress
:
string,
contractABI
:
string,
eventName
:
string,
chainId
:
number,
handler
:
KafkaConsumerMessageHandlerFunc,
groupId ? : string,
...
options: RegisterOption[]
):
Promise<void>Parameters:
contractAddress: The smart contract addresscontractABI: Contract ABI JSON stringeventName: Name of the event to monitor (e.g., "Transfer", "Approval")chainId: Blockchain chain ID (1 for Ethereum mainnet, 137 for Polygon, etc.)handler: Callback function to handle incoming eventsgroupId: Optional Kafka consumer group ID for this specific event (overrides defaultGroupId)options: Optional configuration (transform scripts, test data)
startKafka()
Initialize Kafka consumer and start processing events.
async
startKafka(
kafka
:
Kafka,
options ? : {
groupId? : string;
fromBeginning? : boolean;
}
):
Promise<void>Parameters:
kafka: KafkaJS Kafka instanceoptions: Optional configuration objectgroupId: Consumer group ID (overrides defaultGroupId)fromBeginning: Whether to read from the beginning of the topic (default: true)
Types
KafkaConsumerMessageHandlerFunc
The handler function can have multiple signatures:
// Option 1: Just payload
type Handler = (payload: any) => Promise<void> | void;
// Option 2: Payload + metadata
type Handler = (payload: any, meta: EtlMetaData) => Promise<void> | void;
// Option 3: Payload + raw Kafka message
type Handler = (payload: any, raw: KafkaMessage) => Promise<void> | void;Examples:
// Simple handler - just the payload
async (payload) => {
console.log("Event data:", payload);
}
// Handler with metadata
async (payload, meta) => {
console.log("Event data:", payload);
console.log("Chain ID:", meta.chain_id);
console.log("Block number:", meta.block_number);
console.log("Transaction hash:", meta.tx_hash);
}
// Handler with raw message (for advanced use cases)
async (payload, rawMessage) => {
console.log("Event data:", payload);
console.log("Kafka headers:", rawMessage.headers);
console.log("Kafka offset:", rawMessage.offset);
}EtlMetaData
Metadata extracted from blockchain events:
type EtlMetaData = {
chain_id?: number;
_etl_event_log_entity_id?: number;
address?: string;
block_number?: number;
block_hash?: string;
tx_hash?: string;
tx_index?: number;
index?: number;
topics?: string[];
[k: string]: any;
};RegisterOption
type RegisterOption = {
transformScript?: string;
testData?: Record<string, any>;
kafkaTopic?: string;
};Helper Functions
withTransformScript(script: string)
Create a RegisterOption with a custom transform script.
const option = withTransformScript(`
export function transform(data, meta) {
return { ...data, processed: true };
}
`);withTestData(testData: Record<string, any>)
Create a RegisterOption with test data.
const option = withTestData({mockField: "value"});withKafkaTopic(kafkaTopic: string)
Create a RegisterOption with a custom Kafka topic name.
const option = withKafkaTopic("my-custom-topic");Note: If not specified, the default topic naming convention is:
blockchain.{chainId}.{eventName}.{contractAddress}.{topicHash4Bytes}How It Works
Event Registration: When you register an event, the library:
- Parses the contract ABI using ethers.js
- Extracts the event signature and generates the topic hash
- Creates a unique Kafka topic name
- Sends a registration request to the ETL service
Kafka Topic Naming: By default, topics are automatically generated in the format:
blockchain.{chainId}.{eventName}.{contractAddress}.{topicHash4Bytes}You can override this by using
withKafkaTopic()option.Message Processing Flow:
Kafka Message ↓ KafkaConsumerRegister (receives raw message) ↓ KafkaConsumerHandlerSingleFunc (wrapper layer) ↓ - Parses message.value JSON - Extracts payload - Extracts metadata (if needed) - Converts string numbers to actual numbers ↓ Your Handler Function (receives parsed data)Event Consumption: The Kafka consumer subscribes to all registered topics and routes messages to the appropriate handlers.
Automatic Parsing: The library automatically:
- Parses the Kafka message JSON
- Extracts the
payloadfield - Extracts and converts
metadata (chain_id, block_number, etc.) - Calls your handler with the appropriate parameters based on function signature
Configuration
Environment Variables
You can configure Kafka brokers and other settings through environment variables or directly in code:
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || "blockchain-consumer",
brokers: process.env.KAFKA_BROKERS?.split(",") || ["localhost:9092"],
});Error Handling
The library throws errors for:
- Invalid contract ABIs
- Non-existent event names
- ETL service connection failures
- Kafka connection issues
Always wrap registration and startup calls in try-catch blocks:
try {
await watcher.register(...);
await watcher.startKafka(kafka);
} catch (error) {
console.error("Error:", error.message);
}Dependencies
- ethers (^6.15.0): For ABI parsing and event signature generation
- kafkajs (^2.2.4): For Kafka consumer functionality
License
MIT
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues and questions, please open an issue on the GitHub repository.
