Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/network: Make NetworkService::set_priority_group async #7352

Merged
1 commit merged into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/authority-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"]
prost-build = "0.6.1"

[dependencies]
async-trait = "0.1"
bytes = "0.5.0"
codec = { package = "parity-scale-codec", default-features = false, version = "1.3.4" }
derive_more = "0.99.2"
Expand Down
15 changes: 9 additions & 6 deletions client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use futures::{FutureExt, Stream, StreamExt, stream::Fuse};
use futures_timer::Delay;

use addr_cache::AddrCache;
use async_trait::async_trait;
use codec::Decode;
use either::Either;
use libp2p::{core::multiaddr, multihash::Multihash};
Expand Down Expand Up @@ -267,7 +268,7 @@ where
},
// Set peerset priority group to a new random set of addresses.
_ = self.priority_group_set_interval.next().fuse() => {
if let Err(e) = self.set_priority_group() {
if let Err(e) = self.set_priority_group().await {
error!(
target: LOG_TARGET,
"Failed to set priority group: {:?}", e,
Expand Down Expand Up @@ -629,7 +630,7 @@ where

/// Set the peer set 'authority' priority group to a new random set of
/// [`Multiaddr`]s.
fn set_priority_group(&self) -> Result<()> {
async fn set_priority_group(&self) -> Result<()> {
let addresses = self.addr_cache.get_random_subset();

if addresses.is_empty() {
Expand All @@ -653,7 +654,7 @@ where
.set_priority_group(
AUTHORITIES_PRIORITY_GROUP_NAME.to_string(),
addresses.into_iter().collect(),
)
).await
.map_err(Error::SettingPeersetPriorityGroup)?;

Ok(())
Expand All @@ -663,9 +664,10 @@ where
/// NetworkProvider provides [`Worker`] with all necessary hooks into the
/// underlying Substrate networking. Using this trait abstraction instead of [`NetworkService`]
/// directly is necessary to unit test [`Worker`].
#[async_trait]
pub trait NetworkProvider: NetworkStateInfo {
/// Modify a peerset priority group.
fn set_priority_group(
async fn set_priority_group(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
Expand All @@ -678,17 +680,18 @@ pub trait NetworkProvider: NetworkStateInfo {
fn get_value(&self, key: &libp2p::kad::record::Key);
}

#[async_trait::async_trait]
impl<B, H> NetworkProvider for sc_network::NetworkService<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
fn set_priority_group(
async fn set_priority_group(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
) -> std::result::Result<(), String> {
self.set_priority_group(group_id, peers)
self.set_priority_group(group_id, peers).await
}
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>) {
self.put_value(key, value)
Expand Down
8 changes: 5 additions & 3 deletions client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::worker::schema;

use std::{iter::FromIterator, sync::{Arc, Mutex}, task::Poll};

use async_trait::async_trait;
use futures::channel::mpsc::{self, channel};
use futures::executor::{block_on, LocalPool};
use futures::future::FutureExt;
Expand Down Expand Up @@ -213,8 +214,9 @@ impl Default for TestNetwork {
}
}

#[async_trait]
impl NetworkProvider for TestNetwork {
fn set_priority_group(
async fn set_priority_group(
&self,
group_id: String,
peers: HashSet<Multiaddr>,
Expand Down Expand Up @@ -424,7 +426,7 @@ fn publish_discover_cycle() {
// Make authority discovery handle the event.
worker.handle_dht_event(dht_event).await;

worker.set_priority_group().unwrap();
worker.set_priority_group().await.unwrap();

// Expect authority discovery to set the priority set.
assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1);
Expand Down Expand Up @@ -623,7 +625,7 @@ fn never_add_own_address_to_priority_group() {
sentry_worker.start_new_lookups();

sentry_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
sentry_worker.set_priority_group().unwrap();
block_on(sentry_worker.set_priority_group()).unwrap();

assert_eq!(
sentry_network.set_priority_group_call.lock().unwrap().len(), 1,
Expand Down
5 changes: 4 additions & 1 deletion client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
///
/// Returns an `Err` if one of the given addresses is invalid or contains an
/// invalid peer ID (which includes the local peer ID).
pub fn set_priority_group(&self, group_id: String, peers: HashSet<Multiaddr>) -> Result<(), String> {
//
// NOTE: even though this function is currently sync, it's marked as async for
// future-proofing, see https://github.com/paritytech/substrate/pull/7247#discussion_r502263451.
pub async fn set_priority_group(&self, group_id: String, peers: HashSet<Multiaddr>) -> Result<(), String> {
let peers = self.split_multiaddr_and_peer_id(peers)?;

let peer_ids = peers.iter().map(|(peer_id, _addr)| peer_id.clone()).collect();
Expand Down