From eef6a12ff423eb0218fdd14e4733381bde31b8a1 Mon Sep 17 00:00:00 2001 From: Aleksandr Logunov Date: Wed, 8 Mar 2023 17:00:48 +0400 Subject: [PATCH] refactor: use shard uid in flat state keys (#8685) Prepend `ShardUId` to flat state keys, as motivated in https://github.com/near/nearcore/issues/8670. Ideally we need to replace `shard_id` with `shard_uid` everywhere, including other DB keys and `RuntimeAdapter` flat storage methods. But this change would be too involved, so let's do it incrementally. The biggest structural consequence is that `FlatStorage` now stores `shard_uid`, which makes sense, and later it will be used for writing deltas as well. It's not a big deal, but because of that changes are scattered throughout the code and may look scary. ## Testing * `test_flat_storage_iter` checks that new `key_belongs_to_shard` impl is aligned with shard layout; also we can see that the change takes place in this failing test: https://buildkite.com/nearprotocol/nearcore/builds/25676#0186bc8f-12a2-410a-b74c-e326f3b963fa * https://nayduck.near.org/#/run/2898 --- chain/chain/src/chain.rs | 7 +- chain/chain/src/flat_storage_creator.rs | 19 +-- chain/chain/src/test_utils/kv_runtime.rs | 4 +- chain/chain/src/types.rs | 2 +- core/store/src/columns.rs | 2 +- core/store/src/flat/delta.rs | 5 +- core/store/src/flat/storage.rs | 112 +++++++++++------- core/store/src/flat/store_helper.rs | 60 +++++++--- genesis-tools/genesis-populate/src/lib.rs | 2 +- .../src/tests/client/flat_storage.rs | 13 +- nearcore/src/runtime/mod.rs | 11 +- .../src/estimator_context.rs | 17 ++- runtime/runtime/src/genesis.rs | 2 +- tools/flat-storage/src/commands.rs | 11 +- 14 files changed, 157 insertions(+), 110 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 28944c759e7..db2ac16a01a 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -3187,10 +3187,13 @@ impl Chain { // So we don't have to add the storage state for shard in such case. // TODO(8438) - add additional test scenarios for this case. if *block_hash != CryptoHash::default() { - let block_height = self.get_block_header(block_hash)?.height(); + let block_header = self.get_block_header(block_hash)?; + let epoch_id = block_header.epoch_id(); + let shard_uid = self.runtime_adapter.shard_id_to_uid(shard_id, epoch_id)?; + let block_height = block_header.height(); self.runtime_adapter.create_flat_storage_for_shard( - shard_id, + shard_uid, block_height, self.store(), ); diff --git a/chain/chain/src/flat_storage_creator.rs b/chain/chain/src/flat_storage_creator.rs index 9a3d7f1961b..6a8cb8f83bf 100644 --- a/chain/chain/src/flat_storage_creator.rs +++ b/chain/chain/src/flat_storage_creator.rs @@ -22,7 +22,6 @@ use near_store::flat::{ store_helper, FetchingStateStatus, FlatStateDelta, FlatStorageCreationStatus, NUM_PARTS_IN_ONE_STEP, STATE_PART_MEMORY_LIMIT, }; -use near_store::migrations::BatchedStoreUpdate; use near_store::{Store, FLAT_STORAGE_HEAD_HEIGHT}; use near_store::{Trie, TrieDBStorage, TrieTraversalItem}; use std::collections::HashMap; @@ -127,7 +126,7 @@ impl FlatStorageShardCreator { debug!(target: "store", "Preload state part from {hex_path_begin}"); let mut trie_iter = trie.iter().unwrap(); - let mut store_update = BatchedStoreUpdate::new(&store, 10_000_000); + let mut store_update = store.store_update(); let mut num_items = 0; for TrieTraversalItem { hash, key } in trie_iter.visit_nodes_interval(&path_begin, &path_end).unwrap() @@ -137,15 +136,13 @@ impl FlatStorageShardCreator { Some(key) => { let value = trie.storage.retrieve_raw_bytes(&hash).unwrap(); let value_ref = ValueRef::new(&value); - store_update - .set_ser(store_helper::FlatStateColumn::State.to_db_col(), &key, &value_ref) + store_helper::set_ref(&mut store_update, shard_uid, key, Some(value_ref)) .expect("Failed to put value in FlatState"); - num_items += 1; } } } - store_update.finish().unwrap(); + store_update.commit().unwrap(); let processed_parts = progress.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; @@ -341,12 +338,14 @@ impl FlatStorageShardCreator { if (old_flat_head != &flat_head) || (flat_head == chain_final_head.last_block_hash) { // If flat head changes, save all changes to store. + let epoch_id = self.runtime_adapter.get_epoch_id(&flat_head)?; + let shard_uid = self.runtime_adapter.shard_id_to_uid(shard_id, &epoch_id)?; let old_height = chain_store.get_block_height(&old_flat_head).unwrap(); let height = chain_store.get_block_height(&flat_head).unwrap(); debug!(target: "chain", %shard_id, %old_flat_head, %old_height, %flat_head, %height, "Catching up flat head"); self.metrics.flat_head_height.set(height as i64); let mut store_update = self.runtime_adapter.store().store_update(); - merged_delta.apply_to_flat_state(&mut store_update); + merged_delta.apply_to_flat_state(&mut store_update, shard_uid); if flat_head == chain_final_head.last_block_hash { // If we reached chain final head, we can finish catchup and finally create flat storage. @@ -357,7 +356,7 @@ impl FlatStorageShardCreator { store_helper::set_flat_head(&mut store_update, shard_id, &flat_head); store_update.commit()?; self.runtime_adapter.create_flat_storage_for_shard( - shard_id, + shard_uid, chain_store.head().unwrap().height, chain_store, ); @@ -406,8 +405,10 @@ impl FlatStorageCreator { let status = runtime_adapter.get_flat_storage_creation_status(shard_id); match status { FlatStorageCreationStatus::Ready => { + let shard_uid = + runtime_adapter.shard_id_to_uid(shard_id, &chain_head.epoch_id)?; runtime_adapter.create_flat_storage_for_shard( - shard_id, + shard_uid, chain_head.height, chain_store, ); diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 04f19e39f85..cdfc4134fba 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -839,11 +839,11 @@ impl RuntimeAdapter for KeyValueRuntime { fn create_flat_storage_for_shard( &self, - shard_id: ShardId, + shard_uid: ShardUId, _latest_block_height: BlockHeight, _chain_access: &dyn ChainAccessForFlatStorage, ) { - panic!("Flat storage state can't be created for shard {shard_id} because KeyValueRuntime doesn't support this"); + panic!("Flat storage state can't be created for shard {shard_uid} because KeyValueRuntime doesn't support this"); } fn remove_flat_storage_for_shard( diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index a6bfbb65860..b8155c65249 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -307,7 +307,7 @@ pub trait RuntimeAdapter: Send + Sync { /// TODO (#7327): consider returning flat storage creation errors here fn create_flat_storage_for_shard( &self, - shard_id: ShardId, + shard_uid: ShardUId, latest_block_height: BlockHeight, chain_access: &dyn ChainAccessForFlatStorage, ); diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 07451d94015..0480176e0c0 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -472,7 +472,7 @@ impl DBCol { DBCol::StateChangesForSplitStates => &[DBKeyType::BlockHash, DBKeyType::ShardId], DBCol::TransactionResultForBlock => &[DBKeyType::OutcomeId, DBKeyType::BlockHash], #[cfg(feature = "protocol_feature_flat_state")] - DBCol::FlatState => &[DBKeyType::TrieKey], + DBCol::FlatState => &[DBKeyType::ShardUId, DBKeyType::TrieKey], #[cfg(feature = "protocol_feature_flat_state")] DBCol::FlatStateDeltas => &[DBKeyType::ShardId, DBKeyType::BlockHash], #[cfg(feature = "protocol_feature_flat_state")] diff --git a/core/store/src/flat/delta.rs b/core/store/src/flat/delta.rs index 7da74736f6b..2797a43c49c 100644 --- a/core/store/src/flat/delta.rs +++ b/core/store/src/flat/delta.rs @@ -1,6 +1,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use near_primitives::hash::hash; +use near_primitives::shard_layout::ShardUId; use near_primitives::state::ValueRef; use near_primitives::types::{RawStateChangesWithTrieKey, ShardId}; use std::collections::HashMap; @@ -72,9 +73,9 @@ impl FlatStateDelta { } /// Applies delta to the flat state. - pub fn apply_to_flat_state(self, store_update: &mut StoreUpdate) { + pub fn apply_to_flat_state(self, store_update: &mut StoreUpdate, shard_uid: ShardUId) { for (key, value) in self.0.into_iter() { - store_helper::set_ref(store_update, key, value).expect("Borsh cannot fail"); + store_helper::set_ref(store_update, shard_uid, key, value).expect("Borsh cannot fail"); } } } diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index 1ffb43f9d7c..08b72370eaa 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -5,9 +5,9 @@ use lru::LruCache; use near_o11y::metrics::IntGauge; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::ShardLayout; +use near_primitives::shard_layout::{ShardLayout, ShardUId}; use near_primitives::state::ValueRef; -use near_primitives::types::{BlockHeight, ShardId}; +use near_primitives::types::BlockHeight; use tracing::info; use crate::flat::delta::CachedFlatStateDelta; @@ -40,8 +40,8 @@ pub struct FlatStorage(pub(crate) Arc>); // after the `flat_head` block successfully. pub(crate) struct FlatStorageInner { store: Store, - /// Id of the shard which state is accessed by this flat storage. - shard_id: ShardId, + /// UId of the shard which state is accessed by this flat storage. + shard_uid: ShardUId, /// The block for which we store the key value pairs of the state after it is applied. /// For non catchup mode, it should be the last final block. flat_head: CryptoHash, @@ -97,12 +97,12 @@ impl FlatStorageInner { &self, target_block_hash: &CryptoHash, ) -> Result, FlatStorageError> { - let shard_id = &self.shard_id; + let shard_uid = &self.shard_uid; let flat_head = &self.flat_head; let flat_head_info = self .blocks .get(flat_head) - .expect(&format!("Inconsistent flat storage state for shard {shard_id}: head {flat_head} not found in cached blocks")); + .expect(&format!("Inconsistent flat storage state for shard {shard_uid}: head {flat_head} not found in cached blocks")); let mut block_hash = *target_block_hash; let mut blocks = vec![]; @@ -152,13 +152,14 @@ impl FlatStorage { /// including those on forks into the returned FlatStorage. pub fn new( store: Store, - shard_id: ShardId, + shard_uid: ShardUId, latest_block_height: BlockHeight, // Unfortunately we don't have access to ChainStore inside this file because of package // dependencies, so we pass these functions in to access chain info chain_access: &dyn ChainAccessForFlatStorage, cache_capacity: usize, ) -> Self { + let shard_id = shard_uid.shard_id(); let flat_head = store_helper::get_flat_head(&store, shard_id) .unwrap_or_else(|| panic!("Cannot read flat head for shard {} from storage", shard_id)); let flat_head_info = chain_access.get_block_info(&flat_head); @@ -225,7 +226,7 @@ impl FlatStorage { Self(Arc::new(RwLock::new(FlatStorageInner { store, - shard_id, + shard_uid, flat_head, blocks, deltas, @@ -269,7 +270,7 @@ impl FlatStorage { return Ok(value_ref); } - let value_ref = store_helper::get_ref(&guard.store, key)?; + let value_ref = store_helper::get_ref(&guard.store, guard.shard_uid, key)?; guard.put_value_ref_to_cache(key.to_vec(), value_ref.clone()); Ok(value_ref) } @@ -279,17 +280,18 @@ impl FlatStorage { /// returns an error. pub fn update_flat_head(&self, new_head: &CryptoHash) -> Result<(), FlatStorageError> { let mut guard = self.0.write().expect(crate::flat::POISONED_LOCK_ERR); + let shard_id = guard.shard_uid.shard_id(); let blocks = guard.get_blocks_to_head(new_head)?; for block in blocks.into_iter().rev() { let mut store_update = StoreUpdate::new(guard.store.storage.clone()); // We unwrap here because flat storage is locked and we could retrieve path from old to new head, so delta // must exist. - let delta = store_helper::get_delta(&guard.store, guard.shard_id, block)?.unwrap(); + let delta = store_helper::get_delta(&guard.store, shard_id, block)?.unwrap(); for (key, value) in delta.0.iter() { guard.put_value_ref_to_cache(key.clone(), value.clone()); } - delta.apply_to_flat_state(&mut store_update); - store_helper::set_flat_head(&mut store_update, guard.shard_id, &block); + delta.apply_to_flat_state(&mut store_update, guard.shard_uid); + store_helper::set_flat_head(&mut store_update, shard_id, &block); // Remove old blocks and deltas from disk and memory. // Do it for each head update separately to ensure that old data is removed properly if node was @@ -311,7 +313,7 @@ impl FlatStorage { for hash in hashes_to_remove { // It is fine to remove all deltas in single store update, because memory overhead of `DeleteRange` // operation is low. - store_helper::remove_delta(&mut store_update, guard.shard_id, hash); + store_helper::remove_delta(&mut store_update, shard_id, hash); match guard.deltas.remove(&hash) { Some(delta) => { guard.metrics.cached_deltas.dec(); @@ -335,7 +337,6 @@ impl FlatStorage { store_update.commit().unwrap(); } - let shard_id = guard.shard_id; guard.flat_head = *new_head; let flat_head_height = guard .blocks @@ -360,14 +361,14 @@ impl FlatStorage { block: BlockInfo, ) -> Result { let mut guard = self.0.write().expect(super::POISONED_LOCK_ERR); - let shard_id = guard.shard_id; + let shard_id = guard.shard_uid.shard_id(); let block_height = block.height; info!(target: "chain", %shard_id, %block_hash, %block_height, "Adding block to flat storage"); if !guard.blocks.contains_key(&block.prev_hash) { return Err(guard.create_block_not_supported_error(block_hash)); } let mut store_update = StoreUpdate::new(guard.store.storage.clone()); - store_helper::set_delta(&mut store_update, guard.shard_id, *block_hash, &delta)?; + store_helper::set_delta(&mut store_update, shard_id, *block_hash, &delta)?; let cached_delta: CachedFlatStateDelta = delta.into(); guard.metrics.cached_deltas.inc(); guard.metrics.cached_deltas_num_items.add(cached_delta.len() as i64); @@ -381,7 +382,7 @@ impl FlatStorage { /// Clears all State key-value pairs from flat storage. pub fn clear_state(&self, shard_layout: ShardLayout) -> Result<(), StorageError> { let guard = self.0.write().expect(super::POISONED_LOCK_ERR); - let shard_id = guard.shard_id; + let shard_id = guard.shard_uid.shard_id(); // Removes all items belonging to the shard one by one. // Note that it does not work for resharding. @@ -431,6 +432,7 @@ mod tests { }; use assert_matches::assert_matches; + use near_primitives::shard_layout::ShardUId; use std::collections::HashMap; struct MockChain { @@ -640,9 +642,11 @@ mod tests { fn block_not_supported_errors() { // Create a chain with two forks. Set flat head to be at block 0. let chain = MockChain::chain_with_two_forks(5); + let shard_uid = ShardUId::single_shard(); + let shard_id = shard_uid.shard_id(); let store = create_test_store(); let mut store_update = store.store_update(); - store_helper::set_flat_head(&mut store_update, 0, &chain.get_block_hash(0)); + store_helper::set_flat_head(&mut store_update, shard_id, &chain.get_block_hash(0)); for i in 1..5 { store_helper::set_delta( &mut store_update, @@ -654,10 +658,10 @@ mod tests { } store_update.commit().unwrap(); - let flat_storage = FlatStorage::new(store.clone(), 0, 4, &chain, 0); + let flat_storage = FlatStorage::new(store.clone(), shard_uid, 4, &chain, 0); let flat_storage_manager = FlatStorageManager::new(store.clone()); - flat_storage_manager.add_flat_storage_for_shard(0, flat_storage); - let flat_storage = flat_storage_manager.get_flat_storage_for_shard(0).unwrap(); + flat_storage_manager.add_flat_storage_for_shard(shard_id, flat_storage); + let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_id).unwrap(); // Check that flat head can be moved to block 1. let flat_head_hash = chain.get_block_hash(1); @@ -686,9 +690,11 @@ mod tests { fn skipped_heights() { // Create a linear chain where some heights are skipped. let chain = MockChain::linear_chain_with_skips(5); + let shard_uid = ShardUId::single_shard(); + let shard_id = shard_uid.shard_id(); let store = create_test_store(); let mut store_update = store.store_update(); - store_helper::set_flat_head(&mut store_update, 0, &chain.get_block_hash(0)); + store_helper::set_flat_head(&mut store_update, shard_id, &chain.get_block_hash(0)); for i in 1..5 { store_helper::set_delta( &mut store_update, @@ -701,10 +707,10 @@ mod tests { store_update.commit().unwrap(); // Check that flat storage state is created correctly for chain which has skipped heights. - let flat_storage = FlatStorage::new(store.clone(), 0, 8, &chain, 0); + let flat_storage = FlatStorage::new(store.clone(), shard_uid, 8, &chain, 0); let flat_storage_manager = FlatStorageManager::new(store.clone()); - flat_storage_manager.add_flat_storage_for_shard(0, flat_storage); - let flat_storage = flat_storage_manager.get_flat_storage_for_shard(0).unwrap(); + flat_storage_manager.add_flat_storage_for_shard(shard_id, flat_storage); + let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_id).unwrap(); // Check that flat head can be moved to block 8. let flat_head_hash = chain.get_block_hash(8); @@ -719,14 +725,17 @@ mod tests { // 1. Create a chain with 10 blocks with no forks. Set flat head to be at block 0. // Block i sets value for key &[1] to &[i]. let mut chain = MockChain::linear_chain(10); + let shard_uid = ShardUId::single_shard(); + let shard_id = shard_uid.shard_id(); let store = create_test_store(); let mut store_update = store.store_update(); - store_helper::set_flat_head(&mut store_update, 0, &chain.get_block_hash(0)); - store_helper::set_ref(&mut store_update, vec![1], Some(ValueRef::new(&[0]))).unwrap(); + store_helper::set_flat_head(&mut store_update, shard_id, &chain.get_block_hash(0)); + store_helper::set_ref(&mut store_update, shard_uid, vec![1], Some(ValueRef::new(&[0]))) + .unwrap(); for i in 1..10 { store_helper::set_delta( &mut store_update, - 0, + shard_id, chain.get_block_hash(i), &FlatStateDelta::from([(vec![1], Some(ValueRef::new(&[i as u8])))]), ) @@ -734,9 +743,9 @@ mod tests { } store_update.commit().unwrap(); - let flat_storage = FlatStorage::new(store.clone(), 0, 9, &chain, cache_capacity); + let flat_storage = FlatStorage::new(store.clone(), shard_uid, 9, &chain, cache_capacity); let flat_storage_manager = FlatStorageManager::new(store.clone()); - flat_storage_manager.add_flat_storage_for_shard(0, flat_storage); + flat_storage_manager.add_flat_storage_for_shard(shard_id, flat_storage); let flat_storage = flat_storage_manager.get_flat_storage_for_shard(0).unwrap(); // 2. Check that the chunk_view at block i reads the value of key &[1] as &[i] @@ -744,7 +753,8 @@ mod tests { let block_hash = chain.get_block_hash(i); let blocks = flat_storage.get_blocks_to_head(&block_hash).unwrap(); assert_eq!(blocks.len(), i as usize); - let chunk_view = flat_storage_manager.chunk_view(0, Some(block_hash), false).unwrap(); + let chunk_view = + flat_storage_manager.chunk_view(shard_id, Some(block_hash), false).unwrap(); assert_eq!(chunk_view.get_ref(&[1]).unwrap(), Some(ValueRef::new(&[i as u8]))); } @@ -764,27 +774,32 @@ mod tests { // Verify that they return the correct values let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap(); assert_eq!(blocks.len(), 10); - let chunk_view0 = - flat_storage_manager.chunk_view(0, Some(chain.get_block_hash(10)), false).unwrap(); - let chunk_view1 = - flat_storage_manager.chunk_view(0, Some(chain.get_block_hash(4)), false).unwrap(); + let chunk_view0 = flat_storage_manager + .chunk_view(shard_id, Some(chain.get_block_hash(10)), false) + .unwrap(); + let chunk_view1 = flat_storage_manager + .chunk_view(shard_id, Some(chain.get_block_hash(4)), false) + .unwrap(); assert_eq!(chunk_view0.get_ref(&[1]).unwrap(), None); assert_eq!(chunk_view0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1]))); assert_eq!(chunk_view1.get_ref(&[1]).unwrap(), Some(ValueRef::new(&[4]))); assert_eq!(chunk_view1.get_ref(&[2]).unwrap(), None); assert_matches!( - store_helper::get_delta(&store, 0, chain.get_block_hash(5)).unwrap(), + store_helper::get_delta(&store, shard_id, chain.get_block_hash(5)).unwrap(), Some(_) ); assert_matches!( - store_helper::get_delta(&store, 0, chain.get_block_hash(10)).unwrap(), + store_helper::get_delta(&store, shard_id, chain.get_block_hash(10)).unwrap(), Some(_) ); // 5. Move the flat head to block 5, verify that chunk_view0 still returns the same values // and chunk_view1 returns an error. Also check that DBCol::FlatState is updated correctly flat_storage.update_flat_head(&chain.get_block_hash(5)).unwrap(); - assert_eq!(store_helper::get_ref(&store, &[1]).unwrap(), Some(ValueRef::new(&[5]))); + assert_eq!( + store_helper::get_ref(&store, shard_uid, &[1]).unwrap(), + Some(ValueRef::new(&[5])) + ); let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap(); assert_eq!(blocks.len(), 5); assert_eq!(chunk_view0.get_ref(&[1]).unwrap(), None); @@ -801,12 +816,15 @@ mod tests { flat_storage.update_flat_head(&chain.get_block_hash(10)).unwrap(); let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap(); assert_eq!(blocks.len(), 0); - assert_eq!(store_helper::get_ref(&store, &[1]).unwrap(), None); - assert_eq!(store_helper::get_ref(&store, &[2]).unwrap(), Some(ValueRef::new(&[1]))); + assert_eq!(store_helper::get_ref(&store, shard_uid, &[1]).unwrap(), None); + assert_eq!( + store_helper::get_ref(&store, shard_uid, &[2]).unwrap(), + Some(ValueRef::new(&[1])) + ); assert_eq!(chunk_view0.get_ref(&[1]).unwrap(), None); assert_eq!(chunk_view0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1]))); assert_matches!( - store_helper::get_delta(&store, 0, chain.get_block_hash(10)).unwrap(), + store_helper::get_delta(&store, shard_id, chain.get_block_hash(10)).unwrap(), None ); } @@ -826,8 +844,10 @@ mod tests { // 1. Create a simple chain and add single key-value deltas for 3 consecutive blocks. let chain = MockChain::linear_chain(4); let store = create_test_store(); + let shard_uid = ShardUId::single_shard(); + let shard_id = shard_uid.shard_id(); let mut store_update = store.store_update(); - store_helper::set_flat_head(&mut store_update, 0, &chain.get_block_hash(0)); + store_helper::set_flat_head(&mut store_update, shard_id, &chain.get_block_hash(0)); let mut deltas: Vec<(BlockHeight, Vec, Option)> = vec![ (1, vec![1], Some(ValueRef::new(&[1 as u8]))), @@ -837,7 +857,7 @@ mod tests { for (height, key, value) in deltas.drain(..) { store_helper::set_delta( &mut store_update, - 0, + shard_id, chain.get_block_hash(height), &FlatStateDelta::from([(key, value)]), ) @@ -846,10 +866,10 @@ mod tests { store_update.commit().unwrap(); // 2. Create flat storage and apply 3 blocks to it. - let flat_storage = FlatStorage::new(store.clone(), 0, 3, &chain, 2); + let flat_storage = FlatStorage::new(store.clone(), shard_uid, 3, &chain, 2); let flat_storage_manager = FlatStorageManager::new(store.clone()); - flat_storage_manager.add_flat_storage_for_shard(0, flat_storage); - let flat_storage = flat_storage_manager.get_flat_storage_for_shard(0).unwrap(); + flat_storage_manager.add_flat_storage_for_shard(shard_id, flat_storage); + let flat_storage = flat_storage_manager.get_flat_storage_for_shard(shard_id).unwrap(); flat_storage.update_flat_head(&chain.get_block_hash(3)).unwrap(); { diff --git a/core/store/src/flat/store_helper.rs b/core/store/src/flat/store_helper.rs index e83eac07385..3442f529320 100644 --- a/core/store/src/flat/store_helper.rs +++ b/core/store/src/flat/store_helper.rs @@ -8,9 +8,8 @@ use borsh::{BorshDeserialize, BorshSerialize}; use byteorder::ReadBytesExt; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::{account_id_to_shard_id, ShardLayout}; +use near_primitives::shard_layout::{ShardLayout, ShardUId}; use near_primitives::state::ValueRef; -use near_primitives::trie_key::trie_key_parsers::parse_account_id_from_raw_key; use near_primitives::types::ShardId; /// Prefixes determining type of flat storage creation status stored in DB. @@ -97,9 +96,36 @@ pub fn remove_flat_head(store_update: &mut StoreUpdate, shard_id: ShardId) { store_update.delete(FlatStateColumn::Misc.to_db_col(), &flat_head_key(shard_id)); } -pub(crate) fn get_ref(store: &Store, key: &[u8]) -> Result, FlatStorageError> { +fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec { + let mut buffer = vec![]; + buffer.extend_from_slice(&shard_uid.to_bytes()); + buffer.extend_from_slice(key); + buffer +} + +fn decode_flat_state_db_key(key: &Box<[u8]>) -> Result<(ShardUId, Vec), StorageError> { + if key.len() < 8 { + return Err(StorageError::StorageInconsistentState(format!( + "Found key in flat storage with length < 8: {key:?}" + ))); + } + let (shard_uid_bytes, trie_key) = key.split_at(8); + let shard_uid = shard_uid_bytes.try_into().map_err(|_| { + StorageError::StorageInconsistentState(format!( + "Incorrect raw shard uid: {shard_uid_bytes:?}" + )) + })?; + Ok((shard_uid, trie_key.to_vec())) +} + +pub(crate) fn get_ref( + store: &Store, + shard_uid: ShardUId, + key: &[u8], +) -> Result, FlatStorageError> { + let db_key = encode_flat_state_db_key(shard_uid, key); let raw_ref = store - .get(FlatStateColumn::State.to_db_col(), key) + .get(FlatStateColumn::State.to_db_col(), &db_key) .map_err(|_| FlatStorageError::StorageInternalError)?; if let Some(raw_ref) = raw_ref { let bytes = @@ -110,16 +136,19 @@ pub(crate) fn get_ref(store: &Store, key: &[u8]) -> Result, Fla } } -pub(crate) fn set_ref( +// TODO(#8577): make pub(crate) once flat storage creator is moved inside `flat` module. +pub fn set_ref( store_update: &mut StoreUpdate, + shard_uid: ShardUId, key: Vec, value: Option, ) -> Result<(), FlatStorageError> { + let db_key = encode_flat_state_db_key(shard_uid, &key); match value { Some(value) => store_update - .set_ser(FlatStateColumn::State.to_db_col(), &key, &value) + .set_ser(FlatStateColumn::State.to_db_col(), &db_key, &value) .map_err(|_| FlatStorageError::StorageInternalError), - None => Ok(store_update.delete(FlatStateColumn::State.to_db_col(), &key)), + None => Ok(store_update.delete(FlatStateColumn::State.to_db_col(), &db_key)), } } @@ -197,17 +226,18 @@ pub fn remove_flat_storage_creation_status(store_update: &mut StoreUpdate, shard } /// Iterate over flat storage entries for a given shard. -/// It reads data only from the 'main' column - which represents the state as of final head.k +/// It reads data only from the 'main' column - which represents the state as of final head. /// /// WARNING: flat storage keeps changing, so the results might be inconsistent, unless you're running /// this method on the shapshot of the data. +// TODO(#8676): Support non-trivial ranges and maybe pass `shard_uid` as key prefix. pub fn iter_flat_state_entries<'a>( shard_layout: ShardLayout, shard_id: u64, store: &'a Store, from: Option<&'a Vec>, to: Option<&'a Vec>, -) -> impl Iterator, Box<[u8]>)> + 'a { +) -> impl Iterator, Box<[u8]>)> + 'a { store .iter_range( FlatStateColumn::State.to_db_col(), @@ -220,7 +250,8 @@ pub fn iter_flat_state_entries<'a>( // to see if this element belongs to this shard. if let Ok(key_in_shard) = key_belongs_to_shard(&key, &shard_layout, shard_id) { if key_in_shard { - return Some((key, value)); + let (_, trie_key) = decode_flat_state_db_key(&key).unwrap(); + return Some((trie_key, value)); } } } @@ -235,11 +266,6 @@ pub fn key_belongs_to_shard( shard_layout: &ShardLayout, shard_id: u64, ) -> Result { - let account_id = parse_account_id_from_raw_key(key) - .map_err(|e| StorageError::StorageInconsistentState(e.to_string()))? - .ok_or(StorageError::FlatStorageError(format!( - "Failed to find account id in flat storage key {:?}", - key - )))?; - Ok(account_id_to_shard_id(&account_id, &shard_layout) == shard_id) + let (key_shard_uid, _) = decode_flat_state_db_key(key)?; + Ok(key_shard_uid.version == shard_layout.version() && key_shard_uid.shard_id as u64 == shard_id) } diff --git a/genesis-tools/genesis-populate/src/lib.rs b/genesis-tools/genesis-populate/src/lib.rs index 8b02deda500..98c6df00bef 100644 --- a/genesis-tools/genesis-populate/src/lib.rs +++ b/genesis-tools/genesis-populate/src/lib.rs @@ -183,7 +183,7 @@ impl GenesisBuilder { let root = tries.apply_all(&trie_changes, shard_uid, &mut store_update); if cfg!(feature = "protocol_feature_flat_state") { near_store::flat::FlatStateDelta::from_state_changes(&state_changes) - .apply_to_flat_state(&mut store_update); + .apply_to_flat_state(&mut store_update, shard_uid); } store_update.commit()?; diff --git a/integration-tests/src/tests/client/flat_storage.rs b/integration-tests/src/tests/client/flat_storage.rs index 8b80e67180b..4d80f75d624 100644 --- a/integration-tests/src/tests/client/flat_storage.rs +++ b/integration-tests/src/tests/client/flat_storage.rs @@ -260,6 +260,7 @@ fn test_flat_storage_creation_start_from_state_part() { let genesis = Genesis::test(accounts, 1); let store = create_test_store(); let shard_layout = ShardLayout::v0_single_shard(); + let shard_uid = shard_layout.get_shard_uids()[0]; // Process some blocks with flat storage. // Split state into two parts and return trie keys corresponding to each part. @@ -318,7 +319,7 @@ fn test_flat_storage_creation_start_from_state_part() { let flat_head = store_helper::get_flat_head(&store, 0).unwrap(); let mut store_update = store.store_update(); for key in trie_keys[1].iter() { - store_update.delete(store_helper::FlatStateColumn::State.to_db_col(), key); + store_helper::set_ref(&mut store_update, shard_uid, key.clone(), None).unwrap(); } store_helper::remove_flat_head(&mut store_update, 0); store_helper::set_flat_storage_creation_status( @@ -440,9 +441,8 @@ fn test_flat_storage_iter() { near_primitives::trie_key::TrieKey::Account { account_id: "test0".parse().unwrap() } - .to_vec() - .as_slice(), - items.get(0).unwrap().0.as_ref() + .to_vec(), + items.get(0).unwrap().0 ); } 1 => { @@ -456,9 +456,8 @@ fn test_flat_storage_iter() { near_primitives::trie_key::TrieKey::Account { account_id: "near".parse().unwrap() } - .to_vec() - .as_slice(), - items.get(0).unwrap().0.as_ref() + .to_vec(), + items.get(0).unwrap().0 ); } _ => { diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index f364b639065..009ccb33fcb 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -743,19 +743,19 @@ impl RuntimeAdapter for NightshadeRuntime { // TODO (#7327): consider passing flat storage errors here to handle them gracefully fn create_flat_storage_for_shard( &self, - shard_id: ShardId, + shard_uid: ShardUId, latest_block_height: BlockHeight, chain_access: &dyn ChainAccessForFlatStorage, ) { let cache_capacity = self.tries.flat_state_cache_capacity() as usize; let flat_storage = FlatStorage::new( self.store.clone(), - shard_id, + shard_uid, latest_block_height, chain_access, cache_capacity, ); - self.flat_storage_manager.add_flat_storage_for_shard(shard_id, flat_storage); + self.flat_storage_manager.add_flat_storage_for_shard(shard_uid.shard_id(), flat_storage); } fn remove_flat_storage_for_shard( @@ -1376,7 +1376,7 @@ impl RuntimeAdapter for NightshadeRuntime { tries.apply_all(&trie_changes, shard_uid, &mut store_update); if cfg!(feature = "protocol_feature_flat_state") { debug!(target: "chain", %shard_id, "Inserting {} values to flat storage", flat_state_delta.len()); - flat_state_delta.apply_to_flat_state(&mut store_update); + flat_state_delta.apply_to_flat_state(&mut store_update, shard_uid); } self.precompile_contracts(epoch_id, contract_codes)?; Ok(store_update.commit()?) @@ -1821,7 +1821,8 @@ mod test { runtime.get_flat_storage_creation_status(shard_id), FlatStorageCreationStatus::Ready ); - runtime.create_flat_storage_for_shard(shard_id, 0, &mock_chain); + let shard_uid = runtime.shard_id_to_uid(shard_id, &EpochId::default()).unwrap(); + runtime.create_flat_storage_for_shard(shard_uid, 0, &mock_chain); } } diff --git a/runtime/runtime-params-estimator/src/estimator_context.rs b/runtime/runtime-params-estimator/src/estimator_context.rs index 975141a8f9f..647b5a4dc2c 100644 --- a/runtime/runtime-params-estimator/src/estimator_context.rs +++ b/runtime/runtime-params-estimator/src/estimator_context.rs @@ -169,19 +169,19 @@ impl<'c> EstimatorContext<'c> { } } - let shard_id = ShardUId::single_shard().shard_id(); + let shard_uid = ShardUId::single_shard(); // Set up flat head to be equal to the latest block height let mut store_update = store.store_update(); - store_helper::set_flat_head(&mut store_update, shard_id, &FLAT_STATE_HEAD); + store_helper::set_flat_head(&mut store_update, shard_uid.shard_id(), &FLAT_STATE_HEAD); store_update.commit().expect("failed to set flat head"); let flat_storage = FlatStorage::new( store, - shard_id, + shard_uid, BLOCK_HEIGHT, &ChainAccess {}, cache_capacity as usize, ); - flat_storage_manager.add_flat_storage_for_shard(shard_id, flat_storage); + flat_storage_manager.add_flat_storage_for_shard(shard_uid.shard_id(), flat_storage); flat_storage_manager } } @@ -306,14 +306,11 @@ impl Testbed<'_> { .unwrap(); let mut store_update = self.tries.store_update(); - self.root = self.tries.apply_all( - &apply_result.trie_changes, - ShardUId::single_shard(), - &mut store_update, - ); + let shard_uid = ShardUId::single_shard(); + self.root = self.tries.apply_all(&apply_result.trie_changes, shard_uid, &mut store_update); if cfg!(feature = "protocol_feature_flat_state") { near_store::flat::FlatStateDelta::from_state_changes(&apply_result.state_changes) - .apply_to_flat_state(&mut store_update); + .apply_to_flat_state(&mut store_update, shard_uid); } store_update.commit().unwrap(); self.apply_state.block_height += 1; diff --git a/runtime/runtime/src/genesis.rs b/runtime/runtime/src/genesis.rs index f490097a401..537cd22ed56 100644 --- a/runtime/runtime/src/genesis.rs +++ b/runtime/runtime/src/genesis.rs @@ -99,7 +99,7 @@ impl GenesisStateApplier { *current_state_root = tries.apply_all(&trie_changes, shard_uid, &mut store_update); if cfg!(feature = "protocol_feature_flat_state") { FlatStateDelta::from_state_changes(&state_changes) - .apply_to_flat_state(&mut store_update); + .apply_to_flat_state(&mut store_update, shard_uid); } drop(state_changes); // silence compiler when not protocol_feature_flat_state store_update.commit().expect("Store update failed on genesis initialization"); diff --git a/tools/flat-storage/src/commands.rs b/tools/flat-storage/src/commands.rs index 11636f90bc1..8c823b69121 100644 --- a/tools/flat-storage/src/commands.rs +++ b/tools/flat-storage/src/commands.rs @@ -126,8 +126,10 @@ impl FlatStorageCommand { let tip = rw_chain_store.final_head().unwrap(); // TODO: there should be a method that 'loads' the current flat storage state based on Storage. + let shard_uid = + rw_hot_runtime.shard_id_to_uid(reset_cmd.shard_id, &tip.epoch_id)?; rw_hot_runtime.create_flat_storage_for_shard( - reset_cmd.shard_id, + shard_uid, tip.height, &rw_chain_store, ); @@ -195,11 +197,8 @@ impl FlatStorageCommand { println!("Verifying using the {:?} as state_root", state_root); let tip = chain_store.final_head().unwrap(); - hot_runtime.create_flat_storage_for_shard( - verify_cmd.shard_id, - tip.height, - &chain_store, - ); + let shard_uid = hot_runtime.shard_id_to_uid(verify_cmd.shard_id, &tip.epoch_id)?; + hot_runtime.create_flat_storage_for_shard(shard_uid, tip.height, &chain_store); let trie = hot_runtime .get_view_trie_for_shard(verify_cmd.shard_id, &head_hash, *state_root)