~/writing/rust-onchain-token-indexer
Indexing an on-chain token ledger in Rust
A blockchain hands you an append-only log of transfers in three different shapes, behind a rate-limited RPC that quietly rewrites its own recent history. Folding that into a correct balance ledger is the whole job, and most of the ways to get it wrong don't show up for weeks.
A friend made the art. I wrote the backend, during the years NFTs were hot. The art isn't my story. The backend is, because it had to be correct, and most of the ways to get it wrong don't announce themselves for weeks.
The question it answered was simple: who holds what, at any block? The chain already knows, but it doesn't store balances. It stores an append-only log of transfer events, and the only way to know a balance is to replay every transfer that ever touched an address and add it up.
That replay, folded once and kept current, is an indexer. You can rent one. I wanted to own the answer, so I wrote my own. It sounds like a weekend script, and the happy path is. The parts that took real time were the ones that don't show up until you're a few million blocks in: the chain speaks three transfer dialects, the RPC rate-limits you, and the most recent blocks aren't final.
The chain speaks three dialects
A token transfer isn't one event. ERC-721 emits Transfer(from, to, tokenId). ERC-1155 emits TransferSingle(operator, from, to, id, value) and, for bulk moves, TransferBatch(operator, from, to, ids[], values[]). If you index a mixed collection, which I did, you handle all three or your ledger is wrong in ways you won't notice for weeks.
Each event identifies itself by its first topic, the keccak hash of its signature. Those are constants, so I pinned them as raw bytes instead of hashing strings at runtime:
const TRANSFER_TOPIC: H256 = H256([ // ERC-721 Transfer(address,address,uint256)
0xdd, 0xf2, 0x52, 0xad, 0x1b, 0xe2, 0xc8, 0x9b, 0x69, 0xc2, 0xb0, 0x68, 0xfc, 0x37, 0x8d, 0xaa,
0x95, 0x2b, 0xa7, 0xf1, 0x63, 0xc4, 0xa1, 0x16, 0x28, 0xf5, 0x5a, 0x4d, 0xf5, 0x23, 0xb3, 0xef,
]);
const TRANSFER_SINGLE_TOPIC: H256 = H256([ /* ERC-1155 TransferSingle */ ]);
const TRANSFER_BATCH_TOPIC: H256 = H256([ /* ERC-1155 TransferBatch */ ]);The decoding has one subtlety that people get wrong. Indexed parameters live in the log's topics, not its data. So from and to come straight out of topics[1] and topics[2], but id and value aren't indexed, so they have to be ABI-decoded out of the data blob. ERC-721 puts its tokenId in a topic and has no value field at all, so I normalize it to a quantity of one. Dispatch on the first topic, decode accordingly:
let event = if log.topics[0] == TRANSFER_TOPIC {
self.erc721_to_dbevent(&log, contract)? // value is always 1
} else if log.topics[0] == TRANSFER_SINGLE_TOPIC {
self.erc1155_to_single_dbevent(&log, contract)?
} else if log.topics[0] == TRANSFER_BATCH_TOPIC {
self.erc1155_to_batch_dbevent(&log, contract)?
} else {
continue; // not a transfer we asked for
};TransferBatch is the one that earns its keep: ids and values are parallel arrays, and the event is only valid if they're the same length. I made that an invariant of the event type itself, so a malformed batch can't become a database row:
pub fn new(/* ... */ ids: Vec<U256>, values: Vec<U256> /* ... */)
-> Result<Self, &'static str>
{
if ids.len() != values.len() {
return Err("ids and values must be the same length");
}
Ok(Event { /* ... */ })
}The RPC rate-limits you, so chunk and throttle
eth_getLogs won't return an unbounded block range. Ask for too much and you get an error, or worse, a truncated result that looks like success. So the range from the last processed block to the chain head gets sliced into fixed chunks, one request each:
let chunks: Vec<(usize, usize)> = (start_block..current_block)
.step_by(self.chain.chunk_size)
.map(|start| (start, std::cmp::min(start + self.chain.chunk_size - 1, current_block)))
.collect();Now you have hundreds of independent requests and the temptation to fire them all at once. I did, and the RPC rate-limited me immediately. The fix is to fan out but cap how many are in flight with a semaphore, and give each chunk its own exponential backoff so one 429 doesn't sink the run:
let semaphore = Arc::new(Semaphore::new(80));
let mut tasks = FuturesUnordered::new();
for (chunk_start, chunk_end) in chunks {
let permit = semaphore.clone();
tasks.push(async move {
let _permit = permit.acquire_owned().await.unwrap(); // released on drop
let mut delay = INITIAL_RETRY_DELAY; // 2s
loop {
match web3.eth().logs(filter.clone()).await {
Ok(logs) => return decode(logs),
Err(_) if attempts < MAX_RETRY_COUNT => { // 5
sleep(delay).await;
delay *= 2; // 2s, 4s, 8s, 16s, 32s
attempts += 1;
}
Err(e) => panic!("gave up after {MAX_RETRY_COUNT}: {e:?}"),
}
}
});
}Reading my own old code
The semaphore is set to 80. The comment next to it, in the committed source, says "limit to 2 concurrent tasks." I changed the number and not the comment. A comment is a claim, the code is the truth. The panic on exhausted retries is also first-pass: a long-running indexer should log and skip a chunk, not abort. Both are the kind of edge you file down on the second pass.
The recent blocks aren't final
This is the part that separates an indexer that demos well from one you can leave running. The head of the chain isn't final. Blocks a few deep can be reorganized away and replaced, which means an event you indexed yesterday might not be on the canonical chain today. If you only ever append, your ledger slowly fills with transfers that no longer happened.
Two things defend against it. First, don't trust the exact head: I take the current block minus two, and on every pass I look back and re-scan the last chunk instead of starting clean from where I stopped.
Ok(block) => return Ok(usize::try_from(block)? - 2), // step back from the unstable head
let look_back_start_block = if current_block <= self.last_processed_block + 2000 {
self.last_processed_block.saturating_sub(2000) // re-scan a full chunk for reorgs
} else {
self.last_processed_block
};Second, and the whole design rests on this: re-indexing a block range has to be idempotent. Instead of diffing the chain against the database and patching individual events, which is more code and more bugs, I delete the entire window and rewrite it from what the chain says right now, all in one transaction:
let transaction = client.transaction().await?;
for contract in &chain.contracts {
transaction.execute(
"DELETE FROM events WHERE contract_id = $1 AND block_number >= $2 AND block_number <= $3",
&[&contract_id, &(from_block as i32), &(to_block as i32)],
).await?;
for event in new_events {
transaction.execute(
"INSERT INTO events (contract_id, operator, from_address, to_address, \
ids, values, block_number, transaction_hash) VALUES ($1,$2,$3,$4,$5,$6,$7,$8)",
&[/* ... */],
).await?;
}
transaction.execute(
"UPDATE contracts SET last_processed_block = $1 WHERE id = $2",
&[&(to_block as i32), &contract_id],
).await?;
}
transaction.commit().await?; // all or nothingA reorg heals itself: next pass, the rescanned window gets deleted and rewritten from canonical truth, and last_processed_block only advances after the commit lands. The indexer either fully ingests a range or leaves the database exactly as it was. There's no state where half a block range is written, and that half-written state is exactly what produces a balance that's wrong by one transfer and impossible to debug later.
Boring on purpose
Resumability falls out of the same table. Each contract carries its own last_processed_block, and a chain resumes from the minimum across its contracts, so adding a new contract just backfills from its own start block on the next pass without disturbing the others.
The one storage decision that surprised me was that a uint256 doesn't fit in any integer Postgres has. Token ids and values are 256-bit, and a 64-bit column overflows on the first whale. I store the id and value arrays as JSON decimal strings and do the arithmetic in U256 in the application, where the type system understands the width. That choice has its own story, and it's most of why I rewrote this from Python to Rust in the first place.
The clever version of this, the one that surgically patches only the events that changed, is the one that breaks. Delete the window and write it again. It's boring, it's idempotent, and for a ledger that's the only property that counts.
The code is on GitHub.