Skip to content

Commit

Permalink
Fix task handling bugs, so peers are more likely to be available (#3191)
Browse files Browse the repository at this point in the history
* Tweak crawler timings so peers are more likely to be available

* Tweak min peer connection interval so we try all peers

* Let other tasks run between fanouts, so we're more likely to choose different peers

* Let other tasks run between retries, so we're more likely to choose different peers

* Let other tasks run after peer crawler DemandDrop

This makes it more likely that peers will become ready.
  • Loading branch information
teor2345 authored Dec 19, 2021
1 parent cb21321 commit 6cbd7dc
Show file tree
Hide file tree
Showing 13 changed files with 534 additions and 415 deletions.
2 changes: 1 addition & 1 deletion zebra-chain/src/parameters/network_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub(crate) const CONSENSUS_BRANCH_IDS: &[(NetworkUpgrade, ConsensusBranchId)] =
const PRE_BLOSSOM_POW_TARGET_SPACING: i64 = 150;

/// The target block spacing after Blossom activation.
const POST_BLOSSOM_POW_TARGET_SPACING: i64 = 75;
pub const POST_BLOSSOM_POW_TARGET_SPACING: i64 = 75;

/// The averaging window for difficulty threshold arithmetic mean calculations.
///
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl Default for Config {
network: Network::Mainnet,
initial_mainnet_peers: mainnet_peers,
initial_testnet_peers: testnet_peers,
crawl_new_peer_interval: Duration::from_secs(60),
crawl_new_peer_interval: constants::DEFAULT_CRAWL_NEW_PEER_INTERVAL,

// The default peerset target size should be large enough to ensure
// nodes have a reliable set of peers. But it should also be limited
Expand Down
44 changes: 40 additions & 4 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,21 @@ pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(4);
/// This avoids explicit synchronization, but relies on the peer
/// connector actually setting up channels and these heartbeats in a
/// specific manner that matches up with this math.
pub const MIN_PEER_RECONNECTION_DELAY: Duration = Duration::from_secs(60 + 20 + 20 + 20);
pub const MIN_PEER_RECONNECTION_DELAY: Duration = Duration::from_secs(59 + 20 + 20 + 20);

/// The default peer address crawler interval.
///
/// This should be at least [`HANDSHAKE_TIMEOUT`](constants::HANDSHAKE_TIMEOUT)
/// lower than all other crawler intervals.
///
/// This makes the following sequence of events more likely:
/// 1. a peer address crawl,
/// 2. new peer connections,
/// 3. peer requests from other crawlers.
///
/// Using a prime number makes sure that peer address crawls
/// don't synchronise with other crawls.
pub const DEFAULT_CRAWL_NEW_PEER_INTERVAL: Duration = Duration::from_secs(61);

/// The maximum duration since a peer was last seen to consider it reachable.
///
Expand All @@ -89,23 +103,27 @@ pub const MAX_RECENT_PEER_AGE: Duration32 = Duration32::from_days(3);

/// Regular interval for sending keepalive `Ping` messages to each
/// connected peer.
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);
///
/// Using a prime number makes sure that heartbeats don't synchronise with crawls.
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(59);

/// The minimum time between successive calls to [`CandidateSet::next()`][Self::next].
///
/// ## Security
///
/// Zebra resists distributed denial of service attacks by making sure that new peer connections
/// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart.
pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100);
pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(25);

/// The minimum time between successive calls to [`CandidateSet::update()`][Self::update].
///
/// Using a prime number makes sure that peer address crawls don't synchronise with other crawls.
///
/// ## Security
///
/// Zebra resists distributed denial of service attacks by making sure that requests for more
/// peer addresses are sent at least `MIN_PEER_GET_ADDR_INTERVAL` apart.
pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(30);
pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(31);

/// The combined timeout for all the requests in [`CandidateSet::update()`][Self::update].
///
Expand Down Expand Up @@ -255,6 +273,8 @@ pub mod magics {
#[cfg(test)]
mod tests {

use std::convert::TryFrom;

use super::*;

/// This assures that the `Duration` value we are computing for
Expand Down Expand Up @@ -287,6 +307,22 @@ mod tests {

assert!(EWMA_DECAY_TIME_NANOS > request_timeout_nanos,
"The EWMA decay time should be higher than the request timeout, so timed out peers are penalised by the EWMA.");

assert!(
u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32")
* MIN_PEER_CONNECTION_INTERVAL
< MIN_PEER_RECONNECTION_DELAY,
"each peer should get at least one connection attempt in each connection interval",
);

assert!(
MIN_PEER_RECONNECTION_DELAY.as_secs()
/ (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32")
* MIN_PEER_CONNECTION_INTERVAL)
.as_secs()
<= 2,
"each peer should only have a few connection attempts in each connection interval",
);
}

/// Make sure that peer age limits are consistent with each other.
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub use crate::{
meta_addr::PeerAddrState,
peer::{HandshakeError, PeerError, SharedPeerError},
peer_set::init,
policies::{RetryErrors, RetryLimit},
policies::RetryLimit,
protocol::internal::{Request, Response},
};

Expand Down
11 changes: 8 additions & 3 deletions zebra-network/src/peer_set/candidate_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,14 @@ where
let mut more_peers = None;

// Launch requests
//
// TODO: launch each fanout in its own task (might require tokio 1.6)
for _ in 0..fanout_limit {
for attempt in 0..fanout_limit {
if attempt > 0 {
// Let other tasks run, so we're more likely to choose a different peer.
//
// TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
tokio::task::yield_now().await;
}

let peer_service = self.peer_service.ready().await?;
responses.push(peer_service.call(Request::Peers));
}
Expand Down
1 change: 0 additions & 1 deletion zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,6 @@ where
// congested it can generate a lot of demand signal very
// rapidly.
trace!("too many open connections or in-flight handshakes, dropping demand signal");
continue;
}
DemandHandshake { candidate } => {
// Increment the connection count before we spawn the connection.
Expand Down
45 changes: 19 additions & 26 deletions zebra-network/src/policies.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use futures::future;
use std::pin::Pin;

use futures::{Future, FutureExt};
use tower::retry::Policy;

/// A very basic retry policy with a limited number of retry attempts.
Expand All @@ -19,14 +21,26 @@ impl RetryLimit {
}

impl<Req: Clone + std::fmt::Debug, Res, E: std::fmt::Debug> Policy<Req, Res, E> for RetryLimit {
type Future = future::Ready<Self>;
type Future = Pin<Box<dyn Future<Output = Self> + Send + 'static>>;

fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if let Err(e) = result {
if self.remaining_tries > 0 {
tracing::debug!(?req, ?e, remaining_tries = self.remaining_tries, "retrying");
Some(future::ready(RetryLimit {
remaining_tries: self.remaining_tries - 1,
}))
let remaining_tries = self.remaining_tries - 1;

Some(
async move {
// Let other tasks run, so we're more likely to choose a different peer,
// and so that any notfound inv entries win the race to the PeerSet.
//
// TODO: move syncer retries into the PeerSet,
// so we always choose different peers (#3235)
tokio::task::yield_now().await;
RetryLimit { remaining_tries }
}
.boxed(),
)
} else {
None
}
Expand All @@ -39,24 +53,3 @@ impl<Req: Clone + std::fmt::Debug, Res, E: std::fmt::Debug> Policy<Req, Res, E>
Some(req.clone())
}
}

/// A very basic retry policy that always retries failed requests.
///
/// XXX remove this when https://github.com/tower-rs/tower/pull/414 lands.
#[derive(Clone, Debug)]
pub struct RetryErrors;

impl<Req: Clone, Res, E> Policy<Req, Res, E> for RetryErrors {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if result.is_err() {
Some(future::ready(RetryErrors))
} else {
None
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
17 changes: 15 additions & 2 deletions zebrad/src/components/mempool/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ mod tests;
const FANOUT: usize = 3;

/// The delay between crawl events.
const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75);
///
/// This should be less than the target block interval,
/// so that we crawl peer mempools at least once per block.
///
/// Using a prime number makes sure that mempool crawler fanouts
/// don't synchronise with other crawls.
const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73);

/// The time to wait for a peer response.
///
Expand Down Expand Up @@ -194,7 +200,14 @@ where

let mut requests = FuturesUnordered::new();
// get readiness for one peer at a time, to avoid peer set contention
for _ in 0..FANOUT {
for attempt in 0..FANOUT {
if attempt > 0 {
// Let other tasks run, so we're more likely to choose a different peer.
//
// TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
tokio::task::yield_now().await;
}

let mut peer_set = peer_set.clone();
// end the task on permanent peer set errors
let peer_set = peer_set.ready().await?;
Expand Down
Loading

0 comments on commit 6cbd7dc

Please sign in to comment.