npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@c-tech/c-indexer-consumer

v1.0.8

Published

A TypeScript library for consuming blockchain events through Kafka

Readme

@c-tech/c-indexer-consumer

A TypeScript library for consuming blockchain events through Kafka, designed to work with blockchain ETL services. This package simplifies the process of registering blockchain event listeners and consuming them via Kafka.

Features

  • 🔗 Easy Event Registration: Register blockchain contract events with minimal configuration
  • 📨 Kafka Integration: Built-in Kafka consumer management with kafkajs
  • 🔧 Type-Safe: Full TypeScript support with comprehensive type definitions
  • 🎯 Event Parsing: Automatic event signature parsing using ethers.js v6
  • 🔄 Transform Scripts: Support for custom data transformation scripts
  • Efficient: Optimized for handling high-throughput blockchain events

Installation

npm install @c-tech/c-indexer-consumer

or with bun:

bun add @c-tech/c-indexer-consumer

Prerequisites

  • A blockchain ETL service endpoint
  • Kafka broker(s) configured and running
  • Contract ABI for the events you want to monitor

Usage

Basic Example

import {BlockWatcherRegister, withTransformScript} from "@c-tech/c-indexer-consumer";
import {Kafka} from "kafkajs";

// Initialize Kafka
const kafka = new Kafka({
    clientId: "my-app",
    brokers: ["localhost:9092"],
});

// Create a BlockWatcherRegister instance
const watcher = new BlockWatcherRegister({
    etlBaseUrl: "https://your-etl-service.com",
    defaultGroupId: "my-consumer-group", // optional, default: "blockwatcher-group"
});

// Define your contract ABI
const contractABI = `[
  {
    "anonymous": false,
    "inputs": [
      {"indexed": true, "name": "from", "type": "address"},
      {"indexed": true, "name": "to", "type": "address"},
      {"indexed": false, "name": "value", "type": "uint256"}
    ],
    "name": "Transfer",
    "type": "event"
  }
]`;

// Register an event handler
await watcher.register(
    "0x1234567890123456789012345678901234567890", // contract address
    contractABI,
    "Transfer", // event name
    1, // chain ID (1 = Ethereum mainnet)
    async (payload) => {
        // payload is already parsed - contains the event data
        console.log("Transfer event:", payload);
    },
    "my-group-id" // optional groupId, uses defaultGroupId if not provided
);

// Start consuming Kafka messages
await watcher.startKafka(kafka, {
    groupId: "my-consumer-group", // optional, overrides defaultGroupId
    fromBeginning: true, // optional, default: true
});

Advanced Usage with Custom Kafka Topic

import {
    BlockWatcherRegister,
    withKafkaTopic
} from "@c-tech/c-indexer-consumer";

const watcher = new BlockWatcherRegister({
    etlBaseUrl: "https://your-etl-service.com",
});

// Register with custom Kafka topic
await watcher.register(
    "0x1234567890123456789012345678901234567890",
    contractABI,
    "Transfer",
    1,
    async (payload, meta) => {
        console.log("Transfer event:", payload);
        console.log("Block number:", meta.block_number);
    },
    undefined, // groupId (optional)
    withKafkaTopic("my-custom-topic-name") // custom Kafka topic
);

await watcher.startKafka(kafka);

Advanced Usage with Transform Scripts

import {
    BlockWatcherRegister,
    withTransformScript,
    withTestData,
    withKafkaTopic
} from "@c-tech/c-indexer-consumer";

const watcher = new BlockWatcherRegister({
    etlBaseUrl: "https://your-etl-service.com",
});

// Custom transformation script
const transformScript = `
export function transform(data, meta) {
  return {
    ...data,
    timestamp: Date.now(),
    chainId: meta.chain_id,
    blockNumber: meta.block_number
  };
}
`;

await watcher.register(
    "0x1234567890123456789012345678901234567890",
    contractABI,
    "Transfer",
    1,
    async (payload, meta) => {
        // Payload is already transformed by the script
        console.log("Transformed event:", payload);
        console.log("With metadata:", meta);
    },
    undefined, // groupId (optional)
    withTransformScript(transformScript),
    withTestData({mockField: "test"}),
    withKafkaTopic("custom-transfer-topic") // custom Kafka topic (optional)
);

await watcher.startKafka(kafka, {
    groupId: "my-consumer-group",
    fromBeginning: true,
});

Multiple Event Registration

const watcher = new BlockWatcherRegister({
    etlBaseUrl: "https://your-etl-service.com",
});

// Handler functions
const handleTransfer = async (payload: any, meta: EtlMetaData) => {
    console.log(`Transfer on chain ${meta.chain_id}:`, payload);
};

const handleApproval = async (payload: any) => {
    console.log("Approval event:", payload);
};

const handleSwap = async (payload: any, meta: EtlMetaData) => {
    console.log(`Swap at block ${meta.block_number}:`, payload);
};

// Register multiple events
await watcher.register(
    "0xContractA...",
    contractABI_A,
    "Transfer",
    1,
    handleTransfer
);

await watcher.register(
    "0xContractB...",
    contractABI_B,
    "Approval",
    1,
    handleApproval
);

await watcher.register(
    "0xContractC...",
    contractABI_C,
    "Swap",
    1,
    handleSwap
);

// Start consuming all registered events
await watcher.startKafka(kafka);

API Reference

BlockWatcherRegister

Main class for managing blockchain event registration and consumption.

Constructor

new BlockWatcherRegister(args
:
BlockWatcherConfig
)

BlockWatcherConfig:

type BlockWatcherConfig = {
    etlBaseUrl: string;
    defaultGroupId?: string; // optional, default: "blockwatcher-group"
}
  • etlBaseUrl: The base URL of your blockchain ETL service
  • defaultGroupId: Default Kafka consumer group ID for all registered events

Methods

register()

Register a blockchain event to monitor.

async
register(
    contractAddress
:
string,
    contractABI
:
string,
    eventName
:
string,
    chainId
:
number,
    handler
:
KafkaConsumerMessageHandlerFunc,
    groupId ? : string,
...
options: RegisterOption[]
):
Promise<void>

Parameters:

  • contractAddress: The smart contract address
  • contractABI: Contract ABI JSON string
  • eventName: Name of the event to monitor (e.g., "Transfer", "Approval")
  • chainId: Blockchain chain ID (1 for Ethereum mainnet, 137 for Polygon, etc.)
  • handler: Callback function to handle incoming events
  • groupId: Optional Kafka consumer group ID for this specific event (overrides defaultGroupId)
  • options: Optional configuration (transform scripts, test data)
startKafka()

Initialize Kafka consumer and start processing events.

async
startKafka(
    kafka
:
Kafka,
    options ? : {
        groupId? : string;
        fromBeginning? : boolean;
    }
):
Promise<void>

Parameters:

  • kafka: KafkaJS Kafka instance
  • options: Optional configuration object
    • groupId: Consumer group ID (overrides defaultGroupId)
    • fromBeginning: Whether to read from the beginning of the topic (default: true)

Types

KafkaConsumerMessageHandlerFunc

The handler function can have multiple signatures:

// Option 1: Just payload
type Handler = (payload: any) => Promise<void> | void;

// Option 2: Payload + metadata
type Handler = (payload: any, meta: EtlMetaData) => Promise<void> | void;

// Option 3: Payload + raw Kafka message
type Handler = (payload: any, raw: KafkaMessage) => Promise<void> | void;

Examples:

// Simple handler - just the payload
async (payload) => {
    console.log("Event data:", payload);
}

// Handler with metadata
async (payload, meta) => {
    console.log("Event data:", payload);
    console.log("Chain ID:", meta.chain_id);
    console.log("Block number:", meta.block_number);
    console.log("Transaction hash:", meta.tx_hash);
}

// Handler with raw message (for advanced use cases)
async (payload, rawMessage) => {
    console.log("Event data:", payload);
    console.log("Kafka headers:", rawMessage.headers);
    console.log("Kafka offset:", rawMessage.offset);
}

EtlMetaData

Metadata extracted from blockchain events:

type EtlMetaData = {
    chain_id?: number;
    _etl_event_log_entity_id?: number;
    address?: string;
    block_number?: number;
    block_hash?: string;
    tx_hash?: string;
    tx_index?: number;
    index?: number;
    topics?: string[];
    [k: string]: any;
};

RegisterOption

type RegisterOption = {
    transformScript?: string;
    testData?: Record<string, any>;
    kafkaTopic?: string;
};

Helper Functions

withTransformScript(script: string)

Create a RegisterOption with a custom transform script.

const option = withTransformScript(`
  export function transform(data, meta) {
    return { ...data, processed: true };
  }
`);

withTestData(testData: Record<string, any>)

Create a RegisterOption with test data.

const option = withTestData({mockField: "value"});

withKafkaTopic(kafkaTopic: string)

Create a RegisterOption with a custom Kafka topic name.

const option = withKafkaTopic("my-custom-topic");

Note: If not specified, the default topic naming convention is:

blockchain.{chainId}.{eventName}.{contractAddress}.{topicHash4Bytes}

How It Works

  1. Event Registration: When you register an event, the library:

    • Parses the contract ABI using ethers.js
    • Extracts the event signature and generates the topic hash
    • Creates a unique Kafka topic name
    • Sends a registration request to the ETL service
  2. Kafka Topic Naming: By default, topics are automatically generated in the format:

    blockchain.{chainId}.{eventName}.{contractAddress}.{topicHash4Bytes}

    You can override this by using withKafkaTopic() option.

  3. Message Processing Flow:

    Kafka Message
        ↓
    KafkaConsumerRegister (receives raw message)
        ↓
    KafkaConsumerHandlerSingleFunc (wrapper layer)
        ↓
    - Parses message.value JSON
    - Extracts payload
    - Extracts metadata (if needed)
    - Converts string numbers to actual numbers
        ↓
    Your Handler Function (receives parsed data)
  4. Event Consumption: The Kafka consumer subscribes to all registered topics and routes messages to the appropriate handlers.

  5. Automatic Parsing: The library automatically:

    • Parses the Kafka message JSON
    • Extracts the payload field
    • Extracts and converts meta data (chain_id, block_number, etc.)
    • Calls your handler with the appropriate parameters based on function signature

Configuration

Environment Variables

You can configure Kafka brokers and other settings through environment variables or directly in code:

const kafka = new Kafka({
    clientId: process.env.KAFKA_CLIENT_ID || "blockchain-consumer",
    brokers: process.env.KAFKA_BROKERS?.split(",") || ["localhost:9092"],
});

Error Handling

The library throws errors for:

  • Invalid contract ABIs
  • Non-existent event names
  • ETL service connection failures
  • Kafka connection issues

Always wrap registration and startup calls in try-catch blocks:

try {
    await watcher.register(...);
    await watcher.startKafka(kafka);
} catch (error) {
    console.error("Error:", error.message);
}

Dependencies

  • ethers (^6.15.0): For ABI parsing and event signature generation
  • kafkajs (^2.2.4): For Kafka consumer functionality

License

MIT

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Support

For issues and questions, please open an issue on the GitHub repository.