Skip to content

Commit

Permalink
feat: construct state sync parts using flat storage (near#8927)
Browse files Browse the repository at this point in the history
Gennerate inner part of state part using flat storage using idea present in near#8984.

In short, if flat storage head corresponds to the state root for which we sync state, it is enough to read only boundary nodes, and inner trie part can be reconstructed using range of KV pairs from state. The main logic for that is contained in `Trie::get_trie_nodes_for_part_with_flat_storage`.

It requires couple of minor changes:
* now we allow creating "view" `Trie`s with flat storage as well. As before, we want to avoid creating non-view `Tries` because `TrieCache` accesses may be blocking for chunk processing
* `get_head_hash` and `shard_uid` methods for `FlatStorage` allowing to make correct range query to flat storage
* `FlatStateValue` moved to `primitives` to allow more general access

* prometheus metrics
* integration test checking that flat storage is used during normal block processing on client (or wait for near#9090)

https://nayduck.near.org/#/run/3023

Big sanity test `get_trie_nodes_for_part_with_flat_storage` covering all scenarios I could think of:
* results with/without flat storage must match
* result with incorrect flat storage must be an error
* result with flat storage and missing intermediate node should be still okay
  • Loading branch information
Longarithm authored and nikurt committed Jun 8, 2023
1 parent b5b14b7 commit 45fb617
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 530 deletions.
8 changes: 1 addition & 7 deletions core/store/src/flat/chunk_view.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::flat::store_helper;
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;

use crate::Store;
Expand Down Expand Up @@ -50,15 +48,11 @@ impl FlatStorageChunkView {
&'a self,
from: Option<&[u8]>,
to: Option<&[u8]>,
) -> impl Iterator<Item = Result<(Vec<u8>, FlatStateValue), StorageError>> + 'a {
) -> impl Iterator<Item = (Vec<u8>, Box<[u8]>)> + 'a {
store_helper::iter_flat_state_entries(self.flat_storage.shard_uid(), &self.store, from, to)
}

pub fn get_head_hash(&self) -> CryptoHash {
self.flat_storage.get_head_hash()
}

pub fn shard_uid(&self) -> ShardUId {
self.flat_storage.shard_uid()
}
}
6 changes: 4 additions & 2 deletions core/store/src/flat/inlining_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crossbeam::channel;
use itertools::Itertools;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;
use tracing::{debug, info};

use crate::flat::store_helper::set_flat_state_values_inlining_migration_status;
Expand All @@ -21,7 +22,7 @@ use super::store_helper::{
decode_flat_state_db_key, get_flat_state_values_inlining_migration_status,
};
use super::types::{FlatStateValuesInliningMigrationStatus, INLINE_DISK_VALUE_THRESHOLD};
use super::{FlatStateValue, FlatStorageManager};
use super::FlatStorageManager;

pub struct FlatStateValuesInliningMigrationHandle {
handle: JoinHandle<()>,
Expand Down Expand Up @@ -286,10 +287,11 @@ mod tests {
use borsh::{BorshDeserialize, BorshSerialize};
use near_primitives::hash::hash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::state::FlatStateValue;

use crate::flat::store_helper::encode_flat_state_db_key;
use crate::flat::types::INLINE_DISK_VALUE_THRESHOLD;
use crate::flat::{FlatStateValue, FlatStorageManager};
use crate::flat::FlatStorageManager;
use crate::{DBCol, NodeStorage, TrieCachingStorage};

use super::inline_flat_state_values;
Expand Down
30 changes: 16 additions & 14 deletions core/store/src/flat/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::flat::{
};
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::types::BlockHeight;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -57,8 +57,8 @@ impl FlatStorageManager {

/// When a node starts from an empty database, this function must be called to ensure
/// information such as flat head is set up correctly in the database.
/// Note that this function is different from `create_flat_storage_for_shard`,
/// it must be called before `create_flat_storage_for_shard` if the node starts from
/// Note that this function is different from `add_flat_storage_for_shard`,
/// it must be called before `add_flat_storage_for_shard` if the node starts from
/// an empty database.
pub fn set_flat_storage_for_genesis(
&self,
Expand All @@ -78,24 +78,19 @@ impl FlatStorageManager {
);
}

/// Creates flat storage instance for shard `shard_id`. The function also checks that
/// Add a flat storage state for shard `shard_id`. The function also checks that
/// the shard's flat storage state hasn't been set before, otherwise it panics.
/// TODO (#7327): this behavior may change when we implement support for state sync
/// and resharding.
pub fn create_flat_storage_for_shard(&self, shard_uid: ShardUId) {
pub fn add_flat_storage_for_shard(&self, shard_uid: ShardUId, flat_storage: FlatStorage) {
let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR);
let original_value =
flat_storages.insert(shard_uid, FlatStorage::new(self.0.store.clone(), shard_uid));
let original_value = flat_storages.insert(shard_uid, flat_storage);
// TODO (#7327): maybe we should propagate the error instead of assert here
// assert is fine now because this function is only called at construction time, but we
// will need to be more careful when we want to implement flat storage for resharding
assert!(original_value.is_none());
}

pub fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus {
store_helper::get_flat_storage_status(&self.0.store, shard_uid)
}

/// Creates `FlatStorageChunkView` to access state for `shard_uid` and block `block_hash`.
/// Note that:
/// * the state includes changes by the block `block_hash`;
Expand Down Expand Up @@ -126,11 +121,18 @@ impl FlatStorageManager {
flat_storages.get(&shard_uid).cloned()
}

pub fn remove_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Result<(), StorageError> {
pub fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
shard_layout: ShardLayout,
) -> Result<(), StorageError> {
let mut flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR);

if let Some(flat_store) = flat_storages.remove(&shard_uid) {
flat_store.clear_state()?;
match flat_storages.remove(&shard_uid) {
None => {}
Some(flat_storage) => {
flat_storage.clear_state(shard_layout)?;
}
}

Ok(())
Expand Down
75 changes: 43 additions & 32 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ use std::sync::{Arc, RwLock};
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::state::FlatStateValue;
use tracing::{debug, info, warn};

use crate::flat::delta::CachedFlatStateChanges;
use crate::flat::store_helper::FlatStateColumn;
use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus};
use crate::{Store, StoreUpdate};
use crate::{DBCol, Store, StoreUpdate};

use super::delta::{CachedFlatStateDelta, FlatStateDelta};
use super::metrics::FlatStorageMetrics;
use super::types::{FlatStateValue, FlatStorageError};
use super::types::FlatStorageError;
use super::{store_helper, BlockInfo};

/// FlatStorage stores information on which blocks flat storage current supports key lookups on.
Expand Down Expand Up @@ -41,6 +41,9 @@ pub(crate) struct FlatStorageInner {
flat_head: BlockInfo,
/// Cached deltas for all blocks supported by this flat storage.
deltas: HashMap<CryptoHash, CachedFlatStateDelta>,
/// This flag enables skipping flat head moves, needed temporarily for FlatState
/// values inlining migration.
move_head_enabled: bool,
metrics: FlatStorageMetrics,
}

Expand Down Expand Up @@ -159,7 +162,14 @@ impl FlatStorage {
);
}

let inner = FlatStorageInner { store, shard_uid, flat_head, deltas, metrics };
let inner = FlatStorageInner {
store,
shard_uid,
flat_head,
deltas,
move_head_enabled: true,
metrics,
};
inner.update_delta_metrics();
Self(Arc::new(RwLock::new(inner)))
}
Expand Down Expand Up @@ -204,16 +214,11 @@ impl FlatStorage {
/// returns an error.
pub fn update_flat_head(&self, new_head: &CryptoHash) -> Result<(), FlatStorageError> {
let mut guard = self.0.write().expect(crate::flat::POISONED_LOCK_ERR);

if !guard.move_head_enabled {
return Ok(());
}
let shard_uid = guard.shard_uid;
let shard_id = shard_uid.shard_id();
let _span = tracing::debug_span!(
target: "chain",
"update_flat_head",
?new_head,
?shard_id
)
.entered();
let blocks = guard.get_blocks_to_head(new_head)?;
for block_hash in blocks.into_iter().rev() {
let mut store_update = StoreUpdate::new(guard.store.storage.clone());
Expand Down Expand Up @@ -274,13 +279,6 @@ impl FlatStorage {
let mut guard = self.0.write().expect(super::POISONED_LOCK_ERR);
let shard_uid = guard.shard_uid;
let shard_id = shard_uid.shard_id();
let _span = tracing::debug_span!(
target: "chain",
"add_delta",
block_height = delta.metadata.block.height,
?shard_id
)
.entered();
let block = &delta.metadata.block;
let block_hash = block.hash;
let block_height = block.height;
Expand All @@ -301,7 +299,7 @@ impl FlatStorage {
}

/// Clears all State key-value pairs from flat storage.
pub fn clear_state(&self, shard_layout: ShardLayout) -> Result<(), StorageError> {
pub fn clear_state(&self, _shard_layout: ShardLayout) -> Result<(), StorageError> {
let guard = self.0.write().expect(super::POISONED_LOCK_ERR);
let shard_id = guard.shard_uid.shard_id();

Expand All @@ -316,13 +314,13 @@ impl FlatStorage {
// We should also take fixed accounts into account.
let mut store_update = guard.store.store_update();
let mut removed_items = 0;
for item in guard.store.iter(FlatStateColumn::State.to_db_col()) {
for item in guard.store.iter(DBCol::FlatState) {
let (key, _) =
item.map_err(|e| StorageError::StorageInconsistentState(e.to_string()))?;

if store_helper::key_belongs_to_shard(&key, &shard_layout, shard_id)? {
if store_helper::key_belongs_to_shard(&key, &guard.shard_uid)? {
removed_items += 1;
store_update.delete(FlatStateColumn::State.to_db_col(), &key);
store_update.delete(DBCol::FlatState, &key);
}
}
info!(target: "store", %shard_id, %removed_items, "Removing old items from flat storage");
Expand All @@ -338,14 +336,29 @@ impl FlatStorage {

Ok(())
}

pub(crate) fn get_head_hash(&self) -> CryptoHash {
let guard = self.0.read().expect(super::POISONED_LOCK_ERR);
guard.flat_head.hash
}

pub(crate) fn shard_uid(&self) -> ShardUId {
let guard = self.0.read().expect(super::POISONED_LOCK_ERR);
guard.shard_uid
}

pub(crate) fn set_flat_head_update_mode(&self, enabled: bool) {
let mut guard = self.0.write().expect(crate::flat::POISONED_LOCK_ERR);
guard.move_head_enabled = enabled;
}
}

#[cfg(test)]
mod tests {
use crate::flat::delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata};
use crate::flat::manager::FlatStorageManager;
use crate::flat::storage::FlatStorage;
use crate::flat::types::{BlockInfo, FlatStateValue, FlatStorageError};
use crate::flat::types::{BlockInfo, FlatStorageError};
use crate::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use crate::test_utils::create_test_store;
use crate::StorageError;
Expand All @@ -355,6 +368,7 @@ mod tests {

use assert_matches::assert_matches;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;
use std::collections::HashMap;

struct MockChain {
Expand Down Expand Up @@ -593,8 +607,7 @@ mod tests {
let block_hash = chain.get_block_hash(i);
let blocks = flat_storage.get_blocks_to_head(&block_hash).unwrap();
assert_eq!(blocks.len(), i as usize);
let chunk_view =
flat_storage_manager.chunk_view(shard_uid, Some(block_hash), false).unwrap();
let chunk_view = flat_storage_manager.chunk_view(shard_uid, block_hash).unwrap();
assert_eq!(
chunk_view.get_value(&[1]).unwrap(),
Some(FlatStateValue::value_ref(&[i as u8]))
Expand All @@ -619,12 +632,10 @@ mod tests {
// Verify that they return the correct values
let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap();
assert_eq!(blocks.len(), 10);
let chunk_view0 = flat_storage_manager
.chunk_view(shard_uid, Some(chain.get_block_hash(10)), false)
.unwrap();
let chunk_view1 = flat_storage_manager
.chunk_view(shard_uid, Some(chain.get_block_hash(4)), false)
.unwrap();
let chunk_view0 =
flat_storage_manager.chunk_view(shard_uid, chain.get_block_hash(10)).unwrap();
let chunk_view1 =
flat_storage_manager.chunk_view(shard_uid, chain.get_block_hash(4)).unwrap();
assert_eq!(chunk_view0.get_value(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_value(&[2]).unwrap(), Some(FlatStateValue::value_ref(&[1])));
assert_eq!(chunk_view1.get_value(&[1]).unwrap(), Some(FlatStateValue::value_ref(&[4])));
Expand Down
17 changes: 6 additions & 11 deletions core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use crate::flat::types::FlatStorageError;
use crate::{DBCol, Store, StoreUpdate};
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;

use super::delta::{FlatStateDelta, FlatStateDeltaMetadata};
use super::types::{FlatStateValue, FlatStateValuesInliningMigrationStatus, FlatStorageStatus};
use super::types::{FlatStateValuesInliningMigrationStatus, FlatStorageStatus};

/// Prefixes for keys in `FlatStateMisc` DB column.
pub const FLAT_STATE_HEAD_KEY_PREFIX: &[u8; 4] = b"HEAD";
Expand Down Expand Up @@ -72,9 +73,7 @@ pub(crate) fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec<u
buffer
}

pub(crate) fn decode_flat_state_db_key(
key: &Box<[u8]>,
) -> Result<(ShardUId, Vec<u8>), StorageError> {
pub(crate) fn decode_flat_state_db_key(key: &[u8]) -> Result<(ShardUId, Vec<u8>), StorageError> {
if key.len() < 8 {
return Err(StorageError::StorageInconsistentState(format!(
"Found key in flat storage with length < 8: {key:?}"
Expand Down Expand Up @@ -187,11 +186,7 @@ pub fn iter_flat_state_entries<'a>(

/// Currently all the data in flat storage is 'together' - so we have to parse the key,
/// to see if this element belongs to this shard.
pub fn key_belongs_to_shard(
key: &Box<[u8]>,
shard_layout: &ShardLayout,
shard_id: u64,
) -> Result<bool, StorageError> {
pub fn key_belongs_to_shard(key: &[u8], shard_uid: &ShardUId) -> Result<bool, StorageError> {
let (key_shard_uid, _) = decode_flat_state_db_key(key)?;
Ok(key_shard_uid.version == shard_layout.version() && key_shard_uid.shard_id as u64 == shard_id)
Ok(key_shard_uid == *shard_uid)
}
7 changes: 0 additions & 7 deletions core/store/src/flat/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ pub enum FlatStateValuesInliningMigrationStatus {
Finished,
}

#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)]
pub enum FlatStateValuesInliningMigrationStatus {
Empty,
InProgress,
Finished,
}

#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)]
pub enum FlatStorageStatus {
/// Flat Storage is not supported.
Expand Down
Loading

0 comments on commit 45fb617

Please sign in to comment.