Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable FlatState values inlining migration in the background #9093

Merged
merged 1 commit into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2568,7 +2568,8 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::FlatState
| DBCol::FlatStateChanges
| DBCol::FlatStateDeltaMetadata
| DBCol::FlatStorageStatus => {
| DBCol::FlatStorageStatus
| DBCol::Misc => {
unreachable!();
}
}
Expand Down
4 changes: 4 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,10 @@ impl RuntimeAdapter for KeyValueRuntime {
.get_trie_for_shard(ShardUId { version: 0, shard_id: shard_id as u32 }, state_root))
}

fn get_flat_storage_manager(&self) -> Option<near_store::flat::FlatStorageManager> {
None
}

fn get_view_trie_for_shard(
&self,
shard_id: ShardId,
Expand Down
3 changes: 3 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use borsh::{BorshDeserialize, BorshSerialize};
use chrono::DateTime;
use chrono::Utc;
use near_primitives::sandbox::state_patch::SandboxStatePatch;
use near_store::flat::FlatStorageManager;
use num_rational::Rational32;

use crate::metrics;
Expand Down Expand Up @@ -299,6 +300,8 @@ pub trait RuntimeAdapter: Send + Sync {
state_root: StateRoot,
) -> Result<Trie, Error>;

fn get_flat_storage_manager(&self) -> Option<FlatStorageManager>;

fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option<FlatStorage>;

fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus;
Expand Down
9 changes: 8 additions & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum DBCol {
/// - *Rows*: single row `"VERSION"`
/// - *Content type*: The version of the database (u32), serialized as JSON.
DbVersion,
/// Column that store Misc cells.
/// Column that stores miscellaneous block-related cells.
/// - *Rows*: multiple, for example `"GENESIS_JSON_HASH"`, `"HEAD_KEY"`, `"LATEST_KNOWN_KEY"` etc.
/// - *Content type*: cell specific.
BlockMisc,
Expand Down Expand Up @@ -273,6 +273,11 @@ pub enum DBCol {
/// - *Rows*: `shard_uid`
/// - *Column type*: `FlatStorageStatus`
FlatStorageStatus,
/// Column to persist pieces of miscellaneous small data. Should only be used to store
/// constant or small (for example per-shard) amount of data.
/// - *Rows*: arbitrary string, see `crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY` for example
/// - *Column type*: arbitrary bytes
Misc,
}

/// Defines different logical parts of a db key.
Expand Down Expand Up @@ -422,6 +427,7 @@ impl DBCol {

// TODO
DBCol::ChallengedBlocks => false,
DBCol::Misc => false,
// BlockToCatchup is only needed while syncing and it is not immutable.
DBCol::BlocksToCatchup => false,
// BlockRefCount is only needed when handling forks and it is not immutable.
Expand Down Expand Up @@ -479,6 +485,7 @@ impl DBCol {
match self {
DBCol::DbVersion => &[DBKeyType::StringLiteral],
DBCol::BlockMisc => &[DBKeyType::StringLiteral],
DBCol::Misc => &[DBKeyType::StringLiteral],
DBCol::Block => &[DBKeyType::BlockHash],
DBCol::BlockHeader => &[DBKeyType::BlockHash],
DBCol::BlockHeight => &[DBKeyType::BlockHeight],
Expand Down
5 changes: 5 additions & 0 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use self::splitdb::SplitDB;
pub use self::slice::DBSlice;
pub use self::testdb::TestDB;

// `DBCol::BlockMisc` keys
pub const HEAD_KEY: &[u8; 4] = b"HEAD";
pub const TAIL_KEY: &[u8; 4] = b"TAIL";
pub const CHUNK_TAIL_KEY: &[u8; 10] = b"CHUNK_TAIL";
Expand All @@ -32,6 +33,10 @@ pub const GENESIS_JSON_HASH_KEY: &[u8; 17] = b"GENESIS_JSON_HASH";
pub const GENESIS_STATE_ROOTS_KEY: &[u8; 19] = b"GENESIS_STATE_ROOTS";
pub const COLD_HEAD_KEY: &[u8; 9] = b"COLD_HEAD";

// `DBCol::Misc` keys
pub const FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY: &[u8] =
b"FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS";

#[derive(Default, Debug)]
pub struct DBTransaction {
pub(crate) ops: Vec<DBOp>,
Expand Down
85 changes: 79 additions & 6 deletions core/store/src/flat/inlining_migration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread::JoinHandle;

use borsh::{BorshDeserialize, BorshSerialize};
use crossbeam::channel;
Expand All @@ -7,16 +10,70 @@ use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use tracing::{debug, info};

use crate::flat::store_helper::set_flat_state_values_inlining_migration_status;
use crate::metrics::flat_state_metrics::inlining_migration::{
FLAT_STATE_PAUSED_DURATION, INLINED_COUNT, INLINED_TOTAL_VALUES_SIZE, PROCESSED_COUNT,
PROCESSED_TOTAL_VALUES_SIZE, SKIPPED_COUNT,
};
use crate::{DBCol, Store, TrieDBStorage, TrieStorage};

use super::store_helper::decode_flat_state_db_key;
use super::types::INLINE_DISK_VALUE_THRESHOLD;
use super::store_helper::{
decode_flat_state_db_key, get_flat_state_values_inlining_migration_status,
};
use super::types::{FlatStateValuesInliningMigrationStatus, INLINE_DISK_VALUE_THRESHOLD};
use super::{FlatStateValue, FlatStorageManager};

pub struct FlatStateValuesInliningMigrationHandle {
handle: JoinHandle<()>,
keep_running: Arc<AtomicBool>,
}

const BACKGROUND_MIGRATION_BATCH_SIZE: usize = 50_000;

impl FlatStateValuesInliningMigrationHandle {
pub fn start_background_migration(
store: Store,
flat_storage_manager: FlatStorageManager,
read_state_threads: usize,
) -> Self {
let keep_running = Arc::new(AtomicBool::new(true));
let keep_runnning_clone = keep_running.clone();
let handle = std::thread::spawn(move || {
let status = get_flat_state_values_inlining_migration_status(&store)
.expect("failed to read fs migration status");
info!(target: "store", ?status, "Read FlatState values inlining migration status");
if status == FlatStateValuesInliningMigrationStatus::Finished {
return;
}
set_flat_state_values_inlining_migration_status(
&store,
FlatStateValuesInliningMigrationStatus::InProgress,
)
.expect("failed to set fs migration in-progress status");
let completed = inline_flat_state_values(
store.clone(),
&flat_storage_manager,
&keep_running,
read_state_threads,
BACKGROUND_MIGRATION_BATCH_SIZE,
);
if completed {
set_flat_state_values_inlining_migration_status(
&store,
FlatStateValuesInliningMigrationStatus::Finished,
)
.expect("failed to set fs migration finished status");
}
});
Self { handle, keep_running: keep_runnning_clone }
}

pub fn stop(self) {
self.keep_running.store(false, std::sync::atomic::Ordering::Relaxed);
self.handle.join().expect("join should not fail here");
}
}

struct ReadValueRequest {
shard_uid: ShardUId,
value_hash: CryptoHash,
Expand All @@ -33,7 +90,7 @@ struct StateValueReader {
pending_requests: usize,
value_request_send: channel::Sender<ReadValueRequest>,
value_response_recv: channel::Receiver<ReadValueResponse>,
join_handles: Vec<std::thread::JoinHandle<()>>,
join_handles: Vec<JoinHandle<()>>,
}

impl StateValueReader {
Expand Down Expand Up @@ -110,16 +167,23 @@ impl StateValueReader {
pub fn inline_flat_state_values(
store: Store,
flat_storage_manager: &FlatStorageManager,
keep_running: &AtomicBool,
read_state_threads: usize,
batch_size: usize,
) {
) -> bool {
info!(target: "store", %read_state_threads, %batch_size, "Starting FlatState value inlining migration");
let migration_start = std::time::Instant::now();
let mut value_reader = StateValueReader::new(store.clone(), read_state_threads);
let mut inlined_total_count = 0;
let mut interrupted = false;
for (batch_index, batch) in
store.iter(DBCol::FlatState).chunks(batch_size).into_iter().enumerate()
{
if !keep_running.load(std::sync::atomic::Ordering::Relaxed) {
info!(target: "store", %batch_index, "FlatState value inlining migration was interrupted");
interrupted = true;
break;
}
let (mut min_key, mut max_key) = (None, None);
for entry in batch {
PROCESSED_COUNT.inc();
Expand Down Expand Up @@ -204,7 +268,8 @@ pub fn inline_flat_state_values(
}
value_reader.close();
let migration_elapsed = migration_start.elapsed();
info!(target: "store", %inlined_total_count, ?migration_elapsed, "Finished FlatState value inlining migration");
info!(target: "store", %inlined_total_count, ?migration_elapsed, %interrupted, "Finished FlatState value inlining migration");
!interrupted
}

fn log_skipped(reason: &str, err: impl std::error::Error) {
Expand All @@ -214,6 +279,8 @@ fn log_skipped(reason: &str, err: impl std::error::Error) {

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicBool;

use borsh::{BorshDeserialize, BorshSerialize};
use near_primitives::hash::hash;
use near_primitives::shard_layout::ShardLayout;
Expand Down Expand Up @@ -243,7 +310,13 @@ mod tests {
}
store_update.commit().unwrap();
}
inline_flat_state_values(store.clone(), &FlatStorageManager::new(store.clone()), 2, 4);
inline_flat_state_values(
store.clone(),
&FlatStorageManager::new(store.clone()),
&AtomicBool::new(true),
2,
4,
);
assert_eq!(
store
.iter(DBCol::FlatState)
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/flat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod types;

pub use chunk_view::FlatStorageChunkView;
pub use delta::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata};
pub use inlining_migration::inline_flat_state_values;
pub use inlining_migration::{inline_flat_state_values, FlatStateValuesInliningMigrationHandle};
pub use manager::FlatStorageManager;
pub use metrics::FlatStorageCreationMetrics;
pub use storage::FlatStorage;
Expand Down
23 changes: 22 additions & 1 deletion core/store/src/flat/store_helper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This file contains helper functions for accessing flat storage data in DB
//! TODO(#8577): remove this file and move functions to the corresponding structs

use crate::db::FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY;
use crate::flat::delta::{FlatStateChanges, KeyForFlatStateDelta};
use crate::flat::types::FlatStorageError;
use crate::{DBCol, Store, StoreUpdate};
Expand All @@ -9,7 +10,7 @@ use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{ShardLayout, ShardUId};

use super::delta::{FlatStateDelta, FlatStateDeltaMetadata};
use super::types::{FlatStateValue, FlatStorageStatus};
use super::types::{FlatStateValue, FlatStateValuesInliningMigrationStatus, FlatStorageStatus};

/// Prefixes for keys in `FlatStateMisc` DB column.
pub const FLAT_STATE_HEAD_KEY_PREFIX: &[u8; 4] = b"HEAD";
Expand Down Expand Up @@ -88,6 +89,26 @@ pub(crate) fn decode_flat_state_db_key(
Ok((shard_uid, trie_key.to_vec()))
}

pub fn get_flat_state_values_inlining_migration_status(
store: &Store,
) -> Result<FlatStateValuesInliningMigrationStatus, FlatStorageError> {
store
.get_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY)
.map(|status| status.unwrap_or(FlatStateValuesInliningMigrationStatus::Empty))
.map_err(|_| FlatStorageError::StorageInternalError)
}

pub fn set_flat_state_values_inlining_migration_status(
store: &Store,
status: FlatStateValuesInliningMigrationStatus,
) -> Result<(), FlatStorageError> {
let mut store_update = store.store_update();
store_update
.set_ser(DBCol::Misc, FLAT_STATE_VALUES_INLINING_MIGRATION_STATUS_KEY, &status)
.map_err(|_| FlatStorageError::StorageInternalError)?;
store_update.commit().map_err(|_| FlatStorageError::StorageInternalError)
}

pub(crate) fn get_flat_state_value(
store: &Store,
shard_uid: ShardUId,
Expand Down
7 changes: 7 additions & 0 deletions core/store/src/flat/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ impl From<FlatStorageError> for StorageError {
}
}

#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)]
pub enum FlatStateValuesInliningMigrationStatus {
Empty,
InProgress,
Finished,
}

#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Eq)]
pub enum FlatStorageStatus {
/// Flat Storage is not supported.
Expand Down
18 changes: 18 additions & 0 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ use cold_storage::ColdStoreLoopHandle;
use near_async::actix::AddrWithAutoSpanContextExt;
use near_async::messaging::{IntoSender, LateBoundSender};
use near_async::time;
use near_chain::types::RuntimeAdapter;
use near_chain::{Chain, ChainGenesis};
use near_chunks::shards_manager_actor::start_shards_manager;
use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor};
use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::EpochManager;
use near_network::PeerManagerActor;
use near_primitives::block::GenesisId;
use near_store::flat::FlatStateValuesInliningMigrationHandle;
use near_store::metadata::DbKind;
use near_store::metrics::spawn_db_metrics_loop;
use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError};
Expand Down Expand Up @@ -193,6 +195,9 @@ pub struct NearNode {
pub cold_store_loop_handle: Option<ColdStoreLoopHandle>,
/// Contains handles to background threads that may be dumping state to S3.
pub state_sync_dump_handle: Option<StateSyncDumpHandle>,
/// A handle to control background flat state values inlining migration.
/// Needed temporarily, will be removed after the migration is completed.
pub flat_state_migration_handle: Option<FlatStateValuesInliningMigrationHandle>,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -303,6 +308,18 @@ pub fn start_with_config_and_synchronization(
);
shards_manager_adapter.bind(shards_manager_actor);

let flat_state_migration_handle =
if let Some(flat_storage_manager) = runtime.get_flat_storage_manager() {
let handle = FlatStateValuesInliningMigrationHandle::start_background_migration(
storage.get_hot_store(),
flat_storage_manager,
config.client_config.client_background_migration_threads,
);
Some(handle)
} else {
None
};

let state_sync_dump_handle = spawn_state_sync_dump(
&config.client_config,
chain_genesis,
Expand Down Expand Up @@ -366,6 +383,7 @@ pub fn start_with_config_and_synchronization(
arbiters,
cold_store_loop_handle,
state_sync_dump_handle,
flat_state_migration_handle,
})
}

Expand Down
4 changes: 4 additions & 0 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ impl RuntimeAdapter for NightshadeRuntime {
Ok(self.tries.get_view_trie_for_shard(shard_uid, state_root))
}

fn get_flat_storage_manager(&self) -> Option<FlatStorageManager> {
Some(self.flat_storage_manager.clone())
}

fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option<FlatStorage> {
self.flat_storage_manager.get_flat_storage_for_shard(shard_uid)
}
Expand Down
4 changes: 4 additions & 0 deletions neard/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ impl RunCmd {
rpc_servers,
cold_store_loop_handle,
state_sync_dump_handle,
flat_state_migration_handle,
..
} = nearcore::start_with_config_and_synchronization(
home_dir,
Expand All @@ -544,6 +545,9 @@ impl RunCmd {
if let Some(handle) = state_sync_dump_handle {
handle.stop()
}
if let Some(handle) = flat_state_migration_handle {
handle.stop();
}
futures::future::join_all(rpc_servers.iter().map(|(name, server)| async move {
server.stop(true).await;
debug!(target: "neard", "{} server stopped", name);
Expand Down
Loading