npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@thru/indexer

v0.2.7

Published

A reusable blockchain indexing framework for building backends that index Thru chain data.

Readme

@thru/indexer

A reusable blockchain indexing framework for building backends that index Thru chain data.

Features

  • Event Streams - Index historical, immutable event data
  • Account Streams - Track current on-chain account state with slot-aware upserts
  • Type-Safe Schema Builder - Fluent API with full TypeScript inference
  • Auto-Generated REST API - Hono + OpenAPI routes with pagination
  • Resumable Indexing - Checkpoint-based recovery after restarts
  • Drizzle ORM - PostgreSQL with type-safe queries and migrations

Installation

pnpm add @thru/indexer @thru/replay @thru/helpers postgres drizzle-orm hono @hono/zod-openapi
pnpm add -D drizzle-kit tsx typescript

Quick Start

1. Define an Event Stream

// src/streams/transfers.ts
import { create } from "@bufbuild/protobuf";
import { decodeAddress, encodeAddress, encodeSignature } from "@thru/helpers";
import { defineEventStream, t } from "@thru/indexer";
import { FilterSchema, FilterParamValueSchema, type Event } from "@thru/replay";
import { TokenEvent } from "../abi/token";

const TOKEN_PROGRAM = "taAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAKqq";

const transfers = defineEventStream({
  name: "transfers",
  description: "Token transfer events",

  schema: {
    id: t.text().primaryKey(),
    slot: t.bigint().notNull().index(),
    txnSignature: t.text().notNull(),
    source: t.text().notNull().index(),
    dest: t.text().notNull().index(),
    amount: t.bigint().notNull(),
    indexedAt: t.timestamp().notNull().defaultNow(),
  },

  // Lazy filter for drizzle-kit compatibility
  filterFactory: () => {
    const programBytes = new Uint8Array(decodeAddress(TOKEN_PROGRAM));
    return create(FilterSchema, {
      expression: "event.program.value == params.address",
      params: {
        address: create(FilterParamValueSchema, {
          kind: { case: "bytesValue", value: programBytes },
        }),
      },
    });
  },

  // Parse raw event into table row (return null to skip)
  parse: (event: Event) => {
    const payload = event.payload;
    if (!payload || payload[0] !== 2) return null;

    const tokenEvent = TokenEvent.from_array(payload);
    const transfer = tokenEvent?.payload()?.asTransfer();
    if (!transfer) return null;

    return {
      id: event.eventId,
      slot: event.slot!,
      txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()),
      source: encodeAddress(new Uint8Array(transfer.source.get_bytes())),
      dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())),
      amount: transfer.amount,
      indexedAt: new Date(),
    };
  },

  api: { filters: ["source", "dest"] },
});

// Export table for Drizzle migrations
export const transferEvents = transfers.table;
export default transfers;

2. Define an Account Stream

// src/account-streams/token-accounts.ts
import { decodeAddress, encodeAddress } from "@thru/helpers";
import { defineAccountStream, t } from "@thru/indexer";
import { TokenAccount } from "../abi/token";

const TOKEN_PROGRAM = "taAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAKqq";

const tokenAccounts = defineAccountStream({
  name: "token-accounts",
  description: "Token account balances",

  ownerProgramFactory: () => new Uint8Array(decodeAddress(TOKEN_PROGRAM)),
  expectedSize: 73,

  schema: {
    address: t.text().primaryKey(),
    mint: t.text().notNull().index(),
    owner: t.text().notNull().index(),
    amount: t.bigint().notNull(),
    slot: t.bigint().notNull(),
    seq: t.bigint().notNull(),
    updatedAt: t.timestamp().notNull().defaultNow(),
  },

  parse: (account) => {
    if (account.data.length !== 73) return null;

    const parsed = TokenAccount.from_array(account.data);
    if (!parsed) return null;

    return {
      address: encodeAddress(account.address),
      mint: encodeAddress(new Uint8Array(parsed.mint.get_bytes())),
      owner: encodeAddress(new Uint8Array(parsed.owner.get_bytes())),
      amount: parsed.amount,
      slot: account.slot,
      seq: account.seq,
      updatedAt: new Date(),
    };
  },

  api: { filters: ["mint", "owner"], idField: "address" },
});

export const tokenAccountsTable = tokenAccounts.table;
export default tokenAccounts;

3. Set Up Database Schema

// src/db/schema.ts
export { checkpointTable } from "@thru/indexer";
export { transferEvents } from "../streams/transfers";
export { tokenAccountsTable } from "../account-streams/token-accounts";
// drizzle.config.ts
import { defineConfig } from "drizzle-kit";

export default defineConfig({
  schema: "./src/db/schema.ts",
  out: "./drizzle",
  dialect: "postgresql",
  dbCredentials: {
    url: process.env.DATABASE_URL!,
  },
});
// src/db/index.ts
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";

const client = postgres(process.env.DATABASE_URL!);
export const db = drizzle(client);

4. Create Indexer

// src/indexer.ts
import { ChainClient } from "@thru/replay";
import { Indexer } from "@thru/indexer";
import { db } from "./db";
import transfers from "./streams/transfers";
import tokenAccounts from "./account-streams/token-accounts";

const indexer = new Indexer({
  db,
  clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }),
  eventStreams: [transfers],
  accountStreams: [tokenAccounts],
  defaultStartSlot: 0n,
  safetyMargin: 64,
  pageSize: 512,
  logLevel: "info",
});

process.on("SIGINT", () => indexer.stop());
process.on("SIGTERM", () => indexer.stop());

indexer.start().then((result) => {
  console.log("Indexer finished:", result);
});

5. Create API Server

// src/api.ts
import { serve } from "@hono/node-server";
import { OpenAPIHono } from "@hono/zod-openapi";
import { mountStreamRoutes } from "@thru/indexer";
import { db } from "./db";
import transfers from "./streams/transfers";
import tokenAccounts from "./account-streams/token-accounts";

const app = new OpenAPIHono();

mountStreamRoutes(app, {
  db,
  basePath: "/api/v1",
  eventStreams: [transfers],
  accountStreams: [tokenAccounts],
});

serve({ fetch: app.fetch, port: 3000 }, (info) => {
  console.log(`API server running on http://localhost:${info.port}`);
});

6. Run

# Generate and apply migrations
pnpm drizzle-kit generate
pnpm drizzle-kit push

# Start indexer
pnpm tsx src/indexer.ts

# Start API (separate terminal)
pnpm tsx src/api.ts

API Reference

Schema Builder

The t object provides a fluent API for defining columns:

import { t } from "@thru/indexer";

const schema = {
  id: t.text().primaryKey(),
  slot: t.bigint().notNull().index(),
  name: t.text(),                        // nullable by default
  count: t.integer().notNull(),
  active: t.boolean().notNull().default(true),
  createdAt: t.timestamp().notNull().defaultNow(),
  mintId: t.text().notNull().references(mintsTable, "id"),
};

Column Types:

  • t.text() - VARCHAR/TEXT
  • t.bigint() - BIGINT (for slots, amounts)
  • t.integer() - INTEGER
  • t.boolean() - BOOLEAN
  • t.timestamp() - TIMESTAMP WITH TIME ZONE

Modifiers:

  • .notNull() - NOT NULL constraint
  • .primaryKey() - Primary key (implies NOT NULL)
  • .index() - Create index
  • .unique() - Unique constraint
  • .default(value) - Default value
  • .defaultNow() - Default to current timestamp
  • .references(table, column) - Foreign key

Event Stream Options

defineEventStream({
  name: string;                    // Unique stream name
  description?: string;            // Human-readable description
  schema: { ... };                 // Column definitions
  filter?: Filter;                 // Direct CEL filter
  filterFactory?: () => Filter;    // Lazy filter (for drizzle-kit)
  parse: (event: Event) => Row | null;
  api?: {
    filters?: string[];            // Filterable columns
  };
  filterBatch?: (events, ctx) => Promise<events>;  // Pre-commit filter
  onCommit?: (batch, ctx) => Promise<void>;        // Post-commit hook
});

Account Stream Options

defineAccountStream({
  name: string;
  description?: string;
  ownerProgram?: Uint8Array;           // Direct program address
  ownerProgramFactory?: () => Uint8Array;  // Lazy (for drizzle-kit)
  expectedSize?: number;               // Filter by data size
  dataSizes?: number[];                // Multiple valid sizes
  schema: { ... };
  parse: (account: AccountState) => Row | null;
  api?: {
    filters?: string[];
    idField?: string;                  // Primary key field name
  };
});

Indexer Options

new Indexer({
  db: DatabaseClient;                  // Drizzle database client
  clientFactory: () => ChainClient;    // Factory for RPC connections
  eventStreams?: EventStream[];
  accountStreams?: AccountStream[];
  defaultStartSlot?: bigint;           // Start slot if no checkpoint
  safetyMargin?: number;               // Slots behind tip (default: 64)
  pageSize?: number;                   // Events per page (default: 512)
  logLevel?: "debug" | "info" | "warn" | "error";
  validateParse?: boolean;             // Validate parse output with Zod (dev mode)
});

Hooks

filterBatch - Filter events before database commit:

filterBatch: async (events, { db }) => {
  // Only keep transfers involving registered users
  const users = await db.select().from(usersTable);
  const userAddresses = new Set(users.map(u => u.address));

  return events.filter(e =>
    userAddresses.has(e.source) || userAddresses.has(e.dest)
  );
}

onCommit - Side effects after commit:

onCommit: async (batch, { db }) => {
  // Queue notifications for transfer recipients
  await queueNotifications(db, batch.events);
}

Migrations

The library uses Drizzle Kit for migrations. Tables are automatically created from stream schemas.

# Generate migration from schema changes
pnpm drizzle-kit generate

# Apply migrations
pnpm drizzle-kit migrate

# Push schema directly (development)
pnpm drizzle-kit push

# Open Drizzle Studio
pnpm drizzle-kit studio

Why filterFactory / ownerProgramFactory?

Drizzle Kit imports your schema files to generate migrations. If those files load config at import time, it fails:

// Breaks drizzle-kit (config not available at import time)
filter: create(FilterSchema, {
  params: { address: decodeAddress(loadConfig().TOKEN_PROGRAM) }
})

// Works (lazy loading, only called at runtime)
filterFactory: () => {
  const config = loadConfig();
  return create(FilterSchema, { ... });
}

Schema Helper

Use getSchemaExports() to collect all tables for your Drizzle schema file:

// db/schema.ts
import { getSchemaExports } from "@thru/indexer";
import transfers from "../streams/transfers";
import tokenAccounts from "../account-streams/token-accounts";

// Export all tables for Drizzle migrations
export const { checkpointTable, transfersTable, tokenAccountsTable } = getSchemaExports({
  eventStreams: [transfers],
  accountStreams: [tokenAccounts],
});

Runtime Validation

Enable validateParse to validate parse function output at runtime using Zod schemas. This is useful during development to catch type mismatches early:

const indexer = new Indexer({
  db,
  clientFactory: () => new ChainClient({ baseUrl: RPC_URL }),
  eventStreams: [transfers],
  validateParse: process.env.NODE_ENV !== "production",  // Enable in dev
});

When validation fails, the indexer logs detailed error messages:

[transfers] Stream "transfers" parse returned invalid data:
  - amount: Expected bigint, received number
  - source: Required

Exports

// Schema
export { t, columnBuilder } from "@thru/indexer";
export type { ColumnDef, SchemaDefinition, InferRow, InferInsert } from "@thru/indexer";

// Validation (for development)
export { generateZodSchema, validateParsedData } from "@thru/indexer";

// Streams
export { defineEventStream, defineAccountStream } from "@thru/indexer";
export type { EventStream, AccountStream } from "@thru/indexer";

// Checkpoint
export { checkpointTable, getCheckpoint, updateCheckpoint, getSchemaExports } from "@thru/indexer";

// API
export { mountStreamRoutes, generateSchemas } from "@thru/indexer";
export { paginate, parseCursor, paginationQuerySchema } from "@thru/indexer";

// Runtime
export { Indexer } from "@thru/indexer";
export type { IndexerConfig, IndexerResult } from "@thru/indexer";

// Types
export type { ApiConfig, StreamBatch, HookContext } from "@thru/indexer";

Example Project Structure

my-indexer/
├── src/
│   ├── abi/                      # ABI type definitions
│   │   └── token.ts
│   ├── streams/                  # Event stream definitions
│   │   └── transfers.ts
│   ├── account-streams/          # Account stream definitions
│   │   └── token-accounts.ts
│   ├── db/
│   │   ├── index.ts              # Database client
│   │   └── schema.ts             # Drizzle schema exports
│   ├── indexer.ts                # Indexer entry point
│   └── api.ts                    # API entry point
├── drizzle/                      # Generated migrations
├── drizzle.config.ts
├── package.json
└── tsconfig.json