Skip to content

Commit

Permalink
refactor: Clean flat state via range (near#9109)
Browse files Browse the repository at this point in the history
Use range deletion in `clean_state` when removing values from `FlatState` column. Addresses near#8332
  • Loading branch information
Jure Bajic authored and nikurt committed Jun 8, 2023
1 parent 40adfcc commit 98b86f7
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 94 deletions.
2 changes: 1 addition & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3241,7 +3241,7 @@ impl Chain {
// Before working with state parts, remove existing flat storage data.
let epoch_id = self.get_block_header(&sync_hash)?.epoch_id().clone();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
self.runtime_adapter.remove_flat_storage_for_shard(shard_uid, &epoch_id)?;
self.runtime_adapter.remove_flat_storage_for_shard(shard_uid)?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let state_root = shard_state_header.chunk_prev_state_root();
Expand Down
27 changes: 27 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ use crate::BlockHeader;

use near_primitives::epoch_manager::ShardConfig;

use near_store::flat::{FlatStorage, FlatStorageStatus};

use super::ValidatorSchedule;

/// Simple key value runtime for tests.
Expand Down Expand Up @@ -974,6 +976,31 @@ impl RuntimeAdapter for KeyValueRuntime {
))
}

fn get_flat_storage_for_shard(&self, _shard_uid: ShardUId) -> Option<FlatStorage> {
None
}

fn get_flat_storage_status(&self, _shard_uid: ShardUId) -> FlatStorageStatus {
FlatStorageStatus::Disabled
}

fn create_flat_storage_for_shard(&self, shard_uid: ShardUId) {
panic!("Flat storage state can't be created for shard {shard_uid} because KeyValueRuntime doesn't support this");
}

fn remove_flat_storage_for_shard(&self, _shard_uid: ShardUId) -> Result<(), Error> {
Ok(())
}

fn set_flat_storage_for_genesis(
&self,
_genesis_block: &CryptoHash,
_genesis_block_height: BlockHeight,
_genesis_epoch_id: &EpochId,
) -> Result<StoreUpdate, Error> {
Ok(self.store.store_update())
}

fn validate_tx(
&self,
_gas_price: Balance,
Expand Down
6 changes: 1 addition & 5 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,7 @@ pub trait RuntimeAdapter: Send + Sync {

/// Removes flat storage state for shard, if it exists.
/// Used to clear old flat storage data from disk and memory before syncing to newer state.
fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
epoch_id: &EpochId,
) -> Result<(), Error>;
fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), Error>;

fn set_flat_storage_for_genesis(
&self,
Expand Down
15 changes: 4 additions & 11 deletions core/store/src/flat/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::flat::{
};
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::shard_layout::ShardUId;
use near_primitives::types::BlockHeight;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -121,18 +121,11 @@ impl FlatStorageManager {
flat_storages.get(&shard_uid).cloned()
}

pub fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
shard_layout: ShardLayout,
) -> Result<(), StorageError> {
pub fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), StorageError> {
let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR);

match flat_storages.remove(&shard_uid) {
None => {}
Some(flat_storage) => {
flat_storage.clear_state(shard_layout)?;
}
if let Some(flat_store) = flat_storages.remove(&shard_uid) {
flat_store.clear_state()?;
}

Ok(())
Expand Down
34 changes: 7 additions & 27 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::sync::{Arc, RwLock};

use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;
use tracing::{debug, info, warn};
use tracing::{debug, warn};

use crate::flat::delta::CachedFlatStateChanges;
use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus};
use crate::{DBCol, Store, StoreUpdate};
use crate::{Store, StoreUpdate};

use super::delta::{CachedFlatStateDelta, FlatStateDelta};
use super::metrics::FlatStorageMetrics;
Expand Down Expand Up @@ -299,33 +299,13 @@ impl FlatStorage {
}

/// Clears all State key-value pairs from flat storage.
pub fn clear_state(&self, _shard_layout: ShardLayout) -> Result<(), StorageError> {
pub fn clear_state(&self) -> Result<(), StorageError> {
let guard = self.0.write().expect(super::POISONED_LOCK_ERR);
let shard_id = guard.shard_uid.shard_id();

// Removes all items belonging to the shard one by one.
// Note that it does not work for resharding.
// TODO (#7327): call it just after we stopped tracking a shard.
// TODO (#7327): remove FlatStateChanges. Consider custom serialization of keys to remove them by
// prefix.
// TODO (#7327): support range deletions which are much faster than naive deletions. For that, we
// can delete ranges of keys like
// [ [0]+boundary_accounts(shard_id) .. [0]+boundary_accounts(shard_id+1) ), etc.
// We should also take fixed accounts into account.
let mut store_update = guard.store.store_update();
let mut removed_items = 0;
for item in guard.store.iter(DBCol::FlatState) {
let (key, _) =
item.map_err(|e| StorageError::StorageInconsistentState(e.to_string()))?;

if store_helper::key_belongs_to_shard(&key, &guard.shard_uid)? {
removed_items += 1;
store_update.delete(DBCol::FlatState, &key);
}
}
info!(target: "store", %shard_id, %removed_items, "Removing old items from flat storage");

let mut store_update = guard.store.store_update();
store_helper::remove_all_flat_state_values(&mut store_update, guard.shard_uid);
store_helper::remove_all_deltas(&mut store_update, guard.shard_uid);

store_helper::set_flat_storage_status(
&mut store_update,
guard.shard_uid,
Expand Down
14 changes: 11 additions & 3 deletions core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,19 @@ pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_h
store_update.delete(DBCol::FlatStateDeltaMetadata, &key);
}

pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
fn remove_range_by_shard_uid(store_update: &mut StoreUpdate, shard_uid: ShardUId, col: DBCol) {
let key_from = shard_uid.to_bytes();
let key_to = ShardUId::next_shard_prefix(&key_from);
store_update.delete_range(DBCol::FlatStateChanges, &key_from, &key_to);
store_update.delete_range(DBCol::FlatStateDeltaMetadata, &key_from, &key_to);
store_update.delete_range(col, &key_from, &key_to);
}

pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateChanges);
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateDeltaMetadata);
}

pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState);
}

pub(crate) fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec<u8> {
Expand Down
68 changes: 29 additions & 39 deletions integration-tests/src/tests/client/flat_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::types::AccountId;
use near_primitives_core::types::BlockHeight;
use near_store::flat::{
store_helper, FetchingStateStatus, FlatStorageCreationStatus, FlatStorageManager,
FlatStorageReadyStatus, FlatStorageStatus, NUM_PARTS_IN_ONE_STEP,
store_helper, FetchingStateStatus, FlatStorageCreationStatus, FlatStorageReadyStatus,
FlatStorageStatus, NUM_PARTS_IN_ONE_STEP,
};
use near_store::test_utils::create_test_store;
use near_store::{KeyLookupMode, Store, TrieTraversalItem};
Expand Down Expand Up @@ -49,14 +49,14 @@ fn wait_for_flat_storage_creation(
) -> BlockHeight {
let store = env.clients[0].runtime_adapter.store().clone();
let mut next_height = start_height;
let mut prev_status = store_helper::get_flat_storage_status(&store, shard_uid).unwrap();
let mut prev_status = store_helper::get_flat_storage_status(&store, shard_uid);
while next_height < start_height + CREATION_TIMEOUT {
if produce_blocks {
env.produce_block(0, next_height);
}
env.clients[0].run_flat_storage_creation_step().unwrap();

let status = store_helper::get_flat_storage_status(&store, shard_uid).unwrap();
let status = store_helper::get_flat_storage_status(&store, shard_uid);
// Check validity of state transition for flat storage creation.
match &prev_status {
FlatStorageStatus::Empty => assert_matches!(
Expand Down Expand Up @@ -98,14 +98,13 @@ fn wait_for_flat_storage_creation(

thread::sleep(Duration::from_secs(1));
}
let flat_storage_manager = get_flat_storage_manager(&env);
let status = flat_storage_manager.get_flat_storage_status(shard_uid);
let status = store_helper::get_flat_storage_status(&store, shard_uid);
assert_matches!(
status,
FlatStorageStatus::Ready(_),
"Client couldn't create flat storage until block {next_height}, status: {status:?}"
);
assert!(flat_storage_manager.get_flat_storage_for_shard(shard_uid).is_some());
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_some());

// We don't expect any forks in the chain after flat storage head, so the number of
// deltas stored on DB should be exactly 2, as there are only 2 blocks after
Expand Down Expand Up @@ -138,7 +137,7 @@ fn test_flat_storage_creation_sanity() {
let expected_flat_storage_head =
env.clients[0].chain.get_block_hash_by_height(flat_head_height).unwrap();
let status = store_helper::get_flat_storage_status(&store, shard_uid);
if let Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head })) = status {
if let FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }) = status {
assert_eq!(flat_head.hash, expected_flat_storage_head);
assert_eq!(flat_head.height, flat_head_height);
} else {
Expand All @@ -160,7 +159,7 @@ fn test_flat_storage_creation_sanity() {
);
}

get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uid).unwrap();
env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap();
}

// Create new chain and runtime using the same store. It should produce next blocks normally, but now it should
Expand All @@ -169,18 +168,15 @@ fn test_flat_storage_creation_sanity() {
for height in START_HEIGHT..START_HEIGHT + 2 {
env.produce_block(0, height);
}
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none());
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none());

assert_eq!(
store_helper::get_flat_storage_status(&store, shard_uid),
Ok(FlatStorageStatus::Empty)
);
assert_eq!(store_helper::get_flat_storage_status(&store, shard_uid), FlatStorageStatus::Empty);
assert!(!env.clients[0].run_flat_storage_creation_step().unwrap());
// At first, flat storage state should start saving deltas. Deltas for all newly processed blocks should be saved to
// disk.
assert_eq!(
store_helper::get_flat_storage_status(&store, shard_uid),
Ok(FlatStorageStatus::Creation(FlatStorageCreationStatus::SavingDeltas))
FlatStorageStatus::Creation(FlatStorageCreationStatus::SavingDeltas)
);
// Introduce fork block to check that deltas for it will be GC-d later.
let fork_block = env.clients[0].produce_block(START_HEIGHT + 2).unwrap().unwrap();
Expand Down Expand Up @@ -208,14 +204,14 @@ fn test_flat_storage_creation_sanity() {
let final_block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT).unwrap();
assert_eq!(
store_helper::get_flat_storage_status(&store, shard_uid),
Ok(FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState(
FlatStorageStatus::Creation(FlatStorageCreationStatus::FetchingState(
FetchingStateStatus {
block_hash: final_block_hash,
part_id: 0,
num_parts_in_step: NUM_PARTS_IN_ONE_STEP,
num_parts: 1,
}
)))
))
);

wait_for_flat_storage_creation(&mut env, START_HEIGHT + 5, shard_uid, true);
Expand All @@ -241,24 +237,24 @@ fn test_flat_storage_creation_two_shards() {
for &shard_uid in &shard_uids {
assert_matches!(
store_helper::get_flat_storage_status(&store, shard_uid),
Ok(FlatStorageStatus::Ready(_))
FlatStorageStatus::Ready(_)
);
}

get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uids[0]).unwrap();
env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uids[0]).unwrap();
}

// Check that flat storage is not ready for shard 0 but ready for shard 1.
let mut env = setup_env(&genesis, store.clone());
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uids[0]).is_none());
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uids[0]).is_none());
assert_matches!(
store_helper::get_flat_storage_status(&store, shard_uids[0]),
Ok(FlatStorageStatus::Empty)
FlatStorageStatus::Empty
);
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uids[1]).is_some());
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uids[1]).is_some());
assert_matches!(
store_helper::get_flat_storage_status(&store, shard_uids[1]),
Ok(FlatStorageStatus::Ready(_))
FlatStorageStatus::Ready(_)
);

wait_for_flat_storage_creation(&mut env, START_HEIGHT, shard_uids[0], true);
Expand Down Expand Up @@ -287,7 +283,7 @@ fn test_flat_storage_creation_start_from_state_part() {

assert_matches!(
store_helper::get_flat_storage_status(&store, shard_uid),
Ok(FlatStorageStatus::Ready(_))
FlatStorageStatus::Ready(_)
);

let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap();
Expand Down Expand Up @@ -320,14 +316,15 @@ fn test_flat_storage_creation_start_from_state_part() {
// Remove keys of part 1 from the flat state.
// Manually set flat storage creation status to the step when it should start from fetching part 1.
let status = store_helper::get_flat_storage_status(&store, shard_uid);
let flat_head = if let Ok(FlatStorageStatus::Ready(ready_status)) = status {
let flat_head = if let FlatStorageStatus::Ready(ready_status) = status {
ready_status.flat_head.hash
} else {
panic!("expected FlatStorageStatus::Ready, got: {status:?}");
};
let mut store_update = store.store_update();
for key in trie_keys[1].iter() {
store_helper::set_flat_state_value(&mut store_update, shard_uid, key.clone(), None);
store_helper::set_flat_state_value(&mut store_update, shard_uid, key.clone(), None)
.unwrap();
}
store_helper::set_flat_storage_status(
&mut store_update,
Expand All @@ -345,7 +342,7 @@ fn test_flat_storage_creation_start_from_state_part() {

// Re-create runtime, check that flat storage is not created yet.
let mut env = setup_env(&genesis, store);
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none());
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none());

// Run chain for a couple of blocks and check that flat storage for shard 0 is eventually created.
let next_height = wait_for_flat_storage_creation(&mut env, START_HEIGHT, shard_uid, true);
Expand Down Expand Up @@ -384,14 +381,11 @@ fn test_catchup_succeeds_even_if_no_new_blocks() {
env.produce_block(0, height);
}
// Remove flat storage.
get_flat_storage_manager(&env).remove_flat_storage_for_shard(shard_uid).unwrap();
env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap();
}
let mut env = setup_env(&genesis, store.clone());
assert!(get_flat_storage_manager(&env).get_flat_storage_for_shard(shard_uid).is_none());
assert_eq!(
store_helper::get_flat_storage_status(&store, shard_uid),
Ok(FlatStorageStatus::Empty)
);
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none());
assert_eq!(store_helper::get_flat_storage_status(&store, shard_uid), FlatStorageStatus::Empty);
// Create 3 more blocks (so that the deltas are generated) - and assume that no new blocks are received.
// In the future, we should also support the scenario where no new blocks are created.

Expand Down Expand Up @@ -439,7 +433,7 @@ fn test_flat_storage_iter() {
account_id: "test0".parse().unwrap()
}
.to_vec(),
items[0].as_ref().unwrap().0.to_vec()
items.get(0).unwrap().0
);
}
1 => {
Expand All @@ -454,7 +448,7 @@ fn test_flat_storage_iter() {
account_id: "near".parse().unwrap()
}
.to_vec(),
items[0].as_ref().unwrap().0.to_vec()
items.get(0).unwrap().0
);
}
_ => {
Expand Down Expand Up @@ -513,7 +507,3 @@ fn test_not_supported_block() {
// For the second result chunk view is valid, so result is Ok.
assert_matches!(get_ref_results[1], Ok(Some(_)));
}

fn get_flat_storage_manager(env: &TestEnv) -> FlatStorageManager {
env.clients[0].chain.runtime_adapter.get_flat_storage_manager().unwrap()
}
Loading

0 comments on commit 98b86f7

Please sign in to comment.