diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 76d2a1f31fe09..a8e89b9e27690 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -136,7 +136,6 @@ use crate::authority::authority_store_pruner::{ }; use crate::authority::epoch_start_configuration::EpochStartConfigTrait; use crate::authority::epoch_start_configuration::EpochStartConfiguration; -use crate::checkpoints::checkpoint_executor::CheckpointExecutor; use crate::checkpoints::CheckpointStore; use crate::consensus_adapter::ConsensusAdapter; use crate::epoch::committee_store::CommitteeStore; @@ -2838,7 +2837,6 @@ impl AuthorityState { supported_protocol_versions: SupportedProtocolVersions, new_committee: Committee, epoch_start_configuration: EpochStartConfiguration, - checkpoint_executor: &CheckpointExecutor, accumulator: Arc, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, ) -> SuiResult> { @@ -2857,12 +2855,7 @@ impl AuthorityState { .await?; self.get_reconfig_api() .clear_state_end_of_epoch(&execution_lock); - self.check_system_consistency( - cur_epoch_store, - checkpoint_executor, - accumulator, - expensive_safety_check_config, - ); + self.check_system_consistency(cur_epoch_store, accumulator, expensive_safety_check_config); self.maybe_reaccumulate_state_hash( cur_epoch_store, epoch_start_configuration @@ -2955,7 +2948,6 @@ impl AuthorityState { fn check_system_consistency( &self, cur_epoch_store: &AuthorityPerEpochStore, - checkpoint_executor: &CheckpointExecutor, accumulator: Arc, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, ) { @@ -2986,7 +2978,6 @@ impl AuthorityState { cur_epoch_store.epoch() ); self.expensive_check_is_consistent_state( - checkpoint_executor, accumulator, cur_epoch_store, cfg!(debug_assertions), // panic in debug mode only @@ -3003,7 +2994,6 @@ impl AuthorityState { fn expensive_check_is_consistent_state( &self, - checkpoint_executor: &CheckpointExecutor, accumulator: Arc, cur_epoch_store: &AuthorityPerEpochStore, panic: bool, @@ -3041,7 +3031,7 @@ impl AuthorityState { } if !panic { - checkpoint_executor.set_inconsistent_state(is_inconsistent); + accumulator.set_inconsistent_state(is_inconsistent); } } diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index 4f0ca326b23fe..e6e7315ad7224 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -726,6 +726,7 @@ impl AuthorityPerEpochStore { epoch_id ); let epoch_start_configuration = Arc::new(epoch_start_configuration); + info!("epoch flags: {:?}", epoch_start_configuration.flags()); metrics.current_epoch.set(epoch_id as i64); metrics .current_voting_right diff --git a/crates/sui-core/src/authority/authority_test_utils.rs b/crates/sui-core/src/authority/authority_test_utils.rs index 74bd74b49f9b9..335872122d324 100644 --- a/crates/sui-core/src/authority/authority_test_utils.rs +++ b/crates/sui-core/src/authority/authority_test_utils.rs @@ -88,7 +88,8 @@ pub async fn execute_certificate_with_execution_error( // for testing and regression detection. // We must do this before sending to consensus, otherwise consensus may already // lead to transaction execution and state change. - let state_acc = StateAccumulator::new(authority.get_accumulator_store().clone(), &epoch_store); + let state_acc = + StateAccumulator::new_for_tests(authority.get_accumulator_store().clone(), &epoch_store); let include_wrapped_tombstone = !authority .epoch_store_for_testing() .protocol_config() diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs index 33b82682416ac..c51c4c6e61a8f 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs @@ -20,7 +20,6 @@ pub struct CheckpointExecutorMetrics { pub checkpoint_transaction_count: Histogram, pub checkpoint_contents_age_ms: Histogram, pub last_executed_checkpoint_age_ms: Histogram, - pub accumulator_inconsistent_state: IntGauge, } impl CheckpointExecutorMetrics { @@ -87,12 +86,6 @@ impl CheckpointExecutorMetrics { "Age of the last executed checkpoint", registry ), - accumulator_inconsistent_state: register_int_gauge_with_registry!( - "accumulator_inconsistent_state", - "1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch", - registry, - ) - .unwrap(), }; Arc::new(this) } diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs index b269c4a753f92..201774ed46776 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs @@ -29,7 +29,6 @@ use either::Either; use futures::stream::FuturesOrdered; use itertools::izip; use mysten_metrics::spawn_monitored_task; -use prometheus::Registry; use sui_config::node::{CheckpointExecutorConfig, RunWithRange}; use sui_macros::{fail_point, fail_point_async}; use sui_types::accumulator::Accumulator; @@ -68,7 +67,8 @@ use crate::{ }; mod data_ingestion_handler; -mod metrics; +pub mod metrics; + #[cfg(test)] pub(crate) mod tests; @@ -157,7 +157,7 @@ impl CheckpointExecutor { state: Arc, accumulator: Arc, config: CheckpointExecutorConfig, - prometheus_registry: &Registry, + metrics: Arc, ) -> Self { Self { mailbox, @@ -168,7 +168,7 @@ impl CheckpointExecutor { tx_manager: state.transaction_manager().clone(), accumulator, config, - metrics: CheckpointExecutorMetrics::new(prometheus_registry), + metrics, } } @@ -178,17 +178,14 @@ impl CheckpointExecutor { state: Arc, accumulator: Arc, ) -> Self { - Self { + Self::new( mailbox, - state: state.clone(), checkpoint_store, - object_cache_reader: state.get_object_cache_reader().clone(), - transaction_cache_reader: state.get_transaction_cache_reader().clone(), - tx_manager: state.transaction_manager().clone(), + state, accumulator, - config: Default::default(), - metrics: CheckpointExecutorMetrics::new_for_tests(), - } + Default::default(), + CheckpointExecutorMetrics::new_for_tests(), + ) } /// Ensure that all checkpoints in the current epoch will be executed. @@ -358,12 +355,6 @@ impl CheckpointExecutor { } } - pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) { - self.metrics - .accumulator_inconsistent_state - .set(is_inconsistent_state as i64); - } - fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) { // Ensure that we are not skipping checkpoints at any point let seq = *checkpoint.sequence_number(); diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs index c156a1676879f..4efa9a57714b2 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs @@ -234,7 +234,6 @@ pub async fn test_checkpoint_executor_cross_epoch() { EpochFlag::default_flags_for_new_epoch(&authority_state.config), ) .unwrap(), - &executor, accumulator, &ExpensiveSafetyCheckConfig::default(), ) @@ -394,7 +393,8 @@ async fn init_executor_test( broadcast::channel(buffer_size); let epoch_store = state.epoch_store_for_testing(); - let accumulator = StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store); + let accumulator = + StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store); let accumulator = Arc::new(accumulator); let executor = CheckpointExecutor::new_for_tests( diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index a0437a0ff6e77..8f329e4ea2789 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -41,6 +41,7 @@ use std::fs::File; use std::io::Write; use std::path::Path; use std::sync::Arc; +use std::sync::Weak; use std::time::Duration; use sui_protocol_config::ProtocolVersion; use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest}; @@ -859,7 +860,7 @@ pub struct CheckpointBuilder { notify: Arc, notify_aggregator: Arc, effects_store: Arc, - accumulator: Arc, + accumulator: Weak, output: Box, exit: watch::Receiver<()>, metrics: Arc, @@ -897,7 +898,7 @@ impl CheckpointBuilder { epoch_store: Arc, notify: Arc, effects_store: Arc, - accumulator: Arc, + accumulator: Weak, output: Box, exit: watch::Receiver<()>, notify_aggregator: Arc, @@ -1432,19 +1433,24 @@ impl CheckpointBuilder { let committee = system_state_obj.get_current_epoch_committee().committee; // This must happen after the call to augment_epoch_last_checkpoint, - // otherwise we will not capture the change_epoch tx - let acc = self.accumulator.accumulate_checkpoint( - effects.clone(), - sequence_number, - &self.epoch_store, - )?; - self.accumulator - .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc)) - .await?; - let root_state_digest = self - .accumulator - .digest_epoch(self.epoch_store.clone(), sequence_number) - .await?; + // otherwise we will not capture the change_epoch tx. + let root_state_digest = { + let state_acc = self + .accumulator + .upgrade() + .expect("No checkpoints should be getting built after local configuration"); + let acc = state_acc.accumulate_checkpoint( + effects.clone(), + sequence_number, + &self.epoch_store, + )?; + state_acc + .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc)) + .await?; + state_acc + .digest_epoch(self.epoch_store.clone(), sequence_number) + .await? + }; self.metrics.highest_accumulated_epoch.set(epoch as i64); info!("Epoch {epoch} root state hash digest: {root_state_digest:?}"); @@ -2213,7 +2219,7 @@ impl CheckpointService { checkpoint_store: Arc, epoch_store: Arc, effects_store: Arc, - accumulator: Arc, + accumulator: Weak, checkpoint_output: Box, certified_checkpoint_output: Box, metrics: Arc, @@ -2503,15 +2509,17 @@ mod tests { let checkpoint_store = CheckpointStore::new(ckpt_dir.path()); let epoch_store = state.epoch_store_for_testing(); - let accumulator = - StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store); + let accumulator = Arc::new(StateAccumulator::new_for_tests( + state.get_accumulator_store().clone(), + &epoch_store, + )); let (checkpoint_service, _exit) = CheckpointService::spawn( state.clone(), checkpoint_store, epoch_store.clone(), store, - Arc::new(accumulator), + Arc::downgrade(&accumulator), Box::new(output), Box::new(certified_output), CheckpointMetrics::new_for_tests(), diff --git a/crates/sui-core/src/state_accumulator.rs b/crates/sui-core/src/state_accumulator.rs index 0cc6f6c9d4e3c..f67f150839721 100644 --- a/crates/sui-core/src/state_accumulator.rs +++ b/crates/sui-core/src/state_accumulator.rs @@ -3,6 +3,7 @@ use itertools::Itertools; use mysten_metrics::monitored_scope; +use prometheus::{register_int_gauge_with_registry, IntGauge, Registry}; use serde::Serialize; use sui_protocol_config::ProtocolConfig; use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber}; @@ -25,6 +26,24 @@ use sui_types::messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSet use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore; use crate::authority::authority_store_tables::LiveObject; +pub struct StateAccumulatorMetrics { + inconsistent_state: IntGauge, +} + +impl StateAccumulatorMetrics { + pub fn new(registry: &Registry) -> Arc { + let this = Self { + inconsistent_state: register_int_gauge_with_registry!( + "accumulator_inconsistent_state", + "1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch", + registry + ) + .unwrap(), + }; + Arc::new(this) + } +} + pub enum StateAccumulator { V1(StateAccumulatorV1), V2(StateAccumulatorV2), @@ -32,10 +51,12 @@ pub enum StateAccumulator { pub struct StateAccumulatorV1 { store: Arc, + metrics: Arc, } pub struct StateAccumulatorV2 { store: Arc, + metrics: Arc, } pub trait AccumulatorStore: ObjectStore + Send + Sync { @@ -366,16 +387,44 @@ impl StateAccumulator { pub fn new( store: Arc, epoch_store: &Arc, + metrics: Arc, ) -> Self { if cfg!(msim) { if epoch_store.state_accumulator_v2_enabled() { - return StateAccumulator::V2(StateAccumulatorV2::new(store)); + return StateAccumulator::V2(StateAccumulatorV2::new(store, metrics)); } else { - return StateAccumulator::V1(StateAccumulatorV1::new(store)); + return StateAccumulator::V1(StateAccumulatorV1::new(store, metrics)); } } - StateAccumulator::V1(StateAccumulatorV1::new(store)) + StateAccumulator::V1(StateAccumulatorV1::new(store, metrics)) + } + + pub fn new_for_tests( + store: Arc, + epoch_store: &Arc, + ) -> Self { + Self::new( + store, + epoch_store, + StateAccumulatorMetrics::new(&Registry::new()), + ) + } + + pub fn metrics(&self) -> Arc { + match self { + StateAccumulator::V1(impl_v1) => impl_v1.metrics.clone(), + StateAccumulator::V2(impl_v2) => impl_v2.metrics.clone(), + } + } + + pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) { + match self { + StateAccumulator::V1(impl_v1) => &impl_v1.metrics, + StateAccumulator::V2(impl_v2) => &impl_v2.metrics, + } + .inconsistent_state + .set(is_inconsistent_state as i64); } /// Accumulates the effects of a single checkpoint and persists the accumulator. @@ -528,8 +577,8 @@ impl StateAccumulator { } impl StateAccumulatorV1 { - pub fn new(store: Arc) -> Self { - Self { store } + pub fn new(store: Arc, metrics: Arc) -> Self { + Self { store, metrics } } /// Unions all checkpoint accumulators at the end of the epoch to generate the @@ -618,8 +667,8 @@ impl StateAccumulatorV1 { } impl StateAccumulatorV2 { - pub fn new(store: Arc) -> Self { - Self { store } + pub fn new(store: Arc, metrics: Arc) -> Self { + Self { store, metrics } } pub async fn accumulate_running_root( @@ -629,6 +678,10 @@ impl StateAccumulatorV2 { checkpoint_acc: Option, ) -> SuiResult { let _scope = monitored_scope("AccumulateRunningRoot"); + tracing::info!( + "accumulating running root for checkpoint {}", + checkpoint_seq_num + ); // For the last checkpoint of the epoch, this function will be called once by the // checkpoint builder, and again by checkpoint executor. diff --git a/crates/sui-core/src/test_utils.rs b/crates/sui-core/src/test_utils.rs index 3c2b8a6ae5ef3..d9ccdd467af78 100644 --- a/crates/sui-core/src/test_utils.rs +++ b/crates/sui-core/src/test_utils.rs @@ -80,7 +80,8 @@ pub async fn send_and_confirm_transaction( // // We also check the incremental effects of the transaction on the live object set against StateAccumulator // for testing and regression detection - let state_acc = StateAccumulator::new(authority.get_accumulator_store().clone(), &epoch_store); + let state_acc = + StateAccumulator::new_for_tests(authority.get_accumulator_store().clone(), &epoch_store); let include_wrapped_tombstone = !authority .epoch_store_for_testing() .protocol_config() diff --git a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs index 2fb2873ee63d9..5f6e52ebfd15e 100644 --- a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs @@ -70,7 +70,10 @@ async fn send_transactions( pub fn checkpoint_service_for_testing(state: Arc) -> Arc { let (output, _result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10); let epoch_store = state.epoch_store_for_testing(); - let accumulator = StateAccumulator::new(state.get_accumulator_store().clone(), &epoch_store); + let accumulator = Arc::new(StateAccumulator::new_for_tests( + state.get_accumulator_store().clone(), + &epoch_store, + )); let (certified_output, _certified_result) = mpsc::channel::(10); let (checkpoint_service, _) = CheckpointService::spawn( @@ -78,7 +81,7 @@ pub fn checkpoint_service_for_testing(state: Arc) -> Arc= 2 || !initial_flags_nodes.insert(current_node) { + return None; + } + + // start with no flags set + Some(Vec::::new()) + }); + + let test_cluster = TestClusterBuilder::new() + .with_epoch_duration_ms(30000) + .build() + .await; + + let mut any_empty = false; + for node in test_cluster.all_node_handles() { + any_empty = any_empty + || node.with(|node| { + node.state() + .epoch_store_for_testing() + .epoch_start_config() + .flags() + .is_empty() + }); + } + assert!(any_empty); + + test_cluster.wait_for_epoch_all_nodes(1).await; + + let mut any_empty = false; + for node in test_cluster.all_node_handles() { + any_empty = any_empty + || node.with(|node| { + node.state() + .epoch_store_for_testing() + .epoch_start_config() + .flags() + .is_empty() + }); + } + assert!(!any_empty); + + sleep(Duration::from_secs(15)).await; + + test_cluster.stop_all_validators().await; + test_cluster.start_all_validators().await; + + test_cluster.wait_for_epoch_all_nodes(2).await; +} + #[cfg(msim)] #[sim_test] async fn safe_mode_reconfig_test() { diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 1070caa2431e7..85510eb78274f 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -20,7 +20,7 @@ use std::path::PathBuf; use std::str::FromStr; #[cfg(msim)] use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::Duration; use sui_core::authority::epoch_start_configuration::EpochFlag; use sui_core::authority::RandomnessRoundReceiver; @@ -29,6 +29,7 @@ use sui_core::consensus_adapter::SubmitToConsensus; use sui_core::consensus_manager::ConsensusClient; use sui_core::epoch::randomness::RandomnessManager; use sui_core::execution_cache::build_execution_cache; +use sui_core::state_accumulator::StateAccumulatorMetrics; use sui_core::storage::RestReadStore; use sui_core::traffic_controller::metrics::TrafficControllerMetrics; use sui_json_rpc::bridge_api::BridgeReadApi; @@ -67,6 +68,7 @@ use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait; use sui_core::authority::epoch_start_configuration::EpochStartConfiguration; use sui_core::authority_aggregator::AuthorityAggregator; use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics}; +use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics; use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason}; use sui_core::checkpoints::{ CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync, @@ -225,7 +227,7 @@ pub struct SuiNode { state_sync_handle: state_sync::Handle, randomness_handle: randomness::Handle, checkpoint_store: Arc, - accumulator: Arc, + accumulator: Mutex>>, connection_monitor_status: Arc, /// Broadcast channel to send the starting system state for the next epoch. @@ -714,6 +716,7 @@ impl SuiNode { let accumulator = Arc::new(StateAccumulator::new( cache_traits.accumulator_store.clone(), &epoch_store, + StateAccumulatorMetrics::new(&prometheus_registry), )); let authority_names_to_peer_ids = epoch_store @@ -750,7 +753,7 @@ impl SuiNode { checkpoint_store.clone(), state_sync_handle.clone(), randomness_handle.clone(), - accumulator.clone(), + Arc::downgrade(&accumulator), connection_monitor_status.clone(), ®istry_service, sui_node_metrics.clone(), @@ -780,7 +783,7 @@ impl SuiNode { state_sync_handle, randomness_handle, checkpoint_store, - accumulator, + accumulator: Mutex::new(Some(accumulator)), end_of_epoch_channel, connection_monitor_status, trusted_peer_change_tx, @@ -1112,7 +1115,7 @@ impl SuiNode { checkpoint_store: Arc, state_sync_handle: state_sync::Handle, randomness_handle: randomness::Handle, - accumulator: Arc, + accumulator: Weak, connection_monitor_status: Arc, registry_service: &RegistryService, sui_node_metrics: Arc, @@ -1202,7 +1205,7 @@ impl SuiNode { randomness_handle: randomness::Handle, consensus_manager: ConsensusManager, consensus_epoch_data_remover: EpochDataRemover, - accumulator: Arc, + accumulator: Weak, validator_server_handle: JoinHandle>, validator_overload_monitor_handle: Option>, checkpoint_metrics: Arc, @@ -1306,7 +1309,7 @@ impl SuiNode { epoch_store: Arc, state: Arc, state_sync_handle: state_sync::Handle, - accumulator: Arc, + accumulator: Weak, checkpoint_metrics: Arc, ) -> (Arc, watch::Sender<()>) { let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms(); @@ -1456,17 +1459,23 @@ impl SuiNode { /// This function awaits the completion of checkpoint execution of the current epoch, /// after which it iniitiates reconfiguration of the entire system. pub async fn monitor_reconfiguration(self: Arc) -> Result<()> { - let mut checkpoint_executor = CheckpointExecutor::new( - self.state_sync_handle.subscribe_to_synced_checkpoints(), - self.checkpoint_store.clone(), - self.state.clone(), - self.accumulator.clone(), - self.config.checkpoint_executor_config.clone(), - &self.registry_service.default_registry(), - ); + let checkpoint_executor_metrics = + CheckpointExecutorMetrics::new(&self.registry_service.default_registry()); - let run_with_range = self.config.run_with_range; loop { + let mut accumulator_guard = self.accumulator.lock().await; + let accumulator = accumulator_guard.take().unwrap(); + let mut checkpoint_executor = CheckpointExecutor::new( + self.state_sync_handle.subscribe_to_synced_checkpoints(), + self.checkpoint_store.clone(), + self.state.clone(), + accumulator.clone(), + self.config.checkpoint_executor_config.clone(), + checkpoint_executor_metrics.clone(), + ); + + let run_with_range = self.config.run_with_range; + let cur_epoch_store = self.state.load_epoch_store_one_call_per_task(); // Advertise capabilities to committee, if we are a validator. @@ -1495,6 +1504,7 @@ impl SuiNode { let stop_condition = checkpoint_executor .run_epoch(cur_epoch_store.clone(), run_with_range) .await; + drop(checkpoint_executor); if stop_condition == StopReason::RunWithRangeCondition { SuiNode::shutdown(&self).await; @@ -1565,6 +1575,7 @@ impl SuiNode { // The following code handles 4 different cases, depending on whether the node // was a validator in the previous epoch, and whether the node is a validator // in the new epoch. + let new_validator_components = if let Some(ValidatorComponents { validator_server_handle, validator_overload_monitor_handle, @@ -1588,10 +1599,23 @@ impl SuiNode { &cur_epoch_store, next_epoch_committee.clone(), new_epoch_start_state, - &checkpoint_executor, + accumulator.clone(), ) .await; + // No other components should be holding a strong reference to state accumulator + // at this point. Confirm here before we swap in the new accumulator. + let accumulator_metrics = Arc::into_inner(accumulator) + .expect("Accumulator should have no other references at this point") + .metrics(); + let new_accumulator = Arc::new(StateAccumulator::new( + self.state.get_accumulator_store().clone(), + &new_epoch_store, + accumulator_metrics, + )); + let weak_accumulator = Arc::downgrade(&new_accumulator); + *accumulator_guard = Some(new_accumulator); + consensus_epoch_data_remover .remove_old_data(next_epoch - 1) .await; @@ -1609,7 +1633,7 @@ impl SuiNode { self.randomness_handle.clone(), consensus_manager, consensus_epoch_data_remover, - self.accumulator.clone(), + weak_accumulator, validator_server_handle, validator_overload_monitor_handle, checkpoint_metrics, @@ -1629,10 +1653,23 @@ impl SuiNode { &cur_epoch_store, next_epoch_committee.clone(), new_epoch_start_state, - &checkpoint_executor, + accumulator.clone(), ) .await; + // No other components should be holding a strong reference to state accumulator + // at this point. Confirm here before we swap in the new accumulator. + let accumulator_metrics = Arc::into_inner(accumulator) + .expect("Accumulator should have no other references at this point") + .metrics(); + let new_accumulator = Arc::new(StateAccumulator::new( + self.state.get_accumulator_store().clone(), + &new_epoch_store, + accumulator_metrics, + )); + let weak_accumulator = Arc::downgrade(&new_accumulator); + *accumulator_guard = Some(new_accumulator); + if self.state.is_validator(&new_epoch_store) { info!("Promoting the node from fullnode to validator, starting grpc server"); @@ -1645,7 +1682,7 @@ impl SuiNode { self.checkpoint_store.clone(), self.state_sync_handle.clone(), self.randomness_handle.clone(), - self.accumulator.clone(), + weak_accumulator, self.connection_monitor_status.clone(), &self.registry_service, self.metrics.clone(), @@ -1694,7 +1731,7 @@ impl SuiNode { cur_epoch_store: &AuthorityPerEpochStore, next_epoch_committee: Committee, next_epoch_start_system_state: EpochStartSystemState, - checkpoint_executor: &CheckpointExecutor, + accumulator: Arc, ) -> Arc { let next_epoch = next_epoch_committee.epoch(); @@ -1719,8 +1756,7 @@ impl SuiNode { self.config.supported_protocol_versions.unwrap(), next_epoch_committee, epoch_start_configuration, - checkpoint_executor, - self.accumulator.clone(), + accumulator, &self.config.expensive_safety_check_config, ) .await diff --git a/crates/sui-single-node-benchmark/src/single_node.rs b/crates/sui-single-node-benchmark/src/single_node.rs index d020ce6d4ffa4..5b87afcfff846 100644 --- a/crates/sui-single-node-benchmark/src/single_node.rs +++ b/crates/sui-single-node-benchmark/src/single_node.rs @@ -278,7 +278,7 @@ impl SingleValidator { ckpt_receiver, validator.get_checkpoint_store().clone(), validator.clone(), - Arc::new(StateAccumulator::new( + Arc::new(StateAccumulator::new_for_tests( validator.get_accumulator_store().clone(), self.get_epoch_store(), )),