Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement inlining for small values in flat storage #8988

Merged
merged 1 commit into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ use assert_matches::assert_matches;
use crossbeam_channel::{unbounded, Receiver, Sender};
use near_chain_primitives::Error;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::ValueRef;
use near_primitives::state_part::PartId;
use near_primitives::types::{AccountId, BlockHeight, StateRoot};
use near_store::flat::{
store_helper, BlockInfo, FetchingStateStatus, FlatStateChanges, FlatStorageCreationMetrics,
FlatStorageCreationStatus, FlatStorageReadyStatus, FlatStorageStatus, NUM_PARTS_IN_ONE_STEP,
STATE_PART_MEMORY_LIMIT,
store_helper, BlockInfo, FetchingStateStatus, FlatStateChanges, FlatStateValue,
FlatStorageCreationMetrics, FlatStorageCreationStatus, FlatStorageReadyStatus,
FlatStorageStatus, NUM_PARTS_IN_ONE_STEP, STATE_PART_MEMORY_LIMIT,
};
use near_store::Store;
use near_store::{Trie, TrieDBStorage, TrieTraversalItem};
Expand Down Expand Up @@ -104,9 +103,13 @@ impl FlatStorageShardCreator {
{
if let Some(key) = key {
let value = trie.storage.retrieve_raw_bytes(&hash).unwrap();
let value_ref = ValueRef::new(&value);
store_helper::set_ref(&mut store_update, shard_uid, key, Some(value_ref))
.expect("Failed to put value in FlatState");
store_helper::set_flat_state_value(
&mut store_update,
shard_uid,
key,
Some(FlatStateValue::value_ref(&value)),
)
.expect("Failed to put value in FlatState");
num_items += 1;
}
}
Expand Down
7 changes: 3 additions & 4 deletions core/store/src/flat/chunk_view.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use near_primitives::hash::CryptoHash;
use near_primitives::state::ValueRef;

use crate::Store;

use super::FlatStorage;
use super::{FlatStateValue, FlatStorage};

/// Struct for getting value references from the flat storage, corresponding
/// to some block defined in `blocks_to_head`.
Expand Down Expand Up @@ -39,7 +38,7 @@ impl FlatStorageChunkView {
/// they are stored in `DBCol::State`. Also the separation is done so we
/// could charge users for the value length before loading the value.
// TODO (#7327): consider inlining small values, so we could use only one db access.
pub fn get_ref(&self, key: &[u8]) -> Result<Option<ValueRef>, crate::StorageError> {
self.flat_storage.get_ref(&self.block_hash, key)
pub fn get_value(&self, key: &[u8]) -> Result<Option<FlatStateValue>, crate::StorageError> {
self.flat_storage.get_value(&self.block_hash, key)
}
}
73 changes: 44 additions & 29 deletions core/store/src/flat/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use near_primitives::types::RawStateChangesWithTrieKey;
use std::collections::HashMap;
use std::sync::Arc;

use super::{store_helper, BlockInfo};
use super::types::INLINE_DISK_VALUE_THRESHOLD;
use super::{store_helper, BlockInfo, FlatStateValue};
use crate::{CryptoHash, StoreUpdate};

#[derive(Debug)]
Expand All @@ -34,14 +35,14 @@ impl KeyForFlatStateDelta {
res
}
}
/// Delta of the state for some shard and block, stores mapping from keys to value refs or None, if key was removed in
/// this block.
/// Delta of the state for some shard and block, stores mapping from keys to values
/// or None, if key was removed in this block.
#[derive(BorshSerialize, BorshDeserialize, Clone, Default, PartialEq, Eq)]
pub struct FlatStateChanges(pub(crate) HashMap<Vec<u8>, Option<ValueRef>>);
pub struct FlatStateChanges(pub(crate) HashMap<Vec<u8>, Option<FlatStateValue>>);

impl<T> From<T> for FlatStateChanges
where
T: IntoIterator<Item = (Vec<u8>, Option<ValueRef>)>,
T: IntoIterator<Item = (Vec<u8>, Option<FlatStateValue>)>,
{
fn from(iter: T) -> Self {
Self(HashMap::from_iter(iter))
Expand All @@ -57,13 +58,17 @@ impl std::fmt::Debug for FlatStateChanges {
}

impl FlatStateChanges {
/// Returns `Some(Option<ValueRef>)` from delta for the given key. If key is not present, returns None.
pub fn get(&self, key: &[u8]) -> Option<Option<ValueRef>> {
/// Returns `Some(Option<FlatStateValue>)` from delta for the given key. If key is not present, returns None.
pub fn get(&self, key: &[u8]) -> Option<Option<FlatStateValue>> {
self.0.get(key).cloned()
}

/// Inserts a key-value pair to delta.
pub fn insert(&mut self, key: Vec<u8>, value: Option<ValueRef>) -> Option<Option<ValueRef>> {
pub fn insert(
&mut self,
key: Vec<u8>,
value: Option<FlatStateValue>,
) -> Option<Option<FlatStateValue>> {
self.0.insert(key, value)
}

Expand All @@ -88,20 +93,23 @@ impl FlatStateChanges {
.last()
.expect("Committed entry should have at least one change")
.data;
match last_change {
Some(value) => {
delta.insert(key, Some(near_primitives::state::ValueRef::new(value)))
let flat_state_value = last_change.as_ref().map(|value| {
if value.len() <= INLINE_DISK_VALUE_THRESHOLD {
FlatStateValue::inlined(value)
} else {
FlatStateValue::value_ref(value)
}
None => delta.insert(key, None),
};
});
delta.insert(key, flat_state_value);
}
Self(delta)
}

/// Applies delta to the flat state.
pub fn apply_to_flat_state(self, store_update: &mut StoreUpdate, shard_uid: ShardUId) {
for (key, value) in self.0.into_iter() {
store_helper::set_ref(store_update, shard_uid, key, value).expect("Borsh cannot fail");
store_helper::set_flat_state_value(store_update, shard_uid, key, value)
.expect("Borsh cannot fail");
}
}
}
Expand All @@ -117,7 +125,13 @@ pub struct CachedFlatStateDelta {

impl From<FlatStateChanges> for CachedFlatStateChanges {
fn from(delta: FlatStateChanges) -> Self {
Self(delta.0.into_iter().map(|(key, value)| (hash(&key), value)).collect())
Self(
delta
.0
.into_iter()
.map(|(key, value)| (hash(&key), value.map(|v| v.to_value_ref())))
.collect(),
)
}
}

Expand All @@ -144,8 +158,9 @@ impl CachedFlatStateChanges {

#[cfg(test)]
mod tests {
use crate::flat::FlatStateValue;

use super::FlatStateChanges;
use near_primitives::state::ValueRef;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{RawStateChange, RawStateChangesWithTrieKey, StateChangeCause};

Expand Down Expand Up @@ -208,17 +223,17 @@ mod tests {
let flat_state_changes = FlatStateChanges::from_state_changes(&state_changes);
assert_eq!(
flat_state_changes.get(&alice_trie_key.to_vec()),
Some(Some(ValueRef::new(&[3, 4])))
Some(Some(FlatStateValue::inlined(&[3, 4])))
);
assert_eq!(flat_state_changes.get(&bob_trie_key.to_vec()), Some(None));
assert_eq!(flat_state_changes.get(&carol_trie_key.to_vec()), None);
assert_eq!(
flat_state_changes.get(&delayed_trie_key.to_vec()),
Some(Some(ValueRef::new(&[1])))
Some(Some(FlatStateValue::inlined(&[1])))
);
assert_eq!(
flat_state_changes.get(&delayed_receipt_trie_key.to_vec()),
Some(Some(ValueRef::new(&[2])))
Some(Some(FlatStateValue::inlined(&[2])))
);
}

Expand All @@ -227,23 +242,23 @@ mod tests {
#[test]
fn flat_state_changes_merge() {
let mut changes = FlatStateChanges::from([
(vec![1], Some(ValueRef::new(&[4]))),
(vec![2], Some(ValueRef::new(&[5]))),
(vec![1], Some(FlatStateValue::value_ref(&[4]))),
(vec![2], Some(FlatStateValue::value_ref(&[5]))),
(vec![3], None),
(vec![4], Some(ValueRef::new(&[6]))),
(vec![4], Some(FlatStateValue::value_ref(&[6]))),
]);
let changes_new = FlatStateChanges::from([
(vec![2], Some(ValueRef::new(&[7]))),
(vec![3], Some(ValueRef::new(&[8]))),
(vec![2], Some(FlatStateValue::value_ref(&[7]))),
(vec![3], Some(FlatStateValue::value_ref(&[8]))),
(vec![4], None),
(vec![5], Some(ValueRef::new(&[9]))),
(vec![5], Some(FlatStateValue::value_ref(&[9]))),
]);
changes.merge(changes_new);

assert_eq!(changes.get(&[1]), Some(Some(ValueRef::new(&[4]))));
assert_eq!(changes.get(&[2]), Some(Some(ValueRef::new(&[7]))));
assert_eq!(changes.get(&[3]), Some(Some(ValueRef::new(&[8]))));
assert_eq!(changes.get(&[1]), Some(Some(FlatStateValue::value_ref(&[4]))));
assert_eq!(changes.get(&[2]), Some(Some(FlatStateValue::value_ref(&[7]))));
assert_eq!(changes.get(&[3]), Some(Some(FlatStateValue::value_ref(&[8]))));
assert_eq!(changes.get(&[4]), Some(None));
assert_eq!(changes.get(&[5]), Some(Some(ValueRef::new(&[9]))));
assert_eq!(changes.get(&[5]), Some(Some(FlatStateValue::value_ref(&[9]))));
}
}
2 changes: 1 addition & 1 deletion core/store/src/flat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub use manager::FlatStorageManager;
pub use metrics::FlatStorageCreationMetrics;
pub use storage::FlatStorage;
pub use types::{
BlockInfo, FetchingStateStatus, FlatStorageCreationStatus, FlatStorageError,
BlockInfo, FetchingStateStatus, FlatStateValue, FlatStorageCreationStatus, FlatStorageError,
FlatStorageReadyStatus, FlatStorageStatus,
};

Expand Down
65 changes: 37 additions & 28 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::{Arc, RwLock};
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::state::ValueRef;
use tracing::{debug, info, warn};

use crate::flat::delta::CachedFlatStateChanges;
Expand All @@ -14,7 +13,7 @@ use crate::{Store, StoreUpdate};

use super::delta::{CachedFlatStateDelta, FlatStateDelta};
use super::metrics::FlatStorageMetrics;
use super::types::FlatStorageError;
use super::types::{FlatStateValue, FlatStorageError};
use super::{store_helper, BlockInfo};

/// FlatStorage stores information on which blocks flat storage current supports key lookups on.
Expand Down Expand Up @@ -177,11 +176,11 @@ impl FlatStorage {
guard.get_blocks_to_head(target_block_hash)
}

pub fn get_ref(
pub fn get_value(
&self,
block_hash: &CryptoHash,
key: &[u8],
) -> Result<Option<ValueRef>, crate::StorageError> {
) -> Result<Option<FlatStateValue>, crate::StorageError> {
let guard = self.0.read().expect(super::POISONED_LOCK_ERR);
let blocks_to_head =
guard.get_blocks_to_head(block_hash).map_err(|e| StorageError::from(e))?;
Expand All @@ -190,14 +189,14 @@ impl FlatStorage {
let changes = guard.get_block_changes(block_hash)?;
match changes.get(key) {
Some(value_ref) => {
return Ok(value_ref);
return Ok(value_ref.map(|value_ref| FlatStateValue::Ref(value_ref)));
}
None => {}
};
}

let value_ref = store_helper::get_ref(&guard.store, guard.shard_uid, key)?;
Ok(value_ref)
let value = store_helper::get_flat_state_value(&guard.store, guard.shard_uid, key)?;
Ok(value)
}

/// Update the head of the flat storage, including updating the flat state in memory and on disk
Expand Down Expand Up @@ -331,13 +330,12 @@ mod tests {
use crate::flat::delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata};
use crate::flat::manager::FlatStorageManager;
use crate::flat::storage::FlatStorage;
use crate::flat::types::{BlockInfo, FlatStorageError};
use crate::flat::types::{BlockInfo, FlatStateValue, FlatStorageError};
use crate::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use crate::test_utils::create_test_store;
use crate::StorageError;
use borsh::BorshSerialize;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::state::ValueRef;
use near_primitives::types::BlockHeight;

use assert_matches::assert_matches;
Expand Down Expand Up @@ -551,11 +549,19 @@ mod tests {
shard_uid,
FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }),
);
store_helper::set_ref(&mut store_update, shard_uid, vec![1], Some(ValueRef::new(&[0])))
.unwrap();
store_helper::set_flat_state_value(
&mut store_update,
shard_uid,
vec![1],
Some(FlatStateValue::value_ref(&[0])),
)
.unwrap();
for i in 1..10 {
let delta = FlatStateDelta {
changes: FlatStateChanges::from([(vec![1], Some(ValueRef::new(&[i as u8])))]),
changes: FlatStateChanges::from([(
vec![1],
Some(FlatStateValue::value_ref(&[i as u8])),
)]),
metadata: FlatStateDeltaMetadata { block: chain.get_block(i) },
};
store_helper::set_delta(&mut store_update, shard_uid, &delta).unwrap();
Expand All @@ -574,7 +580,10 @@ mod tests {
assert_eq!(blocks.len(), i as usize);
let chunk_view =
flat_storage_manager.chunk_view(shard_uid, Some(block_hash), false).unwrap();
assert_eq!(chunk_view.get_ref(&[1]).unwrap(), Some(ValueRef::new(&[i as u8])));
assert_eq!(
chunk_view.get_value(&[1]).unwrap(),
Some(FlatStateValue::value_ref(&[i as u8]))
);
}

// 3. Create a new block that deletes &[1] and add a new value &[2]
Expand All @@ -584,7 +593,7 @@ mod tests {
.add_delta(FlatStateDelta {
changes: FlatStateChanges::from([
(vec![1], None),
(vec![2], Some(ValueRef::new(&[1]))),
(vec![2], Some(FlatStateValue::value_ref(&[1]))),
]),
metadata: FlatStateDeltaMetadata { block: chain.get_block_info(&hash) },
})
Expand All @@ -601,10 +610,10 @@ mod tests {
let chunk_view1 = flat_storage_manager
.chunk_view(shard_uid, Some(chain.get_block_hash(4)), false)
.unwrap();
assert_eq!(chunk_view0.get_ref(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1])));
assert_eq!(chunk_view1.get_ref(&[1]).unwrap(), Some(ValueRef::new(&[4])));
assert_eq!(chunk_view1.get_ref(&[2]).unwrap(), None);
assert_eq!(chunk_view0.get_value(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_value(&[2]).unwrap(), Some(FlatStateValue::value_ref(&[1])));
assert_eq!(chunk_view1.get_value(&[1]).unwrap(), Some(FlatStateValue::value_ref(&[4])));
assert_eq!(chunk_view1.get_value(&[2]).unwrap(), None);
assert_matches!(
store_helper::get_delta_changes(&store, shard_uid, chain.get_block_hash(5)).unwrap(),
Some(_)
Expand All @@ -618,15 +627,15 @@ mod tests {
// and chunk_view1 returns an error. Also check that DBCol::FlatState is updated correctly
flat_storage.update_flat_head(&chain.get_block_hash(5)).unwrap();
assert_eq!(
store_helper::get_ref(&store, shard_uid, &[1]).unwrap(),
Some(ValueRef::new(&[5]))
store_helper::get_flat_state_value(&store, shard_uid, &[1]).unwrap(),
Some(FlatStateValue::value_ref(&[5]))
);
let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap();
assert_eq!(blocks.len(), 5);
assert_eq!(chunk_view0.get_ref(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1])));
assert_eq!(chunk_view0.get_value(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_value(&[2]).unwrap(), Some(FlatStateValue::value_ref(&[1])));
assert_matches!(
chunk_view1.get_ref(&[1]),
chunk_view1.get_value(&[1]),
Err(StorageError::FlatStorageBlockNotSupported(_))
);
assert_matches!(
Expand All @@ -643,13 +652,13 @@ mod tests {
flat_storage.update_flat_head(&chain.get_block_hash(10)).unwrap();
let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap();
assert_eq!(blocks.len(), 0);
assert_eq!(store_helper::get_ref(&store, shard_uid, &[1]).unwrap(), None);
assert_eq!(store_helper::get_flat_state_value(&store, shard_uid, &[1]).unwrap(), None);
assert_eq!(
store_helper::get_ref(&store, shard_uid, &[2]).unwrap(),
Some(ValueRef::new(&[1]))
store_helper::get_flat_state_value(&store, shard_uid, &[2]).unwrap(),
Some(FlatStateValue::value_ref(&[1]))
);
assert_eq!(chunk_view0.get_ref(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1])));
assert_eq!(chunk_view0.get_value(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_value(&[2]).unwrap(), Some(FlatStateValue::value_ref(&[1])));
assert_matches!(
store_helper::get_delta_changes(&store, shard_uid, chain.get_block_hash(10)).unwrap(),
None
Expand Down
Loading