From b787d9e521fad1cb28a372637474ae4ec4986bf3 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 9 Oct 2024 22:45:40 +0900 Subject: [PATCH] perf(rpc): optimistically retrieve block if near the tip on `eth_getLogs` (#11582) --- crates/rpc/rpc-eth-types/src/logs_utils.rs | 67 +++++++++++------- crates/rpc/rpc/src/eth/filter.rs | 81 ++++++++++++---------- 2 files changed, 87 insertions(+), 61 deletions(-) diff --git a/crates/rpc/rpc-eth-types/src/logs_utils.rs b/crates/rpc/rpc-eth-types/src/logs_utils.rs index f26555bb70da..c64bbe055b79 100644 --- a/crates/rpc/rpc-eth-types/src/logs_utils.rs +++ b/crates/rpc/rpc-eth-types/src/logs_utils.rs @@ -6,7 +6,7 @@ use alloy_primitives::TxHash; use alloy_rpc_types::{FilteredParams, Log}; use reth_chainspec::ChainInfo; use reth_errors::ProviderError; -use reth_primitives::{BlockNumHash, Receipt}; +use reth_primitives::{BlockNumHash, Receipt, SealedBlock}; use reth_storage_api::BlockReader; /// Returns all matching of a block's receipts when the transaction hashes are known. @@ -45,11 +45,20 @@ where all_logs } +/// Helper enum to fetch a transaction either from a block or from the provider. +#[derive(Debug)] +pub enum ProviderOrBlock<'a, P: BlockReader> { + /// Provider + Provider(&'a P), + /// [`SealedBlock`] + Block(SealedBlock), +} + /// Appends all matching logs of a block's receipts. /// If the log matches, look up the corresponding transaction hash. -pub fn append_matching_block_logs( +pub fn append_matching_block_logs( all_logs: &mut Vec, - provider: impl BlockReader, + provider_or_block: ProviderOrBlock<'_, P>, filter: &FilteredParams, block_num_hash: BlockNumHash, receipts: &[Receipt], @@ -60,8 +69,8 @@ pub fn append_matching_block_logs( let mut log_index: u64 = 0; // Lazy loaded number of the first transaction in the block. - // This is useful for blocks with multiple matching logs because it prevents - // re-querying the block body indices. + // This is useful for blocks with multiple matching logs because it + // prevents re-querying the block body indices. let mut loaded_first_tx_num = None; // Iterate over receipts and append matching logs. @@ -71,27 +80,37 @@ pub fn append_matching_block_logs( for log in &receipt.logs { if log_matches_filter(block_num_hash, log, filter) { - let first_tx_num = match loaded_first_tx_num { - Some(num) => num, - None => { - let block_body_indices = - provider.block_body_indices(block_num_hash.number)?.ok_or( - ProviderError::BlockBodyIndicesNotFound(block_num_hash.number), - )?; - loaded_first_tx_num = Some(block_body_indices.first_tx_num); - block_body_indices.first_tx_num - } - }; - // if this is the first match in the receipt's logs, look up the transaction hash if transaction_hash.is_none() { - // This is safe because Transactions and Receipts have the same keys. - let transaction_id = first_tx_num + receipt_idx as u64; - let transaction = provider - .transaction_by_id(transaction_id)? - .ok_or_else(|| ProviderError::TransactionNotFound(transaction_id.into()))?; - - transaction_hash = Some(transaction.hash()); + transaction_hash = match &provider_or_block { + ProviderOrBlock::Block(block) => { + block.body.transactions.get(receipt_idx).map(|t| t.hash()) + } + ProviderOrBlock::Provider(provider) => { + let first_tx_num = match loaded_first_tx_num { + Some(num) => num, + None => { + let block_body_indices = provider + .block_body_indices(block_num_hash.number)? + .ok_or(ProviderError::BlockBodyIndicesNotFound( + block_num_hash.number, + ))?; + loaded_first_tx_num = Some(block_body_indices.first_tx_num); + block_body_indices.first_tx_num + } + }; + + // This is safe because Transactions and Receipts have the same + // keys. + let transaction_id = first_tx_num + receipt_idx as u64; + let transaction = + provider.transaction_by_id(transaction_id)?.ok_or_else(|| { + ProviderError::TransactionNotFound(transaction_id.into()) + })?; + + Some(transaction.hash()) + } + }; } let log = Log { diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index c5581f42a06c..9efecf3dae7f 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -19,11 +19,11 @@ use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_chainspec::ChainInfo; use reth_node_api::EthApiTypes; -use reth_primitives::TransactionSignedEcRecovered; +use reth_primitives::{Receipt, SealedBlock, TransactionSignedEcRecovered}; use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError}; use reth_rpc_eth_api::{EthFilterApiServer, FullEthApiTypes, RpcTransaction, TransactionCompat}; use reth_rpc_eth_types::{ - logs_utils::{self, append_matching_block_logs}, + logs_utils::{self, append_matching_block_logs, ProviderOrBlock}, EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider, }; use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult}; @@ -376,29 +376,34 @@ where FilterBlockOption::AtBlockHash(block_hash) => { // for all matching logs in the block // get the block header with the hash - let block = self + let header = self .provider .header_by_hash_or_number(block_hash.into())? .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?; + let block_num_hash = BlockNumHash::new(header.number, block_hash); + // we also need to ensure that the receipts are available and return an error if // not, in case the block hash been reorged - let receipts = self - .eth_cache - .get_receipts(block_hash) + let (receipts, maybe_block) = self + .receipts_and_maybe_block( + &block_num_hash, + self.provider.chain_info()?.best_number, + ) .await? .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?; let mut all_logs = Vec::new(); - let filter = FilteredParams::new(Some(filter)); - logs_utils::append_matching_block_logs( + append_matching_block_logs( &mut all_logs, - &self.provider, - &filter, - (block_hash, block.number).into(), + maybe_block + .map(|b| ProviderOrBlock::Block(b)) + .unwrap_or_else(|| ProviderOrBlock::Provider(&self.provider)), + &FilteredParams::new(Some(filter)), + block_num_hash, &receipts, false, - block.timestamp, + header.timestamp, )?; Ok(all_logs) @@ -454,7 +459,6 @@ where chain_info: ChainInfo, ) -> Result, EthFilterError> { trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range"); - let best_number = chain_info.best_number; if to_block < from_block { return Err(EthFilterError::InvalidBlockRangeParams) @@ -467,27 +471,6 @@ where let mut all_logs = Vec::new(); let filter_params = FilteredParams::new(Some(filter.clone())); - if (to_block == best_number) && (from_block == best_number) { - // only one block to check and it's the current best block which we can fetch directly - // Note: In case of a reorg, the best block's hash might have changed, hence we only - // return early of we were able to fetch the best block's receipts - // perf: we're fetching the best block here which is expected to be cached - if let Some((block, receipts)) = - self.eth_cache.get_block_and_receipts(chain_info.best_hash).await? - { - logs_utils::append_matching_block_logs( - &mut all_logs, - &self.provider, - &filter_params, - chain_info.into(), - &receipts, - false, - block.header.timestamp, - )?; - } - return Ok(all_logs) - } - // derive bloom filters from filter input, so we can check headers for matching logs let address_filter = FilteredParams::address_filter(&filter.address); let topics_filter = FilteredParams::topics_filter(&filter.topics); @@ -514,12 +497,17 @@ where .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?, }; - if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? { + let num_hash = BlockNumHash::new(header.number, block_hash); + if let Some((receipts, maybe_block)) = + self.receipts_and_maybe_block(&num_hash, chain_info.best_number).await? + { append_matching_block_logs( &mut all_logs, - &self.provider, + maybe_block + .map(|block| ProviderOrBlock::Block(block)) + .unwrap_or_else(|| ProviderOrBlock::Provider(&self.provider)), &filter_params, - BlockNumHash::new(header.number, block_hash), + num_hash, &receipts, false, header.timestamp, @@ -540,6 +528,25 @@ where Ok(all_logs) } + + /// Retrieves receipts and block from cache if near the tip (4 blocks), otherwise only receipts. + async fn receipts_and_maybe_block( + &self, + block_num_hash: &BlockNumHash, + best_number: u64, + ) -> Result>, Option)>, EthFilterError> { + // The last 4 blocks are most likely cached, so we can just fetch them + let cached_range = best_number.saturating_sub(4)..=best_number; + let receipts_block = if cached_range.contains(&block_num_hash.number) { + self.eth_cache + .get_block_and_receipts(block_num_hash.hash) + .await? + .map(|(b, r)| (r, Some(b))) + } else { + self.eth_cache.get_receipts(block_num_hash.hash).await?.map(|r| (r, None)) + }; + Ok(receipts_block) + } } /// All active filters