~/textes/rust-onchain-token-indexer
Indexer un registre de jetons on-chain en Rust
Une blockchain te remet un journal de transferts en ajout seul, sous trois formes différentes, derrière un RPC à débit limité qui réécrit discrètement sa propre histoire récente. Replier ça en un registre de soldes correct, c'est tout le travail, et la plupart des façons de se tromper ne se manifestent pas avant des semaines.
Un ami a fait l'art. J'ai écrit le backend, pendant les années où les NFT étaient en vogue. L'art n'est pas mon histoire. Le backend l'est, parce qu'il devait être correct, et la plupart des façons de se tromper ne s'annoncent pas avant des semaines.
La question à laquelle il répondait était simple : qui détient quoi, à n'importe quel bloc ? La chaîne le sait déjà, mais elle ne stocke pas les soldes. Elle stocke un journal d'événements de transfert en ajout seul, et la seule façon de connaître un solde, c'est de rejouer chaque transfert qui a jamais touché une adresse et de les additionner.
Ce rejeu, replié une fois et maintenu à jour, c'est un indexeur. Tu peux en louer un. Je voulais posséder la réponse, alors j'ai écrit le mien. Ça ressemble à un script de fin de semaine, et le chemin heureux l'est. Les parties qui ont pris du vrai temps, ce sont celles qui n'apparaissent pas avant d'être quelques millions de blocs plus loin : la chaîne parle trois dialectes de transfert, le RPC te limite le débit, et les blocs les plus récents ne sont pas définitifs.
La chaîne parle trois dialectes
Un transfert de jeton, ce n'est pas un seul événement. ERC-721 émet Transfer(from, to, tokenId). ERC-1155 émet TransferSingle(operator, from, to, id, value) et, pour les mouvements en lot, TransferBatch(operator, from, to, ids[], values[]). Si tu indexes une collection mixte, ce que j'ai fait, tu gères les trois ou ton registre est faux d'une manière que tu ne remarqueras pas avant des semaines.
Chaque événement s'identifie par son premier topic, le hash keccak de sa signature. Ce sont des constantes, alors je les ai figées en octets bruts plutôt que de hacher des chaînes à l'exécution :
const TRANSFER_TOPIC: H256 = H256([ // ERC-721 Transfer(address,address,uint256)
0xdd, 0xf2, 0x52, 0xad, 0x1b, 0xe2, 0xc8, 0x9b, 0x69, 0xc2, 0xb0, 0x68, 0xfc, 0x37, 0x8d, 0xaa,
0x95, 0x2b, 0xa7, 0xf1, 0x63, 0xc4, 0xa1, 0x16, 0x28, 0xf5, 0x5a, 0x4d, 0xf5, 0x23, 0xb3, 0xef,
]);
const TRANSFER_SINGLE_TOPIC: H256 = H256([ /* ERC-1155 TransferSingle */ ]);
const TRANSFER_BATCH_TOPIC: H256 = H256([ /* ERC-1155 TransferBatch */ ]);Le décodage a une subtilité que les gens manquent. Les paramètres indexés vivent dans les topics du log, pas dans son data. Donc from et to sortent directement de topics[1] et topics[2], mais id et value ne sont pas indexés, alors il faut les décoder en ABI à partir du blob de données. ERC-721 met son tokenId dans un topic et n'a aucun champ de valeur, alors je le normalise à une quantité de un. On dispatche sur le premier topic et on décode en conséquence :
let event = if log.topics[0] == TRANSFER_TOPIC {
self.erc721_to_dbevent(&log, contract)? // value is always 1
} else if log.topics[0] == TRANSFER_SINGLE_TOPIC {
self.erc1155_to_single_dbevent(&log, contract)?
} else if log.topics[0] == TRANSFER_BATCH_TOPIC {
self.erc1155_to_batch_dbevent(&log, contract)?
} else {
continue; // not a transfer we asked for
};TransferBatch est celui qui justifie son existence : ids et values sont des tableaux parallèles, et l'événement n'est valide que s'ils ont la même longueur. J'en ai fait un invariant du type d'événement lui-même, pour qu'un lot mal formé ne puisse pas devenir une ligne de base de données :
pub fn new(/* ... */ ids: Vec<U256>, values: Vec<U256> /* ... */)
-> Result<Self, &'static str>
{
if ids.len() != values.len() {
return Err("ids and values must be the same length");
}
Ok(Event { /* ... */ })
}Le RPC te limite le débit, alors découpe et étrangle
eth_getLogs ne retourne pas une plage de blocs illimitée. Demande-en trop et tu reçois une erreur, ou pire, un résultat tronqué qui ressemble à un succès. Alors la plage du dernier bloc traité jusqu'à la tête de chaîne se découpe en morceaux de taille fixe, une requête chacun :
let chunks: Vec<(usize, usize)> = (start_block..current_block)
.step_by(self.chain.chunk_size)
.map(|start| (start, std::cmp::min(start + self.chain.chunk_size - 1, current_block)))
.collect();Là tu as des centaines de requêtes indépendantes et la tentation de toutes les tirer d'un coup. C'est ce que j'ai fait, et le RPC m'a limité le débit immédiatement. La solution, c'est d'éventer mais de plafonner combien sont en vol avec un sémaphore, et de donner à chaque morceau son propre backoff exponentiel pour qu'un seul 429 ne coule pas la passe :
let semaphore = Arc::new(Semaphore::new(80));
let mut tasks = FuturesUnordered::new();
for (chunk_start, chunk_end) in chunks {
let permit = semaphore.clone();
tasks.push(async move {
let _permit = permit.acquire_owned().await.unwrap(); // released on drop
let mut delay = INITIAL_RETRY_DELAY; // 2s
loop {
match web3.eth().logs(filter.clone()).await {
Ok(logs) => return decode(logs),
Err(_) if attempts < MAX_RETRY_COUNT => { // 5
sleep(delay).await;
delay *= 2; // 2s, 4s, 8s, 16s, 32s
attempts += 1;
}
Err(e) => panic!("gave up after {MAX_RETRY_COUNT}: {e:?}"),
}
}
});
}Relire mon propre vieux code
Le sémaphore est réglé à 80. Le commentaire à côté, dans le code commité, dit « limit to 2 concurrent tasks ». J'ai changé le chiffre et pas le commentaire. Un commentaire est une affirmation, le code est la vérité. Le panic sur épuisement des essais est aussi de la première passe : un indexeur de longue durée devrait journaliser et sauter un morceau, pas abandonner. Les deux sont le genre d'aspérité qu'on lime à la deuxième passe.
Les blocs récents ne sont pas définitifs
C'est la partie qui sépare un indexeur qui fait une belle démo de celui que tu peux laisser tourner. La tête de chaîne n'est pas définitive. Des blocs à quelques niveaux de profondeur peuvent être réorganisés puis remplacés, ce qui veut dire qu'un événement que tu as indexé hier pourrait ne plus être sur la chaîne canonique aujourd'hui. Si tu ne fais qu'ajouter, ton registre se remplit lentement de transferts qui n'ont plus eu lieu.
Deux choses s'en défendent. D'abord, ne fais pas confiance à la tête exacte : je prends le bloc courant moins deux, et à chaque passe je regarde en arrière et je rebalaie le dernier morceau au lieu de repartir à blanc d'où je me suis arrêté.
Ok(block) => return Ok(usize::try_from(block)? - 2), // step back from the unstable head
let look_back_start_block = if current_block <= self.last_processed_block + 2000 {
self.last_processed_block.saturating_sub(2000) // re-scan a full chunk for reorgs
} else {
self.last_processed_block
};Ensuite, et tout le design repose là-dessus : réindexer une plage de blocs doit être idempotent. Au lieu de comparer la chaîne avec la base de données et de rapiécer les événements un à un, ce qui fait plus de code et plus de bogues, je supprime toute la fenêtre et je la réécris à partir de ce que la chaîne dit en ce moment, le tout dans une seule transaction :
let transaction = client.transaction().await?;
for contract in &chain.contracts {
transaction.execute(
"DELETE FROM events WHERE contract_id = $1 AND block_number >= $2 AND block_number <= $3",
&[&contract_id, &(from_block as i32), &(to_block as i32)],
).await?;
for event in new_events {
transaction.execute(
"INSERT INTO events (contract_id, operator, from_address, to_address, \
ids, values, block_number, transaction_hash) VALUES ($1,$2,$3,$4,$5,$6,$7,$8)",
&[/* ... */],
).await?;
}
transaction.execute(
"UPDATE contracts SET last_processed_block = $1 WHERE id = $2",
&[&(to_block as i32), &contract_id],
).await?;
}
transaction.commit().await?; // all or nothingUne réorg se soigne d'elle-même : à la passe suivante, la fenêtre rebalayée est supprimée et réécrite à partir de la vérité canonique, et last_processed_block n'avance qu'une fois le commit posé. L'indexeur ingère entièrement une plage ou laisse la base de données exactement comme elle était. Il n'y a aucun état où la moitié d'une plage de blocs est écrite, et cet état à moitié écrit est exactement ce qui produit un solde faux d'un transfert et impossible à déboguer plus tard.
Ennuyeux à dessein
La reprise découle de la même table. Chaque contrat porte son propre last_processed_block, et une chaîne reprend à partir du minimum parmi ses contrats, donc ajouter un nouveau contrat ne fait que le remblayer à partir de son propre bloc de départ à la passe suivante, sans déranger les autres.
La seule décision de stockage qui m'a surpris, c'est qu'un uint256 n'entre dans aucun entier que Postgres possède. Les ids et les valeurs de jetons sont sur 256 bits, et une colonne 64 bits déborde au premier gros porteur. Je stocke les tableaux d'ids et de valeurs sous forme de chaînes décimales JSON et je fais l'arithmétique en U256 dans l'application, où le système de types comprend la largeur. Ce choix a sa propre histoire, et c'est en bonne partie pourquoi j'ai réécrit ça de Python vers Rust au départ.
La version astucieuse de tout ça, celle qui rapièce chirurgicalement seulement les événements qui ont changé, c'est celle qui casse. Supprime la fenêtre et réécris-la. C'est ennuyeux, c'est idempotent, et pour un registre c'est la seule propriété qui compte.
Le code est sur GitHub.