From 4f67743299164464b06f423a522eb662d93c848b Mon Sep 17 00:00:00 2001 From: Bob <80072466+cwastche@users.noreply.github.com> Date: Thu, 19 Sep 2024 05:00:07 -0400 Subject: [PATCH] cleanup indexer transform function (#1226) * - Changed return statements to array filters - Moved object mutations from map to forEach * Address comments: - Add name constants for Collection - cleanup * Fix typing * Divide transform into helper functions for clarity * fix: fmt and type * fix: tests * fix: refactoring * fix: remove update store --------- Co-authored-by: Eugenio Paluello --- indexer/src/main.ts | 397 +++++++++++++++++--------------- indexer/src/types/interfaces.ts | 27 ++- indexer/src/types/storeItem.ts | 13 +- 3 files changed, 251 insertions(+), 186 deletions(-) diff --git a/indexer/src/main.ts b/indexer/src/main.ts index 46ab3cbd1..e2c027645 100644 --- a/indexer/src/main.ts +++ b/indexer/src/main.ts @@ -30,19 +30,25 @@ import { } from "./types/receipt.ts"; import { JsonRpcLog, toEthLog } from "./types/log.ts"; import { createTrieData, TrieData } from "./types/tries.ts"; -import { StoreItem } from "./types/storeItem.ts"; +import { Collection, StoreItem } from "./types/storeItem.ts"; // Starknet import { BlockHeader, Config, EventWithTransaction, hexToBytes, + JsonRpcTx, NetworkOptions, SinkOptions, TransactionWithReceipt, } from "./deps.ts"; // Eth import { Bloom, Trie } from "./deps.ts"; +import { + BlockInfo, + ProcessedEvent, + ProcessedTransaction, +} from "./types/interfaces.ts"; export const config: Config = { streamUrl: STREAM_URL, @@ -73,191 +79,45 @@ export default async function transform({ events: EventWithTransaction[]; transactions: TransactionWithReceipt[]; }) { - // Accumulate the gas used in the block in order to calculate the cumulative gas used. - // We increment it by the gas used in each transaction in the flatMap iteration. - let cumulativeGasUsed = 0n; - // An array containing the cumulative gas used up to that transaction, indexed by - // transaction index. This is used to later get the cumulative gas used for an out of - // resources transaction. - const cumulativeGasUsages: Array = []; - - const blockNumber = padString(toHexString(header.blockNumber), 8); - const isPendingBlock = padString(header.blockHash, 32) === NULL_BLOCK_HASH; - const blockHash = padString(header.blockHash, 32); + const blockInfo = createBlockInfo(header); + const store: Array = []; const blockLogsBloom = new Bloom(); - const transactionTrie = new Trie(); - const receiptTrie = new Trie(); - const store: Array = []; + const processedEvents = (events ?? []) + // Can be false if the transaction is not related to a specific instance of the Kakarot contract. + // This is typically the case if there are multiple Kakarot contracts on the same chain. + // Skip if the transaction_executed event contains "eth validation failed". + .filter((event) => + isKakarotTransaction(event.transaction) && + !ethValidationFailed(event.event) + ) + .map(processEvent(blockInfo)) + .filter((event): event is ProcessedEvent => event !== null); - const maybeTrieData: Array = (events ?? []).map( - ({ transaction, receipt, event }) => { - console.log( - "🔍 Processing transaction with Starknet hash: ", - transaction.meta.hash, - ); - // Can be false if the transaction is not related to a specific instance of the Kakarot contract. - // This is typically the case if there are multiple Kakarot contracts on the same chain. - const isKakarotTx = isKakarotTransaction(transaction); - if (!isKakarotTx) { - return null; - } - - // Skip if the transaction_executed event contains "eth validation failed". - if (ethValidationFailed(event)) { - return null; - } - - const typedEthTx = toTypedEthTx({ transaction }); - // Can be null if: - // 1. The transaction is missing calldata. - // 2. The transaction is a multi-call. - // 3. The length of the signature array is different from 5. - // 4. The chain id is not encoded in the v param of the signature for a - // Legacy transaction. - // 5. The deserialization of the transaction fails. - if (!typedEthTx) { - return null; - } - const ethTx = typedTransactionToEthTx({ - typedTransaction: typedEthTx, - receipt, - blockNumber, - blockHash, - isPendingBlock, - }); - // Can be null if: - // 1. The typed transaction if missing a signature param (v, r, s). - if (!ethTx) { - return null; - } + const { cumulativeGasUsed, cumulativeGasUsages } = + accumulateGasAndUpdateStore(processedEvents, store, blockLogsBloom); - // Can be null if: - // 1. The event is part of the defined ignored events (see IGNORED_KEYS). - // 2. The event has an invalid number of keys. - const ethLogs = receipt.events - .map((e) => { - return toEthLog({ - transaction: ethTx, - event: e, - blockNumber, - blockHash, - isPendingBlock, - }); - }) - .filter((e) => e !== null) as JsonRpcLog[]; - const ethLogsIndexed = ethLogs.map((log, index) => { - log.logIndex = index.toString(); - return log; - }); - - const ethReceipt = toEthReceipt({ - transaction: ethTx, - logs: ethLogsIndexed, - event, - cumulativeGasUsed, - blockNumber, - blockHash, - isPendingBlock, - }); - - cumulativeGasUsed += BigInt(ethReceipt.gasUsed); - // ethTx.transactionIndex can be null (if the block is pending) but - // Number(null) is 0 so this won't panic. - cumulativeGasUsages[Number(ethTx.transactionIndex)] = cumulativeGasUsed; - - // Add all the eth data to the store. - store.push({ collection: "transactions", data: { tx: ethTx } }); - store.push({ collection: "receipts", data: { receipt: ethReceipt } }); - ethLogs.forEach((ethLog) => { - store.push({ collection: "logs", data: { log: ethLog } }); - }); - - // Add the logs bloom of the receipt to the block logs bloom. - const receiptLogsBloom = new Bloom(hexToBytes(ethReceipt.logsBloom)); - blockLogsBloom.or(receiptLogsBloom); - - /// Return the trie data. - return createTrieData({ - transactionIndex: Number(ethTx.transactionIndex), - typedTransaction: typedEthTx, - receipt: ethReceipt, - }); - }, - ); + const trieData: Array = processedEvents + .map((event) => + createTrieData({ + transactionIndex: Number(event!.ethTx.transactionIndex), + typedTransaction: event!.typedEthTx, + receipt: event!.ethReceipt, + }) + ) + .filter((x) => x !== null); - // Filter out the null values for the trie data. - const trieData = maybeTrieData.filter((x) => x !== null) as Array; - - // Compute the blooms in an async manner. - await Promise.all( - trieData.map( - async ({ - encodedTransactionIndex, - encodedTransaction, - encodedReceipt, - }) => { - // Add the transaction to the transaction trie. - await transactionTrie.put(encodedTransactionIndex, encodedTransaction); - // Add the receipt to the receipt trie. - await receiptTrie.put(encodedTransactionIndex, encodedReceipt); - }, - ), - ); + const { transactionTrie, receiptTrie } = await computeBlooms(trieData); // Sort the cumulative gas uses by descending transaction index. cumulativeGasUsages.reverse(); - (transactions ?? []).forEach(({ transaction, receipt }) => { - if (isRevertedWithOutOfResources(receipt)) { - // Can be false if the transaction is not related to a specific instance of the Kakarot contract. - // This is typically the case if there are multiple Kakarot contracts on the same chain. - const isKakarotTx = isKakarotTransaction(transaction); - if (!isKakarotTx) { - return; - } - - const ethTx = toEthTx({ - transaction, - receipt, - blockNumber, - blockHash, - isPendingBlock, - }); - if (!ethTx) { - return; - } - - // Get the cumulative gas used for the reverted transaction. - // Example: - // const cumulativeGasUsages = [300n, undefined, undefined, 200n, undefined, 100n, undefined, undefined, 10n, undefined]; - // const ethTx = { transactionIndex: 5 }; - // const revertedTransactionCumulativeGasUsed = 100n; - const len = cumulativeGasUsages.length; - const revertedTransactionCumulativeGasUsed = - cumulativeGasUsages.find((gas, i) => { - return ( - Number(ethTx.transactionIndex) >= len - 1 - i && gas !== undefined - ); - }) ?? 0n; - - const ethReceipt = toRevertedOutOfResourcesReceipt({ - transaction: ethTx, - blockNumber, - blockHash, - cumulativeGasUsed: revertedTransactionCumulativeGasUsed, - isPendingBlock, - }); - - store.push({ collection: "transactions", data: { tx: ethTx } }); - store.push({ - collection: "receipts", - data: { - receipt: ethReceipt, - }, - }); - } - }); + const processedTransactions = processTransactions( + transactions, + blockInfo, + cumulativeGasUsages, + ); + updateStoreWithTransactions(store, processedTransactions); const ethHeader = await toEthHeader({ header: header, @@ -265,14 +125,189 @@ export default async function transform({ logsBloom: blockLogsBloom, receiptRoot: receiptTrie.root(), transactionRoot: transactionTrie.root(), - blockNumber, - blockHash, - isPendingBlock, + ...blockInfo, }); store.push({ - collection: "headers", + collection: Collection.Headers, data: { header: ethHeader }, }); return store; } + +function createBlockInfo(header: BlockHeader): BlockInfo { + const blockNumber = padString(toHexString(header.blockNumber), 8); + const blockHash = padString(header.blockHash, 32); + const isPendingBlock = blockHash === NULL_BLOCK_HASH; + return { blockNumber, blockHash, isPendingBlock }; +} + +function processEvent(blockInfo: BlockInfo) { + return (event: EventWithTransaction): ProcessedEvent | null => { + const typedEthTx = toTypedEthTx({ transaction: event.transaction }); + // Can be null if: + // 1. The transaction is missing calldata. + // 2. The transaction is a multi-call. + // 3. The length of the signature array is different from 5. + // 4. The chain id is not encoded in the v param of the signature for a + // Legacy transaction. + // 5. The deserialization of the transaction fails. + if (!typedEthTx) return null; + + const ethTx = typedTransactionToEthTx({ + typedTransaction: typedEthTx!, + receipt: event.receipt, + ...blockInfo, + }); + // Can be null if: + // The typed transaction is missing a signature param (v, r, s). + if (!ethTx) return null; + + const ethLogs = event.receipt.events + .map((e) => toEthLog({ transaction: ethTx, event: e, ...blockInfo })) + // Can be null if: + // 1. The event is part of the defined ignored events (see IGNORED_KEYS). + // 2. The event has an invalid number of keys. + .filter((e): e is JsonRpcLog => e !== null); + + const ethLogsIndexed = ethLogs.map((log, index) => ({ + ...log, + logIndex: index.toString(), + })); + + const ethReceipt = toEthReceipt({ + transaction: ethTx as JsonRpcTx, + logs: ethLogsIndexed, + event: event.event, + cumulativeGasUsed: 0n, // This will be updated later + ...blockInfo, + }); + + return { event, typedEthTx, ethTx, ethLogs: ethLogsIndexed, ethReceipt }; + }; +} + +function accumulateGasAndUpdateStore( + processedEvents: ProcessedEvent[], + store: Array, + blockLogsBloom: Bloom, +): { cumulativeGasUsed: bigint; cumulativeGasUsages: bigint[] } { + let cumulativeGasUsed = 0n; + const cumulativeGasUsages: bigint[] = []; + + processedEvents?.forEach((event, index) => { + cumulativeGasUsed += BigInt(event.ethReceipt.gasUsed); + cumulativeGasUsages[index] = cumulativeGasUsed; + + // Update the cumulative gas used in the receipt + event.ethReceipt.cumulativeGasUsed = `0x${cumulativeGasUsed.toString(16)}`; + + store.push(...[ + { + collection: Collection.Transactions, + data: { tx: event.ethTx }, + }, + { + collection: Collection.Receipts, + data: { receipt: event.ethReceipt }, + }, + ...event.ethLogs.map((log) => ({ + collection: Collection.Logs, + data: { log }, + })), + ]); + updateBlockLogsBloom(blockLogsBloom, event); + }); + + return { cumulativeGasUsed, cumulativeGasUsages }; +} + +function updateBlockLogsBloom(blockLogsBloom: Bloom, event: ProcessedEvent) { + const receiptLogsBloom = new Bloom(hexToBytes(event.ethReceipt.logsBloom)); + blockLogsBloom.or(receiptLogsBloom); +} + +async function computeBlooms( + trieData: Array, +): Promise<{ transactionTrie: Trie; receiptTrie: Trie }> { + const transactionTrie = new Trie(); + const receiptTrie = new Trie(); + + trieData.sort((a, b) => + Number(a.encodedTransactionIndex) - Number(b.encodedTransactionIndex) + ); + + for ( + const { encodedTransactionIndex, encodedTransaction, encodedReceipt } + of trieData + ) { + await transactionTrie.put(encodedTransactionIndex, encodedTransaction); + await receiptTrie.put(encodedTransactionIndex, encodedReceipt); + } + + return { transactionTrie, receiptTrie }; +} + +function processTransactions( + transactions: TransactionWithReceipt[], + blockInfo: BlockInfo, + cumulativeGasUsages: bigint[], +): ProcessedTransaction[] { + return (transactions ?? []) + .filter( + (tx) => + isRevertedWithOutOfResources(tx.receipt) && + isKakarotTransaction(tx.transaction), + ) + .map((tx) => createProcessedTransaction(tx, blockInfo, cumulativeGasUsages)) + .filter((tx): tx is ProcessedTransaction => tx !== null); +} + +function createProcessedTransaction( + tx: TransactionWithReceipt, + blockInfo: BlockInfo, + cumulativeGasUsages: bigint[], +): ProcessedTransaction | null { + if (!tx.transaction || !tx.receipt) return null; + + const ethTx = toEthTx({ + transaction: tx.transaction, + receipt: tx.receipt, + ...blockInfo, + }); + if (!ethTx) return null; + // Get the cumulative gas used for the reverted transaction. + // Example: + // const cumulativeGasUsages = [300n, undefined, undefined, 200n, undefined, 100n, undefined, undefined, 10n, undefined]; + // const ethTx = { transactionIndex: 5 }; + // const revertedTransactionCumulativeGasUsed = 100n; + const revertedTransactionCumulativeGasUsed = cumulativeGasUsages.find( + (gas, i) => + Number(ethTx.transactionIndex) >= cumulativeGasUsages.length - 1 - i && + gas, + ) ?? 0n; + + const ethReceipt = toRevertedOutOfResourcesReceipt({ + transaction: ethTx as JsonRpcTx, + cumulativeGasUsed: revertedTransactionCumulativeGasUsed, + ...blockInfo, + }); + + return { ethTx, ethReceipt }; +} + +function updateStoreWithTransactions( + store: Array, + processedTransactions: ProcessedTransaction[], +) { + processedTransactions.forEach(({ ethTx, ethReceipt }) => { + store.push({ + collection: Collection.Transactions, + data: { tx: ethTx as JsonRpcTx }, + }); + store.push({ + collection: Collection.Receipts, + data: { receipt: ethReceipt }, + }); + }); +} diff --git a/indexer/src/types/interfaces.ts b/indexer/src/types/interfaces.ts index 4080c9a44..3638d2a94 100644 --- a/indexer/src/types/interfaces.ts +++ b/indexer/src/types/interfaces.ts @@ -1,5 +1,9 @@ // Starknet -import { Transaction, TransactionReceipt } from "../deps.ts"; +import { + EventWithTransaction, + Transaction, + TransactionReceipt, +} from "../deps.ts"; // Eth import { @@ -8,6 +12,8 @@ import { PrefixedHexString, TypedTransaction, } from "../deps.ts"; +import { JsonRpcLog } from "./log.ts"; +import { JsonRpcReceipt } from "./receipt.ts"; /** * Represents a hexadecimal string with a `0x` prefix. @@ -81,3 +87,22 @@ export interface TransactionConversionInput { /** The index of the transaction in the block. */ index: string; } + +export interface BlockInfo { + blockNumber: string; + blockHash: string; + isPendingBlock: boolean; +} + +export interface ProcessedEvent { + event: EventWithTransaction; + typedEthTx: TypedTransaction; + ethTx: JsonRpcTx; + ethLogs: JsonRpcLog[]; + ethReceipt: JsonRpcReceipt; +} + +export interface ProcessedTransaction { + ethTx: JsonRpcTx; + ethReceipt: JsonRpcReceipt; +} diff --git a/indexer/src/types/storeItem.ts b/indexer/src/types/storeItem.ts index 891c82ffd..52e75a0cc 100644 --- a/indexer/src/types/storeItem.ts +++ b/indexer/src/types/storeItem.ts @@ -6,12 +6,17 @@ import { JsonRpcReceipt } from "./receipt.ts"; import { JsonRpcTx } from "../deps.ts"; import { JsonRpcBlock } from "./header.ts"; -type Collection = "transactions" | "logs" | "receipts" | "headers"; +export enum Collection { + Transactions = "transactions", + Logs = "logs", + Receipts = "receipts", + Headers = "headers", +} export type StoreItem = { collection: C; - data: C extends "transactions" ? { tx: JsonRpcTx } - : C extends "logs" ? { log: JsonRpcLog } - : C extends "receipts" ? { receipt: JsonRpcReceipt } + data: C extends Collection.Transactions ? { tx: JsonRpcTx } + : C extends Collection.Logs ? { log: JsonRpcLog } + : C extends Collection.Receipts ? { receipt: JsonRpcReceipt } : { header: JsonRpcBlock }; };