From 66f6015ee42ff5ce3b3501fbbd033f2f2e1b43e9 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Tue, 28 Jan 2025 14:38:09 +0200 Subject: [PATCH] detect network silence during startup --- .../chain_metadata_service/service.rs | 2 +- .../state_machine_service/state_machine.rs | 14 ++--- .../states/events_and_states.rs | 8 +-- .../state_machine_service/states/listening.rs | 55 ++++++++++--------- .../states/starting_state.rs | 47 ++++++++++++++-- base_layer/core/tests/helpers/sync.rs | 2 +- .../core/tests/tests/node_state_machine.rs | 9 ++- 7 files changed, 89 insertions(+), 48 deletions(-) diff --git a/base_layer/core/src/base_node/chain_metadata_service/service.rs b/base_layer/core/src/base_node/chain_metadata_service/service.rs index 48c3f94790..05da35ba00 100644 --- a/base_layer/core/src/base_node/chain_metadata_service/service.rs +++ b/base_layer/core/src/base_node/chain_metadata_service/service.rs @@ -40,7 +40,7 @@ use crate::{ proto::base_node as proto, }; -const NUM_ROUNDS_NETWORK_SILENCE: u16 = 4; +const NUM_ROUNDS_NETWORK_SILENCE: u16 = 3; pub(super) struct ChainMetadataService { liveness: LivenessHandle, diff --git a/base_layer/core/src/base_node/state_machine_service/state_machine.rs b/base_layer/core/src/base_node/state_machine_service/state_machine.rs index a279208c3d..dd55ecb9f1 100644 --- a/base_layer/core/src/base_node/state_machine_service/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine_service/state_machine.rs @@ -145,9 +145,9 @@ impl BaseNodeStateMachine { #[allow(clippy::enum_glob_use)] use self::{BaseNodeState::*, StateEvent::*, SyncStatus::Lagging}; match (state, event) { - (Starting(s), Initialized) => Listening(s.into()), + (Starting(s), Initialized(network_silence)) => Listening(s.into(), network_silence), ( - Listening(_), + Listening(..), FallenBehind(Lagging { local: local_metadata, sync_peers, @@ -163,14 +163,14 @@ impl BaseNodeStateMachine { }, (HeaderSync(s), Continue | NetworkSilence) => { db.clear_disable_add_block_flag(); - Listening(s.into()) + Listening(s.into(), false) }, (HeaderSync(s), HeadersSynchronized(..)) => DecideNextSync(s.into()), (DecideNextSync(_), ProceedToHorizonSync(peers)) => HorizonStateSync(peers.into()), (DecideNextSync(s), Continue) => { db.clear_disable_add_block_flag(); - Listening(s.into()) + Listening(s.into(), false) }, (HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()), (HorizonStateSync(s), HorizonStateSyncFailure) => { @@ -181,14 +181,14 @@ impl BaseNodeStateMachine { (DecideNextSync(_), ProceedToBlockSync(peers)) => BlockSync(peers.into()), (BlockSync(s), BlocksSynchronized) => { db.clear_disable_add_block_flag(); - Listening(s.into()) + Listening(s.into(), false) }, (BlockSync(s), BlockSyncFailed) => { db.clear_disable_add_block_flag(); Waiting(s.into()) }, - (Waiting(s), Continue) => Listening(s.into()), + (Waiting(s), Continue) => Listening(s.into(), false), (_, FatalError(s)) => Shutdown(states::Shutdown::with_reason(s)), (_, UserQuit) => Shutdown(states::Shutdown::with_reason("Shutdown initiated by user".to_string())), (s, e) => { @@ -278,7 +278,7 @@ impl BaseNodeStateMachine { DecideNextSync(s) => s.next_event(shared_state).await, HorizonStateSync(s) => s.next_event(shared_state).await, BlockSync(s) => s.next_event(shared_state).await, - Listening(s) => s.next_event(shared_state).await, + Listening(s, network_silence) => s.next_event(shared_state, *network_silence).await, Waiting(s) => s.next_event().await, Shutdown(_) => unreachable!("called get_next_state_event while in Shutdown state"), } diff --git a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs index fce4dc9a88..f6cdb2da1e 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs @@ -48,7 +48,7 @@ pub enum BaseNodeState { HorizonStateSync(HorizonStateSync), BlockSync(BlockSync), // The best network chain metadata - Listening(Listening), + Listening(Listening, bool), // We're in a paused state, and will return to Listening after a timeout Waiting(Waiting), Shutdown(Shutdown), @@ -56,7 +56,7 @@ pub enum BaseNodeState { #[derive(Debug, Clone, PartialEq)] pub enum StateEvent { - Initialized, + Initialized(bool), // Initialized with or without network silence HeadersSynchronized(SyncPeer, AttemptSyncResult), HeaderSyncFailed(String), ProceedToHorizonSync(Vec), @@ -143,7 +143,7 @@ impl Display for StateEvent { #[allow(clippy::enum_glob_use)] use StateEvent::*; match self { - Initialized => write!(f, "Initialized"), + Initialized(..) => write!(f, "Initialized"), BlocksSynchronized => write!(f, "Synchronised Blocks"), HeadersSynchronized(peer, result) => write!(f, "Headers Synchronized from peer `{}` ({:?})", peer, result), HeaderSyncFailed(err) => write!(f, "Header Synchronization Failed ({})", err), @@ -171,7 +171,7 @@ impl Display for BaseNodeState { DecideNextSync(_) => "Deciding next sync", HorizonStateSync(_) => "Synchronizing horizon state", BlockSync(_) => "Synchronizing blocks", - Listening(_) => "Listening", + Listening(..) => "Listening", Shutdown(_) => "Shutting down", Waiting(_) => "Waiting", }; diff --git a/base_layer/core/src/base_node/state_machine_service/states/listening.rs b/base_layer/core/src/base_node/state_machine_service/states/listening.rs index 6f915eaf3d..de5a2dccad 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/listening.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/listening.rs @@ -123,17 +123,39 @@ impl Listening { Default::default() } + fn set_synced_response(&mut self, shared: &mut BaseNodeStateMachine) { + if !self.is_synced { + self.is_synced = true; + self.initial_delay_count = 0; + shared.set_state_info(StateInfo::Listening(ListeningInfo::new( + true, + 0, + shared.config.initial_sync_peer_count, + ))); + } + } + #[allow(clippy::too_many_lines)] pub async fn next_event( &mut self, shared: &mut BaseNodeStateMachine, + network_silence: bool, ) -> StateEvent { info!(target: LOG_TARGET, "Listening for chain metadata updates"); - shared.set_state_info(StateInfo::Listening(ListeningInfo::new( - self.is_synced, - self.initial_delay_count, - shared.config.initial_sync_peer_count, - ))); + if network_silence { + self.set_synced_response(shared); + warn!( + target: LOG_TARGET, + "Initial sync achieved based on event 'NetworkSilence'; this may not be true if the entire \ + network in general is slow to respond to pings" + ); + } else { + shared.set_state_info(StateInfo::Listening(ListeningInfo::new( + self.is_synced, + self.initial_delay_count, + shared.config.initial_sync_peer_count, + ))); + } let mut time_since_better_block = None; let mut initial_sync_counter = 0; let mut initial_sync_peer_list = Vec::new(); @@ -141,21 +163,8 @@ impl Listening { let metadata_event = shared.metadata_event_stream.recv().await; match metadata_event.as_ref().map(|v| v.deref()) { Ok(ChainMetadataEvent::NetworkSilence) => { + self.set_synced_response(shared); debug!("NetworkSilence event received"); - if !self.is_synced { - self.is_synced = true; - self.initial_delay_count = 0; - shared.set_state_info(StateInfo::Listening(ListeningInfo::new( - true, - 0, - shared.config.initial_sync_peer_count, - ))); - warn!( - target: LOG_TARGET, - "Initial sync achieved based on event 'NetworkSilence'; this may not be true if the entire \ - network in general is slow to respond to pings" - ); - } }, Ok(ChainMetadataEvent::PeerChainMetadataReceived(peer_metadata)) => { // if we are not yet synced, we wait for the initial delay of ping/pongs, so let's propagate the @@ -251,13 +260,7 @@ impl Listening { } if !self.is_synced && sync_mode.is_up_to_date() { - self.is_synced = true; - self.initial_delay_count = 0; - shared.set_state_info(StateInfo::Listening(ListeningInfo::new( - true, - 0, - shared.config.initial_sync_peer_count, - ))); + self.set_synced_response(shared); debug!(target: LOG_TARGET, "Initial sync achieved"); } diff --git a/base_layer/core/src/base_node/state_machine_service/states/starting_state.rs b/base_layer/core/src/base_node/state_machine_service/states/starting_state.rs index 3b2155052e..a1fb2c085f 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/starting_state.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/starting_state.rs @@ -19,13 +19,19 @@ // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// + +use std::ops::Deref; + use log::*; +use tokio::sync::broadcast; use crate::{ - base_node::state_machine_service::{ - states::{listening::Listening, StateEvent}, - BaseNodeStateMachine, + base_node::{ + chain_metadata_service::ChainMetadataEvent, + state_machine_service::{ + states::{listening::Listening, StateEvent}, + BaseNodeStateMachine, + }, }, chain_storage::BlockchainBackend, }; @@ -37,9 +43,38 @@ const LOG_TARGET: &str = "c::bn::state_machine_service::states::starting_state"; pub struct Starting; impl Starting { - pub async fn next_event(&mut self, _shared: &BaseNodeStateMachine) -> StateEvent { + pub async fn next_event(&mut self, shared: &mut BaseNodeStateMachine) -> StateEvent { info!(target: LOG_TARGET, "Starting node."); - StateEvent::Initialized + + let mut network_silence_count = 0; + loop { + let metadata_event = shared.metadata_event_stream.recv().await; + match metadata_event.as_ref().map(|v| v.deref()) { + Ok(ChainMetadataEvent::NetworkSilence) => { + network_silence_count += 1; + debug!("NetworkSilence event received ({})", network_silence_count); + if network_silence_count >= 3 { + return StateEvent::Initialized(true); + } + }, + Ok(ChainMetadataEvent::PeerChainMetadataReceived(_)) => { + return StateEvent::Initialized(false); + }, + Err(broadcast::error::RecvError::Lagged(n)) => { + debug!(target: LOG_TARGET, "Metadata event subscriber lagged by {} item(s)", n); + }, + Err(broadcast::error::RecvError::Closed) => { + debug!(target: LOG_TARGET, "Metadata event subscriber closed"); + break; + }, + } + } + + debug!( + target: LOG_TARGET, + "Event listener is complete because liveness metadata and timeout streams were closed" + ); + StateEvent::UserQuit } } diff --git a/base_layer/core/tests/helpers/sync.rs b/base_layer/core/tests/helpers/sync.rs index 40099ce612..9b35cd66e0 100644 --- a/base_layer/core/tests/helpers/sync.rs +++ b/base_layer/core/tests/helpers/sync.rs @@ -383,7 +383,7 @@ pub async fn wait_for_is_peer_banned(this_node: &NodeInterfaces, peer_node_id: & /// Condensed format of the state machine state for display pub fn state_event(event: &StateEvent) -> String { match event { - StateEvent::Initialized => "Initialized".to_string(), + StateEvent::Initialized(_) => "Initialized".to_string(), StateEvent::HeadersSynchronized(_, _) => "HeadersSynchronized".to_string(), StateEvent::HeaderSyncFailed(_) => "HeaderSyncFailed".to_string(), StateEvent::ProceedToHorizonSync(_) => "ProceedToHorizonSync".to_string(), diff --git a/base_layer/core/tests/tests/node_state_machine.rs b/base_layer/core/tests/tests/node_state_machine.rs index 305fb883cc..9d85fbc5cb 100644 --- a/base_layer/core/tests/tests/node_state_machine.rs +++ b/base_layer/core/tests/tests/node_state_machine.rs @@ -118,7 +118,8 @@ async fn test_listening_lagging() { ); wait_until_online(&[&alice_node, &bob_node]).await; - let await_event_task = task::spawn(async move { Listening::new().next_event(&mut alice_state_machine).await }); + let await_event_task = + task::spawn(async move { Listening::new().next_event(&mut alice_state_machine, false).await }); let bob_db = bob_node.blockchain_db; let mut bob_local_nci = bob_node.local_nci; @@ -252,7 +253,8 @@ async fn test_listening_initial_fallen_behind() { ); assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 0); - let await_event_task = task::spawn(async move { Listening::new().next_event(&mut alice_state_machine).await }); + let await_event_task = + task::spawn(async move { Listening::new().next_event(&mut alice_state_machine, false).await }); let next_event = time::timeout(Duration::from_secs(10), await_event_task) .await @@ -321,7 +323,8 @@ async fn test_event_channel() { .expect("Could not publish metadata"); } let event = state_change_event_subscriber.recv().await; - assert_eq!(*event.unwrap(), StateEvent::Initialized); + let event = event.unwrap(); + unpack_enum!(StateEvent::Initialized(_) = &*event); let event = state_change_event_subscriber.recv().await; let event = event.unwrap(); unpack_enum!(StateEvent::FallenBehind(_) = &*event);