Skip to content

Commit

Permalink
feat: implement inlining for small values in flat storage (#8988)
Browse files Browse the repository at this point in the history
Part of #8243.

This PR implements value inlining MVP for state sync:
* add `FlatStateValue::Inlined` variant to store inlined values as part of `FlatState` and `FlatStateChanges` on disk.
* change flat storage API to return `FlatStateValue` instead of `ValueRef`.

The following will be implemented separately:
* Migration for existing `FlatState` values. This is required for state sync, but quite involved, so decided to keep it separately.
* Inlining for cached flat state deltas: for now we keep those as `ValueRef`.
* Using inlined values for transaction processing: for now we convert inlined values to `ValueRef`.
  • Loading branch information
pugachAG authored May 3, 2023
1 parent 132b52a commit 22c1466
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 86 deletions.
17 changes: 10 additions & 7 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ use assert_matches::assert_matches;
use crossbeam_channel::{unbounded, Receiver, Sender};
use near_chain_primitives::Error;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::ValueRef;
use near_primitives::state_part::PartId;
use near_primitives::types::{AccountId, BlockHeight, StateRoot};
use near_store::flat::{
store_helper, BlockInfo, FetchingStateStatus, FlatStateChanges, FlatStorageCreationMetrics,
FlatStorageCreationStatus, FlatStorageReadyStatus, FlatStorageStatus, NUM_PARTS_IN_ONE_STEP,
STATE_PART_MEMORY_LIMIT,
store_helper, BlockInfo, FetchingStateStatus, FlatStateChanges, FlatStateValue,
FlatStorageCreationMetrics, FlatStorageCreationStatus, FlatStorageReadyStatus,
FlatStorageStatus, NUM_PARTS_IN_ONE_STEP, STATE_PART_MEMORY_LIMIT,
};
use near_store::Store;
use near_store::{Trie, TrieDBStorage, TrieTraversalItem};
Expand Down Expand Up @@ -104,9 +103,13 @@ impl FlatStorageShardCreator {
{
if let Some(key) = key {
let value = trie.storage.retrieve_raw_bytes(&hash).unwrap();
let value_ref = ValueRef::new(&value);
store_helper::set_ref(&mut store_update, shard_uid, key, Some(value_ref))
.expect("Failed to put value in FlatState");
store_helper::set_flat_state_value(
&mut store_update,
shard_uid,
key,
Some(FlatStateValue::value_ref(&value)),
)
.expect("Failed to put value in FlatState");
num_items += 1;
}
}
Expand Down
7 changes: 3 additions & 4 deletions core/store/src/flat/chunk_view.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use near_primitives::hash::CryptoHash;
use near_primitives::state::ValueRef;

use crate::Store;

use super::FlatStorage;
use super::{FlatStateValue, FlatStorage};

/// Struct for getting value references from the flat storage, corresponding
/// to some block defined in `blocks_to_head`.
Expand Down Expand Up @@ -39,7 +38,7 @@ impl FlatStorageChunkView {
/// they are stored in `DBCol::State`. Also the separation is done so we
/// could charge users for the value length before loading the value.
// TODO (#7327): consider inlining small values, so we could use only one db access.
pub fn get_ref(&self, key: &[u8]) -> Result<Option<ValueRef>, crate::StorageError> {
self.flat_storage.get_ref(&self.block_hash, key)
pub fn get_value(&self, key: &[u8]) -> Result<Option<FlatStateValue>, crate::StorageError> {
self.flat_storage.get_value(&self.block_hash, key)
}
}
73 changes: 44 additions & 29 deletions core/store/src/flat/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use near_primitives::types::RawStateChangesWithTrieKey;
use std::collections::HashMap;
use std::sync::Arc;

use super::{store_helper, BlockInfo};
use super::types::INLINE_DISK_VALUE_THRESHOLD;
use super::{store_helper, BlockInfo, FlatStateValue};
use crate::{CryptoHash, StoreUpdate};

#[derive(Debug)]
Expand All @@ -34,14 +35,14 @@ impl KeyForFlatStateDelta {
res
}
}
/// Delta of the state for some shard and block, stores mapping from keys to value refs or None, if key was removed in
/// this block.
/// Delta of the state for some shard and block, stores mapping from keys to values
/// or None, if key was removed in this block.
#[derive(BorshSerialize, BorshDeserialize, Clone, Default, PartialEq, Eq)]
pub struct FlatStateChanges(pub(crate) HashMap<Vec<u8>, Option<ValueRef>>);
pub struct FlatStateChanges(pub(crate) HashMap<Vec<u8>, Option<FlatStateValue>>);

impl<T> From<T> for FlatStateChanges
where
T: IntoIterator<Item = (Vec<u8>, Option<ValueRef>)>,
T: IntoIterator<Item = (Vec<u8>, Option<FlatStateValue>)>,
{
fn from(iter: T) -> Self {
Self(HashMap::from_iter(iter))
Expand All @@ -57,13 +58,17 @@ impl std::fmt::Debug for FlatStateChanges {
}

impl FlatStateChanges {
/// Returns `Some(Option<ValueRef>)` from delta for the given key. If key is not present, returns None.
pub fn get(&self, key: &[u8]) -> Option<Option<ValueRef>> {
/// Returns `Some(Option<FlatStateValue>)` from delta for the given key. If key is not present, returns None.
pub fn get(&self, key: &[u8]) -> Option<Option<FlatStateValue>> {
self.0.get(key).cloned()
}

/// Inserts a key-value pair to delta.
pub fn insert(&mut self, key: Vec<u8>, value: Option<ValueRef>) -> Option<Option<ValueRef>> {
pub fn insert(
&mut self,
key: Vec<u8>,
value: Option<FlatStateValue>,
) -> Option<Option<FlatStateValue>> {
self.0.insert(key, value)
}

Expand All @@ -88,20 +93,23 @@ impl FlatStateChanges {
.last()
.expect("Committed entry should have at least one change")
.data;
match last_change {
Some(value) => {
delta.insert(key, Some(near_primitives::state::ValueRef::new(value)))
let flat_state_value = last_change.as_ref().map(|value| {
if value.len() <= INLINE_DISK_VALUE_THRESHOLD {
FlatStateValue::inlined(value)
} else {
FlatStateValue::value_ref(value)
}
None => delta.insert(key, None),
};
});
delta.insert(key, flat_state_value);
}
Self(delta)
}

/// Applies delta to the flat state.
pub fn apply_to_flat_state(self, store_update: &mut StoreUpdate, shard_uid: ShardUId) {
for (key, value) in self.0.into_iter() {
store_helper::set_ref(store_update, shard_uid, key, value).expect("Borsh cannot fail");
store_helper::set_flat_state_value(store_update, shard_uid, key, value)
.expect("Borsh cannot fail");
}
}
}
Expand All @@ -117,7 +125,13 @@ pub struct CachedFlatStateDelta {

impl From<FlatStateChanges> for CachedFlatStateChanges {
fn from(delta: FlatStateChanges) -> Self {
Self(delta.0.into_iter().map(|(key, value)| (hash(&key), value)).collect())
Self(
delta
.0
.into_iter()
.map(|(key, value)| (hash(&key), value.map(|v| v.to_value_ref())))
.collect(),
)
}
}

Expand All @@ -144,8 +158,9 @@ impl CachedFlatStateChanges {

#[cfg(test)]
mod tests {
use crate::flat::FlatStateValue;

use super::FlatStateChanges;
use near_primitives::state::ValueRef;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{RawStateChange, RawStateChangesWithTrieKey, StateChangeCause};

Expand Down Expand Up @@ -208,17 +223,17 @@ mod tests {
let flat_state_changes = FlatStateChanges::from_state_changes(&state_changes);
assert_eq!(
flat_state_changes.get(&alice_trie_key.to_vec()),
Some(Some(ValueRef::new(&[3, 4])))
Some(Some(FlatStateValue::inlined(&[3, 4])))
);
assert_eq!(flat_state_changes.get(&bob_trie_key.to_vec()), Some(None));
assert_eq!(flat_state_changes.get(&carol_trie_key.to_vec()), None);
assert_eq!(
flat_state_changes.get(&delayed_trie_key.to_vec()),
Some(Some(ValueRef::new(&[1])))
Some(Some(FlatStateValue::inlined(&[1])))
);
assert_eq!(
flat_state_changes.get(&delayed_receipt_trie_key.to_vec()),
Some(Some(ValueRef::new(&[2])))
Some(Some(FlatStateValue::inlined(&[2])))
);
}

Expand All @@ -227,23 +242,23 @@ mod tests {
#[test]
fn flat_state_changes_merge() {
let mut changes = FlatStateChanges::from([
(vec![1], Some(ValueRef::new(&[4]))),
(vec![2], Some(ValueRef::new(&[5]))),
(vec![1], Some(FlatStateValue::value_ref(&[4]))),
(vec![2], Some(FlatStateValue::value_ref(&[5]))),
(vec![3], None),
(vec![4], Some(ValueRef::new(&[6]))),
(vec![4], Some(FlatStateValue::value_ref(&[6]))),
]);
let changes_new = FlatStateChanges::from([
(vec![2], Some(ValueRef::new(&[7]))),
(vec![3], Some(ValueRef::new(&[8]))),
(vec![2], Some(FlatStateValue::value_ref(&[7]))),
(vec![3], Some(FlatStateValue::value_ref(&[8]))),
(vec![4], None),
(vec![5], Some(ValueRef::new(&[9]))),
(vec![5], Some(FlatStateValue::value_ref(&[9]))),
]);
changes.merge(changes_new);

assert_eq!(changes.get(&[1]), Some(Some(ValueRef::new(&[4]))));
assert_eq!(changes.get(&[2]), Some(Some(ValueRef::new(&[7]))));
assert_eq!(changes.get(&[3]), Some(Some(ValueRef::new(&[8]))));
assert_eq!(changes.get(&[1]), Some(Some(FlatStateValue::value_ref(&[4]))));
assert_eq!(changes.get(&[2]), Some(Some(FlatStateValue::value_ref(&[7]))));
assert_eq!(changes.get(&[3]), Some(Some(FlatStateValue::value_ref(&[8]))));
assert_eq!(changes.get(&[4]), Some(None));
assert_eq!(changes.get(&[5]), Some(Some(ValueRef::new(&[9]))));
assert_eq!(changes.get(&[5]), Some(Some(FlatStateValue::value_ref(&[9]))));
}
}
2 changes: 1 addition & 1 deletion core/store/src/flat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub use manager::FlatStorageManager;
pub use metrics::FlatStorageCreationMetrics;
pub use storage::FlatStorage;
pub use types::{
BlockInfo, FetchingStateStatus, FlatStorageCreationStatus, FlatStorageError,
BlockInfo, FetchingStateStatus, FlatStateValue, FlatStorageCreationStatus, FlatStorageError,
FlatStorageReadyStatus, FlatStorageStatus,
};

Expand Down
65 changes: 37 additions & 28 deletions core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::{Arc, RwLock};
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
use near_primitives::state::ValueRef;
use tracing::{debug, info, warn};

use crate::flat::delta::CachedFlatStateChanges;
Expand All @@ -14,7 +13,7 @@ use crate::{Store, StoreUpdate};

use super::delta::{CachedFlatStateDelta, FlatStateDelta};
use super::metrics::FlatStorageMetrics;
use super::types::FlatStorageError;
use super::types::{FlatStateValue, FlatStorageError};
use super::{store_helper, BlockInfo};

/// FlatStorage stores information on which blocks flat storage current supports key lookups on.
Expand Down Expand Up @@ -177,11 +176,11 @@ impl FlatStorage {
guard.get_blocks_to_head(target_block_hash)
}

pub fn get_ref(
pub fn get_value(
&self,
block_hash: &CryptoHash,
key: &[u8],
) -> Result<Option<ValueRef>, crate::StorageError> {
) -> Result<Option<FlatStateValue>, crate::StorageError> {
let guard = self.0.read().expect(super::POISONED_LOCK_ERR);
let blocks_to_head =
guard.get_blocks_to_head(block_hash).map_err(|e| StorageError::from(e))?;
Expand All @@ -190,14 +189,14 @@ impl FlatStorage {
let changes = guard.get_block_changes(block_hash)?;
match changes.get(key) {
Some(value_ref) => {
return Ok(value_ref);
return Ok(value_ref.map(|value_ref| FlatStateValue::Ref(value_ref)));
}
None => {}
};
}

let value_ref = store_helper::get_ref(&guard.store, guard.shard_uid, key)?;
Ok(value_ref)
let value = store_helper::get_flat_state_value(&guard.store, guard.shard_uid, key)?;
Ok(value)
}

/// Update the head of the flat storage, including updating the flat state in memory and on disk
Expand Down Expand Up @@ -331,13 +330,12 @@ mod tests {
use crate::flat::delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata};
use crate::flat::manager::FlatStorageManager;
use crate::flat::storage::FlatStorage;
use crate::flat::types::{BlockInfo, FlatStorageError};
use crate::flat::types::{BlockInfo, FlatStateValue, FlatStorageError};
use crate::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use crate::test_utils::create_test_store;
use crate::StorageError;
use borsh::BorshSerialize;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::state::ValueRef;
use near_primitives::types::BlockHeight;

use assert_matches::assert_matches;
Expand Down Expand Up @@ -551,11 +549,19 @@ mod tests {
shard_uid,
FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }),
);
store_helper::set_ref(&mut store_update, shard_uid, vec![1], Some(ValueRef::new(&[0])))
.unwrap();
store_helper::set_flat_state_value(
&mut store_update,
shard_uid,
vec![1],
Some(FlatStateValue::value_ref(&[0])),
)
.unwrap();
for i in 1..10 {
let delta = FlatStateDelta {
changes: FlatStateChanges::from([(vec![1], Some(ValueRef::new(&[i as u8])))]),
changes: FlatStateChanges::from([(
vec![1],
Some(FlatStateValue::value_ref(&[i as u8])),
)]),
metadata: FlatStateDeltaMetadata { block: chain.get_block(i) },
};
store_helper::set_delta(&mut store_update, shard_uid, &delta).unwrap();
Expand All @@ -574,7 +580,10 @@ mod tests {
assert_eq!(blocks.len(), i as usize);
let chunk_view =
flat_storage_manager.chunk_view(shard_uid, Some(block_hash), false).unwrap();
assert_eq!(chunk_view.get_ref(&[1]).unwrap(), Some(ValueRef::new(&[i as u8])));
assert_eq!(
chunk_view.get_value(&[1]).unwrap(),
Some(FlatStateValue::value_ref(&[i as u8]))
);
}

// 3. Create a new block that deletes &[1] and add a new value &[2]
Expand All @@ -584,7 +593,7 @@ mod tests {
.add_delta(FlatStateDelta {
changes: FlatStateChanges::from([
(vec![1], None),
(vec![2], Some(ValueRef::new(&[1]))),
(vec![2], Some(FlatStateValue::value_ref(&[1]))),
]),
metadata: FlatStateDeltaMetadata { block: chain.get_block_info(&hash) },
})
Expand All @@ -601,10 +610,10 @@ mod tests {
let chunk_view1 = flat_storage_manager
.chunk_view(shard_uid, Some(chain.get_block_hash(4)), false)
.unwrap();
assert_eq!(chunk_view0.get_ref(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1])));
assert_eq!(chunk_view1.get_ref(&[1]).unwrap(), Some(ValueRef::new(&[4])));
assert_eq!(chunk_view1.get_ref(&[2]).unwrap(), None);
assert_eq!(chunk_view0.get_value(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_value(&[2]).unwrap(), Some(FlatStateValue::value_ref(&[1])));
assert_eq!(chunk_view1.get_value(&[1]).unwrap(), Some(FlatStateValue::value_ref(&[4])));
assert_eq!(chunk_view1.get_value(&[2]).unwrap(), None);
assert_matches!(
store_helper::get_delta_changes(&store, shard_uid, chain.get_block_hash(5)).unwrap(),
Some(_)
Expand All @@ -618,15 +627,15 @@ mod tests {
// and chunk_view1 returns an error. Also check that DBCol::FlatState is updated correctly
flat_storage.update_flat_head(&chain.get_block_hash(5)).unwrap();
assert_eq!(
store_helper::get_ref(&store, shard_uid, &[1]).unwrap(),
Some(ValueRef::new(&[5]))
store_helper::get_flat_state_value(&store, shard_uid, &[1]).unwrap(),
Some(FlatStateValue::value_ref(&[5]))
);
let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap();
assert_eq!(blocks.len(), 5);
assert_eq!(chunk_view0.get_ref(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1])));
assert_eq!(chunk_view0.get_value(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_value(&[2]).unwrap(), Some(FlatStateValue::value_ref(&[1])));
assert_matches!(
chunk_view1.get_ref(&[1]),
chunk_view1.get_value(&[1]),
Err(StorageError::FlatStorageBlockNotSupported(_))
);
assert_matches!(
Expand All @@ -643,13 +652,13 @@ mod tests {
flat_storage.update_flat_head(&chain.get_block_hash(10)).unwrap();
let blocks = flat_storage.get_blocks_to_head(&chain.get_block_hash(10)).unwrap();
assert_eq!(blocks.len(), 0);
assert_eq!(store_helper::get_ref(&store, shard_uid, &[1]).unwrap(), None);
assert_eq!(store_helper::get_flat_state_value(&store, shard_uid, &[1]).unwrap(), None);
assert_eq!(
store_helper::get_ref(&store, shard_uid, &[2]).unwrap(),
Some(ValueRef::new(&[1]))
store_helper::get_flat_state_value(&store, shard_uid, &[2]).unwrap(),
Some(FlatStateValue::value_ref(&[1]))
);
assert_eq!(chunk_view0.get_ref(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1])));
assert_eq!(chunk_view0.get_value(&[1]).unwrap(), None);
assert_eq!(chunk_view0.get_value(&[2]).unwrap(), Some(FlatStateValue::value_ref(&[1])));
assert_matches!(
store_helper::get_delta_changes(&store, shard_uid, chain.get_block_hash(10)).unwrap(),
None
Expand Down
Loading

0 comments on commit 22c1466

Please sign in to comment.