diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 5f06a54d013..7d2fd9451dc 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -3241,7 +3241,7 @@ impl Chain { // Before working with state parts, remove existing flat storage data. let epoch_id = self.get_block_header(&sync_hash)?.epoch_id().clone(); let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?; - self.runtime_adapter.remove_flat_storage_for_shard(shard_uid, &epoch_id)?; + self.runtime_adapter.remove_flat_storage_for_shard(shard_uid)?; let shard_state_header = self.get_state_header(shard_id, sync_hash)?; let state_root = shard_state_header.chunk_prev_state_root(); diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 1c4983b032a..8fe5813c6f3 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -52,6 +52,8 @@ use crate::BlockHeader; use near_primitives::epoch_manager::ShardConfig; +use near_store::flat::{FlatStorage, FlatStorageStatus}; + use super::ValidatorSchedule; /// Simple key value runtime for tests. @@ -974,6 +976,31 @@ impl RuntimeAdapter for KeyValueRuntime { )) } + fn get_flat_storage_for_shard(&self, _shard_uid: ShardUId) -> Option { + None + } + + fn get_flat_storage_status(&self, _shard_uid: ShardUId) -> FlatStorageStatus { + FlatStorageStatus::Disabled + } + + fn create_flat_storage_for_shard(&self, shard_uid: ShardUId) { + panic!("Flat storage state can't be created for shard {shard_uid} because KeyValueRuntime doesn't support this"); + } + + fn remove_flat_storage_for_shard(&self, _shard_uid: ShardUId) -> Result<(), Error> { + Ok(()) + } + + fn set_flat_storage_for_genesis( + &self, + _genesis_block: &CryptoHash, + _genesis_block_height: BlockHeight, + _genesis_epoch_id: &EpochId, + ) -> Result { + Ok(self.store.store_update()) + } + fn validate_tx( &self, _gas_price: Balance, diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 4d31bafbccb..72ff96b5fc9 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -318,11 +318,7 @@ pub trait RuntimeAdapter: Send + Sync { /// Removes flat storage state for shard, if it exists. /// Used to clear old flat storage data from disk and memory before syncing to newer state. - fn remove_flat_storage_for_shard( - &self, - shard_uid: ShardUId, - epoch_id: &EpochId, - ) -> Result<(), Error>; + fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), Error>; fn set_flat_storage_for_genesis( &self, diff --git a/core/store/src/flat/manager.rs b/core/store/src/flat/manager.rs index 32d14b2efa8..dbb4082de12 100644 --- a/core/store/src/flat/manager.rs +++ b/core/store/src/flat/manager.rs @@ -3,7 +3,7 @@ use crate::flat::{ }; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::{ShardLayout, ShardUId}; +use near_primitives::shard_layout::ShardUId; use near_primitives::types::BlockHeight; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -121,18 +121,11 @@ impl FlatStorageManager { flat_storages.get(&shard_uid).cloned() } - pub fn remove_flat_storage_for_shard( - &self, - shard_uid: ShardUId, - shard_layout: ShardLayout, - ) -> Result<(), StorageError> { + pub fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), StorageError> { let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); - match flat_storages.remove(&shard_uid) { - None => {} - Some(flat_storage) => { - flat_storage.clear_state(shard_layout)?; - } + if let Some(flat_store) = flat_storages.remove(&shard_uid) { + flat_store.clear_state()?; } Ok(()) diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index 19fdab641ea..dafd1c53aed 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -3,13 +3,13 @@ use std::sync::{Arc, RwLock}; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::{ShardLayout, ShardUId}; +use near_primitives::shard_layout::ShardUId; use near_primitives::state::FlatStateValue; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; use crate::flat::delta::CachedFlatStateChanges; use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus}; -use crate::{DBCol, Store, StoreUpdate}; +use crate::{Store, StoreUpdate}; use super::delta::{CachedFlatStateDelta, FlatStateDelta}; use super::metrics::FlatStorageMetrics; @@ -299,33 +299,13 @@ impl FlatStorage { } /// Clears all State key-value pairs from flat storage. - pub fn clear_state(&self, _shard_layout: ShardLayout) -> Result<(), StorageError> { + pub fn clear_state(&self) -> Result<(), StorageError> { let guard = self.0.write().expect(super::POISONED_LOCK_ERR); - let shard_id = guard.shard_uid.shard_id(); - - // Removes all items belonging to the shard one by one. - // Note that it does not work for resharding. - // TODO (#7327): call it just after we stopped tracking a shard. - // TODO (#7327): remove FlatStateChanges. Consider custom serialization of keys to remove them by - // prefix. - // TODO (#7327): support range deletions which are much faster than naive deletions. For that, we - // can delete ranges of keys like - // [ [0]+boundary_accounts(shard_id) .. [0]+boundary_accounts(shard_id+1) ), etc. - // We should also take fixed accounts into account. - let mut store_update = guard.store.store_update(); - let mut removed_items = 0; - for item in guard.store.iter(DBCol::FlatState) { - let (key, _) = - item.map_err(|e| StorageError::StorageInconsistentState(e.to_string()))?; - - if store_helper::key_belongs_to_shard(&key, &guard.shard_uid)? { - removed_items += 1; - store_update.delete(DBCol::FlatState, &key); - } - } - info!(target: "store", %shard_id, %removed_items, "Removing old items from flat storage"); + let mut store_update = guard.store.store_update(); + store_helper::remove_all_flat_state_values(&mut store_update, guard.shard_uid); store_helper::remove_all_deltas(&mut store_update, guard.shard_uid); + store_helper::set_flat_storage_status( &mut store_update, guard.shard_uid, diff --git a/core/store/src/flat/store_helper.rs b/core/store/src/flat/store_helper.rs index 4328d8ab93a..2dc655b6d3a 100644 --- a/core/store/src/flat/store_helper.rs +++ b/core/store/src/flat/store_helper.rs @@ -59,11 +59,19 @@ pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_h store_update.delete(DBCol::FlatStateDeltaMetadata, &key); } -pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) { +fn remove_range_by_shard_uid(store_update: &mut StoreUpdate, shard_uid: ShardUId, col: DBCol) { let key_from = shard_uid.to_bytes(); let key_to = ShardUId::next_shard_prefix(&key_from); - store_update.delete_range(DBCol::FlatStateChanges, &key_from, &key_to); - store_update.delete_range(DBCol::FlatStateDeltaMetadata, &key_from, &key_to); + store_update.delete_range(col, &key_from, &key_to); +} + +pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) { + remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateChanges); + remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateDeltaMetadata); +} + +pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) { + remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState); } pub(crate) fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec { diff --git a/integration-tests/src/tests/client/flat_storage.rs b/integration-tests/src/tests/client/flat_storage.rs index 05910fd8bc8..cb5ef12dc27 100644 --- a/integration-tests/src/tests/client/flat_storage.rs +++ b/integration-tests/src/tests/client/flat_storage.rs @@ -9,8 +9,8 @@ use near_primitives::shard_layout::{ShardLayout, ShardUId}; use near_primitives::types::AccountId; use near_primitives_core::types::BlockHeight; use near_store::flat::{ - store_helper, FetchingStateStatus, FlatStorageCreationStatus, FlatStorageManager, - FlatStorageReadyStatus, FlatStorageStatus, NUM_PARTS_IN_ONE_STEP, + store_helper, FetchingStateStatus, FlatStorageCreationStatus, FlatStorageReadyStatus, + FlatStorageStatus, NUM_PARTS_IN_ONE_STEP, }; use near_store::test_utils::create_test_store; use near_store::{KeyLookupMode, Store, TrieTraversalItem}; @@ -49,14 +49,14 @@ fn wait_for_flat_storage_creation( ) -> BlockHeight { let store = env.clients[0].runtime_adapter.store().clone(); let mut next_height = start_height; - let mut prev_status = store_helper::get_flat_storage_status(&store, shard_uid).unwrap(); + let mut prev_status = store_helper::get_flat_storage_status(&store, shard_uid); while next_height < start_height + CREATION_TIMEOUT { if produce_blocks { env.produce_block(0, next_height); } env.clients[0].run_flat_storage_creation_step().unwrap(); - let status = store_helper::get_flat_storage_status(&store, shard_uid).unwrap(); + let status = store_helper::get_flat_storage_status(&store, shard_uid); // Check validity of state transition for flat storage creation. match &prev_status { FlatStorageStatus::Empty => assert_matches!( @@ -98,14 +98,13 @@ fn wait_for_flat_storage_creation( thread::sleep(Duration::from_secs(1)); } - let flat_storage_manager = get_flat_storage_manager(&env); - let status = flat_storage_manager.get_flat_storage_status(shard_uid); + let status = store_helper::get_flat_storage_status(&store, shard_uid); assert_matches!( status, FlatStorageStatus::Ready(_), "Client couldn't create flat storage until block {next_height}, status: {status:?}" ); - assert!(flat_storage_manager.get_flat_storage_for_shard(shard_uid).is_some()); + assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_some()); // We don't expect any forks in the chain after flat storage head, so the number of // deltas stored on DB should be exactly 2, as there are only 2 blocks after @@ -138,7 +137,7 @@ fn test_flat_storage_creation_sanity() { let expected_flat_storage_head = env.clients[0].chain.get_block_hash_by_height(flat_head_height).unwrap(); let status = store_helper::get_flat_storage_status(&store, shard_uid); - if let Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head })) = status { + if let FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }) = status { assert_eq!(flat_head.hash, expected_flat_storage_head); assert_eq!(flat_head.height, flat_head_height); } else { @@ -160,7 +159,7 @@ fn test_flat_storage_creation_sanity() { ); } - get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uid).unwrap(); + env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap(); } // Create new chain and runtime using the same store. It should produce next blocks normally, but now it should @@ -169,18 +168,15 @@ fn test_flat_storage_creation_sanity() { for height in START_HEIGHT..START_HEIGHT + 2 { env.produce_block(0, height); } - assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none()); + assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none()); - assert_eq!( - store_helper::get_flat_storage_status(&store, shard_uid), - Ok(FlatStorageStatus::Empty) - ); + assert_eq!(store_helper::get_flat_storage_status(&store, shard_uid), FlatStorageStatus::Empty); assert!(!env.clients[0].run_flat_storage_creation_step().unwrap()); // At first, flat storage state should start saving deltas. Deltas for all newly processed blocks should be saved to // disk. assert_eq!( store_helper::get_flat_storage_status(&store, shard_uid), - Ok(FlatStorageStatus::Creation(FlatStorageCreationStatus::SavingDeltas)) + FlatStorageStatus::Creation(FlatStorageCreationStatus::SavingDeltas) ); // Introduce fork block to check that deltas for it will be GC-d later. let fork_block = env.clients[0].produce_block(START_HEIGHT + 2).unwrap().unwrap(); @@ -208,14 +204,14 @@ fn test_flat_storage_creation_sanity() { let final_block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT).unwrap(); assert_eq!( store_helper::get_flat_storage_status(&store, shard_uid), - Ok(FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState( + FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState( FetchingStateStatus { block_hash: final_block_hash, part_id: 0, num_parts_in_step: NUM_PARTS_IN_ONE_STEP, num_parts: 1, } - ))) + )) ); wait_for_flat_storage_creation(&mut env, START_HEIGHT + 5, shard_uid, true); @@ -241,24 +237,24 @@ fn test_flat_storage_creation_two_shards() { for &shard_uid in &shard_uids { assert_matches!( store_helper::get_flat_storage_status(&store, shard_uid), - Ok(FlatStorageStatus::Ready(_)) + FlatStorageStatus::Ready(_) ); } - get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uids[0]).unwrap(); + env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uids[0]).unwrap(); } // Check that flat storage is not ready for shard 0 but ready for shard 1. let mut env = setup_env(&genesis, store.clone()); - assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uids[0]).is_none()); + assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uids[0]).is_none()); assert_matches!( store_helper::get_flat_storage_status(&store, shard_uids[0]), - Ok(FlatStorageStatus::Empty) + FlatStorageStatus::Empty ); - assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uids[1]).is_some()); + assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uids[1]).is_some()); assert_matches!( store_helper::get_flat_storage_status(&store, shard_uids[1]), - Ok(FlatStorageStatus::Ready(_)) + FlatStorageStatus::Ready(_) ); wait_for_flat_storage_creation(&mut env, START_HEIGHT, shard_uids[0], true); @@ -287,7 +283,7 @@ fn test_flat_storage_creation_start_from_state_part() { assert_matches!( store_helper::get_flat_storage_status(&store, shard_uid), - Ok(FlatStorageStatus::Ready(_)) + FlatStorageStatus::Ready(_) ); let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap(); @@ -320,14 +316,15 @@ fn test_flat_storage_creation_start_from_state_part() { // Remove keys of part 1 from the flat state. // Manually set flat storage creation status to the step when it should start from fetching part 1. let status = store_helper::get_flat_storage_status(&store, shard_uid); - let flat_head = if let Ok(FlatStorageStatus::Ready(ready_status)) = status { + let flat_head = if let FlatStorageStatus::Ready(ready_status) = status { ready_status.flat_head.hash } else { panic!("expected FlatStorageStatus::Ready, got: {status:?}"); }; let mut store_update = store.store_update(); for key in trie_keys[1].iter() { - store_helper::set_flat_state_value(&mut store_update, shard_uid, key.clone(), None); + store_helper::set_flat_state_value(&mut store_update, shard_uid, key.clone(), None) + .unwrap(); } store_helper::set_flat_storage_status( &mut store_update, @@ -345,7 +342,7 @@ fn test_flat_storage_creation_start_from_state_part() { // Re-create runtime, check that flat storage is not created yet. let mut env = setup_env(&genesis, store); - assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none()); + assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none()); // Run chain for a couple of blocks and check that flat storage for shard 0 is eventually created. let next_height = wait_for_flat_storage_creation(&mut env, START_HEIGHT, shard_uid, true); @@ -384,14 +381,11 @@ fn test_catchup_succeeds_even_if_no_new_blocks() { env.produce_block(0, height); } // Remove flat storage. - get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uid).unwrap(); + env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap(); } let mut env = setup_env(&genesis, store.clone()); - assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none()); - assert_eq!( - store_helper::get_flat_storage_status(&store, shard_uid), - Ok(FlatStorageStatus::Empty) - ); + assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none()); + assert_eq!(store_helper::get_flat_storage_status(&store, shard_uid), FlatStorageStatus::Empty); // Create 3 more blocks (so that the deltas are generated) - and assume that no new blocks are received. // In the future, we should also support the scenario where no new blocks are created. @@ -439,7 +433,7 @@ fn test_flat_storage_iter() { account_id: "test0".parse().unwrap() } .to_vec(), - items[0].as_ref().unwrap().0.to_vec() + items.get(0).unwrap().0 ); } 1 => { @@ -454,7 +448,7 @@ fn test_flat_storage_iter() { account_id: "near".parse().unwrap() } .to_vec(), - items[0].as_ref().unwrap().0.to_vec() + items.get(0).unwrap().0 ); } _ => { @@ -513,7 +507,3 @@ fn test_not_supported_block() { // For the second result chunk view is valid, so result is Ok. assert_matches!(get_ref_results[1], Ok(Some(_))); } - -fn get_flat_storage_manager(env: &TestEnv) -> FlatStorageManager { - env.clients[0].chain.runtime_adapter.get_flat_storage_manager().unwrap() -} diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 3fcaeed85b3..062e0ee6b0e 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -753,14 +753,9 @@ impl RuntimeAdapter for NightshadeRuntime { self.flat_storage_manager.add_flat_storage_for_shard(shard_uid, flat_storage); } - fn remove_flat_storage_for_shard( - &self, - shard_uid: ShardUId, - epoch_id: &EpochId, - ) -> Result<(), Error> { - let shard_layout = self.epoch_manager.get_shard_layout(epoch_id)?; + fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), Error> { self.flat_storage_manager - .remove_flat_storage_for_shard(shard_uid, shard_layout) + .remove_flat_storage_for_shard(shard_uid) .map_err(Error::StorageError)?; Ok(()) } diff --git a/tools/flat-storage/src/commands.rs b/tools/flat-storage/src/commands.rs index d0c5ac7b0c8..9519e90a632 100644 --- a/tools/flat-storage/src/commands.rs +++ b/tools/flat-storage/src/commands.rs @@ -175,7 +175,7 @@ impl FlatStorageCommand { let shard_uid = epoch_manager.shard_id_to_uid(reset_cmd.shard_id, &tip.epoch_id)?; rw_hot_runtime.create_flat_storage_for_shard(shard_uid); - rw_hot_runtime.remove_flat_storage_for_shard(shard_uid, &tip.epoch_id)?; + rw_hot_runtime.remove_flat_storage_for_shard(shard_uid)?; } SubCommand::Init(init_cmd) => { let (_, epoch_manager, rw_hot_runtime, rw_chain_store, rw_hot_store) = Self::get_db(