Skip to content

Commit

Permalink
feat: enable FlatState values inlining migration in the background (#…
Browse files Browse the repository at this point in the history
…9093)

Part of #8243.

This PR enables the migration added in #9037 to be executed in the background on the running node.
It supports graceful stop when the node is shut down. The implementation is heavily inspired by state sync background dumping to S3.

This PR also introduces a new column `DBCol::Misc`. For now it only stores the status of the migration, but it can hold any small pieces of data, similar to `DBCol::BlockMisc`.

`FlatStorageManager` is exposed as part of `RuntimeAdapter` in this PR. This is the first step in cleaning `RuntimeAdapter` from all other flat storage related methods, as the manager can be directly used instead.

Tested by manually running a node and checking metrics and log messages. After that flat storage was checked with `flat-storage verify` cmd.
  • Loading branch information
pugachAG authored and nikurt committed Jun 13, 2023
1 parent eefb531 commit 83f2688
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 160 deletions.
25 changes: 25 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use near_primitives::version::{
MIN_PROTOCOL_VERSION_NEP_92_FIX,
};
use near_primitives::views::{QueryRequest, QueryResponse};
use near_store::flat::{FlatStorage, FlatStorageStatus};
use near_store::{PartialStorage, ShardTries, Store, StoreUpdate, Trie, WrappedTrieChanges};

pub use near_epoch_manager::EpochManagerAdapter;
Expand Down Expand Up @@ -306,6 +307,30 @@ pub trait RuntimeAdapter: Send + Sync {

fn get_flat_storage_manager(&self) -> Option<FlatStorageManager>;

fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option<FlatStorage>;

fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus;

/// Creates flat storage state for given shard, assuming that all flat storage data
/// is already stored in DB.
/// TODO (#7327): consider returning flat storage creation errors here
fn create_flat_storage_for_shard(&self, shard_uid: ShardUId);

/// Removes flat storage state for shard, if it exists.
/// Used to clear old flat storage data from disk and memory before syncing to newer state.
fn remove_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
epoch_id: &EpochId,
) -> Result<(), Error>;

fn set_flat_storage_for_genesis(
&self,
genesis_block: &CryptoHash,
genesis_block_height: BlockHeight,
genesis_epoch_id: &EpochId,
) -> Result<StoreUpdate, Error>;

/// Validates a given signed transaction.
/// If the state root is given, then the verification will use the account. Otherwise it will
/// only validate the transaction math, limits and signatures.
Expand Down
6 changes: 2 additions & 4 deletions core/store/src/flat/inlining_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crossbeam::channel;
use itertools::Itertools;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;
use tracing::{debug, info};

use crate::flat::store_helper::set_flat_state_values_inlining_migration_status;
Expand All @@ -22,7 +21,7 @@ use super::store_helper::{
decode_flat_state_db_key, get_flat_state_values_inlining_migration_status,
};
use super::types::{FlatStateValuesInliningMigrationStatus, INLINE_DISK_VALUE_THRESHOLD};
use super::FlatStorageManager;
use super::{FlatStateValue, FlatStorageManager};

pub struct FlatStateValuesInliningMigrationHandle {
handle: JoinHandle<()>,
Expand Down Expand Up @@ -285,11 +284,10 @@ mod tests {
use borsh::{BorshDeserialize, BorshSerialize};
use near_primitives::hash::hash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::state::FlatStateValue;

use crate::flat::store_helper::encode_flat_state_db_key;
use crate::flat::types::INLINE_DISK_VALUE_THRESHOLD;
use crate::flat::FlatStorageManager;
use crate::flat::{FlatStateValue, FlatStorageManager};
use crate::{DBCol, NodeStorage, TrieCachingStorage};

use super::inline_flat_state_values;
Expand Down
211 changes: 69 additions & 142 deletions core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,55 @@
//! This file contains helper functions for accessing flat storage data in DB
//! TODO(#8577): remove this file and move functions to the corresponding structs
use std::io;

use crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY;
use crate::flat::delta::{FlatStateChanges, KeyForFlatStateDelta};
use crate::flat::types::FlatStorageError;
use crate::{DBCol, Store, StoreUpdate};
use borsh::BorshDeserialize;
use near_primitives::errors::StorageError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;
use near_primitives::shard_layout::{ShardLayout, ShardUId};

use super::delta::{FlatStateDelta, FlatStateDeltaMetadata};
use super::types::{
FlatStateIterator, FlatStateValuesInliningMigrationStatus, FlatStorageResult, FlatStorageStatus,
};
use super::types::{FlatStateValue, FlatStateValuesInliningMigrationStatus, FlatStorageStatus};

/// Prefixes for keys in `FlatStateMisc` DB column.
pub const FLAT_STATE_HEAD_KEY_PREFIX: &[u8; 4] = b"HEAD";
pub const FLAT_STATE_CREATION_STATUS_KEY_PREFIX: &[u8; 6] = b"STATUS";

pub fn get_delta_changes(
store: &Store,
shard_uid: ShardUId,
block_hash: CryptoHash,
) -> FlatStorageResult<Option<FlatStateChanges>> {
) -> Result<Option<FlatStateChanges>, FlatStorageError> {
let key = KeyForFlatStateDelta { shard_uid, block_hash };
store.get_ser::<FlatStateChanges>(DBCol::FlatStateChanges, &key.to_bytes()).map_err(|err| {
FlatStorageError::StorageInternalError(format!(
"failed to read delta changes for {key:?}: {err}"
))
})
Ok(store
.get_ser::<FlatStateChanges>(DBCol::FlatStateChanges, &key.to_bytes())
.map_err(|_| FlatStorageError::StorageInternalError)?)
}

pub fn get_all_deltas_metadata(
store: &Store,
shard_uid: ShardUId,
) -> FlatStorageResult<Vec<FlatStateDeltaMetadata>> {
) -> Result<Vec<FlatStateDeltaMetadata>, FlatStorageError> {
store
.iter_prefix_ser(DBCol::FlatStateDeltaMetadata, &shard_uid.to_bytes())
.map(|res| {
res.map(|(_, value)| value).map_err(|err| {
FlatStorageError::StorageInternalError(format!(
"failed to read delta metadata: {err}"
))
})
})
.map(|res| res.map(|(_, value)| value).map_err(|_| FlatStorageError::StorageInternalError))
.collect()
}

pub fn set_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, delta: &FlatStateDelta) {
pub fn set_delta(
store_update: &mut StoreUpdate,
shard_uid: ShardUId,
delta: &FlatStateDelta,
) -> Result<(), FlatStorageError> {
let key = KeyForFlatStateDelta { shard_uid, block_hash: delta.metadata.block.hash }.to_bytes();
store_update
.set_ser(DBCol::FlatStateChanges, &key, &delta.changes)
.expect("Borsh should not have failed here");
.map_err(|_| FlatStorageError::StorageInternalError)?;
store_update
.set_ser(DBCol::FlatStateDeltaMetadata, &key, &delta.metadata)
.expect("Borsh should not have failed here");
.map_err(|_| FlatStorageError::StorageInternalError)?;
Ok(())
}

pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_hash: CryptoHash) {
Expand All @@ -62,19 +58,11 @@ pub fn remove_delta(store_update: &mut StoreUpdate, shard_uid: ShardUId, block_h
store_update.delete(DBCol::FlatStateDeltaMetadata, &key);
}

fn remove_range_by_shard_uid(store_update: &mut StoreUpdate, shard_uid: ShardUId, col: DBCol) {
pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
let key_from = shard_uid.to_bytes();
let key_to = ShardUId::next_shard_prefix(&key_from);
store_update.delete_range(col, &key_from, &key_to);
}

pub fn remove_all_deltas(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateChanges);
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatStateDeltaMetadata);
}

pub fn remove_all_flat_state_values(store_update: &mut StoreUpdate, shard_uid: ShardUId) {
remove_range_by_shard_uid(store_update, shard_uid, DBCol::FlatState);
store_update.delete_range(DBCol::FlatStateChanges, &key_from, &key_to);
store_update.delete_range(DBCol::FlatStateDeltaMetadata, &key_from, &key_to);
}

pub(crate) fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec<u8> {
Expand All @@ -84,60 +72,50 @@ pub(crate) fn encode_flat_state_db_key(shard_uid: ShardUId, key: &[u8]) -> Vec<u
buffer
}

pub(crate) fn decode_flat_state_db_key(key: &[u8]) -> io::Result<(ShardUId, Vec<u8>)> {
pub(crate) fn decode_flat_state_db_key(
key: &Box<[u8]>,
) -> Result<(ShardUId, Vec<u8>), StorageError> {
if key.len() < 8 {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("expected FlatState key length to be at least 8: {key:?}"),
));
return Err(StorageError::StorageInconsistentState(format!(
"Found key in flat storage with length < 8: {key:?}"
)));
}
let (shard_uid_bytes, trie_key) = key.split_at(8);
let shard_uid = shard_uid_bytes.try_into().map_err(|err| {
io::Error::new(
io::ErrorKind::Other,
format!("failed to decode shard_uid as part of FlatState key: {err}"),
)
let shard_uid = shard_uid_bytes.try_into().map_err(|_| {
StorageError::StorageInconsistentState(format!(
"Incorrect raw shard uid: {shard_uid_bytes:?}"
))
})?;
Ok((shard_uid, trie_key.to_vec()))
}

pub fn get_flat_state_values_inlining_migration_status(
store: &Store,
) -> FlatStorageResult<FlatStateValuesInliningMigrationStatus> {
) -> Result<FlatStateValuesInliningMigrationStatus, FlatStorageError> {
store
.get_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY)
.map(|status| status.unwrap_or(FlatStateValuesInliningMigrationStatus::Empty))
.map_err(|err| {
FlatStorageError::StorageInternalError(format!(
"failed to read FlatState values inlining migration status: {err}"
))
})
.map_err(|_| FlatStorageError::StorageInternalError)
}

pub fn set_flat_state_values_inlining_migration_status(
store: &Store,
status: FlatStateValuesInliningMigrationStatus,
) -> FlatStorageResult<()> {
) -> Result<(), FlatStorageError> {
let mut store_update = store.store_update();
store_update
.set_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY, &status)
.expect("Borsh should not have failed here");
store_update.commit().map_err(|err| {
FlatStorageError::StorageInternalError(format!(
"failed to commit FlatState values inlining migration status: {err}"
))
})
.map_err(|_| FlatStorageError::StorageInternalError)?;
store_update.commit().map_err(|_| FlatStorageError::StorageInternalError)
}

pub(crate) fn get_flat_state_value(
store: &Store,
shard_uid: ShardUId,
key: &[u8],
) -> FlatStorageResult<Option<FlatStateValue>> {
) -> Result<Option<FlatStateValue>, FlatStorageError> {
let db_key = encode_flat_state_db_key(shard_uid, key);
store.get_ser(DBCol::FlatState, &db_key).map_err(|err| {
FlatStorageError::StorageInternalError(format!("failed to read FlatState value: {err}"))
})
store.get_ser(DBCol::FlatState, &db_key).map_err(|_| FlatStorageError::StorageInternalError)
}

// TODO(#8577): make pub(crate) once flat storage creator is moved inside `flat` module.
Expand All @@ -146,28 +124,21 @@ pub fn set_flat_state_value(
shard_uid: ShardUId,
key: Vec<u8>,
value: Option<FlatStateValue>,
) {
) -> Result<(), FlatStorageError> {
let db_key = encode_flat_state_db_key(shard_uid, &key);
match value {
Some(value) => store_update
.set_ser(DBCol::FlatState, &db_key, &value)
.expect("Borsh should not have failed here"),
None => store_update.delete(DBCol::FlatState, &db_key),
.map_err(|_| FlatStorageError::StorageInternalError),
None => Ok(store_update.delete(DBCol::FlatState, &db_key)),
}
}

pub fn get_flat_storage_status(
store: &Store,
shard_uid: ShardUId,
) -> FlatStorageResult<FlatStorageStatus> {
pub fn get_flat_storage_status(store: &Store, shard_uid: ShardUId) -> FlatStorageStatus {
store
.get_ser(DBCol::FlatStorageStatus, &shard_uid.to_bytes())
.map(|status| status.unwrap_or(FlatStorageStatus::Empty))
.map_err(|err| {
FlatStorageError::StorageInternalError(format!(
"failed to read flat storage status: {err}"
))
})
.expect("Error reading flat head from storage")
.unwrap_or(FlatStorageStatus::Empty)
}

pub fn set_flat_storage_status(
Expand All @@ -177,19 +148,19 @@ pub fn set_flat_storage_status(
) {
store_update
.set_ser(DBCol::FlatStorageStatus, &shard_uid.to_bytes(), &status)
.expect("Borsh should not have failed here")
.expect("Borsh should not fail")
}

/// Returns iterator over flat storage entries for a given shard and range of
/// state keys. `None` means that there is no bound in respective direction.
/// It reads data only from `FlatState` column which represents the state at
/// flat storage head. Reads only commited changes.
/// flat storage head.
pub fn iter_flat_state_entries<'a>(
shard_uid: ShardUId,
store: &'a Store,
from: Option<&[u8]>,
to: Option<&[u8]>,
) -> FlatStateIterator<'a> {
) -> impl Iterator<Item = (Vec<u8>, Box<[u8]>)> + 'a {
// If left direction is unbounded, encoded `shard_uid` serves as the
// smallest possible key in DB for the shard.
let db_key_from = match from {
Expand All @@ -203,68 +174,24 @@ pub fn iter_flat_state_entries<'a>(
Some(to) => encode_flat_state_db_key(shard_uid, to),
None => ShardUId::next_shard_prefix(&shard_uid.to_bytes()).to_vec(),
};
let iter =
store.iter_range(DBCol::FlatState, Some(&db_key_from), Some(&db_key_to)).map(|result| {
match result {
Ok((key, value)) => Ok((
decode_flat_state_db_key(&key)
.map_err(|err| {
FlatStorageError::StorageInternalError(format!(
"invalid FlatState key format: {err}"
))
})?
.1,
FlatStateValue::try_from_slice(&value).map_err(|err| {
FlatStorageError::StorageInternalError(format!(
"invalid FlatState value format: {err}"
))
})?,
)),
Err(err) => Err(FlatStorageError::StorageInternalError(format!(
"FlatState iterator error: {err}"
))),
store.iter_range(DBCol::FlatState, Some(&db_key_from), Some(&db_key_to)).filter_map(
move |result| {
if let Ok((key, value)) = result {
let (_, trie_key) = decode_flat_state_db_key(&key).unwrap();
return Some((trie_key, value));
}
});
Box::new(iter)
}

#[cfg(test)]
mod tests {
use crate::flat::store_helper::set_flat_state_value;
use crate::test_utils::create_test_store;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state::FlatStateValue;

#[test]
fn iter_flat_state_entries() {
// Setup shards and store
let store = create_test_store();
let shard_uids = [0, 1, 2].map(|id| ShardUId { version: 0, shard_id: id });

for (i, shard_uid) in shard_uids.iter().enumerate() {
let mut store_update = store.store_update();
let key: Vec<u8> = vec![0, 1, i as u8];
let val: Vec<u8> = vec![0, 1, 2, i as u8];

// Add value to FlatState
set_flat_state_value(
&mut store_update,
*shard_uid,
key.clone(),
Some(FlatStateValue::inlined(&val)),
);

store_update.commit().unwrap();
}

for (i, shard_uid) in shard_uids.iter().enumerate() {
let entries: Vec<_> =
super::iter_flat_state_entries(*shard_uid, &store, None, None).collect();
assert_eq!(entries.len(), 1);
let key: Vec<u8> = vec![0, 1, i as u8];
let val: Vec<u8> = vec![0, 1, 2, i as u8];

assert_eq!(entries, vec![Ok((key, FlatStateValue::inlined(&val)))]);
}
}
return None;
},
)
}

/// Currently all the data in flat storage is 'together' - so we have to parse the key,
/// to see if this element belongs to this shard.
pub fn key_belongs_to_shard(
key: &Box<[u8]>,
shard_layout: &ShardLayout,
shard_id: u64,
) -> Result<bool, StorageError> {
let (key_shard_uid, _) = decode_flat_state_db_key(key)?;
Ok(key_shard_uid.version == shard_layout.version() && key_shard_uid.shard_id as u64 == shard_id)
}
Loading

0 comments on commit 83f2688

Please sign in to comment.