@axxel/event-bus
v1.0.7
Published
Axxel Kafka Event Bus SDK
Keywords
Readme
@axxel/event-bus
A lightweight SDK for producing and consuming real-time events via Amazon MSK (Kafka) for the Axxel ecosystem.
This library provides a simple TypeScript interface for publishing and subscribing to:
token-prices— token price updates across multiple chainswallet-transactions— wallet buy/sell/transfer activity
🚀 Installation
npm install @axxel/event-bus
# or
yarn add @axxel/event-bus⚙️ Setup
Make sure your environment variables point to your Kafka (MSK) cluster:
KAFKA_BROKER="b-1.mskcluster.amazonaws.com:9094,b-2.mskcluster.amazonaws.com:9094"If using local Kafka (for testing):
KAFKA_BROKER="localhost:9092"🔌 Quick Start
1️⃣ Producing messages
import {
produceTokenPrice,
produceWalletTransaction,
ensureTokenPriceEvent,
ensureWalletTransactionEvent,
} from '@axxel/event-bus';
await produceTokenPrice({
chainId: 1,
tokenAddress: '0xC02a...',
blockNumber: 21345678,
price: {
USD: { price: 3250.42, exchange: 'uniswap-v3', liquidity: 1250000 },
},
priceUsd: 3250.42,
fdv: 1234567890,
symbol: 'WETH',
updatedAt: Math.floor(Date.now() / 1000),
});
await produceWalletTransaction({
chainId: 8453,
walletAddress: '0xabc...',
type: 'BUY',
pool: '0xpool...',
txHash: '0xdeadbeef...',
baseAmount: '10.5',
quoteAmount: '32500.42',
baseTokenAddress: '0x123...',
baseTokenSymbol: 'WETH',
quoteTokenAddress: '0x456...',
quoteTokenSymbol: 'USDC',
blockTimestamp: Math.floor(Date.now() / 1000),
liquidity: 250000,
marketcap: 500000000,
totalSupply: 1000000,
});
// Optional: validate and normalize before producing
const safePricePayload = ensureTokenPriceEvent({
chainId: 1,
tokenAddress: '0xC02a...',
blockNumber: 21345678,
price: new Map([
['USD', { price: 3250.42, exchange: 'uniswap-v3', liquidity: 1250000 }],
]),
priceUsd: 3250.42,
fdv: 1234567890,
});
await produceTokenPrice(safePricePayload);2️⃣ Consuming messages
import {
startTokenPriceConsumer,
startWalletTransactionConsumer,
} from '@axxel/event-bus';
await startTokenPriceConsumer('limit-orders', async (priceEvent) => {
const usdInfo =
priceEvent.price instanceof Map
? priceEvent.price.get('USD')
: priceEvent.price['USD'];
console.log(
`📈 ${priceEvent.symbol ?? priceEvent.tokenAddress} @ $${
usdInfo?.price ?? 'n/a'
} (chain ${priceEvent.chainId})`,
);
});
await startWalletTransactionConsumer('wallet-tracker', async (txEvent) => {
console.log(
`💸 ${txEvent.walletAddress} ${txEvent.type} ${txEvent.baseAmount} ${txEvent.baseTokenSymbol}/${txEvent.quoteTokenSymbol}`,
);
});Each service should use a unique consumer group ID (limit-orders, wallet-tracker, ai-engine, etc.)
Multiple instances with the same group ID will automatically load-balance partitions.
🧠 Topics Overview
| Topic | Description | Partition Key |
| --------------------- | ------------------------------------------ | ----------------------- |
| token-prices | Live token price updates across all chains | chainId:tokenAddress |
| wallet-transactions | Wallet buy/sell/transfer events | chainId:walletAddress |
🧱 Message Schemas
TokenPriceEvent
type PriceInfo = { price: number; exchange: string; liquidity: number };
interface TokenPriceEvent {
chainId: number;
tokenAddress: string;
blockNumber: number;
price: Record<string, PriceInfo> | Map<string, PriceInfo>;
priceUsd?: number | null;
fdv?: number | null;
symbol?: string | null;
updatedAt?: number;
}WalletTransactionEvent
interface WalletTransactionEvent {
chainId: number;
walletAddress: string;
type: string;
pool?: string | null;
txHash: string;
baseAmount: string;
quoteAmount: string;
baseTokenAddress: string;
baseTokenSymbol: string;
quoteTokenAddress: string;
quoteTokenSymbol: string;
blockTimestamp: number;
liquidity?: number | null;
marketcap?: number | null;
totalSupply?: number | null;
}Runtime validation helpers
Shared Zod schemas are exported to normalize inputs consistently:
import {
ensureTokenPriceEvent,
ensureWalletTransactionEvent,
} from '@axxel/event-bus';
const normalizedPrice = ensureTokenPriceEvent(rawPricePayload);
const normalizedTx = ensureWalletTransactionEvent(rawTxPayload);ensureTokenPriceEvent accepts either a Record or Map for price, and both helpers lowercase EVM addresses / tx hashes while rejecting malformed numeric fields.
⚙️ Environment Variables
| Variable | Description | Example |
| --------------------- | ------------------------------------- | ------------------ |
| KAFKA_BROKER | Comma-separated list of Kafka brokers | "localhost:9092" |
| KAFKA_SSL | Enable SSL (for MSK) | "true" |
| KAFKA_SASL_USERNAME | SASL username (if using auth) | "user" |
| KAFKA_SASL_PASSWORD | SASL password | "pass" |
🧩 Advanced Usage
You can also create your own consumers or producers using the exported kafka client:
import { kafka } from '@axxel/event-bus';
const producer = kafka.producer();
await producer.connect();
// custom logic...🧭 Example Architecture
Axxel Services
│
├── Trading Bot → produce token-prices, wallet-transactions
├── AI Engine → consume token-prices & wallet-transactions, produce ai-signals
└── Analytics → consume everything📄 License
MIT © Axxel
