Skip to content

Commit

Permalink
[Consensus] add missing blocks per authority metric (MystenLabs#19286)
Browse files Browse the repository at this point in the history
## Description 

This metric might provide us with a better idea on blocks from which
authorities are having propagation problems.

Also, decrement the synchronizer inflight metric properly.

## Test plan 

CI
PT

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
mwtian committed Sep 10, 2024
1 parent 90750b4 commit 055c20c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
7 changes: 7 additions & 0 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub(crate) struct NodeMetrics {
pub(crate) fetch_blocks_scheduler_inflight: IntGauge,
pub(crate) fetch_blocks_scheduler_skipped: IntCounterVec,
pub(crate) synchronizer_fetched_blocks_by_peer: IntCounterVec,
pub(crate) synchronizer_missing_blocks_by_authority: IntCounterVec,
pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec,
pub(crate) invalid_blocks: IntCounterVec,
pub(crate) rejected_blocks: IntCounterVec,
Expand Down Expand Up @@ -330,6 +331,12 @@ impl NodeMetrics {
&["authority", "type"],
registry,
).unwrap(),
synchronizer_missing_blocks_by_authority: register_int_counter_vec_with_registry!(
"synchronizer_missing_blocks_by_authority",
"Number of missing blocks per block author, as observed by the synchronizer during periodic sync.",
&["authority"],
registry,
).unwrap(),
last_known_own_block_round: register_int_gauge_with_registry!(
"last_known_own_block_round",
"The highest round of our own block as this has been synced from peers during an amnesia recovery",
Expand Down
29 changes: 20 additions & 9 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use mysten_metrics::{
monitored_scope,
};
use parking_lot::{Mutex, RwLock};
#[cfg(not(test))]
use rand::{prelude::SliceRandom, rngs::ThreadRng};
use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
use sui_macros::fail_point_async;
use tap::TapFallible;
use tokio::{
Expand Down Expand Up @@ -862,6 +861,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C

// Fetch blocks from peers
let results = Self::fetch_blocks_from_authorities(context.clone(), blocks_to_fetch.clone(), network_client, missing_blocks, core_dispatcher.clone(), dag_state).await;
context.metrics.node_metrics.fetch_blocks_scheduler_inflight.dec();
if results.is_empty() {
warn!("No results returned while requesting missing blocks");
return;
Expand All @@ -877,7 +877,6 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
}
}

context.metrics.node_metrics.fetch_blocks_scheduler_inflight.dec();
debug!("Total blocks requested to fetch: {}, total fetched: {}", total_requested, total_fetched);
}));
Ok(())
Expand Down Expand Up @@ -913,20 +912,32 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
.into_iter()
.take(MAX_PEERS * context.parameters.max_blocks_per_fetch)
.collect::<Vec<_>>();
let mut missing_blocks_per_authority = vec![0; context.committee.size()];
for block in &missing_blocks {
missing_blocks_per_authority[block.author] += 1;
}
for (missing, (_, authority)) in missing_blocks_per_authority
.into_iter()
.zip(context.committee.authorities())
{
context
.metrics
.node_metrics
.synchronizer_missing_blocks_by_authority
.with_label_values(&[&authority.hostname])
.inc_by(missing as u64);
}

#[allow(unused_mut)]
let mut peers = context
.committee
.authorities()
.filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
.collect::<Vec<_>>();

// TODO: probably inject the RNG to allow unit testing - this is a work around for now.
cfg_if::cfg_if! {
if #[cfg(not(test))] {
// Shuffle the peers
peers.shuffle(&mut ThreadRng::default());
}
if cfg!(not(test)) {
// Shuffle the peers
peers.shuffle(&mut ThreadRng::default());
}

let mut peers = peers.into_iter();
Expand Down

0 comments on commit 055c20c

Please sign in to comment.