From eb6be0707d0726f25012bb38a8d4c36ab32ffbc2 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 18 May 2021 20:05:47 +0200 Subject: [PATCH 1/4] validator_discovery: less flexible, but simpler design --- .../availability-distribution/src/lib.rs | 14 +- .../src/pov_requester/mod.rs | 118 +--------- node/network/bridge/src/lib.rs | 2 - .../network/bridge/src/validator_discovery.rs | 213 ++---------------- .../collator-protocol/src/collator_side.rs | 30 +-- node/network/gossip-support/src/lib.rs | 14 +- node/subsystem/src/messages.rs | 9 +- 7 files changed, 40 insertions(+), 360 deletions(-) diff --git a/node/network/availability-distribution/src/lib.rs b/node/network/availability-distribution/src/lib.rs index bb2341ab3f8b..d2659d404394 100644 --- a/node/network/availability-distribution/src/lib.rs +++ b/node/network/availability-distribution/src/lib.rs @@ -90,7 +90,7 @@ impl AvailabilityDistributionSubsystem { Context: SubsystemContext + Sync + Send, { let mut requester = Requester::new(self.metrics.clone()).fuse(); - let mut pov_requester = PoVRequester::new(); + let pov_requester = PoVRequester::new(); loop { let action = { let mut subsystem_next = ctx.recv().fuse(); @@ -113,18 +113,6 @@ impl AvailabilityDistributionSubsystem { }; match message { FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { - let result = pov_requester.update_connected_validators( - &mut ctx, - &mut self.runtime, - &update, - ).await; - if let Err(error) = result { - tracing::debug!( - target: LOG_TARGET, - ?error, - "PoVRequester::update_connected_validators", - ); - } log_error( requester.get_mut().update_fetching_heads(&mut ctx, &mut self.runtime, update).await, "Error in Requester::update_fetching_heads" diff --git a/node/network/availability-distribution/src/pov_requester/mod.rs b/node/network/availability-distribution/src/pov_requester/mod.rs index 5b0038cd7537..c4f71d0a5b5e 100644 --- a/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/node/network/availability-distribution/src/pov_requester/mod.rs @@ -17,71 +17,31 @@ //! PoV requester takes care of requesting PoVs from validators of a backing group. use futures::{FutureExt, channel::oneshot, future::BoxFuture}; -use lru::LruCache; use polkadot_subsystem::jaeger; use polkadot_node_network_protocol::{ - peer_set::PeerSet, request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests}, v1::{PoVFetchingRequest, PoVFetchingResponse}} }; use polkadot_primitives::v1::{ - AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex + CandidateHash, Hash, ValidatorIndex, }; use polkadot_node_primitives::PoV; use polkadot_subsystem::{ - ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf, + SubsystemContext, messages::{AllMessages, NetworkBridgeMessage, IfDisconnected} }; -use polkadot_node_subsystem_util::runtime::{RuntimeInfo, ValidatorInfo}; +use polkadot_node_subsystem_util::runtime::RuntimeInfo; use crate::error::{Fatal, NonFatal}; use crate::LOG_TARGET; -/// Number of sessions we want to keep in the LRU. -const NUM_SESSIONS: usize = 2; - -pub struct PoVRequester { - /// We only ever care about being connected to validators of at most two sessions. - /// - /// So we keep an LRU for managing connection requests of size 2. - /// Cache will contain `None` if we are not a validator in that session. - connected_validators: LruCache>>, -} +pub struct PoVRequester {} impl PoVRequester { /// Create a new requester for PoVs. pub fn new() -> Self { - Self { - connected_validators: LruCache::new(NUM_SESSIONS), - } - } - - /// Make sure we are connected to the right set of validators. - /// - /// On every `ActiveLeavesUpdate`, we check whether we are connected properly to our current - /// validator group. - pub async fn update_connected_validators( - &mut self, - ctx: &mut Context, - runtime: &mut RuntimeInfo, - update: &ActiveLeavesUpdate, - ) -> super::Result<()> - where - Context: SubsystemContext, - { - let activated = update.activated.iter().map(|ActivatedLeaf { hash: h, .. }| h); - let activated_sessions = - get_activated_sessions(ctx, runtime, activated).await?; - - for (parent, session_index) in activated_sessions { - if self.connected_validators.contains(&session_index) { - continue - } - let tx = connect_to_relevant_validators(ctx, runtime, parent, session_index).await?; - self.connected_validators.put(session_index, tx); - } - Ok(()) + Self {} } /// Start background worker for taking care of fetching the requested `PoV` from the network. @@ -170,74 +130,6 @@ async fn do_fetch_pov( } } -/// Get the session indeces for the given relay chain parents. -async fn get_activated_sessions(ctx: &mut Context, runtime: &mut RuntimeInfo, new_heads: impl Iterator) - -> super::Result> -where - Context: SubsystemContext, -{ - let mut sessions = Vec::new(); - for parent in new_heads { - sessions.push((*parent, runtime.get_session_index(ctx, *parent).await?)); - } - Ok(sessions.into_iter()) -} - -/// Connect to validators of our validator group. -async fn connect_to_relevant_validators( - ctx: &mut Context, - runtime: &mut RuntimeInfo, - parent: Hash, - session: SessionIndex -) - -> super::Result>> -where - Context: SubsystemContext, -{ - if let Some(validator_ids) = determine_relevant_validators(ctx, runtime, parent, session).await? { - let (tx, keep_alive) = oneshot::channel(); - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { - validator_ids, peer_set: PeerSet::Validation, keep_alive - })).await; - Ok(Some(tx)) - } else { - Ok(None) - } -} - -/// Get the validators in our validator group. -/// -/// Return: `None` if not a validator. -async fn determine_relevant_validators( - ctx: &mut Context, - runtime: &mut RuntimeInfo, - parent: Hash, - session: SessionIndex, -) - -> super::Result>> -where - Context: SubsystemContext, -{ - let info = runtime.get_session_info_by_index(ctx, parent, session).await?; - if let ValidatorInfo { - our_index: Some(our_index), - our_group: Some(our_group) - } = &info.validator_info { - - let indeces = info.session_info.validator_groups.get(our_group.0 as usize) - .expect("Our group got retrieved from that session info, it must exist. qed.") - .clone(); - Ok(Some( - indeces.into_iter() - .filter(|i| *i != *our_index) - .map(|i| info.session_info.discovery_keys[i.0 as usize].clone()) - .collect() - )) - } else { - Ok(None) - } -} - #[cfg(test)] mod tests { use assert_matches::assert_matches; diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index fcaf7f0f7eca..0a26de9fc757 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -509,7 +509,6 @@ where NetworkBridgeMessage::ConnectToValidators { validator_ids, peer_set, - keep_alive, } => { tracing::trace!( target: LOG_TARGET, @@ -522,7 +521,6 @@ where let (ns, ads) = validator_discovery.on_request( validator_ids, peer_set, - keep_alive, network_service, authority_discovery_service, ).await; diff --git a/node/network/bridge/src/validator_discovery.rs b/node/network/bridge/src/validator_discovery.rs index 74796757c3f9..01e078e81402 100644 --- a/node/network/bridge/src/validator_discovery.rs +++ b/node/network/bridge/src/validator_discovery.rs @@ -19,10 +19,9 @@ use crate::Network; use core::marker::PhantomData; -use std::collections::{HashSet, HashMap, hash_map}; +use std::collections::HashSet; use async_trait::async_trait; -use futures::channel::oneshot; use sc_network::multiaddr::Multiaddr; use sc_authority_discovery::Service as AuthorityDiscoveryService; @@ -52,51 +51,6 @@ impl AuthorityDiscovery for AuthorityDiscoveryService { } } -/// This struct tracks the state for one `ConnectToValidators` request. -struct NonRevokedConnectionRequestState { - requested: Vec, - keep_alive: oneshot::Receiver<()>, -} - -impl NonRevokedConnectionRequestState { - /// Create a new instance of `ConnectToValidatorsState`. - pub fn new( - requested: Vec, - keep_alive: oneshot::Receiver<()>, - ) -> Self { - Self { - requested, - keep_alive, - } - } - - /// Returns `true` if the request is revoked. - pub fn is_revoked(&mut self) -> bool { - self.keep_alive.try_recv().is_err() - } - - pub fn requested(&self) -> &[AuthorityDiscoveryId] { - self.requested.as_ref() - } -} - -/// Will be called by [`Service::on_request`] when a request was revoked. -/// -/// Takes the `map` of requested validators and the `id` of the validator that should be revoked. -/// -/// Returns `Some(id)` iff the request counter is `0`. -fn on_revoke(map: &mut HashMap, id: AuthorityDiscoveryId) -> Option { - if let hash_map::Entry::Occupied(mut entry) = map.entry(id) { - *entry.get_mut() = entry.get().saturating_sub(1); - if *entry.get() == 0 { - return Some(entry.remove_entry().0); - } - } - - None -} - - pub(super) struct Service { state: PerPeerSet, // PhantomData used to make the struct generic instead of having generic methods @@ -105,111 +59,64 @@ pub(super) struct Service { #[derive(Default)] struct StatePerPeerSet { - // The `u64` counts the number of pending non-revoked requests for this validator - // note: the validators in this map are not necessarily present - // in the `connected_validators` map. - // Invariant: the value > 0 for non-revoked requests. - requested_validators: HashMap, - non_revoked_discovery_requests: Vec, + previously_requested: HashSet, } impl Service { pub fn new() -> Self { Self { - state: PerPeerSet::default(), + state: Default::default(), _phantom: PhantomData, } } /// On a new connection request, a peer set update will be issued. /// It will ask the network to connect to the validators and not disconnect - /// from them at least until all the pending requests containing them are revoked. + /// from them at least until the next request is issued for the same peer set. /// - /// This method will also clean up all previously revoked requests. + /// This method will also disconnect from previously connected validators not in the `validator_ids` set. /// it takes `network_service` and `authority_discovery_service` by value /// and returns them as a workaround for the Future: Send requirement imposed by async fn impl. pub async fn on_request( &mut self, validator_ids: Vec, peer_set: PeerSet, - keep_alive: oneshot::Receiver<()>, mut network_service: N, mut authority_discovery_service: AD, ) -> (N, AD) { - const MAX_ADDR_PER_PEER: usize = 3; - - let state = &mut self.state[peer_set]; - // Increment the counter of how many times the validators were requested. - validator_ids.iter().for_each(|id| *state.requested_validators.entry(id.clone()).or_default() += 1); - // collect multiaddress of validators - let mut multiaddr_to_add = HashSet::new(); - for authority in validator_ids.iter() { + let mut newly_requested = HashSet::new(); + for authority in validator_ids.into_iter() { let result = authority_discovery_service.get_addresses_by_authority_id(authority.clone()).await; if let Some(addresses) = result { - // We might have several `PeerId`s per `AuthorityId` - multiaddr_to_add.extend(addresses.into_iter().take(MAX_ADDR_PER_PEER)); + newly_requested.extend(addresses); } else { tracing::debug!(target: LOG_TARGET, "Authority Discovery couldn't resolve {:?}", authority); } } + let state = &mut self.state[peer_set]; // clean up revoked requests - let mut revoked_indices = Vec::new(); - let mut revoked_validators = Vec::new(); - for (i, maybe_revoked) in state.non_revoked_discovery_requests.iter_mut().enumerate() { - if maybe_revoked.is_revoked() { - for id in maybe_revoked.requested() { - if let Some(id) = on_revoke(&mut state.requested_validators, id.clone()) { - revoked_validators.push(id); - } - } - revoked_indices.push(i); - } - } - - // clean up revoked requests states - // - // note that the `.rev()` here is important to guarantee `swap_remove` - // doesn't invalidate unprocessed `revoked_indices` - for to_revoke in revoked_indices.into_iter().rev() { - drop(state.non_revoked_discovery_requests.swap_remove(to_revoke)); - } - - // multiaddresses to remove - let mut multiaddr_to_remove = HashSet::new(); - for id in revoked_validators.into_iter() { - let result = authority_discovery_service.get_addresses_by_authority_id(id.clone()).await; - if let Some(addresses) = result { - multiaddr_to_remove.extend(addresses.into_iter()); - } else { - tracing::debug!( - target: LOG_TARGET, - "Authority Discovery couldn't resolve {:?} on cleanup, a leak is possible", - id, - ); - } - } + let multiaddr_to_remove = state.previously_requested + .difference(&newly_requested) + .cloned() + .collect(); + state.previously_requested = newly_requested.clone(); // ask the network to connect to these nodes and not disconnect // from them until removed from the set if let Err(e) = network_service.add_to_peers_set( peer_set.into_protocol_name(), - multiaddr_to_add.clone(), + newly_requested, ).await { tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); } // the addresses are known to be valid let _ = network_service.remove_from_peers_set( peer_set.into_protocol_name(), - multiaddr_to_remove.clone() + multiaddr_to_remove ).await; - state.non_revoked_discovery_requests.push(NonRevokedConnectionRequestState::new( - validator_ids, - keep_alive, - )); - (network_service, authority_discovery_service) } } @@ -219,7 +126,7 @@ mod tests { use super::*; use crate::network::{Network, NetworkAction}; - use std::{borrow::Cow, pin::Pin}; + use std::{borrow::Cow, pin::Pin, collections::HashMap}; use futures::{sink::Sink, stream::BoxStream}; use sc_network::{Event as NetworkEvent, IfDisconnected}; use sp_keyring::Sr25519Keyring; @@ -317,26 +224,9 @@ mod tests { "/ip4/127.0.0.1/tcp/1236".parse().unwrap(), ] } - - #[test] - fn request_is_revoked_when_the_receiver_is_dropped() { - let (keep_alive_handle, keep_alive) = oneshot::channel(); - - let mut request = NonRevokedConnectionRequestState::new( - Vec::new(), - keep_alive, - ); - - assert!(!request.is_revoked()); - - drop(keep_alive_handle); - - assert!(request.is_revoked()); - } - // Test cleanup works. #[test] - fn requests_are_removed_on_revoke() { + fn old_multiaddrs_are_removed_on_new_request() { let mut service = new_service(); let (ns, ads) = new_network(); @@ -344,87 +234,22 @@ mod tests { let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); futures::executor::block_on(async move { - let (keep_alive_handle, keep_alive) = oneshot::channel(); - let (ns, ads) = service.on_request( vec![authority_ids[0].clone()], PeerSet::Validation, - keep_alive, ns, ads, ).await; - // revoke the request - drop(keep_alive_handle); - - let (_keep_alive_handle, keep_alive) = oneshot::channel(); - let _ = service.on_request( vec![authority_ids[1].clone()], PeerSet::Validation, - keep_alive, - ns, - ads, - ).await; - - let state = &service.state[PeerSet::Validation]; - assert_eq!(state.non_revoked_discovery_requests.len(), 1); - }); - } - - // More complex test with overlapping revoked requests - #[test] - fn revoking_requests_with_overlapping_validator_sets() { - let mut service = new_service(); - - let (ns, ads) = new_network(); - - let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); - - futures::executor::block_on(async move { - let (keep_alive_handle, keep_alive) = oneshot::channel(); - - let (ns, ads) = service.on_request( - vec![authority_ids[0].clone(), authority_ids[2].clone()], - PeerSet::Validation, - keep_alive, - ns, - ads, - ).await; - - // revoke the first request - drop(keep_alive_handle); - - let (keep_alive_handle, keep_alive) = oneshot::channel(); - - let (ns, ads) = service.on_request( - vec![authority_ids[0].clone(), authority_ids[1].clone()], - PeerSet::Validation, - keep_alive, - ns, - ads, - ).await; - - let state = &service.state[PeerSet::Validation]; - assert_eq!(state.non_revoked_discovery_requests.len(), 1); - assert_eq!(ns.peers_set.len(), 2); - - // revoke the second request - drop(keep_alive_handle); - - let (_keep_alive_handle, keep_alive) = oneshot::channel(); - - let (ns, _) = service.on_request( - vec![authority_ids[0].clone()], - PeerSet::Validation, - keep_alive, ns, ads, ).await; let state = &service.state[PeerSet::Validation]; - assert_eq!(state.non_revoked_discovery_requests.len(), 1); - assert_eq!(ns.peers_set.len(), 1); + assert_eq!(state.previously_requested.len(), 1); }); } } diff --git a/node/network/collator-protocol/src/collator_side.rs b/node/network/collator-protocol/src/collator_side.rs index 7a3a22dff8d9..edf9f340e4a2 100644 --- a/node/network/collator-protocol/src/collator_side.rs +++ b/node/network/collator-protocol/src/collator_side.rs @@ -217,9 +217,6 @@ struct State { /// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s by `PeerConnected` events. peer_ids: HashMap, - /// The connection handles to validators per group we are interested in. - connection_handles: HashMap>, - /// Metrics. metrics: Metrics, } @@ -240,7 +237,6 @@ impl State { collation_result_senders: Default::default(), our_validators_groups: Default::default(), peer_ids: Default::default(), - connection_handles: Default::default(), } } @@ -332,22 +328,15 @@ async fn distribute_collation( "Accepted collation, connecting to validators." ); - // Drop obsolete connections: - let new_groups: HashSet<_> = vec![current_validators.group, next_validators.group].into_iter().collect(); - state.connection_handles.retain(|k, _| new_groups.contains(k)); - let validator_group: HashSet<_> = current_validators.validators.iter().map(Clone::clone).collect(); // Issue a discovery request for the validators of the current group and the next group: connect_to_validators( ctx, - state, - current_validators, - ).await; - connect_to_validators( - ctx, - state, - next_validators, + current_validators.validators + .into_iter() + .chain(next_validators.validators.into_iter()) + .collect(), ).await; state.our_validators_groups.insert(relay_parent, validator_group.into()); @@ -461,19 +450,14 @@ async fn declare( /// Issue a connection request to a set of validators and /// revoke the previous connection request. -#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] async fn connect_to_validators( ctx: &mut impl SubsystemContext, - state: &mut State, - group: GroupValidators, + validator_ids: Vec, ) { - let (keep_alive_handle, keep_alive) = oneshot::channel(); - // Reconnect in all cases, as authority discovery cache might not have been fully populated - // last time: ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { - validator_ids: group.validators, peer_set: PeerSet::Collation, keep_alive + validator_ids, peer_set: PeerSet::Collation, })).await; - state.connection_handles.insert(group.group, keep_alive_handle); } /// Advertise collation to the given `peer`. diff --git a/node/network/gossip-support/src/lib.rs b/node/network/gossip-support/src/lib.rs index adfc6344e17a..c71b275ee6cc 100644 --- a/node/network/gossip-support/src/lib.rs +++ b/node/network/gossip-support/src/lib.rs @@ -18,7 +18,7 @@ //! and issuing a connection request to the validators relevant to //! the gossiping subsystems on every new session. -use futures::{channel::oneshot, FutureExt as _}; +use futures::FutureExt as _; use polkadot_node_subsystem::{ messages::{ AllMessages, GossipSupportMessage, NetworkBridgeMessage, @@ -44,8 +44,6 @@ pub struct GossipSupport { #[derive(Default)] struct State { last_session_index: Option, - /// when we overwrite this, it automatically drops the previous request - _last_connection_request: Option>, } impl GossipSupport { @@ -125,18 +123,13 @@ pub async fn connect_to_authorities( ctx: &mut impl SubsystemContext, validator_ids: Vec, peer_set: PeerSet, -) -> oneshot::Sender<()> { - let (keep_alive_handle, keep_alive) = oneshot::channel(); - +) { ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::ConnectToValidators { validator_ids, peer_set, - keep_alive, } )).await; - - keep_alive_handle } impl State { @@ -162,14 +155,13 @@ impl State { ensure_i_am_an_authority(keystore, &authorities).await?; tracing::debug!(target: LOG_TARGET, num = ?authorities.len(), "Issuing a connection request"); - let keep_alive_handle = connect_to_authorities( + connect_to_authorities( ctx, authorities, PeerSet::Validation, ).await; self.last_session_index = Some(new_session); - self._last_connection_request = Some(keep_alive_handle); } } diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index a0d274ec6fd4..edd6de39857a 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -240,7 +240,11 @@ pub enum NetworkBridgeMessage { /// Connect to peers who represent the given `validator_ids`. /// /// Also ask the network to stay connected to these peers at least - /// until the request is revoked. + /// until a new request is issued. + /// + /// Because it overrides the previous request, it must be ensured + /// that `validator_ids` include all peers the subsystems + /// are interested in (per `PeerSet`). /// /// A caller can learn about validator connections by listening to the /// `PeerConnected` events from the network bridge. @@ -249,9 +253,6 @@ pub enum NetworkBridgeMessage { validator_ids: Vec, /// The underlying protocol to use for this request. peer_set: PeerSet, - /// A request is revoked by dropping the `keep_alive` sender. - /// The revokation takes place upon the next connection request. - keep_alive: oneshot::Receiver<()>, }, } From 6f524b67708067fd498a84f0382d8fea397110cd Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 18 May 2021 21:14:57 +0200 Subject: [PATCH 2/4] fix test --- node/network/collator-protocol/src/collator_side.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side.rs b/node/network/collator-protocol/src/collator_side.rs index edf9f340e4a2..e01d41aa2b71 100644 --- a/node/network/collator-protocol/src/collator_side.rs +++ b/node/network/collator-protocol/src/collator_side.rs @@ -1261,14 +1261,6 @@ mod tests { } ) => {} ); - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators { - .. - } - ) => {} - ); } DistributeCollation { From e4cff4054861f23f6cc2b5378afe008bafcc38e0 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 19 May 2021 14:06:42 +0200 Subject: [PATCH 3/4] remove unused struct --- .../availability-distribution/src/lib.rs | 6 +- .../src/pov_requester/mod.rs | 99 +++++++++---------- 2 files changed, 46 insertions(+), 59 deletions(-) diff --git a/node/network/availability-distribution/src/lib.rs b/node/network/availability-distribution/src/lib.rs index d2659d404394..65580ea41d88 100644 --- a/node/network/availability-distribution/src/lib.rs +++ b/node/network/availability-distribution/src/lib.rs @@ -36,7 +36,6 @@ use requester::Requester; /// Handing requests for PoVs during backing. mod pov_requester; -use pov_requester::PoVRequester; /// Responding to erasure chunk requests: mod responder; @@ -90,7 +89,6 @@ impl AvailabilityDistributionSubsystem { Context: SubsystemContext + Sync + Send, { let mut requester = Requester::new(self.metrics.clone()).fuse(); - let pov_requester = PoVRequester::new(); loop { let action = { let mut subsystem_next = ctx.recv().fuse(); @@ -142,7 +140,7 @@ impl AvailabilityDistributionSubsystem { }, } => { log_error( - pov_requester.fetch_pov( + pov_requester::fetch_pov( &mut ctx, &mut self.runtime, relay_parent, @@ -151,7 +149,7 @@ impl AvailabilityDistributionSubsystem { pov_hash, tx, ).await, - "PoVRequester::fetch_pov" + "pov_requester::fetch_pov" )?; } } diff --git a/node/network/availability-distribution/src/pov_requester/mod.rs b/node/network/availability-distribution/src/pov_requester/mod.rs index c4f71d0a5b5e..e5ee1656e0e4 100644 --- a/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/node/network/availability-distribution/src/pov_requester/mod.rs @@ -36,59 +36,49 @@ use polkadot_node_subsystem_util::runtime::RuntimeInfo; use crate::error::{Fatal, NonFatal}; use crate::LOG_TARGET; -pub struct PoVRequester {} - -impl PoVRequester { - /// Create a new requester for PoVs. - pub fn new() -> Self { - Self {} - } - - /// Start background worker for taking care of fetching the requested `PoV` from the network. - pub async fn fetch_pov( - &self, - ctx: &mut Context, - runtime: &mut RuntimeInfo, - parent: Hash, - from_validator: ValidatorIndex, - candidate_hash: CandidateHash, - pov_hash: Hash, - tx: oneshot::Sender - ) -> super::Result<()> - where - Context: SubsystemContext, - { - let info = &runtime.get_session_info(ctx, parent).await?.session_info; - let authority_id = info.discovery_keys.get(from_validator.0 as usize) - .ok_or(NonFatal::InvalidValidatorIndex)? - .clone(); - let (req, pending_response) = OutgoingRequest::new( - Recipient::Authority(authority_id), - PoVFetchingRequest { - candidate_hash, - }, - ); - let full_req = Requests::PoVFetching(req); - - ctx.send_message( - AllMessages::NetworkBridge( - NetworkBridgeMessage::SendRequests( - vec![full_req], - // We are supposed to be connected to validators of our group via `PeerSet`, - // but at session boundaries that is kind of racy, in case a connection takes - // longer to get established, so we try to connect in any case. - IfDisconnected::TryConnect - ) - )).await; - - let span = jaeger::Span::new(candidate_hash, "fetch-pov") - .with_validator_index(from_validator) - .with_relay_parent(parent); - ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed()) - .await - .map_err(|e| Fatal::SpawnTask(e))?; - Ok(()) - } +/// Start background worker for taking care of fetching the requested `PoV` from the network. +pub async fn fetch_pov( + ctx: &mut Context, + runtime: &mut RuntimeInfo, + parent: Hash, + from_validator: ValidatorIndex, + candidate_hash: CandidateHash, + pov_hash: Hash, + tx: oneshot::Sender +) -> super::Result<()> +where + Context: SubsystemContext, +{ + let info = &runtime.get_session_info(ctx, parent).await?.session_info; + let authority_id = info.discovery_keys.get(from_validator.0 as usize) + .ok_or(NonFatal::InvalidValidatorIndex)? + .clone(); + let (req, pending_response) = OutgoingRequest::new( + Recipient::Authority(authority_id), + PoVFetchingRequest { + candidate_hash, + }, + ); + let full_req = Requests::PoVFetching(req); + + ctx.send_message( + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendRequests( + vec![full_req], + // We are supposed to be connected to validators of our group via `PeerSet`, + // but at session boundaries that is kind of racy, in case a connection takes + // longer to get established, so we try to connect in any case. + IfDisconnected::TryConnect + ) + )).await; + + let span = jaeger::Span::new(candidate_hash, "fetch-pov") + .with_validator_index(from_validator) + .with_relay_parent(parent); + ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed()) + .await + .map_err(|e| Fatal::SpawnTask(e))?; + Ok(()) } /// Future to be spawned for taking care of handling reception and sending of PoV. @@ -166,7 +156,6 @@ mod tests { } fn test_run(pov_hash: Hash, pov: PoV) { - let requester = PoVRequester::new(); let pool = TaskExecutor::new(); let (mut context, mut virtual_overseer) = test_helpers::make_subsystem_context::(pool.clone()); @@ -175,7 +164,7 @@ mod tests { let (tx, rx) = oneshot::channel(); let testee = async { - requester.fetch_pov( + fetch_pov( &mut context, &mut runtime, Hash::default(), From d6d4870865a63a3046f72c927c7579107a8c369f Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 19 May 2021 14:08:31 +0200 Subject: [PATCH 4/4] smol optimization --- node/network/bridge/src/validator_discovery.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/node/network/bridge/src/validator_discovery.rs b/node/network/bridge/src/validator_discovery.rs index 01e078e81402..63a968a6bca7 100644 --- a/node/network/bridge/src/validator_discovery.rs +++ b/node/network/bridge/src/validator_discovery.rs @@ -101,13 +101,16 @@ impl Service { .difference(&newly_requested) .cloned() .collect(); - state.previously_requested = newly_requested.clone(); + let multiaddr_to_add = newly_requested.difference(&state.previously_requested) + .cloned() + .collect(); + state.previously_requested = newly_requested; // ask the network to connect to these nodes and not disconnect // from them until removed from the set if let Err(e) = network_service.add_to_peers_set( peer_set.into_protocol_name(), - newly_requested, + multiaddr_to_add, ).await { tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); }