Skip to content

Commit

Permalink
feat: enable FlatState values inlining migration in the background (#…
Browse files Browse the repository at this point in the history
…9093)

Part of #8243.

This PR enables the migration added in #9037 to be executed in the background on the running node.
It supports graceful stop when the node is shut down. The implementation is heavily inspired by state sync background dumping to S3.

This PR also introduces a new column `DBCol::Misc`. For now it only stores the status of the migration, but it can hold any small pieces of data, similar to `DBCol::BlockMisc`.

`FlatStorageManager` is exposed as part of `RuntimeAdapter` in this PR. This is the first step in cleaning `RuntimeAdapter` from all other flat storage related methods, as the manager can be directly used instead.

Tested by manually running a node and checking metrics and log messages. After that flat storage was checked with `flat-storage verify` cmd.
  • Loading branch information
pugachAG authored May 25, 2023
1 parent c9f0bb5 commit 11ab161
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 10 deletions.
3 changes: 2 additions & 1 deletion chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2568,7 +2568,8 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::FlatState
| DBCol::FlatStateChanges
| DBCol::FlatStateDeltaMetadata
| DBCol::FlatStorageStatus => {
| DBCol::FlatStorageStatus
| DBCol::Misc => {
unreachable!();
}
}
Expand Down
4 changes: 4 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,10 @@ impl RuntimeAdapter for KeyValueRuntime {
.get_trie_for_shard(ShardUId { version: 0, shard_id: shard_id as u32 }, state_root))
}

fn get_flat_storage_manager(&self) -> Option<near_store::flat::FlatStorageManager> {
None
}

fn get_view_trie_for_shard(
&self,
shard_id: ShardId,
Expand Down
3 changes: 3 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use borsh::{BorshDeserialize, BorshSerialize};
use chrono::DateTime;
use chrono::Utc;
use near_primitives::sandbox::state_patch::SandboxStatePatch;
use near_store::flat::FlatStorageManager;
use num_rational::Rational32;

use crate::metrics;
Expand Down Expand Up @@ -299,6 +300,8 @@ pub trait RuntimeAdapter: Send + Sync {
state_root: StateRoot,
) -> Result<Trie, Error>;

fn get_flat_storage_manager(&self) -> Option<FlatStorageManager>;

fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option<FlatStorage>;

fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus;
Expand Down
9 changes: 8 additions & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum DBCol {
/// - *Rows*: single row `"VERSION"`
/// - *Content type*: The version of the database (u32), serialized as JSON.
DbVersion,
/// Column that store Misc cells.
/// Column that stores miscellaneous block-related cells.
/// - *Rows*: multiple, for example `"GENESIS_JSON_HASH"`, `"HEAD_KEY"`, `"LATEST_KNOWN_KEY"` etc.
/// - *Content type*: cell specific.
BlockMisc,
Expand Down Expand Up @@ -273,6 +273,11 @@ pub enum DBCol {
/// - *Rows*: `shard_uid`
/// - *Column type*: `FlatStorageStatus`
FlatStorageStatus,
/// Column to persist pieces of miscellaneous small data. Should only be used to store
/// constant or small (for example per-shard) amount of data.
/// - *Rows*: arbitrary string, see `crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY` for example
/// - *Column type*: arbitrary bytes
Misc,
}

/// Defines different logical parts of a db key.
Expand Down Expand Up @@ -422,6 +427,7 @@ impl DBCol {

// TODO
DBCol::ChallengedBlocks => false,
DBCol::Misc => false,
// BlockToCatchup is only needed while syncing and it is not immutable.
DBCol::BlocksToCatchup => false,
// BlockRefCount is only needed when handling forks and it is not immutable.
Expand Down Expand Up @@ -479,6 +485,7 @@ impl DBCol {
match self {
DBCol::DbVersion => &[DBKeyType::StringLiteral],
DBCol::BlockMisc => &[DBKeyType::StringLiteral],
DBCol::Misc => &[DBKeyType::StringLiteral],
DBCol::Block => &[DBKeyType::BlockHash],
DBCol::BlockHeader => &[DBKeyType::BlockHash],
DBCol::BlockHeight => &[DBKeyType::BlockHeight],
Expand Down
5 changes: 5 additions & 0 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use self::splitdb::SplitDB;
pub use self::slice::DBSlice;
pub use self::testdb::TestDB;

// `DBCol::BlockMisc` keys
pub const HEAD_KEY: &[u8; 4] = b"HEAD";
pub const TAIL_KEY: &[u8; 4] = b"TAIL";
pub const CHUNK_TAIL_KEY: &[u8; 10] = b"CHUNK_TAIL";
Expand All @@ -32,6 +33,10 @@ pub const GENESIS_JSON_HASH_KEY: &[u8; 17] = b"GENESIS_JSON_HASH";
pub const GENESIS_STATE_ROOTS_KEY: &[u8; 19] = b"GENESIS_STATE_ROOTS";
pub const COLD_HEAD_KEY: &[u8; 9] = b"COLD_HEAD";

// `DBCol::Misc` keys
pub const FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY: &[u8] =
b"FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS";

#[derive(Default, Debug)]
pub struct DBTransaction {
pub(crate) ops: Vec<DBOp>,
Expand Down
85 changes: 79 additions & 6 deletions core/store/src/flat/inlining_migration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread::JoinHandle;

use borsh::{BorshDeserialize, BorshSerialize};
use crossbeam::channel;
Expand All @@ -7,16 +10,70 @@ use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use tracing::{debug, info};

use crate::flat::store_helper::set_flat_state_values_inlining_migration_status;
use crate::metrics::flat_state_metrics::inlining_migration::{
FLAT_STATE_PAUSED_DURATION, INLINED_COUNT, INLINED_TOTAL_VALUES_SIZE, PROCESSED_COUNT,
PROCESSED_TOTAL_VALUES_SIZE, SKIPPED_COUNT,
};
use crate::{DBCol, Store, TrieDBStorage, TrieStorage};

use super::store_helper::decode_flat_state_db_key;
use super::types::INLINE_DISK_VALUE_THRESHOLD;
use super::store_helper::{
decode_flat_state_db_key, get_flat_state_values_inlining_migration_status,
};
use super::types::{FlatStateValuesInliningMigrationStatus, INLINE_DISK_VALUE_THRESHOLD};
use super::{FlatStateValue, FlatStorageManager};

pub struct FlatStateValuesInliningMigrationHandle {
handle: JoinHandle<()>,
keep_running: Arc<AtomicBool>,
}

const BACKGROUND_MIGRATION_BATCH_SIZE: usize = 50_000;

impl FlatStateValuesInliningMigrationHandle {
pub fn start_background_migration(
store: Store,
flat_storage_manager: FlatStorageManager,
read_state_threads: usize,
) -> Self {
let keep_running = Arc::new(AtomicBool::new(true));
let keep_runnning_clone = keep_running.clone();
let handle = std::thread::spawn(move || {
let status = get_flat_state_values_inlining_migration_status(&store)
.expect("failed to read fs migration status");
info!(target: "store", ?status, "Read FlatState values inlining migration status");
if status == FlatStateValuesInliningMigrationStatus::Finished {
return;
}
set_flat_state_values_inlining_migration_status(
&store,
FlatStateValuesInliningMigrationStatus::InProgress,
)
.expect("failed to set fs migration in-progress status");
let completed = inline_flat_state_values(
store.clone(),
&flat_storage_manager,
&keep_running,
read_state_threads,
BACKGROUND_MIGRATION_BATCH_SIZE,
);
if completed {
set_flat_state_values_inlining_migration_status(
&store,
FlatStateValuesInliningMigrationStatus::Finished,
)
.expect("failed to set fs migration finished status");
}
});
Self { handle, keep_running: keep_runnning_clone }
}

pub fn stop(self) {
self.keep_running.store(false, std::sync::atomic::Ordering::Relaxed);
self.handle.join().expect("join should not fail here");
}
}

struct ReadValueRequest {
shard_uid: ShardUId,
value_hash: CryptoHash,
Expand All @@ -33,7 +90,7 @@ struct StateValueReader {
pending_requests: usize,
value_request_send: channel::Sender<ReadValueRequest>,
value_response_recv: channel::Receiver<ReadValueResponse>,
join_handles: Vec<std::thread::JoinHandle<()>>,
join_handles: Vec<JoinHandle<()>>,
}

impl StateValueReader {
Expand Down Expand Up @@ -110,16 +167,23 @@ impl StateValueReader {
pub fn inline_flat_state_values(
store: Store,
flat_storage_manager: &FlatStorageManager,
keep_running: &AtomicBool,
read_state_threads: usize,
batch_size: usize,
) {
) -> bool {
info!(target: "store", %read_state_threads, %batch_size, "Starting FlatState value inlining migration");
let migration_start = std::time::Instant::now();
let mut value_reader = StateValueReader::new(store.clone(), read_state_threads);
let mut inlined_total_count = 0;
let mut interrupted = false;
for (batch_index, batch) in
store.iter(DBCol::FlatState).chunks(batch_size).into_iter().enumerate()
{
if !keep_running.load(std::sync::atomic::Ordering::Relaxed) {
info!(target: "store", %batch_index, "FlatState value inlining migration was interrupted");
interrupted = true;
break;
}
let (mut min_key, mut max_key) = (None, None);
for entry in batch {
PROCESSED_COUNT.inc();
Expand Down Expand Up @@ -204,7 +268,8 @@ pub fn inline_flat_state_values(
}
value_reader.close();
let migration_elapsed = migration_start.elapsed();
info!(target: "store", %inlined_total_count, ?migration_elapsed, "Finished FlatState value inlining migration");
info!(target: "store", %inlined_total_count, ?migration_elapsed, %interrupted, "Finished FlatState value inlining migration");
!interrupted
}

fn log_skipped(reason: &str, err: impl std::error::Error) {
Expand All @@ -214,6 +279,8 @@ fn log_skipped(reason: &str, err: impl std::error::Error) {

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicBool;

use borsh::{BorshDeserialize, BorshSerialize};
use near_primitives::hash::hash;
use near_primitives::shard_layout::ShardLayout;
Expand Down Expand Up @@ -243,7 +310,13 @@ mod tests {
}
store_update.commit().unwrap();
}
inline_flat_state_values(store.clone(), &FlatStorageManager::new(store.clone()), 2, 4);
inline_flat_state_values(
store.clone(),
&FlatStorageManager::new(store.clone()),
&AtomicBool::new(true),
2,
4,
);
assert_eq!(
store
.iter(DBCol::FlatState)
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/flat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod types;

pub use chunk_view::FlatStorageChunkView;
pub use delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata};
pub use inlining_migration::inline_flat_state_values;
pub use inlining_migration::{inline_flat_state_values, FlatStateValuesInliningMigrationHandle};
pub use manager::FlatStorageManager;
pub use metrics::FlatStorageCreationMetrics;
pub use storage::FlatStorage;
Expand Down
23 changes: 22 additions & 1 deletion core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This file contains helper functions for accessing flat storage data in DB
//! TODO(#8577): remove this file and move functions to the corresponding structs
use crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY;
use crate::flat::delta::{FlatStateChanges, KeyForFlatStateDelta};
use crate::flat::types::FlatStorageError;
use crate::{DBCol, Store, StoreUpdate};
Expand All @@ -9,7 +10,7 @@ use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};

use super::delta::{FlatStateDelta, FlatStateDeltaMetadata};
use super::types::{FlatStateValue, FlatStorageStatus};
use super::types::{FlatStateValue, FlatStateValuesInliningMigrationStatus, FlatStorageStatus};

/// Prefixes for keys in `FlatStateMisc` DB column.
pub const FLAT_STATE_HEAD_KEY_PREFIX: &[u8; 4] = b"HEAD";
Expand Down Expand Up @@ -88,6 +89,26 @@ pub(crate) fn decode_flat_state_db_key(
Ok((shard_uid, trie_key.to_vec()))
}

pub fn get_flat_state_values_inlining_migration_status(
store: &Store,
) -> Result<FlatStateValuesInliningMigrationStatus, FlatStorageError> {
store
.get_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY)
.map(|status| status.unwrap_or(FlatStateValuesInliningMigrationStatus::Empty))
.map_err(|_| FlatStorageError::StorageInternalError)
}

pub fn set_flat_state_values_inlining_migration_status(
store: &Store,
status: FlatStateValuesInliningMigrationStatus,
) -> Result<(), FlatStorageError> {
let mut store_update = store.store_update();
store_update
.set_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY, &status)
.map_err(|_| FlatStorageError::StorageInternalError)?;
store_update.commit().map_err(|_| FlatStorageError::StorageInternalError)
}

pub(crate) fn get_flat_state_value(
store: &Store,
shard_uid: ShardUId,
Expand Down
7 changes: 7 additions & 0 deletions core/store/src/flat/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ impl From<FlatStorageError> for StorageError {
}
}

#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)]
pub enum FlatStateValuesInliningMigrationStatus {
Empty,
InProgress,
Finished,
}

#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)]
pub enum FlatStorageStatus {
/// Flat Storage is not supported.
Expand Down
18 changes: 18 additions & 0 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ use cold_storage::ColdStoreLoopHandle;
use near_async::actix::AddrWithAutoSpanContextExt;
use near_async::messaging::{IntoSender, LateBoundSender};
use near_async::time;
use near_chain::types::RuntimeAdapter;
use near_chain::{Chain, ChainGenesis};
use near_chunks::shards_manager_actor::start_shards_manager;
use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor};
use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::EpochManager;
use near_network::PeerManagerActor;
use near_primitives::block::GenesisId;
use near_store::flat::FlatStateValuesInliningMigrationHandle;
use near_store::metadata::DbKind;
use near_store::metrics::spawn_db_metrics_loop;
use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError};
Expand Down Expand Up @@ -193,6 +195,9 @@ pub struct NearNode {
pub cold_store_loop_handle: Option<ColdStoreLoopHandle>,
/// Contains handles to background threads that may be dumping state to S3.
pub state_sync_dump_handle: Option<StateSyncDumpHandle>,
/// A handle to control background flat state values inlining migration.
/// Needed temporarily, will be removed after the migration is completed.
pub flat_state_migration_handle: Option<FlatStateValuesInliningMigrationHandle>,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -303,6 +308,18 @@ pub fn start_with_config_and_synchronization(
);
shards_manager_adapter.bind(shards_manager_actor);

let flat_state_migration_handle =
if let Some(flat_storage_manager) = runtime.get_flat_storage_manager() {
let handle = FlatStateValuesInliningMigrationHandle::start_background_migration(
storage.get_hot_store(),
flat_storage_manager,
config.client_config.client_background_migration_threads,
);
Some(handle)
} else {
None
};

let state_sync_dump_handle = spawn_state_sync_dump(
&config.client_config,
chain_genesis,
Expand Down Expand Up @@ -366,6 +383,7 @@ pub fn start_with_config_and_synchronization(
arbiters,
cold_store_loop_handle,
state_sync_dump_handle,
flat_state_migration_handle,
})
}

Expand Down
4 changes: 4 additions & 0 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ impl RuntimeAdapter for NightshadeRuntime {
Ok(self.tries.get_view_trie_for_shard(shard_uid, state_root))
}

fn get_flat_storage_manager(&self) -> Option<FlatStorageManager> {
Some(self.flat_storage_manager.clone())
}

fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option<FlatStorage> {
self.flat_storage_manager.get_flat_storage_for_shard(shard_uid)
}
Expand Down
4 changes: 4 additions & 0 deletions neard/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ impl RunCmd {
rpc_servers,
cold_store_loop_handle,
state_sync_dump_handle,
flat_state_migration_handle,
..
} = nearcore::start_with_config_and_synchronization(
home_dir,
Expand All @@ -544,6 +545,9 @@ impl RunCmd {
if let Some(handle) = state_sync_dump_handle {
handle.stop()
}
if let Some(handle) = flat_state_migration_handle {
handle.stop();
}
futures::future::join_all(rpc_servers.iter().map(|(name, server)| async move {
server.stop(true).await;
debug!(target: "neard", "{} server stopped", name);
Expand Down
Loading

0 comments on commit 11ab161

Please sign in to comment.