Skip to content

Commit

Permalink
feat: Dump state of every epoch to S3 (near#8661)
Browse files Browse the repository at this point in the history
* Start a thread per shard to do the dumping
* AWS credentials are provided as environment variables: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`
* In `config.json` specify both `config.state_sync.s3_bucket` and `config.state_sync.s3_region` to enable the new behavior.
* No changes to the behavior of the node if those options are not enabled in `config.json`.
* State is persisted to RocksDB such that restarts of the node are well handled.
* Some useful metrics are exported.
* The node assumes it's the only node in the this and all alternative universes that does the dumping.
* * Unclear how to use multiple nodes to complete the dump faster
* TODO: Speed this up by doing things in parallel: obtain parts, upload parts, set tags
* * Do we even need tags?
  • Loading branch information
nikurt committed Apr 6, 2023
1 parent 7ac2bbc commit 3988300
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 159 deletions.
1 change: 1 addition & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ impl ChainStore {

/// Constructs key 'STATE_SYNC_DUMP:<ShardId>',
/// for example 'STATE_SYNC_DUMP:2' for shard_id=2.
/// Doesn't contain epoch_id, because only one dump process per shard is allowed.
fn state_sync_dump_progress_key(shard_id: ShardId) -> Vec<u8> {
let mut key = b"STATE_SYNC_DUMP:".to_vec();
key.extend(shard_id.to_le_bytes());
Expand Down
20 changes: 7 additions & 13 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,15 @@ pub struct ClientConfig {
pub client_background_migration_threads: usize,
/// Duration to perform background flat storage creation step.
pub flat_storage_creation_period: Duration,
/// Whether to enable dumping state of every epoch to S3.
pub state_dump_enabled: bool,
/// If enabled, will dump state of every epoch to external storage.
pub state_sync_dump_enabled: bool,
/// S3 bucket for storing state dumps.
pub state_sync_s3_bucket: String,
/// S3 region for storing state dumps.
pub state_sync_s3_region: String,
/// Discard the existing progress of dumping an epoch state to S3.
pub state_sync_dump_drop_state: Vec<ShardId>,
/// Whether to enable state sync from S3.
/// If disabled will perform state sync from the peers.
pub state_sync_from_s3_enabled: bool,
/// Number of parallel in-flight requests allowed per shard.
pub state_sync_num_s3_requests_per_shard: u64,
/// Restart dumping state of selected shards.
/// Use for troubleshooting of the state dumping process.
pub state_sync_restart_dump_for_shards: Vec<ShardId>,
}

impl ClientConfig {
Expand Down Expand Up @@ -248,12 +244,10 @@ impl ClientConfig {
enable_statistics_export: true,
client_background_migration_threads: 1,
flat_storage_creation_period: Duration::from_secs(1),
state_dump_enabled: false,
state_sync_from_s3_enabled: false,
state_sync_dump_enabled: false,
state_sync_s3_bucket: String::new(),
state_sync_s3_region: String::new(),
state_sync_dump_drop_state: vec![],
state_sync_num_s3_requests_per_shard: 100,
state_sync_restart_dump_for_shards: vec![],
}
}
}
13 changes: 12 additions & 1 deletion core/primitives/src/syncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,29 @@ pub fn get_num_state_parts(memory_usage: u64) -> u64 {
}

#[derive(BorshSerialize, BorshDeserialize, Debug, Clone)]
/// Represents the state of the state machine that dumps state.
/// Represents the progress of dumps state of a shard.
pub enum StateSyncDumpProgress {
/// Represents two cases:
/// * An epoch dump is complete
/// * The node is running its first epoch and there is nothing to dump.
AllDumped {
/// The dumped state corresponds to the state at the beginning of the specified epoch.
epoch_id: EpochId,
epoch_height: EpochHeight,
// Missing in case of a node running the first epoch.
num_parts: Option<u64>,
},
/// Represents the case of an epoch being partially dumped.
InProgress {
/// The dumped state corresponds to the state at the beginning of the specified epoch.
epoch_id: EpochId,
epoch_height: EpochHeight,
/// Block hash of the first block of the epoch.
/// The dumped state corresponds to the state before applying this block.
sync_hash: CryptoHash,
/// Root of the state being dumped.
state_root: StateRoot,
/// Progress made.
parts_dumped: u64,
num_parts: u64,
},
Expand Down
39 changes: 6 additions & 33 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,40 +701,22 @@ impl NearConfig {
enable_statistics_export: config.store.enable_statistics_export,
client_background_migration_threads: config.store.background_migration_threads,
flat_storage_creation_period: config.store.flat_storage_creation_period,
state_dump_enabled: config
state_sync_dump_enabled: config
.state_sync
.as_ref()
.map(|x| x.dump_enabled)
.flatten()
.unwrap_or(false),
.map_or(false, |x| x.dump_enabled.unwrap_or(false)),
state_sync_s3_bucket: config
.state_sync
.as_ref()
.map(|x| x.s3_bucket.clone())
.unwrap_or(String::new()),
.map_or(String::new(), |x| x.s3_bucket.clone()),
state_sync_s3_region: config
.state_sync
.as_ref()
.map(|x| x.s3_region.clone())
.unwrap_or(String::new()),
state_sync_dump_drop_state: config
.map_or(String::new(), |x| x.s3_region.clone()),
state_sync_restart_dump_for_shards: config
.state_sync
.as_ref()
.map(|x| x.drop_state_of_dump.clone())
.flatten()
.unwrap_or(vec![]),
state_sync_from_s3_enabled: config
.state_sync
.as_ref()
.map(|x| x.sync_from_s3_enabled)
.flatten()
.unwrap_or(false),
state_sync_num_s3_requests_per_shard: config
.state_sync
.as_ref()
.map(|x| x.num_s3_requests_per_shard)
.flatten()
.unwrap_or(100),
.map_or(vec![], |x| x.drop_state_of_dump.clone().unwrap_or(vec![])),
},
network_config: NetworkConfig::new(
config.network,
Expand Down Expand Up @@ -1561,21 +1543,12 @@ pub fn load_test_config(seed: &str, addr: tcp::ListenerAddr, genesis: Genesis) -
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
/// Options for dumping state to S3.
pub struct StateSyncConfig {
/// Location of state dumps on S3.
pub s3_bucket: String,
/// Region is very important on S3.
pub s3_region: String,
/// Whether a node should dump state of each epoch to the external storage.
#[serde(skip_serializing_if = "Option::is_none")]
pub dump_enabled: Option<bool>,
/// Use carefully in case a node that dumps state to the external storage gets in trouble.
#[serde(skip_serializing_if = "Option::is_none")]
pub drop_state_of_dump: Option<Vec<ShardId>>,
/// If enabled, will download state parts from external storage and not from the peers.
#[serde(skip_serializing_if = "Option::is_none")]
pub sync_from_s3_enabled: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub num_s3_requests_per_shard: Option<u64>,
}

#[test]
Expand Down
24 changes: 18 additions & 6 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::cold_storage::spawn_cold_store_loop;
pub use crate::config::{init_configs, load_config, load_test_config, NearConfig, NEAR_BASE};
pub use crate::runtime::NightshadeRuntime;
pub use crate::shard_tracker::TrackedConfig;
use crate::state_sync::{spawn_state_sync_dump, StateSyncDumpHandle};
use actix::{Actor, Addr};
use actix_rt::ArbiterHandle;
use actix_web;
Expand All @@ -12,17 +13,17 @@ use near_async::messaging::{IntoSender, LateBoundSender};
use near_chain::{Chain, ChainGenesis};
use near_chunks::shards_manager_actor::start_shards_manager;
use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor};
use near_primitives::time;

use near_network::PeerManagerActor;
use near_primitives::block::GenesisId;
use near_primitives::time;
use near_store::metadata::DbKind;
use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError};
use near_telemetry::TelemetryActor;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{info, trace};
use tracing::info;

pub mod append_only_map;
mod cold_storage;
pub mod config;
Expand All @@ -33,6 +34,7 @@ mod metrics;
pub mod migrations;
mod runtime;
mod shard_tracker;
mod state_sync;

pub fn get_default_home() -> PathBuf {
if let Ok(near_home) = std::env::var("NEAR_HOME") {
Expand Down Expand Up @@ -188,6 +190,8 @@ pub struct NearNode {
/// The cold_store_loop_handle will only be set if the cold store is configured.
/// It's a handle to a background thread that copies data from the hot store to the cold store.
pub cold_store_loop_handle: Option<ColdStoreLoopHandle>,
/// Contains handles to background threads that may be dumping state to S3.
pub state_sync_dump_handle: Option<StateSyncDumpHandle>,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -242,7 +246,7 @@ pub fn start_with_config_and_synchronization(
);
let (client_actor, client_arbiter_handle) = start_client(
config.client_config.clone(),
chain_genesis,
chain_genesis.clone(),
runtime.clone(),
node_id,
network_adapter.clone().into(),
Expand All @@ -255,7 +259,7 @@ pub fn start_with_config_and_synchronization(
);
client_adapter_for_shards_manager.bind(client_actor.clone().with_auto_span_context());
let (shards_manager_actor, shards_manager_arbiter_handle) = start_shards_manager(
runtime,
runtime.clone(),
network_adapter.as_sender(),
client_adapter_for_shards_manager.as_sender(),
config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()),
Expand All @@ -264,6 +268,13 @@ pub fn start_with_config_and_synchronization(
);
shards_manager_adapter.bind(shards_manager_actor);

let state_sync_dump_handle = spawn_state_sync_dump(
&config,
chain_genesis,
runtime,
config.network_config.node_id().public_key(),
)?;

#[allow(unused_mut)]
let mut rpc_servers = Vec::new();
let network_actor = PeerManagerActor::spawn(
Expand Down Expand Up @@ -304,14 +315,15 @@ pub fn start_with_config_and_synchronization(

rpc_servers.shrink_to_fit();

trace!(target: "diagnostic", key="log", "Starting NEAR node with diagnostic activated");
tracing::trace!(target: "diagnostic", key = "log", "Starting NEAR node with diagnostic activated");

Ok(NearNode {
client: client_actor,
view_client,
rpc_servers,
arbiters: vec![client_arbiter_handle, shards_manager_arbiter_handle],
cold_store_loop_handle,
state_sync_dump_handle,
})
}

Expand Down
Loading

0 comments on commit 3988300

Please sign in to comment.