Skip to content

Commit

Permalink
Support announcing to other peers sync status of local peer and take …
Browse files Browse the repository at this point in the history
…that into account when deciding if node is under major sync or not.

Add pending sync status to distinguish state where blocks are imported, but sync target is not yet identified due to no synced nodes known.
  • Loading branch information
nazar-pc committed Aug 12, 2024
1 parent 4670a2c commit 9b09c04
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 50 deletions.
1 change: 1 addition & 0 deletions cumulus/client/relay-chain-minimal-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ fn get_block_announce_proto_config<Network: NetworkBackend<Block, Hash>>(
best_number,
best_hash,
genesis_hash,
true,
))),
// NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement
// protocol is still hardcoded into the peerset.
Expand Down
9 changes: 9 additions & 0 deletions substrate/client/cli/src/params/network_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ pub struct NetworkParams {
verbatim_doc_comment
)]
pub network_backend: NetworkBackendType,

/// Parameter that allows node to forcefully assume it is synced, needed for network
/// bootstrapping only, as long as two synced nodes remain on the network at any time, this
/// doesn't need to be used.
///
/// `--dev` enables this option automatically.
#[clap(long)]
pub force_synced: bool,
}

impl NetworkParams {
Expand Down Expand Up @@ -285,6 +293,7 @@ impl NetworkParams {
sync_mode: self.sync.into(),
pause_sync: Arc::new(AtomicBool::new(false)),
network_backend: self.network_backend.into(),
force_synced: self.force_synced || is_dev,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions substrate/client/informant/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl<B: BlockT> InformantDisplay<B> {
(state.size as f32) / (1024f32 * 1024f32)
),
),
(SyncState::Pending, _, _) => ("⏳", "Pending".into(), "".into()),
(SyncState::Idle, _, _) => ("💤", "Idle".into(), "".into()),
(SyncState::Downloading { target }, _, _) =>
("⚙️ ", format!("Syncing{}", speed), format!(", target=#{target}")),
Expand Down
11 changes: 9 additions & 2 deletions substrate/client/network/common/src/sync/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ pub mod generic {
pub struct BlockAnnounce<H> {
/// New block header.
pub header: H,
/// Whether peer is synced.
pub is_synced: bool,
/// Block state. TODO: Remove `Option` and custom encoding when v4 becomes common.
pub state: Option<BlockState>,
/// Data associated with this block announcement, e.g. a candidate message.
Expand All @@ -202,6 +204,7 @@ pub mod generic {
impl<H: Encode> Encode for BlockAnnounce<H> {
fn encode_to<T: Output + ?Sized>(&self, dest: &mut T) {
self.header.encode_to(dest);
self.is_synced.encode_to(dest);
if let Some(state) = &self.state {
state.encode_to(dest);
}
Expand All @@ -215,8 +218,9 @@ pub mod generic {
fn decode<I: Input>(input: &mut I) -> Result<Self, codec::Error> {
let header = H::decode(input)?;
let state = BlockState::decode(input).ok();
let is_synced = bool::decode(input)?;
let data = Vec::decode(input).ok();
Ok(Self { header, state, data })
Ok(Self { header, is_synced, state, data })
}
}
}
Expand All @@ -232,6 +236,8 @@ pub struct BlockAnnouncesHandshake<B: BlockT> {
pub best_hash: B::Hash,
/// Genesis block hash.
pub genesis_hash: B::Hash,
/// Whether peer is synced.
pub is_synced: bool,
}

impl<B: BlockT> BlockAnnouncesHandshake<B> {
Expand All @@ -240,7 +246,8 @@ impl<B: BlockT> BlockAnnouncesHandshake<B> {
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
is_synced: bool,
) -> Self {
Self { genesis_hash, roles, best_number, best_hash }
Self { genesis_hash, roles, best_number, best_hash, is_synced }
}
}
6 changes: 6 additions & 0 deletions substrate/client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,11 @@ pub struct NetworkConfiguration {

/// Networking backend used for P2P communication.
pub network_backend: NetworkBackendType,

/// Parameter that allows node to forcefully assume it is synced, needed for network
/// bootstrapping only, as long as two synced nodes remain on the network at any time, this
/// doesn't need to be used.
pub force_synced: bool,
}

impl NetworkConfiguration {
Expand Down Expand Up @@ -710,6 +715,7 @@ impl NetworkConfiguration {
yamux_window_size: None,
ipfs_server: false,
network_backend: NetworkBackendType::Libp2p,
force_synced: false,
}
}

Expand Down
6 changes: 6 additions & 0 deletions substrate/client/network/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ pub mod generic {
pub best_hash: Hash,
/// Genesis block hash.
pub genesis_hash: Hash,
/// Whether peer is synced.
pub is_synced: bool,
}

/// Status sent on connection.
Expand All @@ -153,6 +155,8 @@ pub mod generic {
pub best_hash: Hash,
/// Genesis block hash.
pub genesis_hash: Hash,
/// Whether peer is synced.
pub is_synced: bool,
/// DEPRECATED. Chain-specific status.
pub chain_status: Vec<u8>,
}
Expand All @@ -178,6 +182,7 @@ pub mod generic {
best_number,
best_hash,
genesis_hash,
is_synced,
} = compact;

Ok(Self {
Expand All @@ -187,6 +192,7 @@ pub mod generic {
best_number,
best_hash,
genesis_hash,
is_synced,
chain_status,
})
}
Expand Down
38 changes: 34 additions & 4 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Are we actively catching up with the chain?
is_major_syncing: Arc<AtomicBool>,

/// Parameter that allows node to forcefully assume it is synced, needed for network
/// bootstrapping only, as long as two synced nodes remain on the network at any time, this
/// doesn't need to be used.
force_synced: bool,

/// Network service.
network_service: service::network::NetworkServiceHandle,

Expand Down Expand Up @@ -316,6 +321,7 @@ where
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
peer_store_handle: Arc<dyn PeerStoreProvider>,
force_synced: bool,
) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
where
N: NetworkBackend<B, <B as BlockT>::Hash>,
Expand Down Expand Up @@ -404,7 +410,8 @@ where
&net_config.network_config.default_peers_set,
network_metrics,
Arc::clone(&peer_store_handle),
);
force_synced,
);

// Split warp sync params into warp sync config and a channel to retrieve target block
// header.
Expand Down Expand Up @@ -458,6 +465,7 @@ where
),
num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(),
force_synced: net_config.network_config.force_synced,
service_rx,
genesis_hash,
important_peers,
Expand Down Expand Up @@ -495,6 +503,15 @@ where
))
}

fn is_synced(&self) -> bool {
if self.force_synced {
return true
}

!self.is_major_syncing.load(Ordering::Relaxed) &&
self.peers.iter().any(|(_peer_id, peer)| peer.info.is_synced)
}

/// Report Prometheus metrics.
pub fn report_metrics(&self) {
if let Some(metrics) = &self.metrics {
Expand All @@ -509,10 +526,12 @@ where
peer_id: &PeerId,
best_hash: B::Hash,
best_number: NumberFor<B>,
is_synced: bool,
) {
if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
peer.info.best_hash = best_hash;
peer.info.best_number = best_number;
peer.info.is_synced = is_synced;
}
}

Expand All @@ -524,10 +543,10 @@ where
match validation_result {
BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
if let Some((best_hash, best_number)) =
if let Some((best_hash, best_number, is_synced)) =
self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
{
self.update_peer_info(&peer_id, best_hash, best_number);
self.update_peer_info(&peer_id, best_hash, best_number, is_synced);
}

if let Some(data) = announce.data {
Expand Down Expand Up @@ -601,6 +620,7 @@ where
return
}

let is_synced = self.is_synced();
let is_best = self.client.info().best_hash == hash;
log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");

Expand All @@ -614,6 +634,7 @@ where
log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
let message = BlockAnnounce {
header: header.clone(),
is_synced,
state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
data: Some(data.clone()),
};
Expand Down Expand Up @@ -796,6 +817,7 @@ where
number,
hash,
self.genesis_hash,
self.is_synced(),
)
.encode(),
);
Expand Down Expand Up @@ -1081,6 +1103,7 @@ where
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number,
is_synced: status.is_synced,
},
known_blocks: LruHashSet::new(
NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
Expand All @@ -1090,7 +1113,12 @@ where

// Only forward full peers to syncing strategy.
if status.roles.is_full() {
self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
self.strategy.add_peer(
peer_id,
peer.info.best_hash,
peer.info.best_number,
peer.info.is_synced,
);
}

log::debug!(target: LOG_TARGET, "Connected {peer_id}");
Expand Down Expand Up @@ -1338,6 +1366,7 @@ where
set_config: &SetConfig,
metrics: NotificationMetrics,
peer_store_handle: Arc<dyn PeerStoreProvider>,
force_synced: bool,
) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
let block_announces_protocol = {
let genesis_hash = genesis_hash.as_ref();
Expand All @@ -1361,6 +1390,7 @@ where
best_number,
best_hash,
genesis_hash,
force_synced,
))),
set_config.clone(),
metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ mod tests {
best_hash: Hash::random(),
best_number: u64::arbitrary(g),
state: ArbitraryPeerSyncState::arbitrary(g).0,
is_synced: bool::arbitrary(g),
};
ArbitraryPeerSync(ps)
}
Expand Down
38 changes: 25 additions & 13 deletions substrate/client/network/sync/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub struct SyncingStrategy<B: BlockT, Client> {
chain_sync: Option<ChainSync<B, Client>>,
/// Connected peers and their best blocks used to seed a new strategy when switching to it in
/// [`SyncingStrategy::proceed_to_next`].
peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>, bool)>,
}

impl<B: BlockT, Client> SyncingStrategy<B, Client>
Expand Down Expand Up @@ -230,12 +230,20 @@ where
}

/// Notify that a new peer has connected.
pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
pub fn add_peer(
&mut self,
peer_id: PeerId,
best_hash: B::Hash,
best_number: NumberFor<B>,
is_synced: bool,
) {
self.peer_best_blocks.insert(peer_id, (best_hash, best_number, is_synced));

self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
self.chain_sync
.as_mut()
.map(|s| s.add_peer(peer_id, best_hash, best_number, is_synced));
}

/// Notify that a peer has disconnected.
Expand All @@ -255,7 +263,7 @@ where
is_best: bool,
peer_id: PeerId,
announce: &BlockAnnounce<B::Header>,
) -> Option<(B::Hash, NumberFor<B>)> {
) -> Option<(B::Hash, NumberFor<B>, bool)> {
let new_best = if let Some(ref mut warp) = self.warp {
warp.on_validated_block_announce(is_best, peer_id, announce)
} else if let Some(ref mut state) = self.state {
Expand All @@ -265,7 +273,7 @@ where
} else {
error!(target: LOG_TARGET, "No syncing strategy is active.");
debug_assert!(false);
Some((announce.header.hash(), *announce.header.number()))
Some((announce.header.hash(), *announce.header.number(), false))
};

if let Some(new_best) = new_best {
Expand Down Expand Up @@ -540,7 +548,7 @@ where
false,
self.peer_best_blocks
.iter()
.map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
.map(|(peer_id, (_, best_number, _))| (*peer_id, *best_number)),
);

self.warp = None;
Expand All @@ -559,9 +567,11 @@ where
self.config.max_parallel_downloads,
self.config.max_blocks_per_request,
self.config.metrics_registry.clone(),
self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
(*peer_id, *best_hash, *best_number)
}),
self.peer_best_blocks.iter().map(
|(peer_id, (best_hash, best_number, is_synced))| {
(*peer_id, *best_hash, *best_number, *is_synced)
},
),
) {
Ok(chain_sync) => chain_sync,
Err(e) => {
Expand All @@ -588,9 +598,11 @@ where
self.config.max_parallel_downloads,
self.config.max_blocks_per_request,
self.config.metrics_registry.clone(),
self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
(*peer_id, *best_hash, *best_number)
}),
self.peer_best_blocks.iter().map(
|(peer_id, (best_hash, best_number, is_synced))| {
(*peer_id, *best_hash, *best_number, *is_synced)
},
),
) {
Ok(chain_sync) => chain_sync,
Err(e) => {
Expand Down
Loading

0 comments on commit 9b09c04

Please sign in to comment.