From dee28e31babd720bc2afbb6f372d0afcd0e6e6e9 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 2 Oct 2024 12:05:27 +0200 Subject: [PATCH 1/8] add and use get_in_memory_or_storage_by_tx_range --- crates/chain-state/src/in_memory.rs | 5 + .../src/providers/blockchain_provider.rs | 121 +++++++++++++++++- 2 files changed, 123 insertions(+), 3 deletions(-) diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index 03ce660c6057..f2a73d27fa21 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -618,6 +618,11 @@ impl BlockState { self.block.clone() } + /// Returns a reference to the executed block that determines the state. + pub const fn block_ref(&self) -> &ExecutedBlock { + &self.block + } + /// Returns the block with senders for the state. pub fn block_with_senders(&self) -> BlockWithSenders { let block = self.block.block().clone(); diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 100646b0ed21..bcf10fa0a421 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -240,6 +240,92 @@ impl BlockchainProvider2 { Ok(self.canonical_in_memory_state.state_provider_from_state(state, latest_historical)) } + /// Fetches data from either in-memory state or persistent storage for a range of transactions. + /// + /// * `fetch_from_db`: has a [`DatabaseProviderRO`] and the storage specific range. + /// * `fetch_from_block_state`: has the number of elements that should be fetched from + /// [`BlockState`]. + fn get_in_memory_or_storage_by_tx_range( + &self, + range: impl RangeBounds, + fetch_from_db: S, + fetch_from_block_state: M, + ) -> ProviderResult> + where + S: FnOnce( + DatabaseProviderRO, + RangeInclusive, + ) -> ProviderResult>, + M: Fn(usize, Arc) -> ProviderResult>, + { + let in_mem_chain = self.canonical_in_memory_state.canonical_chain().collect::>(); + let provider = self.database.provider()?; + + // Get the last block number stored in the storage which does NOT overlap with in-memory + // chain. + let mut last_database_block_number = provider.last_block_number()?; + if let Some(lowest_in_mem_block) = in_mem_chain.last() { + if lowest_in_mem_block.number() <= last_database_block_number { + last_database_block_number = lowest_in_mem_block.number().saturating_sub(1); + } + } + + // Get the next tx number for the last block stored in the storage, which marks the start of + // the in-memory state. + let last_block_body_index = provider + .block_body_indices(last_database_block_number)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?; + let mut in_memory_tx_num = last_block_body_index.next_tx_num(); + + let (start, end) = self.convert_range_bounds(range, || { + in_mem_chain + .iter() + .map(|b| b.block_ref().block().body.transactions.len() as u64) + .sum::() + + last_block_body_index.last_tx_num() + }); + let mut tx_range = start..=end; + + // If the range is entirely before the first in-memory transaction number, fetch from + // storage + if *tx_range.end() < in_memory_tx_num { + return fetch_from_db(provider, tx_range); + } + + let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize); + + // If the range spans storage and memory, get elements from storage first. + if *tx_range.start() < in_memory_tx_num { + // Determine the range that needs to be fetched from storage. + let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1); + + // Set the remaining transaction range for in-memory + tx_range = in_memory_tx_num..=*tx_range.end(); + + items.extend(fetch_from_db(provider, db_range)?); + } + + // Iterate from the lowest block to the highest in-memory chain + for block_state in in_mem_chain.into_iter().rev() { + let block_tx_count = block_state.block_ref().block().body.transactions.len(); + let remaining = (tx_range.end() - tx_range.start() + 1) as usize; + + items.extend(fetch_from_block_state(remaining.min(block_tx_count), block_state)?); + + in_memory_tx_num += block_tx_count as u64; + + // Break if the range has been fully processed + if in_memory_tx_num > *tx_range.end() { + break + } + + // Set updated range + tx_range = (tx_range.start() + block_tx_count as u64)..=*tx_range.end(); + } + + Ok(items) + } + /// Fetches data from either in-memory state or persistent storage by transaction /// [`HashOrNumber`]. fn get_in_memory_or_storage_by_tx( @@ -805,14 +891,35 @@ impl TransactionsProvider for BlockchainProvider2 { &self, range: impl RangeBounds, ) -> ProviderResult> { - self.database.transactions_by_tx_range(range) + self.get_in_memory_or_storage_by_tx_range( + range, + |db_provider, db_range| db_provider.transactions_by_tx_range(db_range), + |num_items, block_state| { + Ok(block_state + .block_ref() + .block() + .body + .transactions + .iter() + .take(num_items) + .cloned() + .map(Into::into) + .collect()) + }, + ) } fn senders_by_tx_range( &self, range: impl RangeBounds, ) -> ProviderResult> { - self.database.senders_by_tx_range(range) + self.get_in_memory_or_storage_by_tx_range( + range, + |db_provider, db_range| db_provider.senders_by_tx_range(db_range), + |num_items, block_state| { + Ok(block_state.block_ref().senders.iter().take(num_items).copied().collect()) + }, + ) } fn transaction_sender(&self, id: TxNumber) -> ProviderResult> { @@ -878,7 +985,15 @@ impl ReceiptProvider for BlockchainProvider2 { &self, range: impl RangeBounds, ) -> ProviderResult> { - self.database.receipts_by_tx_range(range) + self.get_in_memory_or_storage_by_tx_range( + range, + |db_provider, db_range| db_provider.receipts_by_tx_range(db_range), + |num_items, block_state| { + let mut receipts = block_state.executed_block_receipts(); + receipts.truncate(num_items); + Ok(receipts) + }, + ) } } From 5d334ed0021feae75b9cb4fbf46954638d86a573 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 2 Oct 2024 12:26:48 +0200 Subject: [PATCH 2/8] verify end range is not smaller than start --- .../provider/src/providers/blockchain_provider.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index bcf10fa0a421..132e68858927 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -167,6 +167,10 @@ impl BlockchainProvider2 { .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default()) }); + if start > end { + return Ok(vec![]) + } + // Split range into storage_range and in-memory range. If the in-memory range is not // necessary drop it early. // @@ -284,6 +288,11 @@ impl BlockchainProvider2 { .sum::() + last_block_body_index.last_tx_num() }); + + if start > end { + return Ok(vec![]) + } + let mut tx_range = start..=end; // If the range is entirely before the first in-memory transaction number, fetch from From 408ada8ce6f4553e15b83af593bc4b36cc5d3a5a Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 2 Oct 2024 12:33:24 +0200 Subject: [PATCH 3/8] fmt --- crates/storage/provider/src/providers/blockchain_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 132e68858927..58c93835da4e 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -288,7 +288,7 @@ impl BlockchainProvider2 { .sum::() + last_block_body_index.last_tx_num() }); - + if start > end { return Ok(vec![]) } From 21893e22dd99cf6e90b1839cdf811550bf99604c Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 2 Oct 2024 13:30:51 +0200 Subject: [PATCH 4/8] add tests --- .../src/providers/blockchain_provider.rs | 79 +++++++++++-------- 1 file changed, 46 insertions(+), 33 deletions(-) diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 58c93835da4e..64eed98c7a21 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -247,8 +247,8 @@ impl BlockchainProvider2 { /// Fetches data from either in-memory state or persistent storage for a range of transactions. /// /// * `fetch_from_db`: has a [`DatabaseProviderRO`] and the storage specific range. - /// * `fetch_from_block_state`: has the number of elements that should be fetched from - /// [`BlockState`]. + /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from + /// [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block. fn get_in_memory_or_storage_by_tx_range( &self, range: impl RangeBounds, @@ -260,7 +260,7 @@ impl BlockchainProvider2 { DatabaseProviderRO, RangeInclusive, ) -> ProviderResult>, - M: Fn(usize, Arc) -> ProviderResult>, + M: Fn(RangeInclusive, Arc) -> ProviderResult>, { let in_mem_chain = self.canonical_in_memory_state.canonical_chain().collect::>(); let provider = self.database.provider()?; @@ -319,7 +319,13 @@ impl BlockchainProvider2 { let block_tx_count = block_state.block_ref().block().body.transactions.len(); let remaining = (tx_range.end() - tx_range.start() + 1) as usize; - items.extend(fetch_from_block_state(remaining.min(block_tx_count), block_state)?); + // This should only be more than 0 in the first iteration, in case of a partial range + let skip = (tx_range.start() - in_memory_tx_num) as usize; + + items.extend(fetch_from_block_state( + skip..=(remaining.min(block_tx_count) - 1), + block_state, + )?); in_memory_tx_num += block_tx_count as u64; @@ -329,7 +335,7 @@ impl BlockchainProvider2 { } // Set updated range - tx_range = (tx_range.start() + block_tx_count as u64)..=*tx_range.end(); + tx_range = in_memory_tx_num..=*tx_range.end(); } Ok(items) @@ -903,14 +909,9 @@ impl TransactionsProvider for BlockchainProvider2 { self.get_in_memory_or_storage_by_tx_range( range, |db_provider, db_range| db_provider.transactions_by_tx_range(db_range), - |num_items, block_state| { - Ok(block_state - .block_ref() - .block() - .body - .transactions + |index_range, block_state| { + Ok(block_state.block_ref().block().body.transactions[index_range] .iter() - .take(num_items) .cloned() .map(Into::into) .collect()) @@ -925,8 +926,8 @@ impl TransactionsProvider for BlockchainProvider2 { self.get_in_memory_or_storage_by_tx_range( range, |db_provider, db_range| db_provider.senders_by_tx_range(db_range), - |num_items, block_state| { - Ok(block_state.block_ref().senders.iter().take(num_items).copied().collect()) + |index_range, block_state| { + Ok(block_state.block_ref().senders[index_range].iter().copied().collect()) }, ) } @@ -997,10 +998,8 @@ impl ReceiptProvider for BlockchainProvider2 { self.get_in_memory_or_storage_by_tx_range( range, |db_provider, db_range| db_provider.receipts_by_tx_range(db_range), - |num_items, block_state| { - let mut receipts = block_state.executed_block_receipts(); - receipts.truncate(num_items); - Ok(receipts) + |index_range, block_state| { + Ok(block_state.executed_block_receipts().drain(index_range).collect()) }, ) } @@ -4188,34 +4187,48 @@ mod tests { #[test] fn test_senders_by_tx_range() -> eyre::Result<()> { let mut rng = generators::rng(); - let (provider, database_blocks, _, _) = provider_with_random_blocks( + let (provider, database_blocks, in_memory_blocks, _) = provider_with_random_blocks( &mut rng, TEST_BLOCKS_COUNT, - 0, + TEST_BLOCKS_COUNT, BlockRangeParams { tx_count: TEST_TRANSACTIONS_COUNT..TEST_TRANSACTIONS_COUNT, ..Default::default() }, )?; - // Define a valid transaction range within the database - let start_tx_num = 0; - let end_tx_num = 1; + let db_tx_count = + database_blocks.iter().map(|b| b.body.transactions.len()).sum::() as u64; + let in_mem_tx_count = + in_memory_blocks.iter().map(|b| b.body.transactions.len()).sum::() as u64; - // Retrieve the senders for this transaction number range - let result = provider.senders_by_tx_range(start_tx_num..=end_tx_num)?; + let db_range = 0..=(db_tx_count - 1); + let in_mem_range = db_tx_count..=(in_mem_tx_count + db_range.end()); - // Ensure the sender addresses match the expected addresses in the database - assert_eq!(result.len(), 2); + // Retrieve the senders for the whole database range + let database_senders = + database_blocks.iter().flat_map(|b| b.senders().unwrap()).collect::>(); + assert_eq!(provider.senders_by_tx_range(db_range)?, database_senders); + + // Retrieve the senders for the whole in-memory range + let in_memory_senders = + in_memory_blocks.iter().flat_map(|b| b.senders().unwrap()).collect::>(); + assert_eq!(provider.senders_by_tx_range(in_mem_range.clone())?, in_memory_senders); + + // Retrieve the senders for a partial in-memory range assert_eq!( - result[0], - database_blocks[0].senders().unwrap()[0], - "The sender address should match the expected sender address" + &provider.senders_by_tx_range(in_mem_range.start() + 1..=in_mem_range.end() - 1)?, + &in_memory_senders[1..in_memory_senders.len() - 1] ); + + // Retrieve the senders for a range that spans database and in-memory assert_eq!( - result[1], - database_blocks[0].senders().unwrap()[1], - "The sender address should match the expected sender address" + provider.senders_by_tx_range(in_mem_range.start() - 2..=in_mem_range.end() - 1)?, + database_senders[database_senders.len() - 2..] + .iter() + .chain(&in_memory_senders[..in_memory_senders.len() - 1]) + .copied() + .collect::>() ); // Define an empty range that should return no sender addresses From 76224ec6809c57c2bd5b3507e9c8375dfdb1fefd Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 2 Oct 2024 13:32:36 +0200 Subject: [PATCH 5/8] use to_vec --- crates/storage/provider/src/providers/blockchain_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 64eed98c7a21..b488bf877e40 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -927,7 +927,7 @@ impl TransactionsProvider for BlockchainProvider2 { range, |db_provider, db_range| db_provider.senders_by_tx_range(db_range), |index_range, block_state| { - Ok(block_state.block_ref().senders[index_range].iter().copied().collect()) + Ok(block_state.block_ref().senders[index_range].to_vec()) }, ) } From b9f9765b200f598faf04b1626caf87c8f6ee616d Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 2 Oct 2024 13:35:24 +0200 Subject: [PATCH 6/8] fmt --- crates/storage/provider/src/providers/blockchain_provider.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index b488bf877e40..dbf0ce6cf2ae 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -926,9 +926,7 @@ impl TransactionsProvider for BlockchainProvider2 { self.get_in_memory_or_storage_by_tx_range( range, |db_provider, db_range| db_provider.senders_by_tx_range(db_range), - |index_range, block_state| { - Ok(block_state.block_ref().senders[index_range].to_vec()) - }, + |index_range, block_state| Ok(block_state.block_ref().senders[index_range].to_vec()), ) } From 29369ac61e1ed04c2af7435b6a2f8374e58263cb Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 2 Oct 2024 19:32:34 +0200 Subject: [PATCH 7/8] Update crates/storage/provider/src/providers/blockchain_provider.rs --- crates/storage/provider/src/providers/blockchain_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index dbf0ce6cf2ae..87f07dba707f 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -278,7 +278,7 @@ impl BlockchainProvider2 { // the in-memory state. let last_block_body_index = provider .block_body_indices(last_database_block_number)? - .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?; + .ok_or_else( || ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?; let mut in_memory_tx_num = last_block_body_index.next_tx_num(); let (start, end) = self.convert_range_bounds(range, || { From 81aae3724d177d10ff8cb26709c05139b1fd3870 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 2 Oct 2024 19:32:58 +0200 Subject: [PATCH 8/8] rustfmt --- crates/storage/provider/src/providers/blockchain_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 87f07dba707f..af65ab22e928 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -278,7 +278,7 @@ impl BlockchainProvider2 { // the in-memory state. let last_block_body_index = provider .block_body_indices(last_database_block_number)? - .ok_or_else( || ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?; + .ok_or_else(|| ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?; let mut in_memory_tx_num = last_block_body_index.next_tx_num(); let (start, end) = self.convert_range_bounds(range, || {