Skip to content

Commit

Permalink
refactor: Clean flat state via range (#9109)
Browse files Browse the repository at this point in the history
Use range deletion in `clean_state` when removing values from `FlatState` column. Addresses #8332
  • Loading branch information
Jure Bajic authored and nikurt committed May 31, 2023
1 parent 5e123e7 commit edc0d96
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 81 deletions.
2 changes: 1 addition & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3241,7 +3241,7 @@ impl Chain {
// Before working with state parts, remove existing flat storage data.
let epoch_id = self.get_block_header(&sync_hash)?.epoch_id().clone();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
self.runtime_adapter.remove_flat_storage_for_shard(shard_uid, &epoch_id)?;
self.runtime_adapter.remove_flat_storage_for_shard(shard_uid)?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let state_root = shard_state_header.chunk_prev_state_root();
Expand Down
6 changes: 1 addition & 5 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,11 +988,7 @@ impl RuntimeAdapter for KeyValueRuntime {
panic!("Flat storage state can't be created for shard {shard_uid} because KeyValueRuntime doesn't support this");
}

fn remove_flat_storage_for_shard(
&self,
_shard_uid: ShardUId,
_epoch_id: &EpochId,
) -> Result<(), Error> {
fn remove_flat_storage_for_shard(&self, _shard_uid: ShardUId) -> Result<(), Error> {
Ok(())
}

Expand Down
6 changes: 1 addition & 5 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,7 @@ pub trait RuntimeAdapter: Send + Sync {

/// Removes flat storage state for shard, if it exists.
/// Used to clear old flat storage data from disk and memory before syncing to newer state.
fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
epoch_id: &EpochId,
) -> Result<(), Error>;
fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), Error>;

fn set_flat_storage_for_genesis(
&self,
Expand Down
15 changes: 4 additions & 11 deletions core/store/src/flat/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::flat::{
};
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::shard_layout::ShardUId;
use near_primitives::types::BlockHeight;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -121,18 +121,11 @@ impl FlatStorageManager {
flat_storages.get(&shard_uid).cloned()
}

pub fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
shard_layout: ShardLayout,
) -> Result<(), StorageError> {
pub fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), StorageError> {
let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR);

match flat_storages.remove(&shard_uid) {
None => {}
Some(flat_storage) => {
flat_storage.clear_state(shard_layout)?;
}
if let Some(flat_store) = flat_storages.remove(&shard_uid) {
flat_store.clear_state()?;
}

Ok(())
Expand Down
34 changes: 7 additions & 27 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::sync::{Arc, RwLock};

use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;
use tracing::{debug, info, warn};
use tracing::{debug, warn};

use crate::flat::delta::CachedFlatStateChanges;
use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus};
use crate::{DBCol, Store, StoreUpdate};
use crate::{Store, StoreUpdate};

use super::delta::{CachedFlatStateDelta, FlatStateDelta};
use super::metrics::FlatStorageMetrics;
Expand Down Expand Up @@ -299,33 +299,13 @@ impl FlatStorage {
}

/// Clears all State key-value pairs from flat storage.
pub fn clear_state(&self, _shard_layout: ShardLayout) -> Result<(), StorageError> {
pub fn clear_state(&self) -> Result<(), StorageError> {
let guard = self.0.write().expect(super::POISONED_LOCK_ERR);
let shard_id = guard.shard_uid.shard_id();

// Removes all items belonging to the shard one by one.
// Note that it does not work for resharding.
// TODO (#7327): call it just after we stopped tracking a shard.
// TODO (#7327): remove FlatStateChanges. Consider custom serialization of keys to remove them by
// prefix.
// TODO (#7327): support range deletions which are much faster than naive deletions. For that, we
// can delete ranges of keys like
// [ [0]+boundary_accounts(shard_id) .. [0]+boundary_accounts(shard_id+1) ), etc.
// We should also take fixed accounts into account.
let mut store_update = guard.store.store_update();
let mut removed_items = 0;
for item in guard.store.iter(DBCol::FlatState) {
let (key, _) =
item.map_err(|e| StorageError::StorageInconsistentState(e.to_string()))?;

if store_helper::key_belongs_to_shard(&key, &guard.shard_uid)? {
removed_items += 1;
store_update.delete(DBCol::FlatState, &key);
}
}
info!(target: "store", %shard_id, %removed_items, "Removing old items from flat storage");

let mut store_update = guard.store.store_update();
store_helper::remove_all_flat_state_values(&mut store_update, guard.shard_uid);
store_helper::remove_all_deltas(&mut store_update, guard.shard_uid);

store_helper::set_flat_storage_status(
&mut store_update,
guard.shard_uid,
Expand Down
14 changes: 11 additions & 3 deletions core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,19 @@ pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_h
store_update.delete(DBCol::FlatStateDeltaMetadata, &key);
}

pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
fn remove_range_by_shard_uid(store_update: &mut StoreUpdate, shard_uid: ShardUId, col: DBCol) {
let key_from = shard_uid.to_bytes();
let key_to = ShardUId::next_shard_prefix(&key_from);
store_update.delete_range(DBCol::FlatStateChanges, &key_from, &key_to);
store_update.delete_range(DBCol::FlatStateDeltaMetadata, &key_from, &key_to);
store_update.delete_range(col, &key_from, &key_to);
}

pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateChanges);
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateDeltaMetadata);
}

pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState);
}

pub(crate) fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec<u8> {
Expand Down
24 changes: 3 additions & 21 deletions integration-tests/src/tests/client/flat_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,7 @@ fn test_flat_storage_creation_sanity() {
);
}

let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap();
let epoch_id = env.clients[0].chain.epoch_manager.get_epoch_id(&block_hash).unwrap();
env.clients[0]
.chain
.runtime_adapter
.remove_flat_storage_for_shard(shard_uid, &epoch_id)
.unwrap();
env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap();
}

// Create new chain and runtime using the same store. It should produce next blocks normally, but now it should
Expand Down Expand Up @@ -247,13 +241,7 @@ fn test_flat_storage_creation_two_shards() {
);
}

let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap();
let epoch_id = env.clients[0].chain.epoch_manager.get_epoch_id(&block_hash).unwrap();
env.clients[0]
.chain
.runtime_adapter
.remove_flat_storage_for_shard(shard_uids[0], &epoch_id)
.unwrap();
env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uids[0]).unwrap();
}

// Check that flat storage is not ready for shard 0 but ready for shard 1.
Expand Down Expand Up @@ -393,13 +381,7 @@ fn test_catchup_succeeds_even_if_no_new_blocks() {
env.produce_block(0, height);
}
// Remove flat storage.
let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap();
let epoch_id = env.clients[0].chain.epoch_manager.get_epoch_id(&block_hash).unwrap();
env.clients[0]
.chain
.runtime_adapter
.remove_flat_storage_for_shard(shard_uid, &epoch_id)
.unwrap();
env.clients[0].chain.runtime_adapter.remove_flat_storage_for_shard(shard_uid).unwrap();
}
let mut env = setup_env(&genesis, store.clone());
assert!(env.clients[0].runtime_adapter.get_flat_storage_for_shard(shard_uid).is_none());
Expand Down
9 changes: 2 additions & 7 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,14 +753,9 @@ impl RuntimeAdapter for NightshadeRuntime {
self.flat_storage_manager.add_flat_storage_for_shard(shard_uid, flat_storage);
}

fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
epoch_id: &EpochId,
) -> Result<(), Error> {
let shard_layout = self.epoch_manager.get_shard_layout(epoch_id)?;
fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), Error> {
self.flat_storage_manager
.remove_flat_storage_for_shard(shard_uid, shard_layout)
.remove_flat_storage_for_shard(shard_uid)
.map_err(Error::StorageError)?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion tools/flat-storage/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl FlatStorageCommand {
let shard_uid = epoch_manager.shard_id_to_uid(reset_cmd.shard_id, &tip.epoch_id)?;
rw_hot_runtime.create_flat_storage_for_shard(shard_uid);

rw_hot_runtime.remove_flat_storage_for_shard(shard_uid, &tip.epoch_id)?;
rw_hot_runtime.remove_flat_storage_for_shard(shard_uid)?;
}
SubCommand::Init(init_cmd) => {
let (_, epoch_manager, rw_hot_runtime, rw_chain_store, rw_hot_store) = Self::get_db(
Expand Down

0 comments on commit edc0d96

Please sign in to comment.