From 9f330d28ff2cdb9c1e582c6f9444a592e28b946f Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 15 Feb 2021 11:31:59 +1000 Subject: [PATCH 1/2] Revert "Stop using CallAllUnordered in peer_set::add_initial_peers (#1705)" This reverts commit 241c7ad849817f108297ecaff739ebe676badbb6. --- zebra-network/src/peer_set/initialize.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 5e6a939ea45..0d5e8afab0a 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -194,10 +194,9 @@ where S::Future: Send + 'static, { info!(?initial_peers, "Connecting to initial peer set"); - let mut handshakes = initial_peers - .iter() - .map(|request| connector.clone().oneshot(*request)) - .collect::>(); + use tower::util::CallAllUnordered; + let addr_stream = futures::stream::iter(initial_peers.into_iter()); + let mut handshakes = CallAllUnordered::new(connector, addr_stream); while let Some(handshake_result) = handshakes.next().await { tx.send(handshake_result).await?; From 9bf0b70915b3bd170fe5f38ebedfcda76da205be Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 15 Feb 2021 11:43:49 +1000 Subject: [PATCH 2/2] Add a correctness comment to justify the revert --- zebra-network/src/peer_set/initialize.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 0d5e8afab0a..4da309dcd9e 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -194,6 +194,12 @@ where S::Future: Send + 'static, { info!(?initial_peers, "Connecting to initial peer set"); + // ## Correctness: + // + // Each `CallAll` can hold one `Buffer` or `Batch` reservation for + // an indefinite period. We can use `CallAllUnordered` without filling + // the underlying `Inbound` buffer, because we immediately drive this + // single `CallAll` to completion, and handshakes have a short timeout. use tower::util::CallAllUnordered; let addr_stream = futures::stream::iter(initial_peers.into_iter()); let mut handshakes = CallAllUnordered::new(connector, addr_stream);