Skip to content

Commit

Permalink
refactor: use shard uid in flat state keys (near#8685)
Browse files Browse the repository at this point in the history
Prepend `ShardUId` to flat state keys, as motivated in near#8670.

Ideally we need to replace `shard_id` with `shard_uid` everywhere, including other DB keys and `RuntimeAdapter` flat storage methods. But this change would be too involved, so let's do it incrementally.

The biggest structural consequence is that `FlatStorage` now stores `shard_uid`, which makes sense, and later it will be used for writing deltas as well. It's not a big deal, but because of that changes are scattered throughout the code and may look scary.

## Testing

* `test_flat_storage_iter` checks that new `key_belongs_to_shard` impl is aligned with shard layout; also we can see that the change takes place in this failing test: https://buildkite.com/nearprotocol/nearcore/builds/25676#0186bc8f-12a2-410a-b74c-e326f3b963fa
* https://nayduck.near.org/#/run/2898
  • Loading branch information
Longarithm authored and nikurt committed Mar 15, 2023
1 parent 2b70392 commit eef6a12
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 110 deletions.
7 changes: 5 additions & 2 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3187,10 +3187,13 @@ impl Chain {
// So we don't have to add the storage state for shard in such case.
// TODO(8438) - add additional test scenarios for this case.
if *block_hash != CryptoHash::default() {
let block_height = self.get_block_header(block_hash)?.height();
let block_header = self.get_block_header(block_hash)?;
let epoch_id = block_header.epoch_id();
let shard_uid = self.runtime_adapter.shard_id_to_uid(shard_id, epoch_id)?;
let block_height = block_header.height();

self.runtime_adapter.create_flat_storage_for_shard(
shard_id,
shard_uid,
block_height,
self.store(),
);
Expand Down
19 changes: 10 additions & 9 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use near_store::flat::{
store_helper, FetchingStateStatus, FlatStateDelta, FlatStorageCreationStatus,
NUM_PARTS_IN_ONE_STEP, STATE_PART_MEMORY_LIMIT,
};
use near_store::migrations::BatchedStoreUpdate;
use near_store::{Store, FLAT_STORAGE_HEAD_HEIGHT};
use near_store::{Trie, TrieDBStorage, TrieTraversalItem};
use std::collections::HashMap;
Expand Down Expand Up @@ -127,7 +126,7 @@ impl FlatStorageShardCreator {
debug!(target: "store", "Preload state part from {hex_path_begin}");
let mut trie_iter = trie.iter().unwrap();

let mut store_update = BatchedStoreUpdate::new(&store, 10_000_000);
let mut store_update = store.store_update();
let mut num_items = 0;
for TrieTraversalItem { hash, key } in
trie_iter.visit_nodes_interval(&path_begin, &path_end).unwrap()
Expand All @@ -137,15 +136,13 @@ impl FlatStorageShardCreator {
Some(key) => {
let value = trie.storage.retrieve_raw_bytes(&hash).unwrap();
let value_ref = ValueRef::new(&value);
store_update
.set_ser(store_helper::FlatStateColumn::State.to_db_col(), &key, &value_ref)
store_helper::set_ref(&mut store_update, shard_uid, key, Some(value_ref))
.expect("Failed to put value in FlatState");

num_items += 1;
}
}
}
store_update.finish().unwrap();
store_update.commit().unwrap();

let processed_parts = progress.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;

Expand Down Expand Up @@ -341,12 +338,14 @@ impl FlatStorageShardCreator {
if (old_flat_head != &flat_head) || (flat_head == chain_final_head.last_block_hash)
{
// If flat head changes, save all changes to store.
let epoch_id = self.runtime_adapter.get_epoch_id(&flat_head)?;
let shard_uid = self.runtime_adapter.shard_id_to_uid(shard_id, &epoch_id)?;
let old_height = chain_store.get_block_height(&old_flat_head).unwrap();
let height = chain_store.get_block_height(&flat_head).unwrap();
debug!(target: "chain", %shard_id, %old_flat_head, %old_height, %flat_head, %height, "Catching up flat head");
self.metrics.flat_head_height.set(height as i64);
let mut store_update = self.runtime_adapter.store().store_update();
merged_delta.apply_to_flat_state(&mut store_update);
merged_delta.apply_to_flat_state(&mut store_update, shard_uid);

if flat_head == chain_final_head.last_block_hash {
// If we reached chain final head, we can finish catchup and finally create flat storage.
Expand All @@ -357,7 +356,7 @@ impl FlatStorageShardCreator {
store_helper::set_flat_head(&mut store_update, shard_id, &flat_head);
store_update.commit()?;
self.runtime_adapter.create_flat_storage_for_shard(
shard_id,
shard_uid,
chain_store.head().unwrap().height,
chain_store,
);
Expand Down Expand Up @@ -406,8 +405,10 @@ impl FlatStorageCreator {
let status = runtime_adapter.get_flat_storage_creation_status(shard_id);
match status {
FlatStorageCreationStatus::Ready => {
let shard_uid =
runtime_adapter.shard_id_to_uid(shard_id, &chain_head.epoch_id)?;
runtime_adapter.create_flat_storage_for_shard(
shard_id,
shard_uid,
chain_head.height,
chain_store,
);
Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,11 +839,11 @@ impl RuntimeAdapter for KeyValueRuntime {

fn create_flat_storage_for_shard(
&self,
shard_id: ShardId,
shard_uid: ShardUId,
_latest_block_height: BlockHeight,
_chain_access: &dyn ChainAccessForFlatStorage,
) {
panic!("Flat storage state can't be created for shard {shard_id} because KeyValueRuntime doesn't support this");
panic!("Flat storage state can't be created for shard {shard_uid} because KeyValueRuntime doesn't support this");
}

fn remove_flat_storage_for_shard(
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ pub trait RuntimeAdapter: Send + Sync {
/// TODO (#7327): consider returning flat storage creation errors here
fn create_flat_storage_for_shard(
&self,
shard_id: ShardId,
shard_uid: ShardUId,
latest_block_height: BlockHeight,
chain_access: &dyn ChainAccessForFlatStorage,
);
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ impl DBCol {
DBCol::StateChangesForSplitStates => &[DBKeyType::BlockHash, DBKeyType::ShardId],
DBCol::TransactionResultForBlock => &[DBKeyType::OutcomeId, DBKeyType::BlockHash],
#[cfg(feature = "protocol_feature_flat_state")]
DBCol::FlatState => &[DBKeyType::TrieKey],
DBCol::FlatState => &[DBKeyType::ShardUId, DBKeyType::TrieKey],
#[cfg(feature = "protocol_feature_flat_state")]
DBCol::FlatStateDeltas => &[DBKeyType::ShardId, DBKeyType::BlockHash],
#[cfg(feature = "protocol_feature_flat_state")]
Expand Down
5 changes: 3 additions & 2 deletions core/store/src/flat/delta.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use borsh::{BorshDeserialize, BorshSerialize};

use near_primitives::hash::hash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::ValueRef;
use near_primitives::types::{RawStateChangesWithTrieKey, ShardId};
use std::collections::HashMap;
Expand Down Expand Up @@ -72,9 +73,9 @@ impl FlatStateDelta {
}

/// Applies delta to the flat state.
pub fn apply_to_flat_state(self, store_update: &mut StoreUpdate) {
pub fn apply_to_flat_state(self, store_update: &mut StoreUpdate, shard_uid: ShardUId) {
for (key, value) in self.0.into_iter() {
store_helper::set_ref(store_update, key, value).expect("Borsh cannot fail");
store_helper::set_ref(store_update, shard_uid, key, value).expect("Borsh cannot fail");
}
}
}
Expand Down
Loading

0 comments on commit eef6a12

Please sign in to comment.