Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add get_in_memory_or_storage_by_tx_range #11414

Merged
merged 8 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,11 @@ impl BlockState {
self.block.clone()
}

/// Returns a reference to the executed block that determines the state.
pub const fn block_ref(&self) -> &ExecutedBlock {
&self.block
}

/// Returns the block with senders for the state.
pub fn block_with_senders(&self) -> BlockWithSenders {
let block = self.block.block().clone();
Expand Down
171 changes: 153 additions & 18 deletions crates/storage/provider/src/providers/blockchain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
.unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
});

if start > end {
return Ok(vec![])
}

// Split range into storage_range and in-memory range. If the in-memory range is not
// necessary drop it early.
//
Expand Down Expand Up @@ -240,6 +244,103 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
Ok(self.canonical_in_memory_state.state_provider_from_state(state, latest_historical))
}

/// Fetches data from either in-memory state or persistent storage for a range of transactions.
///
/// * `fetch_from_db`: has a [`DatabaseProviderRO`] and the storage specific range.
/// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
/// [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
fn get_in_memory_or_storage_by_tx_range<S, M, R>(
&self,
range: impl RangeBounds<BlockNumber>,
fetch_from_db: S,
fetch_from_block_state: M,
) -> ProviderResult<Vec<R>>
where
S: FnOnce(
DatabaseProviderRO<N::DB, N::ChainSpec>,
RangeInclusive<TxNumber>,
) -> ProviderResult<Vec<R>>,
M: Fn(RangeInclusive<usize>, Arc<BlockState>) -> ProviderResult<Vec<R>>,
{
let in_mem_chain = self.canonical_in_memory_state.canonical_chain().collect::<Vec<_>>();
let provider = self.database.provider()?;

// Get the last block number stored in the storage which does NOT overlap with in-memory
// chain.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in other words, this ensures that the chain on disk and memory don't overlap but are contiguous

let mut last_database_block_number = provider.last_block_number()?;
if let Some(lowest_in_mem_block) = in_mem_chain.last() {
if lowest_in_mem_block.number() <= last_database_block_number {
last_database_block_number = lowest_in_mem_block.number().saturating_sub(1);
}
}

// Get the next tx number for the last block stored in the storage, which marks the start of
// the in-memory state.
let last_block_body_index = provider
.block_body_indices(last_database_block_number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
mattsse marked this conversation as resolved.
Show resolved Hide resolved
let mut in_memory_tx_num = last_block_body_index.next_tx_num();

let (start, end) = self.convert_range_bounds(range, || {
in_mem_chain
.iter()
.map(|b| b.block_ref().block().body.transactions.len() as u64)
.sum::<u64>() +
last_block_body_index.last_tx_num()
});

if start > end {
return Ok(vec![])
}

let mut tx_range = start..=end;

// If the range is entirely before the first in-memory transaction number, fetch from
// storage
if *tx_range.end() < in_memory_tx_num {
return fetch_from_db(provider, tx_range);
}

let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);

// If the range spans storage and memory, get elements from storage first.
if *tx_range.start() < in_memory_tx_num {
// Determine the range that needs to be fetched from storage.
let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);

// Set the remaining transaction range for in-memory
tx_range = in_memory_tx_num..=*tx_range.end();

items.extend(fetch_from_db(provider, db_range)?);
}

// Iterate from the lowest block to the highest in-memory chain
for block_state in in_mem_chain.into_iter().rev() {
let block_tx_count = block_state.block_ref().block().body.transactions.len();
let remaining = (tx_range.end() - tx_range.start() + 1) as usize;

// This should only be more than 0 in the first iteration, in case of a partial range
let skip = (tx_range.start() - in_memory_tx_num) as usize;

items.extend(fetch_from_block_state(
skip..=(remaining.min(block_tx_count) - 1),
block_state,
)?);

in_memory_tx_num += block_tx_count as u64;

// Break if the range has been fully processed
if in_memory_tx_num > *tx_range.end() {
break
}

// Set updated range
tx_range = in_memory_tx_num..=*tx_range.end();
}

Ok(items)
}

/// Fetches data from either in-memory state or persistent storage by transaction
/// [`HashOrNumber`].
fn get_in_memory_or_storage_by_tx<S, M, R>(
Expand Down Expand Up @@ -805,14 +906,28 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<TransactionSignedNoHash>> {
self.database.transactions_by_tx_range(range)
self.get_in_memory_or_storage_by_tx_range(
range,
|db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
|index_range, block_state| {
Ok(block_state.block_ref().block().body.transactions[index_range]
.iter()
.cloned()
.map(Into::into)
.collect())
},
)
}

fn senders_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Address>> {
self.database.senders_by_tx_range(range)
self.get_in_memory_or_storage_by_tx_range(
range,
|db_provider, db_range| db_provider.senders_by_tx_range(db_range),
|index_range, block_state| Ok(block_state.block_ref().senders[index_range].to_vec()),
)
}

fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
Expand Down Expand Up @@ -878,7 +993,13 @@ impl<N: ProviderNodeTypes> ReceiptProvider for BlockchainProvider2<N> {
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
self.database.receipts_by_tx_range(range)
self.get_in_memory_or_storage_by_tx_range(
range,
|db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
|index_range, block_state| {
Ok(block_state.executed_block_receipts().drain(index_range).collect())
},
)
}
}

Expand Down Expand Up @@ -4064,34 +4185,48 @@ mod tests {
#[test]
fn test_senders_by_tx_range() -> eyre::Result<()> {
let mut rng = generators::rng();
let (provider, database_blocks, _, _) = provider_with_random_blocks(
let (provider, database_blocks, in_memory_blocks, _) = provider_with_random_blocks(
&mut rng,
TEST_BLOCKS_COUNT,
0,
TEST_BLOCKS_COUNT,
BlockRangeParams {
tx_count: TEST_TRANSACTIONS_COUNT..TEST_TRANSACTIONS_COUNT,
..Default::default()
},
)?;

// Define a valid transaction range within the database
let start_tx_num = 0;
let end_tx_num = 1;
let db_tx_count =
database_blocks.iter().map(|b| b.body.transactions.len()).sum::<usize>() as u64;
let in_mem_tx_count =
in_memory_blocks.iter().map(|b| b.body.transactions.len()).sum::<usize>() as u64;

// Retrieve the senders for this transaction number range
let result = provider.senders_by_tx_range(start_tx_num..=end_tx_num)?;
let db_range = 0..=(db_tx_count - 1);
let in_mem_range = db_tx_count..=(in_mem_tx_count + db_range.end());

// Ensure the sender addresses match the expected addresses in the database
assert_eq!(result.len(), 2);
// Retrieve the senders for the whole database range
let database_senders =
database_blocks.iter().flat_map(|b| b.senders().unwrap()).collect::<Vec<_>>();
assert_eq!(provider.senders_by_tx_range(db_range)?, database_senders);

// Retrieve the senders for the whole in-memory range
let in_memory_senders =
in_memory_blocks.iter().flat_map(|b| b.senders().unwrap()).collect::<Vec<_>>();
assert_eq!(provider.senders_by_tx_range(in_mem_range.clone())?, in_memory_senders);

// Retrieve the senders for a partial in-memory range
assert_eq!(
result[0],
database_blocks[0].senders().unwrap()[0],
"The sender address should match the expected sender address"
&provider.senders_by_tx_range(in_mem_range.start() + 1..=in_mem_range.end() - 1)?,
&in_memory_senders[1..in_memory_senders.len() - 1]
);

// Retrieve the senders for a range that spans database and in-memory
assert_eq!(
result[1],
database_blocks[0].senders().unwrap()[1],
"The sender address should match the expected sender address"
provider.senders_by_tx_range(in_mem_range.start() - 2..=in_mem_range.end() - 1)?,
database_senders[database_senders.len() - 2..]
.iter()
.chain(&in_memory_senders[..in_memory_senders.len() - 1])
.copied()
.collect::<Vec<_>>()
);

// Define an empty range that should return no sender addresses
Expand Down
Loading