Skip to content

Commit

Permalink
Strengthen SchemaIterator with IncompleteResult and tests (aptos-labs…
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse authored Jun 11, 2024
1 parent 35715ad commit 142c5c3
Show file tree
Hide file tree
Showing 35 changed files with 454 additions and 191 deletions.
6 changes: 2 additions & 4 deletions consensus/src/consensusdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ use anyhow::Result;
use aptos_consensus_types::{block::Block, quorum_cert::QuorumCert};
use aptos_crypto::HashValue;
use aptos_logger::prelude::*;
use aptos_schemadb::{
schema::Schema, Options, ReadOptions, SchemaBatch, DB, DEFAULT_COLUMN_FAMILY_NAME,
};
use aptos_schemadb::{schema::Schema, Options, SchemaBatch, DB, DEFAULT_COLUMN_FAMILY_NAME};
use aptos_storage_interface::AptosDbError;
pub use schema::{
block::BlockSchema,
Expand Down Expand Up @@ -202,7 +200,7 @@ impl ConsensusDB {
}

pub fn get_all<S: Schema>(&self) -> Result<Vec<(S::Key, S::Value)>, DbError> {
let mut iter = self.db.iter::<S>(ReadOptions::default())?;
let mut iter = self.db.iter::<S>()?;
iter.seek_to_first();
Ok(iter.collect::<Result<Vec<(S::Key, S::Value)>, AptosDbError>>()?)
}
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/quorum_store/quorum_store_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use anyhow::Result;
use aptos_consensus_types::proof_of_store::BatchId;
use aptos_crypto::HashValue;
use aptos_logger::prelude::*;
use aptos_schemadb::{Options, ReadOptions, SchemaBatch, DB};
use aptos_schemadb::{Options, SchemaBatch, DB};
use std::{collections::HashMap, path::Path, time::Instant};

pub trait QuorumStoreStorage: Sync + Send {
Expand Down Expand Up @@ -73,7 +73,7 @@ impl QuorumStoreStorage for QuorumStoreDB {
}

fn get_all_batches(&self) -> Result<HashMap<HashValue, PersistedValue>> {
let mut iter = self.db.iter::<BatchSchema>(ReadOptions::default())?;
let mut iter = self.db.iter::<BatchSchema>()?;
iter.seek_to_first();
iter.map(|res| res.map_err(Into::into))
.collect::<Result<HashMap<HashValue, PersistedValue>>>()
Expand All @@ -100,7 +100,7 @@ impl QuorumStoreStorage for QuorumStoreDB {
}

fn clean_and_get_batch_id(&self, current_epoch: u64) -> Result<Option<BatchId>, DbError> {
let mut iter = self.db.iter::<BatchIdSchema>(ReadOptions::default())?;
let mut iter = self.db.iter::<BatchIdSchema>()?;
iter.seek_to_first();
let epoch_batch_id = iter
.map(|res| res.map_err(Into::into))
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/rand/rand_gen/storage/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
};
use anyhow::Result;
use aptos_logger::info;
use aptos_schemadb::{schema::Schema, Options, ReadOptions, SchemaBatch, DB};
use aptos_schemadb::{schema::Schema, Options, SchemaBatch, DB};
use std::{path::Path, sync::Arc, time::Instant};

pub struct RandDb {
Expand Down Expand Up @@ -71,7 +71,7 @@ impl RandDb {
}

fn get_all<S: Schema>(&self) -> Result<Vec<(S::Key, S::Value)>, DbError> {
let mut iter = self.db.iter::<S>(ReadOptions::default())?;
let mut iter = self.db.iter::<S>()?;
iter.seek_to_first();
Ok(iter
.filter_map(|e| match e {
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/backup/restore_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl RestoreHandler {

pub fn get_in_progress_state_kv_snapshot_version(&self) -> Result<Option<Version>> {
let db = self.aptosdb.ledger_db.metadata_db_arc();
let mut iter = db.iter::<DbMetadataSchema>(Default::default())?;
let mut iter = db.iter::<DbMetadataSchema>()?;
iter.seek_to_first();
while let Some((k, _v)) = iter.next().transpose()? {
if let DbMetadataKey::StateSnapshotRestoreProgress(version) = k {
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db/include/aptosdb_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ impl DbReader for AptosDB {
}

let db = self.ledger_db.metadata_db_arc();
let mut iter = db.rev_iter::<BlockInfoSchema>(ReadOptions::default())?;
let mut iter = db.rev_iter::<BlockInfoSchema>()?;
iter.seek_to_last();

let mut events = Vec::with_capacity(num_events);
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use aptos_experimental_runtimes::thread_manager::{optimal_min_len, THREAD_MANAGE
use aptos_logger::prelude::*;
use aptos_metrics_core::TimerHelper;
use aptos_resource_viewer::AptosValueAnnotator;
use aptos_schemadb::{ReadOptions, SchemaBatch};
use aptos_schemadb::SchemaBatch;
use aptos_scratchpad::SparseMerkleTree;
use aptos_storage_interface::{
cached_state_view::ShardedStateCache, db_ensure as ensure, db_other_bail as bail,
Expand Down
6 changes: 3 additions & 3 deletions storage/aptosdb/src/db_debugger/examine/print_db_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
AptosDB,
};
use aptos_config::config::{RocksdbConfigs, StorageDirPaths};
use aptos_schemadb::{schema::Schema, ReadOptions, DB};
use aptos_schemadb::{schema::Schema, DB};
use aptos_storage_interface::Result;
use aptos_types::transaction::Version;
use clap::Parser;
Expand Down Expand Up @@ -133,7 +133,7 @@ impl Cmd {
{
let mut iter = ledger_db
.transaction_accumulator_db_raw()
.iter::<TransactionAccumulatorSchema>(ReadOptions::default())?;
.iter::<TransactionAccumulatorSchema>()?;
iter.seek_to_last();
let position = iter.next().transpose()?.map(|kv| kv.0);
let num_frozen_nodes = position.map(|p| p.to_postorder_index() + 1);
Expand All @@ -150,7 +150,7 @@ impl Cmd {
where
S: Schema<Key = Version>,
{
let mut iter = db.iter::<S>(ReadOptions::default())?;
let mut iter = db.iter::<S>()?;
iter.seek_to_last();
Ok(iter.next().transpose()?.map(|kv| kv.0))
}
Expand Down
17 changes: 2 additions & 15 deletions storage/aptosdb/src/db_debugger/state_kv/get_value.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{db_debugger::common::DbDir, schema::state_value::StateValueSchema};
use aptos_schemadb::ReadOptions;
use crate::db_debugger::common::DbDir;
use aptos_storage_interface::Result;
use aptos_types::{state_store::state_key::StateKey, transaction::Version};
use clap::Parser;
Expand Down Expand Up @@ -49,19 +48,7 @@ impl Cmd {
);
}

let mut read_opts = ReadOptions::default();
// We want `None` if the state_key changes in iteration.
read_opts.set_prefix_same_as_start(true);
let mut iter = db
.db_shard(key.get_shard_id())
.iter::<StateValueSchema>(read_opts)?;
iter.seek(&(key.clone(), self.version))?;
let res = iter
.next()
.transpose()?
.and_then(|((_, version), value_opt)| value_opt.map(|value| (version, value)));

match res {
match db.get_state_value_with_version_by_version(&key, self.version)? {
None => {
println!("{}", "Value not found.".to_string().yellow());
},
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db_debugger/state_kv/scan_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Cmd {
read_opts.set_prefix_same_as_start(true);
let mut iter = state_kv_db
.db_shard(key.get_shard_id())
.iter::<StateValueSchema>(read_opts)
.iter_with_opts::<StateValueSchema>(read_opts)
.unwrap();
iter.seek(&(key.clone(), key_version)).unwrap();
let (value_version, value) = iter
Expand Down
4 changes: 1 addition & 3 deletions storage/aptosdb/src/db_debugger/state_tree/get_leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ impl Cmd {
let db = Arc::new(self.db_dir.open_state_merkle_db()?);

let root_version = {
let mut iter = db
.metadata_db()
.rev_iter::<JellyfishMerkleNodeSchema>(Default::default())?;
let mut iter = db.metadata_db().rev_iter::<JellyfishMerkleNodeSchema>()?;
iter.seek_for_prev(&NodeKey::new_empty_path(self.before_version - 1))?;
iter.next().transpose()?.unwrap().0.version()
};
Expand Down
4 changes: 1 addition & 3 deletions storage/aptosdb/src/db_debugger/state_tree/get_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ impl Cmd {
);

let db = self.db_dir.open_state_merkle_db()?;
let mut iter = db
.metadata_db()
.rev_iter::<JellyfishMerkleNodeSchema>(Default::default())?;
let mut iter = db.metadata_db().rev_iter::<JellyfishMerkleNodeSchema>()?;

iter.seek_for_prev(&NodeKey::new_empty_path(self.before_version - 1))?;
let mut version = iter.next().transpose()?.unwrap().0.version();
Expand Down
39 changes: 19 additions & 20 deletions storage/aptosdb/src/db_debugger/truncate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
};
use aptos_config::config::{RocksdbConfigs, StorageDirPaths};
use aptos_jellyfish_merkle::node_type::NodeKey;
use aptos_schemadb::{ReadOptions, SchemaBatch, DB};
use aptos_schemadb::{SchemaBatch, DB};
use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Result};
use aptos_types::transaction::Version;
use claims::assert_le;
Expand Down Expand Up @@ -193,8 +193,7 @@ impl Cmd {
if Self::root_exists_at_version(state_merkle_db, closest_version)? {
return Ok(Some(closest_version));
}
let mut iter =
ledger_metadata_db.iter::<EpochByVersionSchema>(ReadOptions::default())?;
let mut iter = ledger_metadata_db.iter::<EpochByVersionSchema>()?;
iter.seek_for_prev(&version)?;
match iter.next().transpose()? {
Some((closest_epoch_version, _)) => {
Expand Down Expand Up @@ -324,66 +323,66 @@ mod test {
let ledger_metadata_db = ledger_db.metadata_db_arc();

let num_frozen_nodes = num_frozen_nodes_in_accumulator(target_version + 1);
let mut iter = ledger_db.transaction_accumulator_db_raw().iter::<TransactionAccumulatorSchema>(ReadOptions::default()).unwrap();
let mut iter = ledger_db.transaction_accumulator_db_raw().iter::<TransactionAccumulatorSchema>().unwrap();
iter.seek_to_last();
let position = iter.next().transpose().unwrap().unwrap().0;
prop_assert_eq!(position.to_postorder_index() + 1, num_frozen_nodes);

let mut iter = ledger_db.transaction_info_db_raw().iter::<TransactionInfoSchema>(ReadOptions::default()).unwrap();
let mut iter = ledger_db.transaction_info_db_raw().iter::<TransactionInfoSchema>().unwrap();
iter.seek_to_last();
prop_assert_eq!(iter.next().transpose().unwrap().unwrap().0, target_version);

let mut iter = ledger_db.transaction_db_raw().iter::<TransactionSchema>(ReadOptions::default()).unwrap();
let mut iter = ledger_db.transaction_db_raw().iter::<TransactionSchema>().unwrap();
iter.seek_to_last();
prop_assert_eq!(iter.next().transpose().unwrap().unwrap().0, target_version);

let mut iter = ledger_metadata_db.iter::<VersionDataSchema>(ReadOptions::default()).unwrap();
let mut iter = ledger_metadata_db.iter::<VersionDataSchema>().unwrap();
iter.seek_to_last();
prop_assert!(iter.next().transpose().unwrap().unwrap().0 <= target_version);

let mut iter = ledger_db.write_set_db_raw().iter::<WriteSetSchema>(ReadOptions::default()).unwrap();
let mut iter = ledger_db.write_set_db_raw().iter::<WriteSetSchema>().unwrap();
iter.seek_to_last();
prop_assert_eq!(iter.next().transpose().unwrap().unwrap().0, target_version);

let mut iter = ledger_metadata_db.iter::<EpochByVersionSchema>(ReadOptions::default()).unwrap();
let mut iter = ledger_metadata_db.iter::<EpochByVersionSchema>().unwrap();
iter.seek_to_last();
let (version, epoch) = iter.next().transpose().unwrap().unwrap();
prop_assert!(version <= target_version);

let mut iter = ledger_metadata_db.iter::<LedgerInfoSchema>(ReadOptions::default()).unwrap();
let mut iter = ledger_metadata_db.iter::<LedgerInfoSchema>().unwrap();
iter.seek_to_last();
prop_assert_eq!(iter.next().transpose().unwrap().unwrap().0, epoch);


let mut iter = state_kv_db.metadata_db().iter::<StateValueSchema>(ReadOptions::default()).unwrap();
let mut iter = state_kv_db.metadata_db().iter::<StateValueSchema>().unwrap();
iter.seek_to_first();
for item in iter {
let ((_, version), _) = item.unwrap();
prop_assert!(version <= target_version);
}

let mut iter = state_kv_db.metadata_db().iter::<StaleStateValueIndexSchema>(ReadOptions::default()).unwrap();
let mut iter = state_kv_db.metadata_db().iter::<StaleStateValueIndexSchema>().unwrap();
iter.seek_to_first();
for item in iter {
let version = item.unwrap().0.stale_since_version;
prop_assert!(version <= target_version);
}

let mut iter = state_merkle_db.metadata_db().iter::<StaleNodeIndexSchema>(ReadOptions::default()).unwrap();
let mut iter = state_merkle_db.metadata_db().iter::<StaleNodeIndexSchema>().unwrap();
iter.seek_to_first();
for item in iter {
let version = item.unwrap().0.stale_since_version;
prop_assert!(version <= target_version);
}

let mut iter = state_merkle_db.metadata_db().iter::<StaleNodeIndexCrossEpochSchema>(ReadOptions::default()).unwrap();
let mut iter = state_merkle_db.metadata_db().iter::<StaleNodeIndexCrossEpochSchema>().unwrap();
iter.seek_to_first();
for item in iter {
let version = item.unwrap().0.stale_since_version;
prop_assert!(version <= target_version);
}

let mut iter = state_merkle_db.metadata_db().iter::<JellyfishMerkleNodeSchema>(ReadOptions::default()).unwrap();
let mut iter = state_merkle_db.metadata_db().iter::<JellyfishMerkleNodeSchema>().unwrap();
iter.seek_to_first();
for item in iter {
let version = item.unwrap().0.version();
Expand All @@ -393,34 +392,34 @@ mod test {
if sharding_config.enable_storage_sharding {
let state_merkle_db = Arc::new(state_merkle_db);
for i in 0..NUM_STATE_SHARDS as u8 {
let mut kv_shard_iter = state_kv_db.db_shard(i).iter::<StateValueSchema>(ReadOptions::default()).unwrap();
let mut kv_shard_iter = state_kv_db.db_shard(i).iter::<StateValueSchema>().unwrap();
kv_shard_iter.seek_to_first();
for item in kv_shard_iter {
let ((_, version), _) = item.unwrap();
prop_assert!(version <= target_version);
}

let value_index_shard_iter = state_kv_db.db_shard(i).iter::<StaleStateValueIndexSchema>(ReadOptions::default()).unwrap();
let value_index_shard_iter = state_kv_db.db_shard(i).iter::<StaleStateValueIndexSchema>().unwrap();
for item in value_index_shard_iter {
let version = item.unwrap().0.stale_since_version;
prop_assert!(version <= target_version);
}

let mut stale_node_ind_iter = state_merkle_db.db_shard(i).iter::<StaleNodeIndexSchema>(ReadOptions::default()).unwrap();
let mut stale_node_ind_iter = state_merkle_db.db_shard(i).iter::<StaleNodeIndexSchema>().unwrap();
stale_node_ind_iter.seek_to_first();
for item in stale_node_ind_iter {
let version = item.unwrap().0.stale_since_version;
prop_assert!(version <= target_version);
}

let mut jelly_iter = state_merkle_db.db_shard(i).iter::<JellyfishMerkleNodeSchema>(ReadOptions::default()).unwrap();
let mut jelly_iter = state_merkle_db.db_shard(i).iter::<JellyfishMerkleNodeSchema>().unwrap();
jelly_iter.seek_to_first();
for item in jelly_iter {
let version = item.unwrap().0.version();
prop_assert!(version <= target_version);
}

let mut cross_iter = state_merkle_db.db_shard(i).iter::<StaleNodeIndexCrossEpochSchema>(ReadOptions::default()).unwrap();
let mut cross_iter = state_merkle_db.db_shard(i).iter::<StaleNodeIndexCrossEpochSchema>().unwrap();
cross_iter.seek_to_first();
for item in cross_iter {
let version = item.unwrap().0.stale_since_version;
Expand Down
24 changes: 6 additions & 18 deletions storage/aptosdb/src/event_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ impl EventStore {
ledger_version: Version,
event_key: &EventKey,
) -> Result<Option<u64>> {
let mut iter = self
.event_db
.iter::<EventByVersionSchema>(ReadOptions::default())?;
let mut iter = self.event_db.iter::<EventByVersionSchema>()?;
iter.seek_for_prev(&(*event_key, ledger_version, u64::max_value()));

Ok(iter.next().transpose()?.and_then(
Expand Down Expand Up @@ -127,9 +125,7 @@ impl EventStore {
u64, // index among events for the same transaction
)>,
> {
let mut iter = self
.event_db
.iter::<EventByKeySchema>(ReadOptions::default())?;
let mut iter = self.event_db.iter::<EventByKeySchema>()?;
iter.seek(&(*event_key, start_seq_num))?;

let mut result = Vec::new();
Expand Down Expand Up @@ -183,9 +179,7 @@ impl EventStore {
u64, // sequence number
)>,
> {
let mut iter = self
.event_db
.iter::<EventByVersionSchema>(ReadOptions::default())?;
let mut iter = self.event_db.iter::<EventByVersionSchema>()?;
iter.seek_for_prev(&(*event_key, version, u64::MAX))?;

match iter.next().transpose()? {
Expand All @@ -211,9 +205,7 @@ impl EventStore {
u64, // sequence number
)>,
> {
let mut iter = self
.event_db
.iter::<EventByVersionSchema>(ReadOptions::default())?;
let mut iter = self.event_db.iter::<EventByVersionSchema>()?;
iter.seek(&(*event_key, version, 0))?;

match iter.next().transpose()? {
Expand All @@ -239,9 +231,7 @@ impl EventStore {
u64, // sequence number
)>,
> {
let mut iter = self
.event_db
.iter::<EventByVersionSchema>(ReadOptions::default())?;
let mut iter = self.event_db.iter::<EventByVersionSchema>()?;
iter.seek(&(*event_key, version + 1, 0))?;

match iter.next().transpose()? {
Expand Down Expand Up @@ -341,9 +331,7 @@ impl EventStore {
end: Version,
db_batch: &SchemaBatch,
) -> anyhow::Result<()> {
let mut iter = self
.event_db
.iter::<EventAccumulatorSchema>(Default::default())?;
let mut iter = self.event_db.iter::<EventAccumulatorSchema>()?;
iter.seek(&(begin, Position::from_inorder_index(0)))?;
while let Some(((version, position), _)) = iter.next().transpose()? {
if version >= end {
Expand Down
Loading

0 comments on commit 142c5c3

Please sign in to comment.