@morpho-org/viem-dlc
v0.0.4
Published
A collection of flexible viem extensions with a focus on intelligent caching.
Readme
@morpho-org/viem-dlc
A collection of flexible viem extensions with a focus on intelligent caching.
Provides composable transport wrappers for optimized eth_getLogs and eth_call handling with
caching, rate limiting, automatic request splitting, and oversized-log filtering.
Installation
pnpm add @morpho-org/viem-dlcAlso available on the GitHub Package Registry.
Transports
cache
All-in-one caching transport for eth_getLogs and eth_call. Internally composes five layers:
oversized-log filtering (logsSieve), log enrichment (logsEnricher), rate limiting (rateLimiter),
request splitting (logsDivider), and caching. Requires a chain on the client so it can
namespace cache keys by chain ID.
import { createPublicClient, http } from 'viem'
import { mainnet } from 'viem/chains'
import { cache, createSimpleInvalidation } from '@morpho-org/viem-dlc/transports/cache'
import { LruStore } from '@morpho-org/viem-dlc/stores'
const transport = cache(http(rpcUrl), [
{
binSize: 10_000,
store: new LruStore(100_000_000),
invalidationStrategy: createSimpleInvalidation(),
},
{
maxBlockRange: 100_000,
},
{
retryCount: 3,
retryDelay: 1_000,
blockTimestamp: false,
},
{
maxBytes: 8_192,
},
{
maxRequestsPerSecond: 10,
maxBurstRequests: 5,
maxConcurrentRequests: 5,
},
])
const client = createPublicClient({ chain: mainnet, transport })The binSize determines cache entry granularity. Requests are aligned to bin boundaries
to maximize cache hits. Smaller bins allow finer-grained invalidation but increase storage
overhead. The logsDivider config's alignTo is automatically set to binSize.
Two invalidation strategies are provided:
createSimpleInvalidation(minAgeMs?, maxAgeDays?, numHotBlocks?, avgInvalidationsPerRequest?)— entries near the chain tip are always refetched; older entries are probabilistically invalidated based on age.createExponentialInvalidation(alphaAge?, maxAgeDays?, alphaBlocks?, scaleBlocks?)— exponential model with separate time and block-age factors.
logsDivider
Splits large eth_getLogs requests into smaller chunks with automatic retry, optional alignment,
internal rate/concurrency limiting via rateLimiter, log enrichment via logsEnricher, and
oversized-log filtering via logsSieve.
import { createPublicClient, http } from 'viem'
import { logsDivider } from '@morpho-org/viem-dlc/transports'
const transport = logsDivider(http(rpcUrl), [
{
maxBlockRange: 100_000,
alignTo: 10_000,
},
{
retryCount: 3,
retryDelay: 1_000,
blockTimestamp: false,
},
{
maxBytes: 8_192,
},
{
maxRequestsPerSecond: 10,
maxConcurrentRequests: 5,
},
])
const client = createPublicClient({ transport })
const logs = await client.request({
method: 'eth_getLogs',
params: [
filter,
undefined,
{
onLogsResponse: ({ logs, fromBlock, toBlock }) => {
/* progressive updates */
},
},
],
})logsEnricher
Enriches eth_getLogs responses with data that standard RPCs omit. Currently supports
populating blockTimestamp by fetching block headers. Logs whose block was reorged away
are silently dropped.
import { createPublicClient, http } from 'viem'
import { logsEnricher } from '@morpho-org/viem-dlc/transports'
const transport = logsEnricher(http(rpcUrl), [{
retryCount: 3,
retryDelay: 1_000,
blockTimestamp: true,
}])
const client = createPublicClient({ transport })logsSieve
Filters eth_getLogs responses by estimated UTF-8 payload size. Any RpcLog whose serialized
size exceeds maxBytes is silently dropped. logsDivider(...) and cache(...) already
compose this transport by default; use logsSieve(...) directly when filtering is all you need.
import { createPublicClient, http } from 'viem'
import { logsSieve } from '@morpho-org/viem-dlc/transports'
const transport = logsSieve(http(rpcUrl), [{ maxBytes: 8_192 }])
const client = createPublicClient({ transport })rateLimiter
Token-bucket rate limiting with concurrency limiting and priority scheduling:
import { createPublicClient, http } from 'viem'
import { rateLimiter } from '@morpho-org/viem-dlc/transports'
const transport = rateLimiter(http(rpcUrl), [
{
maxRequestsPerSecond: 10,
maxBurstRequests: 5,
maxConcurrentRequests: 3,
},
])
const client = createPublicClient({ transport })
await client.request({
method: 'eth_getLogs',
params: [
filter,
{
__rateLimiter: true,
priority: 0,
},
],
})Stores
Key-value stores implementing the Store interface:
interface Store {
get(key: string): MaybePromise<Buffer[] | null>
set(key: string, value: Buffer[]): MaybePromise<void>
delete(key: string): MaybePromise<void>
flush(): MaybePromise<void>
}| Store | Import | Description |
| --- | --- | --- |
| LruStore | @morpho-org/viem-dlc/stores | LRU cache with configurable byte-size limit |
| MemoryStore | @morpho-org/viem-dlc/stores | Simple in-memory Map (prefer LruStore) |
| HierarchicalStore | @morpho-org/viem-dlc/stores | Layered stores — reads fall through, writes fan out |
| DebouncedStore | @morpho-org/viem-dlc/stores | Batches writes with debounce + max staleness timeout |
| CompressedStore | @morpho-org/viem-dlc/stores | Transparent zstd compression (Node/Bun only) |
| UpstashStore | @morpho-org/viem-dlc/stores/upstash | Upstash Redis with automatic value sharding and atomic writes |
Composing stores
Stores are designed to be layered. For example, createOptimizedUpstashStore (exported from
@morpho-org/viem-dlc/stores/upstash) returns a pre-composed stack:
LruStore (fast, in-process)
└─ DebouncedStore (coalesces writes)
└─ UpstashStore (durable, remote)import { createOptimizedUpstashStore } from '@morpho-org/viem-dlc/stores/upstash'
const store = createOptimizedUpstashStore({
maxRequestBytes: 1_000_000,
maxWritesPerSecond: 300,
})Actions
getLogs2
Drop-in replacement for viem's getLogs that adds cache-layer search pre-filtering and
streaming reduce. Requires a client whose transport uses the cache() wrapper (i.e. whose
rpcSchema is CacheSchema).
search is a regex matched against raw NDJSON before parsing — use hex-encoded values
(address fragments, topic prefixes) to skip expensive JSON.parse calls on irrelevant batches.
reduce folds over decoded logs in order, keeping memory proportional to the accumulator
rather than the full result set.
import { parseAbiItem } from 'viem'
import { getLogs2 } from '@morpho-org/viem-dlc/actions'
const logs = await getLogs2(client, {
address: '0x...',
event: parseAbiItem('event Transfer(address indexed, address indexed, uint256)'),
fromBlock: 18_000_000n,
toBlock: 19_000_000n,
search: 'deadbeef',
reduce: (acc, log) => {
acc.push(log) // log.args is already decoded
return acc
},
})eth_call cachePolicy
Creates a stateOverride entry that tells the cache transport how to cache an eth_call.
Works with viem's built-in call, readContract, and multicall actions — no wrappers needed.
Automatically detects Multicall3 aggregate3 calls and caches individual sub-calls.
The first argument (blobKey) groups cached results into a named store entry (the blob is
extended by new results, not replaced). The second (ttl) sets the maximum age in milliseconds
before a cached result is considered stale.
import { cachePolicy } from '@morpho-org/viem-dlc/actions'
// readContract
const totalAssets = await client.readContract({
address: '0x...',
abi: vaultAbi,
functionName: 'totalAssets',
stateOverride: [cachePolicy('morpho-blue', 60_000)],
})
// multicall
const results = await client.multicall({
contracts: [
{ address: '0x...', abi: vaultAbi, functionName: 'totalAssets' },
{ address: '0x...', abi: vaultAbi, functionName: 'totalSupply' },
],
stateOverride: [cachePolicy('morpho-blue', 60_000)],
})getDeploymentBlockNumber
Finds the block at which a contract was deployed using binary search over getCode.
import { createPublicClient, http } from 'viem'
import { mainnet } from 'viem/chains'
import { getDeploymentBlockNumber } from '@morpho-org/viem-dlc/actions'
const client = createPublicClient({ chain: mainnet, transport: http() })
const block = await getDeploymentBlockNumber(client, {
address: '0x...',
})Utilities
Exported from @morpho-org/viem-dlc/utils:
divideBlockRange/mergeBlockRanges/halveBlockRange— block range manipulationresolveBlockNumber/extractRangeFromFilter/isInBlockRange— block number helpersisErrorCausedByBlockRange— detect RPC "block range too large" errorscreateCoalescingMutex— per-resource leader/follower batchingcreateTokenBucket/createRateLimit— rate limiting primitivescyrb64Hash— fast string hashingstringify/parse/estimateUtf8Bytes— JSON serialization with bigint supportpick/omit— object helpersmeasureUtf8Bytes/shardString— string utilities
