diff --git a/Cargo.lock b/Cargo.lock index a03e8386a92..c9af7cd3f68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5956,7 +5956,7 @@ dependencies = [ [[package]] name = "web3" version = "0.19.0-graph" -source = "git+https://github.com/graphprotocol/rust-web3?branch=krishna/eth-get-code#6fa509d152eb81ad578af9da958407474951b0d8" +source = "git+https://github.com/graphprotocol/rust-web3?branch=graph-patches-onto-0.18#d9ba4ff5bffd99b019afa15d5187dbab1e978021" dependencies = [ "arrayvec 0.7.4", "base64 0.13.1", diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index a30ac7c43b8..347c4ae09c3 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -75,6 +75,7 @@ pub struct EthereumAdapter { metrics: Arc, supports_eip_1898: bool, call_only: bool, + supports_block_receipts: bool, } impl CheapClone for EthereumAdapter { @@ -86,6 +87,7 @@ impl CheapClone for EthereumAdapter { metrics: self.metrics.cheap_clone(), supports_eip_1898: self.supports_eip_1898, call_only: self.call_only, + supports_block_receipts: self.supports_block_receipts, } } } @@ -105,6 +107,10 @@ impl EthereumAdapter { ) -> Self { let web3 = Arc::new(Web3::new(transport)); + // Check if the provider supports `getBlockReceipts` method. + let supports_block_receipts = + Self::check_block_receipt_support(web3.clone(), &provider, &logger).await; + // Use the client version to check if it is ganache. For compatibility with unit tests, be // are lenient with errors, defaulting to false. let is_ganache = web3 @@ -121,9 +127,36 @@ impl EthereumAdapter { metrics: provider_metrics, supports_eip_1898: supports_eip_1898 && !is_ganache, call_only, + supports_block_receipts: supports_block_receipts, } } + async fn check_block_receipt_support( + web3: Arc>, + provider: &str, + logger: &Logger, + ) -> bool { + info!(logger, "Checking if provider supports getBlockReceipts"; "provider" => provider); + + // Fetch block receipts from the provider for the latest block. + let block_receipts_result = web3 + .eth() + .block_receipts(BlockId::Number(Web3BlockNumber::Latest)) + .await; + + // Determine if the provider supports block receipts based on the fetched result. + let supports_block_receipts = block_receipts_result + .map(|receipts_option| { + // Ensure the result contains non-empty receipts + receipts_option.map_or(false, |receipts| !receipts.is_empty()) + }) + .unwrap_or(false); // Default to false if there's an error in fetching receipts. + + info!(logger, "Checked if provider supports eth_getBlockReceipts"; "provider" => provider, "supports_block_receipts" => supports_block_receipts); + + supports_block_receipts + } + async fn traces( self, logger: Logger, @@ -1277,25 +1310,14 @@ impl EthereumAdapterTrait for EthereumAdapter { }))); } let hashes: Vec<_> = block.transactions.iter().map(|txn| txn.hash).collect(); - let receipts_future = if ENV_VARS.fetch_receipts_in_batches { - // Deprecated batching retrieval of transaction receipts. - fetch_transaction_receipts_in_batch_with_retry(web3, hashes, block_hash, logger).boxed() - } else { - let hash_stream = graph::tokio_stream::iter(hashes); - let receipt_stream = graph::tokio_stream::StreamExt::map(hash_stream, move |tx_hash| { - fetch_transaction_receipt_with_retry( - web3.cheap_clone(), - tx_hash, - block_hash, - logger.cheap_clone(), - ) - }) - .buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls); - graph::tokio_stream::StreamExt::collect::< - Result>, IngestorError>, - >(receipt_stream) - .boxed() - }; + let receipts_future = fetch_receipts_with_retry( + web3, + hashes, + block_hash, + logger, + self.supports_block_receipts, + ) + .boxed(); let block_future = futures03::TryFutureExt::map_ok(receipts_future, move |transaction_receipts| { @@ -2156,6 +2178,96 @@ async fn fetch_transaction_receipts_in_batch( Ok(collected) } +// Fetches transaction receipts with retries. This function acts as a dispatcher +// based on whether block receipts are supported or individual transaction receipts +// need to be fetched. +async fn fetch_receipts_with_retry( + web3: Arc>, + hashes: Vec, + block_hash: H256, + logger: Logger, + supports_block_receipts: bool, +) -> Result>, IngestorError> { + if supports_block_receipts { + return fetch_block_receipts_with_retry(web3, hashes, block_hash, logger).await; + } + fetch_individual_receipts_with_retry(web3, hashes, block_hash, logger).await +} + +// Fetches receipts for each transaction in the block individually. +async fn fetch_individual_receipts_with_retry( + web3: Arc>, + hashes: Vec, + block_hash: H256, + logger: Logger, +) -> Result>, IngestorError> { + if ENV_VARS.fetch_receipts_in_batches { + return fetch_transaction_receipts_in_batch_with_retry(web3, hashes, block_hash, logger) + .await; + } + + // Use a stream to fetch receipts individually + let hash_stream = graph::tokio_stream::iter(hashes); + let receipt_stream = hash_stream + .map(move |tx_hash| { + fetch_transaction_receipt_with_retry( + web3.cheap_clone(), + tx_hash, + block_hash, + logger.cheap_clone(), + ) + }) + .buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls); + + graph::tokio_stream::StreamExt::collect::>, IngestorError>>( + receipt_stream, + ) + .await +} + +/// Fetches transaction receipts of all transactions in a block with `eth_getBlockReceipts` call. +async fn fetch_block_receipts_with_retry( + web3: Arc>, + hashes: Vec, + block_hash: H256, + logger: Logger, +) -> Result>, IngestorError> { + let logger = logger.cheap_clone(); + let retry_log_message = format!("eth_getBlockReceipts RPC call for block {:?}", block_hash); + + // Perform the retry operation + let receipts_option = retry(retry_log_message, &logger) + .limit(ENV_VARS.request_retries) + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || web3.eth().block_receipts(block_hash.into()).boxed()) + .await + .map_err(|_timeout| -> IngestorError { anyhow!(block_hash).into() })?; + + // Check if receipts are available, and transform them if they are + match receipts_option { + Some(receipts) => { + // Create a HashSet from the transaction hashes of the receipts + let receipt_hashes_set: HashSet<_> = + receipts.iter().map(|r| r.transaction_hash).collect(); + + // Check if the set contains all the hashes and has the same length as the hashes vec + if hashes.len() == receipt_hashes_set.len() + && hashes.iter().all(|hash| receipt_hashes_set.contains(hash)) + { + let transformed_receipts = receipts.into_iter().map(Arc::new).collect(); + Ok(transformed_receipts) + } else { + // If there's a mismatch in numbers or a missing hash, return an error + Err(IngestorError::BlockReceiptsMismatched(block_hash)) + } + } + None => { + // If no receipts are found, return an error + Err(IngestorError::BlockReceiptsUnavailable(block_hash)) + } + } +} + /// Retries fetching a single transaction receipt. async fn fetch_transaction_receipt_with_retry( web3: Arc>, diff --git a/graph/Cargo.toml b/graph/Cargo.toml index 163718297fc..d637a9fc27a 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -86,7 +86,7 @@ defer = "0.2" # Our fork contains patches to make some fields optional for Celo and Fantom compatibility. # Without the "arbitrary_precision" feature, we get the error `data did not match any variant of untagged enum Response`. -web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "krishna/eth-get-code", features = [ +web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-patches-onto-0.18", features = [ "arbitrary_precision", ] } serde_plain = "1.0.2" diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 193b3766bb8..68776c70ce5 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -226,6 +226,14 @@ pub enum IngestorError { #[error("Receipt for tx {1:?} unavailable, block was likely uncled (block hash = {0:?})")] ReceiptUnavailable(H256, H256), + /// The Ethereum node does not know about this block for some reason + #[error("Transaction receipts for block (block hash = {0:?}) is unavailable")] + BlockReceiptsUnavailable(H256), + + /// The Ethereum node does not know about this block for some reason + #[error("Received confliciting block receipts for block (block hash = {0:?})")] + BlockReceiptsMismatched(H256), + /// An unexpected error occurred. #[error("Ingestor error: {0:#}")] Unknown(#[from] Error),