diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 55892a287ac..dfe77898a77 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -107,7 +107,6 @@ impl EthereumAdapter { ) -> Self { let web3 = Arc::new(Web3::new(transport)); - // Check if the chain supports `getBlockReceipts` method. let supports_block_receipts = web3 .eth() @@ -2146,6 +2145,9 @@ async fn fetch_transaction_receipts_in_batch( Ok(collected) } +// Fetches transaction receipts with retries. This function acts as a dispatcher +// based on whether block receipts are supported or individual transaction receipts +// need to be fetched. async fn fetch_receipts_with_retry( web3: Arc>, hashes: Vec, @@ -2153,33 +2155,41 @@ async fn fetch_receipts_with_retry( logger: Logger, supports_block_receipts: bool, ) -> Result>, IngestorError> { - let receipts_future = if supports_block_receipts { - fetch_block_receipts_with_retry(web3, block_hash, logger).await - } else { - let receipts_future = if ENV_VARS.fetch_receipts_in_batches { - // Deprecated batching retrieval of transaction receipts. - fetch_transaction_receipts_in_batch_with_retry(web3, hashes, block_hash, logger).await - } else { - let hash_stream = graph::tokio_stream::iter(hashes); - let receipt_stream = graph::tokio_stream::StreamExt::map(hash_stream, move |tx_hash| { - fetch_transaction_receipt_with_retry( - web3.cheap_clone(), - tx_hash, - block_hash, - logger.cheap_clone(), - ) - }) - .buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls); - graph::tokio_stream::StreamExt::collect::< - Result>, IngestorError>, - >(receipt_stream) - .await - }; + if supports_block_receipts { + return fetch_block_receipts_with_retry(web3, block_hash, logger).await; + } + fetch_individual_receipts_with_retry(web3, hashes, block_hash, logger).await +} - receipts_future - }; +// Fetches receipts for each transaction in the block individually. +async fn fetch_individual_receipts_with_retry( + web3: Arc>, + hashes: Vec, + block_hash: H256, + logger: Logger, +) -> Result>, IngestorError> { + if ENV_VARS.fetch_receipts_in_batches { + return fetch_transaction_receipts_in_batch_with_retry(web3, hashes, block_hash, logger) + .await; + } - receipts_future + // Use a stream to fetch receipts individually + let hash_stream = graph::tokio_stream::iter(hashes); + let receipt_stream = hash_stream + .map(move |tx_hash| { + fetch_transaction_receipt_with_retry( + web3.cheap_clone(), + tx_hash, + block_hash, + logger.cheap_clone(), + ) + }) + .buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls); + + graph::tokio_stream::StreamExt::collect::>, IngestorError>>( + receipt_stream, + ) + .await } /// Fetches transaction receipts of all transactions in a block with `eth_getBlockReceipts` call.