diff --git a/Cargo.lock b/Cargo.lock index 75812e226f1d..ebcedfa92fb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5644,6 +5644,12 @@ dependencies = [ "quick-error", ] +[[package]] +name = "retain_mut" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086" + [[package]] name = "reth" version = "0.1.0-alpha.13" @@ -6290,7 +6296,7 @@ dependencies = [ "ph", "rand 0.8.5", "serde", - "sucds 0.8.1", + "sucds", "tempfile", "thiserror", "tracing", @@ -6351,6 +6357,7 @@ dependencies = [ "alloy-primitives", "alloy-rlp", "alloy-trie", + "anyhow", "arbitrary", "assert_matches", "byteorder", @@ -6377,13 +6384,14 @@ dependencies = [ "reth-rpc-types", "revm", "revm-primitives", + "roaring", "secp256k1 0.27.0", "serde", "serde_json", "sha2", "smallvec", "strum", - "sucds 0.6.0", + "sucds", "tempfile", "test-fuzz", "thiserror", @@ -6942,6 +6950,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "roaring" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6106b5cf8587f5834158895e9715a3c6c9716c8aefab57f1f7680917191c7873" +dependencies = [ + "bytemuck", + "byteorder", + "retain_mut", +] + [[package]] name = "rolling-file" version = "0.2.0" @@ -7789,15 +7808,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" -[[package]] -name = "sucds" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64accd20141dfbef67ad83c51d588146cff7810616e1bda35a975be369059533" -dependencies = [ - "anyhow", -] - [[package]] name = "sucds" version = "0.8.1" diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index 9a13b273ed2b..c70119c605ac 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -18,14 +18,14 @@ use reth_config::Config; use reth_db::init_db; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_primitives::ChainSpec; -use reth_provider::{ProviderFactory, StageCheckpointReader}; +use reth_provider::{ProviderFactory, StageCheckpointReader, StageCheckpointWriter}; use reth_stages::{ stages::{ AccountHashingStage, BodyStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage, }, - ExecInput, Stage, StageExt, UnwindInput, + ExecInput, ExecOutput, Stage, StageExt, UnwindInput, UnwindOutput, }; use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc}; use tracing::*; @@ -102,6 +102,10 @@ pub struct Command { // e.g. query the DB size, or any table data. #[arg(long, short)] commit: bool, + + /// Save stage checkpoints + #[arg(long)] + checkpoints: bool, } impl Command { @@ -244,8 +248,12 @@ impl Command { if !self.skip_unwind { while unwind.checkpoint.block_number > self.from { - let unwind_output = unwind_stage.unwind(&provider_rw, unwind)?; - unwind.checkpoint = unwind_output.checkpoint; + let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?; + unwind.checkpoint = checkpoint; + + if self.checkpoints { + provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?; + } if self.commit { provider_rw.commit()?; @@ -261,16 +269,19 @@ impl Command { loop { exec_stage.execute_ready(input).await?; - let output = exec_stage.execute(&provider_rw, input)?; + let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?; - input.checkpoint = Some(output.checkpoint); + input.checkpoint = Some(checkpoint); + if self.checkpoints { + provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?; + } if self.commit { provider_rw.commit()?; provider_rw = factory.provider_rw()?; } - if output.done { + if done { break } } diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index ff4392fa2312..7b0ae868cab2 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -324,12 +324,9 @@ pub fn random_eoa_account(rng: &mut R) -> (Address, Account) { } /// Generate random Externally Owned Accounts -pub fn random_eoa_account_range( - rng: &mut R, - acc_range: Range, -) -> Vec<(Address, Account)> { - let mut accounts = Vec::with_capacity(acc_range.end.saturating_sub(acc_range.start) as usize); - for _ in acc_range { +pub fn random_eoa_accounts(rng: &mut R, accounts_num: usize) -> Vec<(Address, Account)> { + let mut accounts = Vec::with_capacity(accounts_num); + for _ in 0..accounts_num { accounts.push(random_eoa_account(rng)) } accounts diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index 42b0b4e99d3e..4f22af52e79c 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -49,11 +49,11 @@ rayon.workspace = true serde.workspace = true serde_json.workspace = true sha2 = "0.10.7" -sucds = "~0.6" tempfile.workspace = true thiserror.workspace = true zstd = { version = "0.12", features = ["experimental"] } ahash.workspace = true +roaring = "0.10.2" # `test-utils` feature hash-db = { version = "~0.15", optional = true } @@ -82,6 +82,9 @@ triehash = "0.8" hash-db = "~0.15" plain_hasher = "0.2" +sucds = "0.8.1" +anyhow = "1.0.75" + # necessary so we don't hit a "undeclared 'std'": # https://github.com/paradigmxyz/reth/pull/177#discussion_r1021172198 criterion.workspace = true @@ -122,3 +125,7 @@ harness = false [[bench]] name = "nibbles" harness = false + +[[bench]] +name = "integer_list" +harness = false \ No newline at end of file diff --git a/crates/primitives/benches/integer_list.rs b/crates/primitives/benches/integer_list.rs new file mode 100644 index 000000000000..c07dbaa9d0e7 --- /dev/null +++ b/crates/primitives/benches/integer_list.rs @@ -0,0 +1,250 @@ +#![allow(missing_docs)] +use criterion::{black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; +use rand::prelude::*; + +pub fn new_pre_sorted(c: &mut Criterion) { + let mut group = c.benchmark_group("new_pre_sorted"); + + for delta in [1, 100, 1000, 10000] { + let integers_usize = generate_integers(2000, delta); + assert_eq!(integers_usize.len(), 2000); + + let integers_u64 = integers_usize.iter().map(|v| *v as u64).collect::>(); + assert_eq!(integers_u64.len(), 2000); + + group.bench_function(BenchmarkId::new("Elias-Fano", delta), |b| { + b.iter(|| elias_fano::IntegerList::new_pre_sorted(black_box(&integers_usize))); + }); + + group.bench_function(BenchmarkId::new("Roaring Bitmaps", delta), |b| { + b.iter(|| reth_primitives::IntegerList::new_pre_sorted(black_box(&integers_u64))); + }); + } +} + +pub fn rank_select(c: &mut Criterion) { + let mut group = c.benchmark_group("rank + select"); + + for delta in [1, 100, 1000, 10000] { + let integers_usize = generate_integers(2000, delta); + assert_eq!(integers_usize.len(), 2000); + + let integers_u64 = integers_usize.iter().map(|v| *v as u64).collect::>(); + assert_eq!(integers_u64.len(), 2000); + + group.bench_function(BenchmarkId::new("Elias-Fano", delta), |b| { + b.iter_batched( + || { + let (index, element) = + integers_usize.iter().enumerate().choose(&mut thread_rng()).unwrap(); + (elias_fano::IntegerList::new_pre_sorted(&integers_usize).0, index, *element) + }, + |(list, index, element)| { + let list = list.enable_rank(); + list.rank(element); + list.select(index); + }, + BatchSize::PerIteration, + ); + }); + + group.bench_function(BenchmarkId::new("Roaring Bitmaps", delta), |b| { + b.iter_batched( + || { + let (index, element) = + integers_u64.iter().enumerate().choose(&mut thread_rng()).unwrap(); + ( + reth_primitives::IntegerList::new_pre_sorted(&integers_u64), + index as u64, + *element, + ) + }, + |(list, index, element)| { + list.rank(element); + list.select(index); + }, + BatchSize::PerIteration, + ); + }); + } +} + +fn generate_integers(n: usize, delta: usize) -> Vec { + (0..n).fold(Vec::new(), |mut vec, _| { + vec.push(vec.last().map_or(0, |last| { + last + thread_rng().gen_range(delta - delta / 2..=delta + delta / 2) + })); + vec + }) +} + +criterion_group! { + name = benches; + config = Criterion::default(); + targets = new_pre_sorted, rank_select +} +criterion_main!(benches); + +/// Implementation from https://github.com/paradigmxyz/reth/blob/cda5d4e7c53ccc898b7725eb5d3b46c35e4da7f8/crates/primitives/src/integer_list.rs +/// adapted to work with `sucds = "0.8.1"` +#[allow(unused, unreachable_pub)] +mod elias_fano { + use std::{fmt, ops::Deref}; + use sucds::{mii_sequences::EliasFano, Serializable}; + + #[derive(Clone, PartialEq, Eq, Default)] + pub struct IntegerList(pub EliasFano); + + impl Deref for IntegerList { + type Target = EliasFano; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl fmt::Debug for IntegerList { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let vec: Vec = self.0.iter(0).collect(); + write!(f, "IntegerList {:?}", vec) + } + } + + impl IntegerList { + /// Creates an IntegerList from a list of integers. `usize` is safe to use since + /// [`sucds::EliasFano`] restricts its compilation to 64bits. + /// + /// # Returns + /// + /// Returns an error if the list is empty or not pre-sorted. + pub fn new>(list: T) -> Result { + let mut builder = EliasFanoBuilder::new( + list.as_ref().iter().max().map_or(0, |max| max + 1), + list.as_ref().len(), + )?; + builder.extend(list.as_ref().iter().copied()); + Ok(Self(builder.build())) + } + + // Creates an IntegerList from a pre-sorted list of integers. `usize` is safe to use since + /// [`sucds::EliasFano`] restricts its compilation to 64bits. + /// + /// # Panics + /// + /// Panics if the list is empty or not pre-sorted. + pub fn new_pre_sorted>(list: T) -> Self { + Self::new(list).expect("IntegerList must be pre-sorted and non-empty.") + } + + /// Serializes a [`IntegerList`] into a sequence of bytes. + pub fn to_bytes(&self) -> Vec { + let mut vec = Vec::with_capacity(self.0.size_in_bytes()); + self.0.serialize_into(&mut vec).expect("not able to encode integer list."); + vec + } + + /// Serializes a [`IntegerList`] into a sequence of bytes. + pub fn to_mut_bytes(&self, buf: &mut B) { + let len = self.0.size_in_bytes(); + let mut vec = Vec::with_capacity(len); + self.0.serialize_into(&mut vec).unwrap(); + buf.put_slice(vec.as_slice()); + } + + /// Deserializes a sequence of bytes into a proper [`IntegerList`]. + pub fn from_bytes(data: &[u8]) -> Result { + Ok(Self( + EliasFano::deserialize_from(data).map_err(|_| EliasFanoError::FailedDeserialize)?, + )) + } + } + + macro_rules! impl_uint { + ($($w:tt),+) => { + $( + impl From> for IntegerList { + fn from(v: Vec<$w>) -> Self { + let v: Vec = v.iter().map(|v| *v as usize).collect(); + Self::new(v.as_slice()).expect("could not create list.") + } + } + )+ + }; + } + + impl_uint!(usize, u64, u32, u8, u16); + + impl Serialize for IntegerList { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let vec = self.0.iter(0).collect::>(); + let mut seq = serializer.serialize_seq(Some(self.len()))?; + for e in vec { + seq.serialize_element(&e)?; + } + seq.end() + } + } + + struct IntegerListVisitor; + impl<'de> Visitor<'de> for IntegerListVisitor { + type Value = IntegerList; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("a usize array") + } + + fn visit_seq(self, mut seq: E) -> Result + where + E: SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(item) = seq.next_element()? { + list.push(item); + } + + IntegerList::new(list) + .map_err(|_| serde::de::Error::invalid_value(Unexpected::Seq, &self)) + } + } + + impl<'de> Deserialize<'de> for IntegerList { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_byte_buf(IntegerListVisitor) + } + } + + #[cfg(any(test, feature = "arbitrary"))] + use arbitrary::{Arbitrary, Unstructured}; + use serde::{ + de::{SeqAccess, Unexpected, Visitor}, + ser::SerializeSeq, + Deserialize, Deserializer, Serialize, Serializer, + }; + use sucds::mii_sequences::EliasFanoBuilder; + + #[cfg(any(test, feature = "arbitrary"))] + impl<'a> Arbitrary<'a> for IntegerList { + fn arbitrary(u: &mut Unstructured<'a>) -> Result { + let mut nums: Vec = Vec::arbitrary(u)?; + nums.sort(); + Self::new(&nums).map_err(|_| arbitrary::Error::IncorrectFormat) + } + } + + /// Primitives error type. + #[derive(Debug, thiserror::Error)] + pub enum EliasFanoError { + /// The provided input is invalid. + #[error(transparent)] + InvalidInput(#[from] anyhow::Error), + /// Failed to deserialize data into type. + #[error("failed to deserialize data into type")] + FailedDeserialize, + } +} diff --git a/crates/primitives/src/integer_list.rs b/crates/primitives/src/integer_list.rs index 90f53a27d5f2..4d128c87dac4 100644 --- a/crates/primitives/src/integer_list.rs +++ b/crates/primitives/src/integer_list.rs @@ -1,18 +1,19 @@ +use bytes::BufMut; +use roaring::RoaringTreemap; use serde::{ de::{SeqAccess, Unexpected, Visitor}, ser::SerializeSeq, Deserialize, Deserializer, Serialize, Serializer, }; use std::{fmt, ops::Deref}; -use sucds::{EliasFano, Searial}; -/// Uses EliasFano to hold a list of integers. It provides really good compression with the +/// Uses Roaring Bitmaps to hold a list of integers. It provides really good compression with the /// capability to access its elements without decoding it. -#[derive(Clone, PartialEq, Eq, Default)] -pub struct IntegerList(pub EliasFano); +#[derive(Clone, PartialEq, Default)] +pub struct IntegerList(pub RoaringTreemap); impl Deref for IntegerList { - type Target = EliasFano; + type Target = RoaringTreemap; fn deref(&self) -> &Self::Target { &self.0 @@ -21,53 +22,54 @@ impl Deref for IntegerList { impl fmt::Debug for IntegerList { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let vec: Vec = self.0.iter(0).collect(); + let vec: Vec = self.0.iter().collect(); write!(f, "IntegerList {:?}", vec) } } impl IntegerList { - /// Creates an IntegerList from a list of integers. `usize` is safe to use since - /// [`sucds::EliasFano`] restricts its compilation to 64bits. + /// Creates an IntegerList from a list of integers. /// /// # Returns /// /// Returns an error if the list is empty or not pre-sorted. - pub fn new>(list: T) -> Result { - Ok(Self(EliasFano::from_ints(list.as_ref()).map_err(|_| EliasFanoError::InvalidInput)?)) + pub fn new>(list: T) -> Result { + Ok(Self( + RoaringTreemap::from_sorted_iter(list.as_ref().iter().copied()) + .map_err(|_| RoaringBitmapError::InvalidInput)?, + )) } - // Creates an IntegerList from a pre-sorted list of integers. `usize` is safe to use since - /// [`sucds::EliasFano`] restricts its compilation to 64bits. + // Creates an IntegerList from a pre-sorted list of integers. /// /// # Panics /// /// Panics if the list is empty or not pre-sorted. - pub fn new_pre_sorted>(list: T) -> Self { + pub fn new_pre_sorted>(list: T) -> Self { Self( - EliasFano::from_ints(list.as_ref()) - .expect("IntegerList must be pre-sorted and non-empty."), + RoaringTreemap::from_sorted_iter(list.as_ref().iter().copied()) + .expect("IntegerList must be pre-sorted and non-empty"), ) } /// Serializes a [`IntegerList`] into a sequence of bytes. pub fn to_bytes(&self) -> Vec { - let mut vec = Vec::with_capacity(self.0.size_in_bytes()); - self.0.serialize_into(&mut vec).expect("not able to encode integer list."); + let mut vec = Vec::with_capacity(self.0.serialized_size()); + self.0.serialize_into(&mut vec).expect("not able to encode IntegerList"); vec } /// Serializes a [`IntegerList`] into a sequence of bytes. pub fn to_mut_bytes(&self, buf: &mut B) { - let len = self.0.size_in_bytes(); - let mut vec = Vec::with_capacity(len); - self.0.serialize_into(&mut vec).unwrap(); - buf.put_slice(vec.as_slice()); + self.0.serialize_into(buf.writer()).unwrap(); } /// Deserializes a sequence of bytes into a proper [`IntegerList`]. - pub fn from_bytes(data: &[u8]) -> Result { - Ok(Self(EliasFano::deserialize_from(data).map_err(|_| EliasFanoError::FailedDeserialize)?)) + pub fn from_bytes(data: &[u8]) -> Result { + Ok(Self( + RoaringTreemap::deserialize_from(data) + .map_err(|_| RoaringBitmapError::FailedToDeserialize)?, + )) } } @@ -76,8 +78,7 @@ macro_rules! impl_uint { $( impl From> for IntegerList { fn from(v: Vec<$w>) -> Self { - let v: Vec = v.iter().map(|v| *v as usize).collect(); - Self(EliasFano::from_ints(v.as_slice()).expect("could not create list.")) + Self::new_pre_sorted(v.iter().map(|v| *v as u64).collect::>()) } } )+ @@ -91,8 +92,8 @@ impl Serialize for IntegerList { where S: Serializer, { - let vec = self.0.iter(0).collect::>(); - let mut seq = serializer.serialize_seq(Some(self.len()))?; + let vec = self.0.iter().collect::>(); + let mut seq = serializer.serialize_seq(Some(self.len() as usize))?; for e in vec { seq.serialize_element(&e)?; } @@ -136,21 +137,21 @@ use arbitrary::{Arbitrary, Unstructured}; #[cfg(any(test, feature = "arbitrary"))] impl<'a> Arbitrary<'a> for IntegerList { fn arbitrary(u: &mut Unstructured<'a>) -> Result { - let mut nums: Vec = Vec::arbitrary(u)?; + let mut nums: Vec = Vec::arbitrary(u)?; nums.sort(); - Ok(Self(EliasFano::from_ints(&nums).map_err(|_| arbitrary::Error::IncorrectFormat)?)) + Self::new(nums).map_err(|_| arbitrary::Error::IncorrectFormat) } } /// Primitives error type. #[derive(Debug, thiserror::Error)] -pub enum EliasFanoError { +pub enum RoaringBitmapError { /// The provided input is invalid. #[error("the provided input is invalid")] InvalidInput, /// Failed to deserialize data into type. #[error("failed to deserialize data into type")] - FailedDeserialize, + FailedToDeserialize, } #[cfg(test)] @@ -161,7 +162,7 @@ mod test { fn test_integer_list() { let original_list = [1, 2, 3]; let ef_list = IntegerList::new(original_list).unwrap(); - assert_eq!(ef_list.iter(0).collect::>(), original_list); + assert_eq!(ef_list.iter().collect::>(), original_list); } #[test] diff --git a/crates/prune/src/segments/account_history.rs b/crates/prune/src/segments/account_history.rs index bfebad1a95c0..592552109c28 100644 --- a/crates/prune/src/segments/account_history.rs +++ b/crates/prune/src/segments/account_history.rs @@ -86,7 +86,7 @@ mod tests { use reth_db::{tables, BlockNumberList}; use reth_interfaces::test_utils::{ generators, - generators::{random_block_range, random_changeset_range, random_eoa_account_range}, + generators::{random_block_range, random_changeset_range, random_eoa_accounts}, }; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; use reth_provider::PruneCheckpointReader; @@ -101,8 +101,7 @@ mod tests { let blocks = random_block_range(&mut rng, 1..=5000, B256::ZERO, 0..1); db.insert_blocks(blocks.iter(), None).expect("insert blocks"); - let accounts = - random_eoa_account_range(&mut rng, 0..2).into_iter().collect::>(); + let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::>(); let (changesets, _) = random_changeset_range( &mut rng, @@ -213,8 +212,8 @@ mod tests { .filter(|(key, _)| key.highest_block_number > last_pruned_block_number) .map(|(key, blocks)| { let new_blocks = blocks - .iter(0) - .skip_while(|block| *block <= last_pruned_block_number as usize) + .iter() + .skip_while(|block| *block <= last_pruned_block_number) .collect::>(); (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) }) diff --git a/crates/prune/src/segments/history.rs b/crates/prune/src/segments/history.rs index 4836eeb84154..0f712e79e12b 100644 --- a/crates/prune/src/segments/history.rs +++ b/crates/prune/src/segments/history.rs @@ -54,11 +54,11 @@ where // contain the target block number, as it's in this shard. else { let new_blocks = - blocks.iter(0).skip_while(|block| *block <= to_block as usize).collect::>(); + blocks.iter().skip_while(|block| *block <= to_block).collect::>(); // If there were blocks less than or equal to the target one // (so the shard has changed), update the shard. - if blocks.len() != new_blocks.len() { + if blocks.len() as usize != new_blocks.len() { // If there are no more blocks in this shard, we need to remove it, as empty // shards are not allowed. if new_blocks.is_empty() { diff --git a/crates/prune/src/segments/storage_history.rs b/crates/prune/src/segments/storage_history.rs index 45713760c7da..286c5695e6d4 100644 --- a/crates/prune/src/segments/storage_history.rs +++ b/crates/prune/src/segments/storage_history.rs @@ -90,7 +90,7 @@ mod tests { use reth_db::{tables, BlockNumberList}; use reth_interfaces::test_utils::{ generators, - generators::{random_block_range, random_changeset_range, random_eoa_account_range}, + generators::{random_block_range, random_changeset_range, random_eoa_accounts}, }; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; use reth_provider::PruneCheckpointReader; @@ -105,14 +105,13 @@ mod tests { let blocks = random_block_range(&mut rng, 0..=5000, B256::ZERO, 0..1); db.insert_blocks(blocks.iter(), None).expect("insert blocks"); - let accounts = - random_eoa_account_range(&mut rng, 0..2).into_iter().collect::>(); + let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::>(); let (changesets, _) = random_changeset_range( &mut rng, blocks.iter(), accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), - 2..3, + 1..2, 1..2, ); db.insert_changesets(changesets.clone(), None).expect("insert changesets"); @@ -144,7 +143,7 @@ mod tests { .get_prune_checkpoint(PruneSegment::StorageHistory) .unwrap(), to_block, - delete_limit: 2000, + delete_limit: 1000, }; let segment = StorageHistory::new(prune_mode); @@ -219,8 +218,8 @@ mod tests { .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number) .map(|(key, blocks)| { let new_blocks = blocks - .iter(0) - .skip_while(|block| *block <= last_pruned_block_number as usize) + .iter() + .skip_while(|block| *block <= last_pruned_block_number) .collect::>(); (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) }) @@ -242,8 +241,8 @@ mod tests { ); }; - test_prune(998, 1, (false, 1000)); - test_prune(998, 2, (true, 998)); - test_prune(1400, 3, (true, 804)); + test_prune(998, 1, (false, 500)); + test_prune(998, 2, (true, 499)); + test_prune(1200, 3, (true, 202)); } } diff --git a/crates/stages/benches/setup/account_hashing.rs b/crates/stages/benches/setup/account_hashing.rs index 497dce2787f4..569481c2a410 100644 --- a/crates/stages/benches/setup/account_hashing.rs +++ b/crates/stages/benches/setup/account_hashing.rs @@ -56,7 +56,7 @@ fn find_stage_range(db: &Path) -> StageRange { } fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) { - let opts = SeedOpts { blocks: 0..=num_blocks, accounts: 0..100_000, txs: 100..150 }; + let opts = SeedOpts { blocks: 0..=num_blocks, accounts: 100_000, txs: 100..150 }; let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("account-hashing-bench"); diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index f6322cc50d3f..e7a1ef4bca46 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -11,7 +11,7 @@ use reth_interfaces::test_utils::{ generators, generators::{ random_block_range, random_changeset_range, random_contract_account_range, - random_eoa_account_range, + random_eoa_accounts, }, }; use reth_primitives::{fs, Account, Address, SealedBlock, B256, U256}; @@ -108,7 +108,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { let db = TestStageDB::new(&path); let accounts: BTreeMap = concat([ - random_eoa_account_range(&mut rng, 0..n_eoa), + random_eoa_accounts(&mut rng, n_eoa), random_contract_account_range(&mut rng, &mut (0..n_contract)), ]) .into_iter() diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index ffe5acdb9183..bdd03acc5c11 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -65,8 +65,8 @@ impl Default for AccountHashingStage { pub struct SeedOpts { /// The range of blocks to be generated pub blocks: RangeInclusive, - /// The range of accounts to be generated - pub accounts: Range, + /// The number of accounts to be generated + pub accounts: usize, /// The range of transactions to be generated per block. pub txs: Range, } @@ -85,7 +85,7 @@ impl AccountHashingStage { use reth_db::models::AccountBeforeTx; use reth_interfaces::test_utils::{ generators, - generators::{random_block_range, random_eoa_account_range}, + generators::{random_block_range, random_eoa_accounts}, }; use reth_primitives::{Account, B256, U256}; use reth_provider::BlockWriter; @@ -97,7 +97,7 @@ impl AccountHashingStage { for block in blocks { provider.insert_block(block.try_seal_with_senders().unwrap(), None).unwrap(); } - let mut accounts = random_eoa_account_range(&mut rng, opts.accounts); + let mut accounts = random_eoa_accounts(&mut rng, opts.accounts); { // Account State generator let mut account_cursor = @@ -533,7 +533,7 @@ mod tests { let provider = self.db.factory.provider_rw()?; let res = Ok(AccountHashingStage::seed( &provider, - SeedOpts { blocks: 1..=input.target(), accounts: 0..10, txs: 0..3 }, + SeedOpts { blocks: 1..=input.target(), accounts: 10, txs: 0..3 }, ) .unwrap()); provider.commit().expect("failed to commit"); diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 355a63a7d5c2..c4d9abcffc66 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -127,6 +127,9 @@ mod tests { const ADDRESS: Address = address!("0000000000000000000000000000000000000001"); + const LAST_BLOCK_IN_FULL_SHARD: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber; + const MAX_BLOCK: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber + 2; + fn acc() -> AccountBeforeTx { AccountBeforeTx { address: ADDRESS, info: None } } @@ -136,17 +139,17 @@ mod tests { ShardedKey { key: ADDRESS, highest_block_number: shard_index } } - fn list(list: &[usize]) -> BlockNumberList { + fn list(list: &[u64]) -> BlockNumberList { BlockNumberList::new(list).unwrap() } fn cast( table: Vec<(ShardedKey
, BlockNumberList)>, - ) -> BTreeMap, Vec> { + ) -> BTreeMap, Vec> { table .into_iter() .map(|(k, v)| { - let v = v.iter(0).collect(); + let v = v.iter().collect(); (k, v) }) .collect() @@ -155,33 +158,29 @@ mod tests { fn partial_setup(db: &TestStageDB) { // setup db.commit(|tx| { - // we just need first and last - tx.put::( - 0, - StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, - ) - .unwrap(); - - tx.put::( - 5, - StoredBlockBodyIndices { tx_count: 5, ..Default::default() }, - ) - .unwrap(); - - // setup changeset that are going to be applied to history index - tx.put::(4, acc()).unwrap(); - tx.put::(5, acc()).unwrap(); + for block in 0..=MAX_BLOCK { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + // setup changeset that is going to be applied to history index + tx.put::(block, acc())?; + } Ok(()) }) .unwrap() } - fn run(db: &TestStageDB, run_to: u64) { - let input = ExecInput { target: Some(run_to), ..Default::default() }; + fn run(db: &TestStageDB, run_to: u64, input_checkpoint: Option) { + let input = ExecInput { + target: Some(run_to), + checkpoint: input_checkpoint + .map(|block_number| StageCheckpoint { block_number, stage_checkpoint: None }), + }; let mut stage = IndexAccountHistoryStage::default(); let provider = db.factory.provider_rw().unwrap(); let out = stage.execute(&provider, input).unwrap(); - assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(run_to), done: true }); provider.commit().unwrap(); } @@ -207,14 +206,14 @@ mod tests { partial_setup(&db); // run - run(&db, 5); + run(&db, 3, None); // verify let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![4, 5])])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])])); // unwind - unwind(&db, 5, 0); + unwind(&db, 3, 0); // verify initial state let table = db.table::().unwrap(); @@ -235,25 +234,26 @@ mod tests { .unwrap(); // run - run(&db, 5); + run(&db, 5, Some(3)); // verify let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3, 4, 5]),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3, 4, 5])])); // unwind - unwind(&db, 5, 0); + unwind(&db, 5, 3); // verify initial state let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3]),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])])); } #[tokio::test] async fn insert_index_to_full_shard() { // init let db = TestStageDB::default(); - let full_list = vec![3; NUM_OF_INDICES_IN_SHARD]; + let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::>(); + assert_eq!(full_list.len(), NUM_OF_INDICES_IN_SHARD); // setup partial_setup(&db); @@ -264,17 +264,20 @@ mod tests { .unwrap(); // run - run(&db, 5); + run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD)); // verify let table = cast(db.table::().unwrap()); assert_eq!( table, - BTreeMap::from([(shard(3), full_list.clone()), (shard(u64::MAX), vec![4, 5])]) + BTreeMap::from([ + (shard(LAST_BLOCK_IN_FULL_SHARD), full_list.clone()), + (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2]) + ]) ); // unwind - unwind(&db, 5, 0); + unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD); // verify initial state let table = cast(db.table::().unwrap()); @@ -285,33 +288,33 @@ mod tests { async fn insert_index_to_fill_shard() { // init let db = TestStageDB::default(); - let mut close_full_list = vec![1; NUM_OF_INDICES_IN_SHARD - 2]; + let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 2).collect::>(); // setup partial_setup(&db); db.commit(|tx| { - tx.put::(shard(u64::MAX), list(&close_full_list)).unwrap(); + tx.put::(shard(u64::MAX), list(&almost_full_list)).unwrap(); Ok(()) }) .unwrap(); // run - run(&db, 5); + run(&db, LAST_BLOCK_IN_FULL_SHARD, Some(LAST_BLOCK_IN_FULL_SHARD - 2)); // verify - close_full_list.push(4); - close_full_list.push(5); + almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD - 1); + almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD); let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list.clone()),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list.clone())])); // unwind - unwind(&db, 5, 0); + unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 2); // verify initial state - close_full_list.pop(); - close_full_list.pop(); + almost_full_list.pop(); + almost_full_list.pop(); let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)])); // verify initial state } @@ -320,53 +323,60 @@ mod tests { async fn insert_index_second_half_shard() { // init let db = TestStageDB::default(); - let mut close_full_list = vec![1; NUM_OF_INDICES_IN_SHARD - 1]; + let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::>(); // setup partial_setup(&db); db.commit(|tx| { - tx.put::(shard(u64::MAX), list(&close_full_list)).unwrap(); + tx.put::(shard(u64::MAX), list(&almost_full_list)).unwrap(); Ok(()) }) .unwrap(); // run - run(&db, 5); + run(&db, LAST_BLOCK_IN_FULL_SHARD + 1, Some(LAST_BLOCK_IN_FULL_SHARD - 1)); // verify - close_full_list.push(4); + almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD); let table = cast(db.table::().unwrap()); assert_eq!( table, - BTreeMap::from([(shard(4), close_full_list.clone()), (shard(u64::MAX), vec![5])]) + BTreeMap::from([ + (shard(LAST_BLOCK_IN_FULL_SHARD), almost_full_list.clone()), + (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1]) + ]) ); // unwind - unwind(&db, 5, 0); + unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 1); // verify initial state - close_full_list.pop(); + almost_full_list.pop(); let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)])); } #[tokio::test] async fn insert_index_to_third_shard() { // init let db = TestStageDB::default(); - let full_list = vec![1; NUM_OF_INDICES_IN_SHARD]; + let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::>(); // setup partial_setup(&db); db.commit(|tx| { tx.put::(shard(1), list(&full_list)).unwrap(); tx.put::(shard(2), list(&full_list)).unwrap(); - tx.put::(shard(u64::MAX), list(&[2, 3])).unwrap(); + tx.put::( + shard(u64::MAX), + list(&[LAST_BLOCK_IN_FULL_SHARD + 1]), + ) + .unwrap(); Ok(()) }) .unwrap(); - run(&db, 5); + run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD + 1)); // verify let table = cast(db.table::().unwrap()); @@ -375,12 +385,12 @@ mod tests { BTreeMap::from([ (shard(1), full_list.clone()), (shard(2), full_list.clone()), - (shard(u64::MAX), vec![2, 3, 4, 5]) + (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2]) ]) ); // unwind - unwind(&db, 5, 0); + unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD + 1); // verify initial state let table = cast(db.table::().unwrap()); @@ -389,7 +399,7 @@ mod tests { BTreeMap::from([ (shard(1), full_list.clone()), (shard(2), full_list.clone()), - (shard(u64::MAX), vec![2, 3]) + (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1]) ]) ); } @@ -487,7 +497,7 @@ mod tests { let blocks = random_block_range(&mut rng, start..=end, B256::ZERO, 0..3); - let (transitions, _) = random_changeset_range( + let (changesets, _) = random_changeset_range( &mut rng, blocks.iter(), accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), @@ -496,7 +506,7 @@ mod tests { ); // add block changeset from block 1. - self.db.insert_changesets(transitions, Some(start))?; + self.db.insert_changesets(changesets, Some(start))?; Ok(()) } @@ -541,8 +551,8 @@ mod tests { .iter() .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD) .into_iter() - .map(|chunks| chunks.map(|i| *i as usize).collect::>()) - .collect::>(); + .map(|chunks| chunks.copied().collect::>()) + .collect::>>(); let last_chunk = chunks.pop(); chunks.into_iter().for_each(|list| { @@ -551,16 +561,13 @@ mod tests { address, *list.last().expect("Chuck does not return empty list") as BlockNumber, - ) as ShardedKey
, + ), list, ); }); if let Some(last_list) = last_chunk { - result.insert( - ShardedKey::new(address, u64::MAX) as ShardedKey
, - last_list, - ); + result.insert(ShardedKey::new(address, u64::MAX), last_list); }; } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index c189a90c320b..c8c4a40a8220 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -129,13 +129,16 @@ mod tests { const STORAGE_KEY: B256 = b256!("0000000000000000000000000000000000000000000000000000000000000001"); + const LAST_BLOCK_IN_FULL_SHARD: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber; + const MAX_BLOCK: BlockNumber = NUM_OF_INDICES_IN_SHARD as BlockNumber + 2; + fn storage(key: B256) -> StorageEntry { // Value is not used in indexing stage. StorageEntry { key, value: U256::ZERO } } - fn trns(transition_id: u64) -> BlockNumberAddress { - BlockNumberAddress((transition_id, ADDRESS)) + fn block_number_address(block_number: u64) -> BlockNumberAddress { + BlockNumberAddress((block_number, ADDRESS)) } /// Shard for account @@ -146,17 +149,17 @@ mod tests { } } - fn list(list: &[usize]) -> BlockNumberList { + fn list(list: &[u64]) -> BlockNumberList { BlockNumberList::new(list).unwrap() } fn cast( table: Vec<(StorageShardedKey, BlockNumberList)>, - ) -> BTreeMap> { + ) -> BTreeMap> { table .into_iter() .map(|(k, v)| { - let v = v.iter(0).collect(); + let v = v.iter().collect(); (k, v) }) .collect() @@ -165,33 +168,32 @@ mod tests { fn partial_setup(db: &TestStageDB) { // setup db.commit(|tx| { - // we just need first and last - tx.put::( - 0, - StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, - ) - .unwrap(); - - tx.put::( - 5, - StoredBlockBodyIndices { tx_count: 5, ..Default::default() }, - ) - .unwrap(); - - // setup changeset that are going to be applied to history index - tx.put::(trns(4), storage(STORAGE_KEY)).unwrap(); - tx.put::(trns(5), storage(STORAGE_KEY)).unwrap(); + for block in 0..=MAX_BLOCK { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + // setup changeset that is going to be applied to history index + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } Ok(()) }) .unwrap() } - fn run(db: &TestStageDB, run_to: u64) { - let input = ExecInput { target: Some(run_to), ..Default::default() }; + fn run(db: &TestStageDB, run_to: u64, input_checkpoint: Option) { + let input = ExecInput { + target: Some(run_to), + checkpoint: input_checkpoint + .map(|block_number| StageCheckpoint { block_number, stage_checkpoint: None }), + }; let mut stage = IndexStorageHistoryStage::default(); let provider = db.factory.provider_rw().unwrap(); let out = stage.execute(&provider, input).unwrap(); - assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(run_to), done: true }); provider.commit().unwrap(); } @@ -217,11 +219,11 @@ mod tests { partial_setup(&db); // run - run(&db, 5); + run(&db, 3, None); // verify let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![4, 5]),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])])); // unwind unwind(&db, 5, 0); @@ -245,28 +247,26 @@ mod tests { .unwrap(); // run - run(&db, 5); + run(&db, 5, Some(3)); // verify let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3, 4, 5]),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3, 4, 5])])); // unwind - unwind(&db, 5, 0); + unwind(&db, 5, 3); // verify initial state let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3]),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![1, 2, 3])])); } #[tokio::test] async fn insert_index_to_full_shard() { // init let db = TestStageDB::default(); - let _input = ExecInput { target: Some(5), ..Default::default() }; - // change does not matter only that account is present in changeset. - let full_list = vec![3; NUM_OF_INDICES_IN_SHARD]; + let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::>(); // setup partial_setup(&db); @@ -277,17 +277,20 @@ mod tests { .unwrap(); // run - run(&db, 5); + run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD)); // verify let table = cast(db.table::().unwrap()); assert_eq!( table, - BTreeMap::from([(shard(3), full_list.clone()), (shard(u64::MAX), vec![4, 5])]) + BTreeMap::from([ + (shard(LAST_BLOCK_IN_FULL_SHARD), full_list.clone()), + (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2]) + ]) ); // unwind - unwind(&db, 5, 0); + unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD); // verify initial state let table = cast(db.table::().unwrap()); @@ -298,33 +301,33 @@ mod tests { async fn insert_index_to_fill_shard() { // init let db = TestStageDB::default(); - let mut close_full_list = vec![1; NUM_OF_INDICES_IN_SHARD - 2]; + let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 2).collect::>(); // setup partial_setup(&db); db.commit(|tx| { - tx.put::(shard(u64::MAX), list(&close_full_list)).unwrap(); + tx.put::(shard(u64::MAX), list(&almost_full_list)).unwrap(); Ok(()) }) .unwrap(); // run - run(&db, 5); + run(&db, LAST_BLOCK_IN_FULL_SHARD, Some(LAST_BLOCK_IN_FULL_SHARD - 2)); // verify - close_full_list.push(4); - close_full_list.push(5); + almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD - 1); + almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD); let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list.clone()),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list.clone())])); // unwind - unwind(&db, 5, 0); + unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 2); // verify initial state - close_full_list.pop(); - close_full_list.pop(); + almost_full_list.pop(); + almost_full_list.pop(); let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)])); // verify initial state } @@ -333,7 +336,7 @@ mod tests { async fn insert_index_second_half_shard() { // init let db = TestStageDB::default(); - let mut close_full_list = vec![1; NUM_OF_INDICES_IN_SHARD - 1]; + let mut close_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::>(); // setup partial_setup(&db); @@ -344,42 +347,49 @@ mod tests { .unwrap(); // run - run(&db, 5); + run(&db, LAST_BLOCK_IN_FULL_SHARD + 1, Some(LAST_BLOCK_IN_FULL_SHARD - 1)); // verify - close_full_list.push(4); + close_full_list.push(LAST_BLOCK_IN_FULL_SHARD); let table = cast(db.table::().unwrap()); assert_eq!( table, - BTreeMap::from([(shard(4), close_full_list.clone()), (shard(u64::MAX), vec![5])]) + BTreeMap::from([ + (shard(LAST_BLOCK_IN_FULL_SHARD), close_full_list.clone()), + (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1]) + ]) ); // unwind - unwind(&db, 5, 0); + unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 1); // verify initial state close_full_list.pop(); let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list)])); } #[tokio::test] async fn insert_index_to_third_shard() { // init let db = TestStageDB::default(); - let full_list = vec![1; NUM_OF_INDICES_IN_SHARD]; + let full_list = (1..=LAST_BLOCK_IN_FULL_SHARD).collect::>(); // setup partial_setup(&db); db.commit(|tx| { tx.put::(shard(1), list(&full_list)).unwrap(); tx.put::(shard(2), list(&full_list)).unwrap(); - tx.put::(shard(u64::MAX), list(&[2, 3])).unwrap(); + tx.put::( + shard(u64::MAX), + list(&[LAST_BLOCK_IN_FULL_SHARD + 1]), + ) + .unwrap(); Ok(()) }) .unwrap(); - run(&db, 5); + run(&db, LAST_BLOCK_IN_FULL_SHARD + 2, Some(LAST_BLOCK_IN_FULL_SHARD + 1)); // verify let table = cast(db.table::().unwrap()); @@ -388,12 +398,12 @@ mod tests { BTreeMap::from([ (shard(1), full_list.clone()), (shard(2), full_list.clone()), - (shard(u64::MAX), vec![2, 3, 4, 5]) + (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1, LAST_BLOCK_IN_FULL_SHARD + 2]) ]) ); // unwind - unwind(&db, 5, 0); + unwind(&db, LAST_BLOCK_IN_FULL_SHARD + 2, LAST_BLOCK_IN_FULL_SHARD + 1); // verify initial state let table = cast(db.table::().unwrap()); @@ -402,7 +412,7 @@ mod tests { BTreeMap::from([ (shard(1), full_list.clone()), (shard(2), full_list.clone()), - (shard(u64::MAX), vec![2, 3]) + (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1]) ]) ); } @@ -428,9 +438,12 @@ mod tests { .unwrap(); // setup changeset that are going to be applied to history index - tx.put::(trns(20), storage(STORAGE_KEY)).unwrap(); - tx.put::(trns(36), storage(STORAGE_KEY)).unwrap(); - tx.put::(trns(100), storage(STORAGE_KEY)).unwrap(); + tx.put::(block_number_address(20), storage(STORAGE_KEY)) + .unwrap(); + tx.put::(block_number_address(36), storage(STORAGE_KEY)) + .unwrap(); + tx.put::(block_number_address(100), storage(STORAGE_KEY)) + .unwrap(); Ok(()) }) .unwrap(); @@ -448,7 +461,7 @@ mod tests { // verify let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![36, 100]),])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![36, 100])])); // unwind unwind(&db, 20000, 0); @@ -500,16 +513,16 @@ mod tests { let blocks = random_block_range(&mut rng, start..=end, B256::ZERO, 0..3); - let (transitions, _) = random_changeset_range( + let (changesets, _) = random_changeset_range( &mut rng, blocks.iter(), accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), 0..3, - 0..256, + 0..u64::MAX, ); // add block changeset from block 1. - self.db.insert_changesets(transitions, Some(start))?; + self.db.insert_changesets(changesets, Some(start))?; Ok(()) } @@ -558,8 +571,8 @@ mod tests { .iter() .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD) .into_iter() - .map(|chunks| chunks.map(|i| *i as usize).collect::>()) - .collect::>(); + .map(|chunks| chunks.copied().collect::>()) + .collect::>>(); let last_chunk = chunks.pop(); chunks.into_iter().for_each(|list| { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 6532ea6fd2b4..e2ac204c7e7a 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -179,7 +179,7 @@ fn unwind_history_shards( start_key: T::Key, block_number: BlockNumber, mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool, -) -> ProviderResult> +) -> ProviderResult> where T: Table, T::Key: AsRef>, @@ -195,15 +195,15 @@ where // Check the first item. // If it is greater or eq to the block number, delete it. - let first = list.iter(0).next().expect("List can't be empty"); - if first >= block_number as usize { + let first = list.iter().next().expect("List can't be empty"); + if first >= block_number { item = cursor.prev()?; continue; } else if block_number <= sharded_key.as_ref().highest_block_number { // Filter out all elements greater than block number. - return Ok(list.iter(0).take_while(|i| *i < block_number as usize).collect::>()); + return Ok(list.iter().take_while(|i| *i < block_number).collect::>()); } else { - return Ok(list.iter(0).collect::>()); + return Ok(list.iter().collect::>()); } } @@ -912,7 +912,7 @@ impl DatabaseProvider { if let Some((shard_key, list)) = shard { // delete old shard so new one can be inserted. self.tx.delete::(shard_key, None)?; - let list = list.iter(0).map(|i| i as u64).collect::>(); + let list = list.iter().collect::>(); return Ok(list); } Ok(Vec::new()) @@ -941,13 +941,13 @@ impl DatabaseProvider { let chunks = indices .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD) .into_iter() - .map(|chunks| chunks.map(|i| *i as usize).collect::>()) - .collect::>(); + .map(|chunks| chunks.copied().collect()) + .collect::>>(); let mut chunks = chunks.into_iter().peekable(); while let Some(list) = chunks.next() { let highest_block_number = if chunks.peek().is_some() { - *list.last().expect("`chunks` does not return empty list") as u64 + *list.last().expect("`chunks` does not return empty list") } else { // Insert last list with u64::MAX u64::MAX diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index 85460498d393..b29322812506 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -15,6 +15,7 @@ use reth_primitives::{ trie::AccountProof, Account, Address, BlockNumber, Bytecode, StorageKey, StorageValue, B256, }; use reth_trie::updates::TrieUpdates; +use std::fmt::Debug; /// State provider for a given block number which takes a tx reference. /// @@ -110,10 +111,16 @@ impl<'b, TX: DbTx> HistoricalStateProviderRef<'b, TX> { // index, the first chunk for the next key will be returned so we filter out chunks that // have a different key. if let Some(chunk) = cursor.seek(key)?.filter(|(key, _)| key_filter(key)).map(|x| x.1 .0) { - let chunk = chunk.enable_rank(); + // Get the rank of the first entry before or equal to our block. + let mut rank = chunk.rank(self.block_number); - // Get the rank of the first entry after our block. - let rank = chunk.rank(self.block_number as usize); + // Adjust the rank, so that we have the rank of the first entry strictly before our + // block (not equal to it). + if rank.checked_sub(1).and_then(|rank| chunk.select(rank)) == Some(self.block_number) { + rank -= 1 + }; + + let block_number = chunk.select(rank); // If our block is before the first entry in the index chunk and this first entry // doesn't equal to our block, it might be before the first write ever. To check, we @@ -122,20 +129,21 @@ impl<'b, TX: DbTx> HistoricalStateProviderRef<'b, TX> { // short-circuit) and when it passes we save a full seek into the changeset/plain state // table. if rank == 0 && - chunk.select(rank) as u64 != self.block_number && + block_number != Some(self.block_number) && !cursor.prev()?.is_some_and(|(key, _)| key_filter(&key)) { - if lowest_available_block_number.is_some() { + if let (Some(_), Some(block_number)) = (lowest_available_block_number, block_number) + { // The key may have been written, but due to pruning we may not have changesets // and history, so we need to make a changeset lookup. - Ok(HistoryInfo::InChangeset(chunk.select(rank) as u64)) + Ok(HistoryInfo::InChangeset(block_number)) } else { // The key is written to, but only after our block. Ok(HistoryInfo::NotYetWritten) } - } else if rank < chunk.len() { + } else if let Some(block_number) = block_number { // The chunk contains an entry for a write after our block, return it. - Ok(HistoryInfo::InChangeset(chunk.select(rank) as u64)) + Ok(HistoryInfo::InChangeset(block_number)) } else { // The chunk does not contain an entry for a write after our block. This can only // happen if this is the last chunk and so we need to look in the plain state.