From edc0d96ab967a8b544c19e378a2c4292f2a23ff2 Mon Sep 17 00:00:00 2001 From: Jure Bajic Date: Fri, 26 May 2023 11:03:52 +0200 Subject: [PATCH] refactor: Clean flat state via range (#9109) Use range deletion in `clean_state` when removing values from `FlatState` column. Addresses https://github.com/near/nearcore/issues/8332 --- chain/chain/src/chain.rs | 2 +- chain/chain/src/test_utils/kv_runtime.rs | 6 +--- chain/chain/src/types.rs | 6 +--- core/store/src/flat/manager.rs | 15 +++----- core/store/src/flat/storage.rs | 34 ++++--------------- core/store/src/flat/store_helper.rs | 14 ++++++-- .../src/tests/client/flat_storage.rs | 24 ++----------- nearcore/src/runtime/mod.rs | 9 ++--- tools/flat-storage/src/commands.rs | 2 +- 9 files changed, 31 insertions(+), 81 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 5f06a54d013..7d2fd9451dc 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -3241,7 +3241,7 @@ impl Chain { // Before working with state parts, remove existing flat storage data. let epoch_id = self.get_block_header(&sync_hash)?.epoch_id().clone(); let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?; - self.runtime_adapter.remove_flat_storage_for_shard(shard_uid, &epoch_id)?; + self.runtime_adapter.remove_flat_storage_for_shard(shard_uid)?; let shard_state_header = self.get_state_header(shard_id, sync_hash)?; let state_root = shard_state_header.chunk_prev_state_root(); diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 8b58ae0688e..8fe5813c6f3 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -988,11 +988,7 @@ impl RuntimeAdapter for KeyValueRuntime { panic!("Flat storage state can't be created for shard {shard_uid} because KeyValueRuntime doesn't support this"); } - fn remove_flat_storage_for_shard( - &self, - _shard_uid: ShardUId, - _epoch_id: &EpochId, - ) -> Result<(), Error> { + fn remove_flat_storage_for_shard(&self, _shard_uid: ShardUId) -> Result<(), Error> { Ok(()) } diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 4d31bafbccb..72ff96b5fc9 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -318,11 +318,7 @@ pub trait RuntimeAdapter: Send + Sync { /// Removes flat storage state for shard, if it exists. /// Used to clear old flat storage data from disk and memory before syncing to newer state. - fn remove_flat_storage_for_shard( - &self, - shard_uid: ShardUId, - epoch_id: &EpochId, - ) -> Result<(), Error>; + fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), Error>; fn set_flat_storage_for_genesis( &self, diff --git a/core/store/src/flat/manager.rs b/core/store/src/flat/manager.rs index 32d14b2efa8..dbb4082de12 100644 --- a/core/store/src/flat/manager.rs +++ b/core/store/src/flat/manager.rs @@ -3,7 +3,7 @@ use crate::flat::{ }; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::{ShardLayout, ShardUId}; +use near_primitives::shard_layout::ShardUId; use near_primitives::types::BlockHeight; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -121,18 +121,11 @@ impl FlatStorageManager { flat_storages.get(&shard_uid).cloned() } - pub fn remove_flat_storage_for_shard( - &self, - shard_uid: ShardUId, - shard_layout: ShardLayout, - ) -> Result<(), StorageError> { + pub fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), StorageError> { let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); - match flat_storages.remove(&shard_uid) { - None => {} - Some(flat_storage) => { - flat_storage.clear_state(shard_layout)?; - } + if let Some(flat_store) = flat_storages.remove(&shard_uid) { + flat_store.clear_state()?; } Ok(()) diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index 19fdab641ea..dafd1c53aed 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -3,13 +3,13 @@ use std::sync::{Arc, RwLock}; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::{ShardLayout, ShardUId}; +use near_primitives::shard_layout::ShardUId; use near_primitives::state::FlatStateValue; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; use crate::flat::delta::CachedFlatStateChanges; use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus}; -use crate::{DBCol, Store, StoreUpdate}; +use crate::{Store, StoreUpdate}; use super::delta::{CachedFlatStateDelta, FlatStateDelta}; use super::metrics::FlatStorageMetrics; @@ -299,33 +299,13 @@ impl FlatStorage { } /// Clears all State key-value pairs from flat storage. - pub fn clear_state(&self, _shard_layout: ShardLayout) -> Result<(), StorageError> { + pub fn clear_state(&self) -> Result<(), StorageError> { let guard = self.0.write().expect(super::POISONED_LOCK_ERR); - let shard_id = guard.shard_uid.shard_id(); - - // Removes all items belonging to the shard one by one. - // Note that it does not work for resharding. - // TODO (#7327): call it just after we stopped tracking a shard. - // TODO (#7327): remove FlatStateChanges. Consider custom serialization of keys to remove them by - // prefix. - // TODO (#7327): support range deletions which are much faster than naive deletions. For that, we - // can delete ranges of keys like - // [ [0]+boundary_accounts(shard_id) .. [0]+boundary_accounts(shard_id+1) ), etc. - // We should also take fixed accounts into account. - let mut store_update = guard.store.store_update(); - let mut removed_items = 0; - for item in guard.store.iter(DBCol::FlatState) { - let (key, _) = - item.map_err(|e| StorageError::StorageInconsistentState(e.to_string()))?; - - if store_helper::key_belongs_to_shard(&key, &guard.shard_uid)? { - removed_items += 1; - store_update.delete(DBCol::FlatState, &key); - } - } - info!(target: "store", %shard_id, %removed_items, "Removing old items from flat storage"); + let mut store_update = guard.store.store_update(); + store_helper::remove_all_flat_state_values(&mut store_update, guard.shard_uid); store_helper::remove_all_deltas(&mut store_update, guard.shard_uid); + store_helper::set_flat_storage_status( &mut store_update, guard.shard_uid, diff --git a/core/store/src/flat/store_helper.rs b/core/store/src/flat/store_helper.rs index 4328d8ab93a..2dc655b6d3a 100644 --- a/core/store/src/flat/store_helper.rs +++ b/core/store/src/flat/store_helper.rs @@ -59,11 +59,19 @@ pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_h store_update.delete(DBCol::FlatStateDeltaMetadata, &key); } -pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) { +fn remove_range_by_shard_uid(store_update: &mut StoreUpdate, shard_uid: ShardUId, col: DBCol) { let key_from = shard_uid.to_bytes(); let key_to = ShardUId::next_shard_prefix(&key_from); - store_update.delete_range(DBCol::FlatStateChanges, &key_from, &key_to); - store_update.delete_range(DBCol::FlatStateDeltaMetadata, &key_from, &key_to); + store_update.delete_range(col, &key_from, &key_to); +} + +pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) { + remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateChanges); + remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateDeltaMetadata); +} + +pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) { + remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState); } pub(crate) fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec { diff --git a/integration-tests/src/tests/client/flat_storage.rs b/integration-tests/src/tests/client/flat_storage.rs index 1a788ece71e..cb5ef12dc27 100644 --- a/integration-tests/src/tests/client/flat_storage.rs +++ b/integration-tests/src/tests/client/flat_storage.rs @@ -159,13 +159,7 @@ fn test_flat_storage_creation_sanity() { ); } - let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap(); - let epoch_id = env.clients[0].chain.epoch_manager.get_epoch_id(&block_hash).unwrap(); - env.clients[0] - .chain - .runtime_adapter - .remove_flat_storage_for_shard(shard_uid, &epoch_id) - .unwrap(); + env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap(); } // Create new chain and runtime using the same store. It should produce next blocks normally, but now it should @@ -247,13 +241,7 @@ fn test_flat_storage_creation_two_shards() { ); } - let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap(); - let epoch_id = env.clients[0].chain.epoch_manager.get_epoch_id(&block_hash).unwrap(); - env.clients[0] - .chain - .runtime_adapter - .remove_flat_storage_for_shard(shard_uids[0], &epoch_id) - .unwrap(); + env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uids[0]).unwrap(); } // Check that flat storage is not ready for shard 0 but ready for shard 1. @@ -393,13 +381,7 @@ fn test_catchup_succeeds_even_if_no_new_blocks() { env.produce_block(0, height); } // Remove flat storage. - let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap(); - let epoch_id = env.clients[0].chain.epoch_manager.get_epoch_id(&block_hash).unwrap(); - env.clients[0] - .chain - .runtime_adapter - .remove_flat_storage_for_shard(shard_uid, &epoch_id) - .unwrap(); + env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap(); } let mut env = setup_env(&genesis, store.clone()); assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none()); diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 3fcaeed85b3..062e0ee6b0e 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -753,14 +753,9 @@ impl RuntimeAdapter for NightshadeRuntime { self.flat_storage_manager.add_flat_storage_for_shard(shard_uid, flat_storage); } - fn remove_flat_storage_for_shard( - &self, - shard_uid: ShardUId, - epoch_id: &EpochId, - ) -> Result<(), Error> { - let shard_layout = self.epoch_manager.get_shard_layout(epoch_id)?; + fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), Error> { self.flat_storage_manager - .remove_flat_storage_for_shard(shard_uid, shard_layout) + .remove_flat_storage_for_shard(shard_uid) .map_err(Error::StorageError)?; Ok(()) } diff --git a/tools/flat-storage/src/commands.rs b/tools/flat-storage/src/commands.rs index d0c5ac7b0c8..9519e90a632 100644 --- a/tools/flat-storage/src/commands.rs +++ b/tools/flat-storage/src/commands.rs @@ -175,7 +175,7 @@ impl FlatStorageCommand { let shard_uid = epoch_manager.shard_id_to_uid(reset_cmd.shard_id, &tip.epoch_id)?; rw_hot_runtime.create_flat_storage_for_shard(shard_uid); - rw_hot_runtime.remove_flat_storage_for_shard(shard_uid, &tip.epoch_id)?; + rw_hot_runtime.remove_flat_storage_for_shard(shard_uid)?; } SubCommand::Init(init_cmd) => { let (_, epoch_manager, rw_hot_runtime, rw_chain_store, rw_hot_store) = Self::get_db(