From 36e3652873c697c8c2cc4fdb41c71da6e595e169 Mon Sep 17 00:00:00 2001 From: William Smith Date: Tue, 25 Jun 2024 12:10:30 -0700 Subject: [PATCH] [core] Make proper per epoch execution components (#18281) ## Description Make state accumulator a per epoch component. This also requires that we make checkpoint executor a proper per epoch component (previously it was only instantiated once, but `run_epoch` called once per epoch) since it needs to have a reference to the fresh accumulator after reconfig. Now we actually drop it after the call to `run_epoch` returns. ## Test plan Passed against 120+ seeds: ``` ./scripts/simtest/seed-search.py simtest --test test_simulated_load_reconfig_with_crashes_and_delays ``` --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: --------- Co-authored-by: Mark Logan --- crates/sui-core/src/authority.rs | 14 +--- .../authority/authority_per_epoch_store.rs | 1 + .../src/authority/authority_test_utils.rs | 3 +- .../checkpoint_executor/metrics.rs | 7 -- .../checkpoints/checkpoint_executor/mod.rs | 27 ++---- .../checkpoints/checkpoint_executor/tests.rs | 4 +- crates/sui-core/src/checkpoints/mod.rs | 46 ++++++----- crates/sui-core/src/state_accumulator.rs | 67 +++++++++++++-- crates/sui-core/src/test_utils.rs | 3 +- .../src/unit_tests/narwhal_manager_tests.rs | 7 +- .../tests/reconfiguration_tests.rs | 66 +++++++++++++++ crates/sui-node/src/lib.rs | 82 +++++++++++++------ .../src/single_node.rs | 2 +- 13 files changed, 236 insertions(+), 93 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 76d2a1f31fe09..a8e89b9e27690 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -136,7 +136,6 @@ use crate::authority::authority_store_pruner::{ }; use crate::authority::epoch_start_configuration::EpochStartConfigTrait; use crate::authority::epoch_start_configuration::EpochStartConfiguration; -use crate::checkpoints::checkpoint_executor::CheckpointExecutor; use crate::checkpoints::CheckpointStore; use crate::consensus_adapter::ConsensusAdapter; use crate::epoch::committee_store::CommitteeStore; @@ -2838,7 +2837,6 @@ impl AuthorityState { supported_protocol_versions: SupportedProtocolVersions, new_committee: Committee, epoch_start_configuration: EpochStartConfiguration, - checkpoint_executor: &CheckpointExecutor, accumulator: Arc, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, ) -> SuiResult> { @@ -2857,12 +2855,7 @@ impl AuthorityState { .await?; self.get_reconfig_api() .clear_state_end_of_epoch(&execution_lock); - self.check_system_consistency( - cur_epoch_store, - checkpoint_executor, - accumulator, - expensive_safety_check_config, - ); + self.check_system_consistency(cur_epoch_store, accumulator, expensive_safety_check_config); self.maybe_reaccumulate_state_hash( cur_epoch_store, epoch_start_configuration @@ -2955,7 +2948,6 @@ impl AuthorityState { fn check_system_consistency( &self, cur_epoch_store: &AuthorityPerEpochStore, - checkpoint_executor: &CheckpointExecutor, accumulator: Arc, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, ) { @@ -2986,7 +2978,6 @@ impl AuthorityState { cur_epoch_store.epoch() ); self.expensive_check_is_consistent_state( - checkpoint_executor, accumulator, cur_epoch_store, cfg!(debug_assertions), // panic in debug mode only @@ -3003,7 +2994,6 @@ impl AuthorityState { fn expensive_check_is_consistent_state( &self, - checkpoint_executor: &CheckpointExecutor, accumulator: Arc, cur_epoch_store: &AuthorityPerEpochStore, panic: bool, @@ -3041,7 +3031,7 @@ impl AuthorityState { } if !panic { - checkpoint_executor.set_inconsistent_state(is_inconsistent); + accumulator.set_inconsistent_state(is_inconsistent); } } diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index 4f0ca326b23fe..e6e7315ad7224 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -726,6 +726,7 @@ impl AuthorityPerEpochStore { epoch_id ); let epoch_start_configuration = Arc::new(epoch_start_configuration); + info!("epoch flags: {:?}", epoch_start_configuration.flags()); metrics.current_epoch.set(epoch_id as i64); metrics .current_voting_right diff --git a/crates/sui-core/src/authority/authority_test_utils.rs b/crates/sui-core/src/authority/authority_test_utils.rs index 74bd74b49f9b9..335872122d324 100644 --- a/crates/sui-core/src/authority/authority_test_utils.rs +++ b/crates/sui-core/src/authority/authority_test_utils.rs @@ -88,7 +88,8 @@ pub async fn execute_certificate_with_execution_error( // for testing and regression detection. // We must do this before sending to consensus, otherwise consensus may already // lead to transaction execution and state change. - let state_acc = StateAccumulator::new(authority.get_accumulator_store().clone(), &epoch_store); + let state_acc = + StateAccumulator::new_for_tests(authority.get_accumulator_store().clone(), &epoch_store); let include_wrapped_tombstone = !authority .epoch_store_for_testing() .protocol_config() diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs index 33b82682416ac..c51c4c6e61a8f 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs @@ -20,7 +20,6 @@ pub struct CheckpointExecutorMetrics { pub checkpoint_transaction_count: Histogram, pub checkpoint_contents_age_ms: Histogram, pub last_executed_checkpoint_age_ms: Histogram, - pub accumulator_inconsistent_state: IntGauge, } impl CheckpointExecutorMetrics { @@ -87,12 +86,6 @@ impl CheckpointExecutorMetrics { "Age of the last executed checkpoint", registry ), - accumulator_inconsistent_state: register_int_gauge_with_registry!( - "accumulator_inconsistent_state", - "1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch", - registry, - ) - .unwrap(), }; Arc::new(this) } diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs index b269c4a753f92..201774ed46776 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs @@ -29,7 +29,6 @@ use either::Either; use futures::stream::FuturesOrdered; use itertools::izip; use mysten_metrics::spawn_monitored_task; -use prometheus::Registry; use sui_config::node::{CheckpointExecutorConfig, RunWithRange}; use sui_macros::{fail_point, fail_point_async}; use sui_types::accumulator::Accumulator; @@ -68,7 +67,8 @@ use crate::{ }; mod data_ingestion_handler; -mod metrics; +pub mod metrics; + #[cfg(test)] pub(crate) mod tests; @@ -157,7 +157,7 @@ impl CheckpointExecutor { state: Arc, accumulator: Arc, config: CheckpointExecutorConfig, - prometheus_registry: &Registry, + metrics: Arc, ) -> Self { Self { mailbox, @@ -168,7 +168,7 @@ impl CheckpointExecutor { tx_manager: state.transaction_manager().clone(), accumulator, config, - metrics: CheckpointExecutorMetrics::new(prometheus_registry), + metrics, } } @@ -178,17 +178,14 @@ impl CheckpointExecutor { state: Arc, accumulator: Arc, ) -> Self { - Self { + Self::new( mailbox, - state: state.clone(), checkpoint_store, - object_cache_reader: state.get_object_cache_reader().clone(), - transaction_cache_reader: state.get_transaction_cache_reader().clone(), - tx_manager: state.transaction_manager().clone(), + state, accumulator, - config: Default::default(), - metrics: CheckpointExecutorMetrics::new_for_tests(), - } + Default::default(), + CheckpointExecutorMetrics::new_for_tests(), + ) } /// Ensure that all checkpoints in the current epoch will be executed. @@ -358,12 +355,6 @@ impl CheckpointExecutor { } } - pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) { - self.metrics - .accumulator_inconsistent_state - .set(is_inconsistent_state as i64); - } - fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) { // Ensure that we are not skipping checkpoints at any point let seq = *checkpoint.sequence_number(); diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs index c156a1676879f..4efa9a57714b2 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs @@ -234,7 +234,6 @@ pub async fn test_checkpoint_executor_cross_epoch() { EpochFlag::default_flags_for_new_epoch(&authority_state.config), ) .unwrap(), - &executor, accumulator, &ExpensiveSafetyCheckConfig::default(), ) @@ -394,7 +393,8 @@ async fn init_executor_test( broadcast::channel(buffer_size); let epoch_store = state.epoch_store_for_testing(); - let accumulator = StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store); + let accumulator = + StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store); let accumulator = Arc::new(accumulator); let executor = CheckpointExecutor::new_for_tests( diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index a0437a0ff6e77..8f329e4ea2789 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -41,6 +41,7 @@ use std::fs::File; use std::io::Write; use std::path::Path; use std::sync::Arc; +use std::sync::Weak; use std::time::Duration; use sui_protocol_config::ProtocolVersion; use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest}; @@ -859,7 +860,7 @@ pub struct CheckpointBuilder { notify: Arc, notify_aggregator: Arc, effects_store: Arc, - accumulator: Arc, + accumulator: Weak, output: Box, exit: watch::Receiver<()>, metrics: Arc, @@ -897,7 +898,7 @@ impl CheckpointBuilder { epoch_store: Arc, notify: Arc, effects_store: Arc, - accumulator: Arc, + accumulator: Weak, output: Box, exit: watch::Receiver<()>, notify_aggregator: Arc, @@ -1432,19 +1433,24 @@ impl CheckpointBuilder { let committee = system_state_obj.get_current_epoch_committee().committee; // This must happen after the call to augment_epoch_last_checkpoint, - // otherwise we will not capture the change_epoch tx - let acc = self.accumulator.accumulate_checkpoint( - effects.clone(), - sequence_number, - &self.epoch_store, - )?; - self.accumulator - .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc)) - .await?; - let root_state_digest = self - .accumulator - .digest_epoch(self.epoch_store.clone(), sequence_number) - .await?; + // otherwise we will not capture the change_epoch tx. + let root_state_digest = { + let state_acc = self + .accumulator + .upgrade() + .expect("No checkpoints should be getting built after local configuration"); + let acc = state_acc.accumulate_checkpoint( + effects.clone(), + sequence_number, + &self.epoch_store, + )?; + state_acc + .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc)) + .await?; + state_acc + .digest_epoch(self.epoch_store.clone(), sequence_number) + .await? + }; self.metrics.highest_accumulated_epoch.set(epoch as i64); info!("Epoch {epoch} root state hash digest: {root_state_digest:?}"); @@ -2213,7 +2219,7 @@ impl CheckpointService { checkpoint_store: Arc, epoch_store: Arc, effects_store: Arc, - accumulator: Arc, + accumulator: Weak, checkpoint_output: Box, certified_checkpoint_output: Box, metrics: Arc, @@ -2503,15 +2509,17 @@ mod tests { let checkpoint_store = CheckpointStore::new(ckpt_dir.path()); let epoch_store = state.epoch_store_for_testing(); - let accumulator = - StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store); + let accumulator = Arc::new(StateAccumulator::new_for_tests( + state.get_accumulator_store().clone(), + &epoch_store, + )); let (checkpoint_service, _exit) = CheckpointService::spawn( state.clone(), checkpoint_store, epoch_store.clone(), store, - Arc::new(accumulator), + Arc::downgrade(&accumulator), Box::new(output), Box::new(certified_output), CheckpointMetrics::new_for_tests(), diff --git a/crates/sui-core/src/state_accumulator.rs b/crates/sui-core/src/state_accumulator.rs index 0cc6f6c9d4e3c..f67f150839721 100644 --- a/crates/sui-core/src/state_accumulator.rs +++ b/crates/sui-core/src/state_accumulator.rs @@ -3,6 +3,7 @@ use itertools::Itertools; use mysten_metrics::monitored_scope; +use prometheus::{register_int_gauge_with_registry, IntGauge, Registry}; use serde::Serialize; use sui_protocol_config::ProtocolConfig; use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber}; @@ -25,6 +26,24 @@ use sui_types::messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSet use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore; use crate::authority::authority_store_tables::LiveObject; +pub struct StateAccumulatorMetrics { + inconsistent_state: IntGauge, +} + +impl StateAccumulatorMetrics { + pub fn new(registry: &Registry) -> Arc { + let this = Self { + inconsistent_state: register_int_gauge_with_registry!( + "accumulator_inconsistent_state", + "1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch", + registry + ) + .unwrap(), + }; + Arc::new(this) + } +} + pub enum StateAccumulator { V1(StateAccumulatorV1), V2(StateAccumulatorV2), @@ -32,10 +51,12 @@ pub enum StateAccumulator { pub struct StateAccumulatorV1 { store: Arc, + metrics: Arc, } pub struct StateAccumulatorV2 { store: Arc, + metrics: Arc, } pub trait AccumulatorStore: ObjectStore + Send + Sync { @@ -366,16 +387,44 @@ impl StateAccumulator { pub fn new( store: Arc, epoch_store: &Arc, + metrics: Arc, ) -> Self { if cfg!(msim) { if epoch_store.state_accumulator_v2_enabled() { - return StateAccumulator::V2(StateAccumulatorV2::new(store)); + return StateAccumulator::V2(StateAccumulatorV2::new(store, metrics)); } else { - return StateAccumulator::V1(StateAccumulatorV1::new(store)); + return StateAccumulator::V1(StateAccumulatorV1::new(store, metrics)); } } - StateAccumulator::V1(StateAccumulatorV1::new(store)) + StateAccumulator::V1(StateAccumulatorV1::new(store, metrics)) + } + + pub fn new_for_tests( + store: Arc, + epoch_store: &Arc, + ) -> Self { + Self::new( + store, + epoch_store, + StateAccumulatorMetrics::new(&Registry::new()), + ) + } + + pub fn metrics(&self) -> Arc { + match self { + StateAccumulator::V1(impl_v1) => impl_v1.metrics.clone(), + StateAccumulator::V2(impl_v2) => impl_v2.metrics.clone(), + } + } + + pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) { + match self { + StateAccumulator::V1(impl_v1) => &impl_v1.metrics, + StateAccumulator::V2(impl_v2) => &impl_v2.metrics, + } + .inconsistent_state + .set(is_inconsistent_state as i64); } /// Accumulates the effects of a single checkpoint and persists the accumulator. @@ -528,8 +577,8 @@ impl StateAccumulator { } impl StateAccumulatorV1 { - pub fn new(store: Arc) -> Self { - Self { store } + pub fn new(store: Arc, metrics: Arc) -> Self { + Self { store, metrics } } /// Unions all checkpoint accumulators at the end of the epoch to generate the @@ -618,8 +667,8 @@ impl StateAccumulatorV1 { } impl StateAccumulatorV2 { - pub fn new(store: Arc) -> Self { - Self { store } + pub fn new(store: Arc, metrics: Arc) -> Self { + Self { store, metrics } } pub async fn accumulate_running_root( @@ -629,6 +678,10 @@ impl StateAccumulatorV2 { checkpoint_acc: Option, ) -> SuiResult { let _scope = monitored_scope("AccumulateRunningRoot"); + tracing::info!( + "accumulating running root for checkpoint {}", + checkpoint_seq_num + ); // For the last checkpoint of the epoch, this function will be called once by the // checkpoint builder, and again by checkpoint executor. diff --git a/crates/sui-core/src/test_utils.rs b/crates/sui-core/src/test_utils.rs index 3c2b8a6ae5ef3..d9ccdd467af78 100644 --- a/crates/sui-core/src/test_utils.rs +++ b/crates/sui-core/src/test_utils.rs @@ -80,7 +80,8 @@ pub async fn send_and_confirm_transaction( // // We also check the incremental effects of the transaction on the live object set against StateAccumulator // for testing and regression detection - let state_acc = StateAccumulator::new(authority.get_accumulator_store().clone(), &epoch_store); + let state_acc = + StateAccumulator::new_for_tests(authority.get_accumulator_store().clone(), &epoch_store); let include_wrapped_tombstone = !authority .epoch_store_for_testing() .protocol_config() diff --git a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs index 2fb2873ee63d9..5f6e52ebfd15e 100644 --- a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs @@ -70,7 +70,10 @@ async fn send_transactions( pub fn checkpoint_service_for_testing(state: Arc) -> Arc { let (output, _result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10); let epoch_store = state.epoch_store_for_testing(); - let accumulator = StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store); + let accumulator = Arc::new(StateAccumulator::new_for_tests( + state.get_accumulator_store().clone(), + &epoch_store, + )); let (certified_output, _certified_result) = mpsc::channel::(10); let (checkpoint_service, _) = CheckpointService::spawn( @@ -78,7 +81,7 @@ pub fn checkpoint_service_for_testing(state: Arc) -> Arc= 2 || !initial_flags_nodes.insert(current_node) { + return None; + } + + // start with no flags set + Some(Vec::::new()) + }); + + let test_cluster = TestClusterBuilder::new() + .with_epoch_duration_ms(30000) + .build() + .await; + + let mut any_empty = false; + for node in test_cluster.all_node_handles() { + any_empty = any_empty + || node.with(|node| { + node.state() + .epoch_store_for_testing() + .epoch_start_config() + .flags() + .is_empty() + }); + } + assert!(any_empty); + + test_cluster.wait_for_epoch_all_nodes(1).await; + + let mut any_empty = false; + for node in test_cluster.all_node_handles() { + any_empty = any_empty + || node.with(|node| { + node.state() + .epoch_store_for_testing() + .epoch_start_config() + .flags() + .is_empty() + }); + } + assert!(!any_empty); + + sleep(Duration::from_secs(15)).await; + + test_cluster.stop_all_validators().await; + test_cluster.start_all_validators().await; + + test_cluster.wait_for_epoch_all_nodes(2).await; +} + #[cfg(msim)] #[sim_test] async fn safe_mode_reconfig_test() { diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 1070caa2431e7..85510eb78274f 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -20,7 +20,7 @@ use std::path::PathBuf; use std::str::FromStr; #[cfg(msim)] use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::Duration; use sui_core::authority::epoch_start_configuration::EpochFlag; use sui_core::authority::RandomnessRoundReceiver; @@ -29,6 +29,7 @@ use sui_core::consensus_adapter::SubmitToConsensus; use sui_core::consensus_manager::ConsensusClient; use sui_core::epoch::randomness::RandomnessManager; use sui_core::execution_cache::build_execution_cache; +use sui_core::state_accumulator::StateAccumulatorMetrics; use sui_core::storage::RestReadStore; use sui_core::traffic_controller::metrics::TrafficControllerMetrics; use sui_json_rpc::bridge_api::BridgeReadApi; @@ -67,6 +68,7 @@ use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait; use sui_core::authority::epoch_start_configuration::EpochStartConfiguration; use sui_core::authority_aggregator::AuthorityAggregator; use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics}; +use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics; use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason}; use sui_core::checkpoints::{ CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync, @@ -225,7 +227,7 @@ pub struct SuiNode { state_sync_handle: state_sync::Handle, randomness_handle: randomness::Handle, checkpoint_store: Arc, - accumulator: Arc, + accumulator: Mutex>>, connection_monitor_status: Arc, /// Broadcast channel to send the starting system state for the next epoch. @@ -714,6 +716,7 @@ impl SuiNode { let accumulator = Arc::new(StateAccumulator::new( cache_traits.accumulator_store.clone(), &epoch_store, + StateAccumulatorMetrics::new(&prometheus_registry), )); let authority_names_to_peer_ids = epoch_store @@ -750,7 +753,7 @@ impl SuiNode { checkpoint_store.clone(), state_sync_handle.clone(), randomness_handle.clone(), - accumulator.clone(), + Arc::downgrade(&accumulator), connection_monitor_status.clone(), ®istry_service, sui_node_metrics.clone(), @@ -780,7 +783,7 @@ impl SuiNode { state_sync_handle, randomness_handle, checkpoint_store, - accumulator, + accumulator: Mutex::new(Some(accumulator)), end_of_epoch_channel, connection_monitor_status, trusted_peer_change_tx, @@ -1112,7 +1115,7 @@ impl SuiNode { checkpoint_store: Arc, state_sync_handle: state_sync::Handle, randomness_handle: randomness::Handle, - accumulator: Arc, + accumulator: Weak, connection_monitor_status: Arc, registry_service: &RegistryService, sui_node_metrics: Arc, @@ -1202,7 +1205,7 @@ impl SuiNode { randomness_handle: randomness::Handle, consensus_manager: ConsensusManager, consensus_epoch_data_remover: EpochDataRemover, - accumulator: Arc, + accumulator: Weak, validator_server_handle: JoinHandle>, validator_overload_monitor_handle: Option>, checkpoint_metrics: Arc, @@ -1306,7 +1309,7 @@ impl SuiNode { epoch_store: Arc, state: Arc, state_sync_handle: state_sync::Handle, - accumulator: Arc, + accumulator: Weak, checkpoint_metrics: Arc, ) -> (Arc, watch::Sender<()>) { let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms(); @@ -1456,17 +1459,23 @@ impl SuiNode { /// This function awaits the completion of checkpoint execution of the current epoch, /// after which it iniitiates reconfiguration of the entire system. pub async fn monitor_reconfiguration(self: Arc) -> Result<()> { - let mut checkpoint_executor = CheckpointExecutor::new( - self.state_sync_handle.subscribe_to_synced_checkpoints(), - self.checkpoint_store.clone(), - self.state.clone(), - self.accumulator.clone(), - self.config.checkpoint_executor_config.clone(), - &self.registry_service.default_registry(), - ); + let checkpoint_executor_metrics = + CheckpointExecutorMetrics::new(&self.registry_service.default_registry()); - let run_with_range = self.config.run_with_range; loop { + let mut accumulator_guard = self.accumulator.lock().await; + let accumulator = accumulator_guard.take().unwrap(); + let mut checkpoint_executor = CheckpointExecutor::new( + self.state_sync_handle.subscribe_to_synced_checkpoints(), + self.checkpoint_store.clone(), + self.state.clone(), + accumulator.clone(), + self.config.checkpoint_executor_config.clone(), + checkpoint_executor_metrics.clone(), + ); + + let run_with_range = self.config.run_with_range; + let cur_epoch_store = self.state.load_epoch_store_one_call_per_task(); // Advertise capabilities to committee, if we are a validator. @@ -1495,6 +1504,7 @@ impl SuiNode { let stop_condition = checkpoint_executor .run_epoch(cur_epoch_store.clone(), run_with_range) .await; + drop(checkpoint_executor); if stop_condition == StopReason::RunWithRangeCondition { SuiNode::shutdown(&self).await; @@ -1565,6 +1575,7 @@ impl SuiNode { // The following code handles 4 different cases, depending on whether the node // was a validator in the previous epoch, and whether the node is a validator // in the new epoch. + let new_validator_components = if let Some(ValidatorComponents { validator_server_handle, validator_overload_monitor_handle, @@ -1588,10 +1599,23 @@ impl SuiNode { &cur_epoch_store, next_epoch_committee.clone(), new_epoch_start_state, - &checkpoint_executor, + accumulator.clone(), ) .await; + // No other components should be holding a strong reference to state accumulator + // at this point. Confirm here before we swap in the new accumulator. + let accumulator_metrics = Arc::into_inner(accumulator) + .expect("Accumulator should have no other references at this point") + .metrics(); + let new_accumulator = Arc::new(StateAccumulator::new( + self.state.get_accumulator_store().clone(), + &new_epoch_store, + accumulator_metrics, + )); + let weak_accumulator = Arc::downgrade(&new_accumulator); + *accumulator_guard = Some(new_accumulator); + consensus_epoch_data_remover .remove_old_data(next_epoch - 1) .await; @@ -1609,7 +1633,7 @@ impl SuiNode { self.randomness_handle.clone(), consensus_manager, consensus_epoch_data_remover, - self.accumulator.clone(), + weak_accumulator, validator_server_handle, validator_overload_monitor_handle, checkpoint_metrics, @@ -1629,10 +1653,23 @@ impl SuiNode { &cur_epoch_store, next_epoch_committee.clone(), new_epoch_start_state, - &checkpoint_executor, + accumulator.clone(), ) .await; + // No other components should be holding a strong reference to state accumulator + // at this point. Confirm here before we swap in the new accumulator. + let accumulator_metrics = Arc::into_inner(accumulator) + .expect("Accumulator should have no other references at this point") + .metrics(); + let new_accumulator = Arc::new(StateAccumulator::new( + self.state.get_accumulator_store().clone(), + &new_epoch_store, + accumulator_metrics, + )); + let weak_accumulator = Arc::downgrade(&new_accumulator); + *accumulator_guard = Some(new_accumulator); + if self.state.is_validator(&new_epoch_store) { info!("Promoting the node from fullnode to validator, starting grpc server"); @@ -1645,7 +1682,7 @@ impl SuiNode { self.checkpoint_store.clone(), self.state_sync_handle.clone(), self.randomness_handle.clone(), - self.accumulator.clone(), + weak_accumulator, self.connection_monitor_status.clone(), &self.registry_service, self.metrics.clone(), @@ -1694,7 +1731,7 @@ impl SuiNode { cur_epoch_store: &AuthorityPerEpochStore, next_epoch_committee: Committee, next_epoch_start_system_state: EpochStartSystemState, - checkpoint_executor: &CheckpointExecutor, + accumulator: Arc, ) -> Arc { let next_epoch = next_epoch_committee.epoch(); @@ -1719,8 +1756,7 @@ impl SuiNode { self.config.supported_protocol_versions.unwrap(), next_epoch_committee, epoch_start_configuration, - checkpoint_executor, - self.accumulator.clone(), + accumulator, &self.config.expensive_safety_check_config, ) .await diff --git a/crates/sui-single-node-benchmark/src/single_node.rs b/crates/sui-single-node-benchmark/src/single_node.rs index d020ce6d4ffa4..5b87afcfff846 100644 --- a/crates/sui-single-node-benchmark/src/single_node.rs +++ b/crates/sui-single-node-benchmark/src/single_node.rs @@ -278,7 +278,7 @@ impl SingleValidator { ckpt_receiver, validator.get_checkpoint_store().clone(), validator.clone(), - Arc::new(StateAccumulator::new( + Arc::new(StateAccumulator::new_for_tests( validator.get_accumulator_store().clone(), self.get_epoch_store(), )),