From 1e6c5a98d19333188e97ade4fb369976c3a11ce0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 19 Oct 2020 14:45:22 +0200 Subject: [PATCH] client/network: Make NetworkService::set_priority_group async As done with `NetworkService::{add_to,remove_from}_priority_group`, make `NetworkService::set_priority_group` async as well. This future-proofs the API should we ever decide to use a bounded channel between `NetworkService` and `NetworkWorker`. --- Cargo.lock | 1 + client/authority-discovery/Cargo.toml | 1 + client/authority-discovery/src/worker.rs | 15 +++++++++------ client/authority-discovery/src/worker/tests.rs | 8 +++++--- client/network/src/service.rs | 5 ++++- 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 15408ba75af6c..2e02be5f65670 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6216,6 +6216,7 @@ dependencies = [ name = "sc-authority-discovery" version = "0.8.0" dependencies = [ + "async-trait", "bytes 0.5.6", "derive_more", "either", diff --git a/client/authority-discovery/Cargo.toml b/client/authority-discovery/Cargo.toml index 0e5c22380ded2..3b60136eacda6 100644 --- a/client/authority-discovery/Cargo.toml +++ b/client/authority-discovery/Cargo.toml @@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.6.1" [dependencies] +async-trait = "0.1" bytes = "0.5.0" codec = { package = "parity-scale-codec", default-features = false, version = "1.3.4" } derive_more = "0.99.2" diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs index ca8a1bdd6370d..f204b3adf9bb5 100644 --- a/client/authority-discovery/src/worker.rs +++ b/client/authority-discovery/src/worker.rs @@ -27,6 +27,7 @@ use futures::{FutureExt, Stream, StreamExt, stream::Fuse}; use futures_timer::Delay; use addr_cache::AddrCache; +use async_trait::async_trait; use codec::Decode; use either::Either; use libp2p::{core::multiaddr, multihash::Multihash}; @@ -267,7 +268,7 @@ where }, // Set peerset priority group to a new random set of addresses. _ = self.priority_group_set_interval.next().fuse() => { - if let Err(e) = self.set_priority_group() { + if let Err(e) = self.set_priority_group().await { error!( target: LOG_TARGET, "Failed to set priority group: {:?}", e, @@ -629,7 +630,7 @@ where /// Set the peer set 'authority' priority group to a new random set of /// [`Multiaddr`]s. - fn set_priority_group(&self) -> Result<()> { + async fn set_priority_group(&self) -> Result<()> { let addresses = self.addr_cache.get_random_subset(); if addresses.is_empty() { @@ -653,7 +654,7 @@ where .set_priority_group( AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses.into_iter().collect(), - ) + ).await .map_err(Error::SettingPeersetPriorityGroup)?; Ok(()) @@ -663,9 +664,10 @@ where /// NetworkProvider provides [`Worker`] with all necessary hooks into the /// underlying Substrate networking. Using this trait abstraction instead of [`NetworkService`] /// directly is necessary to unit test [`Worker`]. +#[async_trait] pub trait NetworkProvider: NetworkStateInfo { /// Modify a peerset priority group. - fn set_priority_group( + async fn set_priority_group( &self, group_id: String, peers: HashSet, @@ -678,17 +680,18 @@ pub trait NetworkProvider: NetworkStateInfo { fn get_value(&self, key: &libp2p::kad::record::Key); } +#[async_trait::async_trait] impl NetworkProvider for sc_network::NetworkService where B: BlockT + 'static, H: ExHashT, { - fn set_priority_group( + async fn set_priority_group( &self, group_id: String, peers: HashSet, ) -> std::result::Result<(), String> { - self.set_priority_group(group_id, peers) + self.set_priority_group(group_id, peers).await } fn put_value(&self, key: libp2p::kad::record::Key, value: Vec) { self.put_value(key, value) diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs index cb1f8df8a822d..98177f45729db 100644 --- a/client/authority-discovery/src/worker/tests.rs +++ b/client/authority-discovery/src/worker/tests.rs @@ -20,6 +20,7 @@ use crate::worker::schema; use std::{iter::FromIterator, sync::{Arc, Mutex}, task::Poll}; +use async_trait::async_trait; use futures::channel::mpsc::{self, channel}; use futures::executor::{block_on, LocalPool}; use futures::future::FutureExt; @@ -213,8 +214,9 @@ impl Default for TestNetwork { } } +#[async_trait] impl NetworkProvider for TestNetwork { - fn set_priority_group( + async fn set_priority_group( &self, group_id: String, peers: HashSet, @@ -424,7 +426,7 @@ fn publish_discover_cycle() { // Make authority discovery handle the event. worker.handle_dht_event(dht_event).await; - worker.set_priority_group().unwrap(); + worker.set_priority_group().await.unwrap(); // Expect authority discovery to set the priority set. assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1); @@ -623,7 +625,7 @@ fn never_add_own_address_to_priority_group() { sentry_worker.start_new_lookups(); sentry_worker.handle_dht_value_found_event(vec![dht_event]).unwrap(); - sentry_worker.set_priority_group().unwrap(); + block_on(sentry_worker.set_priority_group()).unwrap(); assert_eq!( sentry_network.set_priority_group_call.lock().unwrap().len(), 1, diff --git a/client/network/src/service.rs b/client/network/src/service.rs index af123e94fac4b..acfd0281db2df 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -972,7 +972,10 @@ impl NetworkService { /// /// Returns an `Err` if one of the given addresses is invalid or contains an /// invalid peer ID (which includes the local peer ID). - pub fn set_priority_group(&self, group_id: String, peers: HashSet) -> Result<(), String> { + // + // NOTE: even though this function is currently sync, it's marked as async for + // future-proofing, see https://github.com/paritytech/substrate/pull/7247#discussion_r502263451. + pub async fn set_priority_group(&self, group_id: String, peers: HashSet) -> Result<(), String> { let peers = self.split_multiaddr_and_peer_id(peers)?; let peer_ids = peers.iter().map(|(peer_id, _addr)| peer_id.clone()).collect();