Skip to content

Commit

Permalink
detect network silence during startup
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Jan 28, 2025
1 parent e8a2911 commit 66f6015
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
proto::base_node as proto,
};

const NUM_ROUNDS_NETWORK_SILENCE: u16 = 4;
const NUM_ROUNDS_NETWORK_SILENCE: u16 = 3;

pub(super) struct ChainMetadataService {
liveness: LivenessHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
#[allow(clippy::enum_glob_use)]
use self::{BaseNodeState::*, StateEvent::*, SyncStatus::Lagging};
match (state, event) {
(Starting(s), Initialized) => Listening(s.into()),
(Starting(s), Initialized(network_silence)) => Listening(s.into(), network_silence),
(
Listening(_),
Listening(..),
FallenBehind(Lagging {
local: local_metadata,
sync_peers,
Expand All @@ -163,14 +163,14 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
},
(HeaderSync(s), Continue | NetworkSilence) => {
db.clear_disable_add_block_flag();
Listening(s.into())
Listening(s.into(), false)
},
(HeaderSync(s), HeadersSynchronized(..)) => DecideNextSync(s.into()),

(DecideNextSync(_), ProceedToHorizonSync(peers)) => HorizonStateSync(peers.into()),
(DecideNextSync(s), Continue) => {
db.clear_disable_add_block_flag();
Listening(s.into())
Listening(s.into(), false)
},
(HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()),
(HorizonStateSync(s), HorizonStateSyncFailure) => {
Expand All @@ -181,14 +181,14 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
(DecideNextSync(_), ProceedToBlockSync(peers)) => BlockSync(peers.into()),
(BlockSync(s), BlocksSynchronized) => {
db.clear_disable_add_block_flag();
Listening(s.into())
Listening(s.into(), false)
},
(BlockSync(s), BlockSyncFailed) => {
db.clear_disable_add_block_flag();
Waiting(s.into())
},

(Waiting(s), Continue) => Listening(s.into()),
(Waiting(s), Continue) => Listening(s.into(), false),
(_, FatalError(s)) => Shutdown(states::Shutdown::with_reason(s)),
(_, UserQuit) => Shutdown(states::Shutdown::with_reason("Shutdown initiated by user".to_string())),
(s, e) => {
Expand Down Expand Up @@ -278,7 +278,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
DecideNextSync(s) => s.next_event(shared_state).await,
HorizonStateSync(s) => s.next_event(shared_state).await,
BlockSync(s) => s.next_event(shared_state).await,
Listening(s) => s.next_event(shared_state).await,
Listening(s, network_silence) => s.next_event(shared_state, *network_silence).await,
Waiting(s) => s.next_event().await,
Shutdown(_) => unreachable!("called get_next_state_event while in Shutdown state"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ pub enum BaseNodeState {
HorizonStateSync(HorizonStateSync),
BlockSync(BlockSync),
// The best network chain metadata
Listening(Listening),
Listening(Listening, bool),
// We're in a paused state, and will return to Listening after a timeout
Waiting(Waiting),
Shutdown(Shutdown),
}

#[derive(Debug, Clone, PartialEq)]
pub enum StateEvent {
Initialized,
Initialized(bool), // Initialized with or without network silence
HeadersSynchronized(SyncPeer, AttemptSyncResult),
HeaderSyncFailed(String),
ProceedToHorizonSync(Vec<SyncPeer>),
Expand Down Expand Up @@ -143,7 +143,7 @@ impl Display for StateEvent {
#[allow(clippy::enum_glob_use)]
use StateEvent::*;
match self {
Initialized => write!(f, "Initialized"),
Initialized(..) => write!(f, "Initialized"),
BlocksSynchronized => write!(f, "Synchronised Blocks"),
HeadersSynchronized(peer, result) => write!(f, "Headers Synchronized from peer `{}` ({:?})", peer, result),
HeaderSyncFailed(err) => write!(f, "Header Synchronization Failed ({})", err),
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Display for BaseNodeState {
DecideNextSync(_) => "Deciding next sync",
HorizonStateSync(_) => "Synchronizing horizon state",
BlockSync(_) => "Synchronizing blocks",
Listening(_) => "Listening",
Listening(..) => "Listening",
Shutdown(_) => "Shutting down",
Waiting(_) => "Waiting",
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,39 +123,48 @@ impl Listening {
Default::default()
}

fn set_synced_response<B: BlockchainBackend + 'static>(&mut self, shared: &mut BaseNodeStateMachine<B>) {
if !self.is_synced {
self.is_synced = true;
self.initial_delay_count = 0;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(
true,
0,
shared.config.initial_sync_peer_count,
)));
}
}

#[allow(clippy::too_many_lines)]
pub async fn next_event<B: BlockchainBackend + 'static>(
&mut self,
shared: &mut BaseNodeStateMachine<B>,
network_silence: bool,
) -> StateEvent {
info!(target: LOG_TARGET, "Listening for chain metadata updates");
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(
self.is_synced,
self.initial_delay_count,
shared.config.initial_sync_peer_count,
)));
if network_silence {
self.set_synced_response(shared);
warn!(
target: LOG_TARGET,
"Initial sync achieved based on event 'NetworkSilence'; this may not be true if the entire \
network in general is slow to respond to pings"
);
} else {
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(
self.is_synced,
self.initial_delay_count,
shared.config.initial_sync_peer_count,
)));
}
let mut time_since_better_block = None;
let mut initial_sync_counter = 0;
let mut initial_sync_peer_list = Vec::new();
loop {
let metadata_event = shared.metadata_event_stream.recv().await;
match metadata_event.as_ref().map(|v| v.deref()) {
Ok(ChainMetadataEvent::NetworkSilence) => {
self.set_synced_response(shared);
debug!("NetworkSilence event received");
if !self.is_synced {
self.is_synced = true;
self.initial_delay_count = 0;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(
true,
0,
shared.config.initial_sync_peer_count,
)));
warn!(
target: LOG_TARGET,
"Initial sync achieved based on event 'NetworkSilence'; this may not be true if the entire \
network in general is slow to respond to pings"
);
}
},
Ok(ChainMetadataEvent::PeerChainMetadataReceived(peer_metadata)) => {
// if we are not yet synced, we wait for the initial delay of ping/pongs, so let's propagate the
Expand Down Expand Up @@ -251,13 +260,7 @@ impl Listening {
}

if !self.is_synced && sync_mode.is_up_to_date() {
self.is_synced = true;
self.initial_delay_count = 0;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(
true,
0,
shared.config.initial_sync_peer_count,
)));
self.set_synced_response(shared);
debug!(target: LOG_TARGET, "Initial sync achieved");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//

use std::ops::Deref;

use log::*;
use tokio::sync::broadcast;

use crate::{
base_node::state_machine_service::{
states::{listening::Listening, StateEvent},
BaseNodeStateMachine,
base_node::{
chain_metadata_service::ChainMetadataEvent,
state_machine_service::{
states::{listening::Listening, StateEvent},
BaseNodeStateMachine,
},
},
chain_storage::BlockchainBackend,
};
Expand All @@ -37,9 +43,38 @@ const LOG_TARGET: &str = "c::bn::state_machine_service::states::starting_state";
pub struct Starting;

impl Starting {
pub async fn next_event<B: BlockchainBackend>(&mut self, _shared: &BaseNodeStateMachine<B>) -> StateEvent {
pub async fn next_event<B: BlockchainBackend>(&mut self, shared: &mut BaseNodeStateMachine<B>) -> StateEvent {
info!(target: LOG_TARGET, "Starting node.");
StateEvent::Initialized

let mut network_silence_count = 0;
loop {
let metadata_event = shared.metadata_event_stream.recv().await;
match metadata_event.as_ref().map(|v| v.deref()) {
Ok(ChainMetadataEvent::NetworkSilence) => {
network_silence_count += 1;
debug!("NetworkSilence event received ({})", network_silence_count);
if network_silence_count >= 3 {
return StateEvent::Initialized(true);
}
},
Ok(ChainMetadataEvent::PeerChainMetadataReceived(_)) => {
return StateEvent::Initialized(false);
},
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!(target: LOG_TARGET, "Metadata event subscriber lagged by {} item(s)", n);
},
Err(broadcast::error::RecvError::Closed) => {
debug!(target: LOG_TARGET, "Metadata event subscriber closed");
break;
},
}
}

debug!(
target: LOG_TARGET,
"Event listener is complete because liveness metadata and timeout streams were closed"
);
StateEvent::UserQuit
}
}

Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/helpers/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ pub async fn wait_for_is_peer_banned(this_node: &NodeInterfaces, peer_node_id: &
/// Condensed format of the state machine state for display
pub fn state_event(event: &StateEvent) -> String {
match event {
StateEvent::Initialized => "Initialized".to_string(),
StateEvent::Initialized(_) => "Initialized".to_string(),
StateEvent::HeadersSynchronized(_, _) => "HeadersSynchronized".to_string(),
StateEvent::HeaderSyncFailed(_) => "HeaderSyncFailed".to_string(),
StateEvent::ProceedToHorizonSync(_) => "ProceedToHorizonSync".to_string(),
Expand Down
9 changes: 6 additions & 3 deletions base_layer/core/tests/tests/node_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ async fn test_listening_lagging() {
);
wait_until_online(&[&alice_node, &bob_node]).await;

let await_event_task = task::spawn(async move { Listening::new().next_event(&mut alice_state_machine).await });
let await_event_task =
task::spawn(async move { Listening::new().next_event(&mut alice_state_machine, false).await });

let bob_db = bob_node.blockchain_db;
let mut bob_local_nci = bob_node.local_nci;
Expand Down Expand Up @@ -252,7 +253,8 @@ async fn test_listening_initial_fallen_behind() {
);

assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 0);
let await_event_task = task::spawn(async move { Listening::new().next_event(&mut alice_state_machine).await });
let await_event_task =
task::spawn(async move { Listening::new().next_event(&mut alice_state_machine, false).await });

let next_event = time::timeout(Duration::from_secs(10), await_event_task)
.await
Expand Down Expand Up @@ -321,7 +323,8 @@ async fn test_event_channel() {
.expect("Could not publish metadata");
}
let event = state_change_event_subscriber.recv().await;
assert_eq!(*event.unwrap(), StateEvent::Initialized);
let event = event.unwrap();
unpack_enum!(StateEvent::Initialized(_) = &*event);
let event = state_change_event_subscriber.recv().await;
let event = event.unwrap();
unpack_enum!(StateEvent::FallenBehind(_) = &*event);
Expand Down

0 comments on commit 66f6015

Please sign in to comment.