From 6cbd7dce43db96de7624936dd4a542da7ffa5231 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 20 Dec 2021 09:02:31 +1000 Subject: [PATCH] Fix task handling bugs, so peers are more likely to be available (#3191) * Tweak crawler timings so peers are more likely to be available * Tweak min peer connection interval so we try all peers * Let other tasks run between fanouts, so we're more likely to choose different peers * Let other tasks run between retries, so we're more likely to choose different peers * Let other tasks run after peer crawler DemandDrop This makes it more likely that peers will become ready. --- zebra-chain/src/parameters/network_upgrade.rs | 2 +- zebra-network/src/config.rs | 2 +- zebra-network/src/constants.rs | 44 ++- zebra-network/src/lib.rs | 2 +- zebra-network/src/peer_set/candidate_set.rs | 11 +- zebra-network/src/peer_set/initialize.rs | 1 - zebra-network/src/policies.rs | 45 +-- zebrad/src/components/mempool/crawler.rs | 17 +- .../src/components/mempool/crawler/tests.rs | 371 +----------------- .../components/mempool/crawler/tests/prop.rs | 371 ++++++++++++++++++ .../mempool/crawler/tests/timing.rs | 29 ++ zebrad/src/components/sync.rs | 22 +- zebrad/src/components/sync/tests/timing.rs | 32 +- 13 files changed, 534 insertions(+), 415 deletions(-) create mode 100644 zebrad/src/components/mempool/crawler/tests/prop.rs create mode 100644 zebrad/src/components/mempool/crawler/tests/timing.rs diff --git a/zebra-chain/src/parameters/network_upgrade.rs b/zebra-chain/src/parameters/network_upgrade.rs index 08a7c9eb482..4bfe3b366bf 100644 --- a/zebra-chain/src/parameters/network_upgrade.rs +++ b/zebra-chain/src/parameters/network_upgrade.rs @@ -138,7 +138,7 @@ pub(crate) const CONSENSUS_BRANCH_IDS: &[(NetworkUpgrade, ConsensusBranchId)] = const PRE_BLOSSOM_POW_TARGET_SPACING: i64 = 150; /// The target block spacing after Blossom activation. -const POST_BLOSSOM_POW_TARGET_SPACING: i64 = 75; +pub const POST_BLOSSOM_POW_TARGET_SPACING: i64 = 75; /// The averaging window for difficulty threshold arithmetic mean calculations. /// diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index fe00e6dcefa..71982241d18 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -242,7 +242,7 @@ impl Default for Config { network: Network::Mainnet, initial_mainnet_peers: mainnet_peers, initial_testnet_peers: testnet_peers, - crawl_new_peer_interval: Duration::from_secs(60), + crawl_new_peer_interval: constants::DEFAULT_CRAWL_NEW_PEER_INTERVAL, // The default peerset target size should be large enough to ensure // nodes have a reliable set of peers. But it should also be limited diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 5abaf517b48..5b5c06455e1 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -64,7 +64,21 @@ pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(4); /// This avoids explicit synchronization, but relies on the peer /// connector actually setting up channels and these heartbeats in a /// specific manner that matches up with this math. -pub const MIN_PEER_RECONNECTION_DELAY: Duration = Duration::from_secs(60 + 20 + 20 + 20); +pub const MIN_PEER_RECONNECTION_DELAY: Duration = Duration::from_secs(59 + 20 + 20 + 20); + +/// The default peer address crawler interval. +/// +/// This should be at least [`HANDSHAKE_TIMEOUT`](constants::HANDSHAKE_TIMEOUT) +/// lower than all other crawler intervals. +/// +/// This makes the following sequence of events more likely: +/// 1. a peer address crawl, +/// 2. new peer connections, +/// 3. peer requests from other crawlers. +/// +/// Using a prime number makes sure that peer address crawls +/// don't synchronise with other crawls. +pub const DEFAULT_CRAWL_NEW_PEER_INTERVAL: Duration = Duration::from_secs(61); /// The maximum duration since a peer was last seen to consider it reachable. /// @@ -89,7 +103,9 @@ pub const MAX_RECENT_PEER_AGE: Duration32 = Duration32::from_days(3); /// Regular interval for sending keepalive `Ping` messages to each /// connected peer. -pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60); +/// +/// Using a prime number makes sure that heartbeats don't synchronise with crawls. +pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(59); /// The minimum time between successive calls to [`CandidateSet::next()`][Self::next]. /// @@ -97,15 +113,17 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60); /// /// Zebra resists distributed denial of service attacks by making sure that new peer connections /// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart. -pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); +pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(25); /// The minimum time between successive calls to [`CandidateSet::update()`][Self::update]. /// +/// Using a prime number makes sure that peer address crawls don't synchronise with other crawls. +/// /// ## Security /// /// Zebra resists distributed denial of service attacks by making sure that requests for more /// peer addresses are sent at least `MIN_PEER_GET_ADDR_INTERVAL` apart. -pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(30); +pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(31); /// The combined timeout for all the requests in [`CandidateSet::update()`][Self::update]. /// @@ -255,6 +273,8 @@ pub mod magics { #[cfg(test)] mod tests { + use std::convert::TryFrom; + use super::*; /// This assures that the `Duration` value we are computing for @@ -287,6 +307,22 @@ mod tests { assert!(EWMA_DECAY_TIME_NANOS > request_timeout_nanos, "The EWMA decay time should be higher than the request timeout, so timed out peers are penalised by the EWMA."); + + assert!( + u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32") + * MIN_PEER_CONNECTION_INTERVAL + < MIN_PEER_RECONNECTION_DELAY, + "each peer should get at least one connection attempt in each connection interval", + ); + + assert!( + MIN_PEER_RECONNECTION_DELAY.as_secs() + / (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32") + * MIN_PEER_CONNECTION_INTERVAL) + .as_secs() + <= 2, + "each peer should only have a few connection attempts in each connection interval", + ); } /// Make sure that peer age limits are consistent with each other. diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index 0261501798b..82c8ceba11e 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -73,7 +73,7 @@ pub use crate::{ meta_addr::PeerAddrState, peer::{HandshakeError, PeerError, SharedPeerError}, peer_set::init, - policies::{RetryErrors, RetryLimit}, + policies::RetryLimit, protocol::internal::{Request, Response}, }; diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index ab1b2790200..fb54ff5cd3d 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -259,9 +259,14 @@ where let mut more_peers = None; // Launch requests - // - // TODO: launch each fanout in its own task (might require tokio 1.6) - for _ in 0..fanout_limit { + for attempt in 0..fanout_limit { + if attempt > 0 { + // Let other tasks run, so we're more likely to choose a different peer. + // + // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214) + tokio::task::yield_now().await; + } + let peer_service = self.peer_service.ready().await?; responses.push(peer_service.call(Request::Peers)); } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 024b59a89a4..5fd8ecf5a98 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -696,7 +696,6 @@ where // congested it can generate a lot of demand signal very // rapidly. trace!("too many open connections or in-flight handshakes, dropping demand signal"); - continue; } DemandHandshake { candidate } => { // Increment the connection count before we spawn the connection. diff --git a/zebra-network/src/policies.rs b/zebra-network/src/policies.rs index 98df5db54b6..7cb6c48dbc0 100644 --- a/zebra-network/src/policies.rs +++ b/zebra-network/src/policies.rs @@ -1,4 +1,6 @@ -use futures::future; +use std::pin::Pin; + +use futures::{Future, FutureExt}; use tower::retry::Policy; /// A very basic retry policy with a limited number of retry attempts. @@ -19,14 +21,26 @@ impl RetryLimit { } impl Policy for RetryLimit { - type Future = future::Ready; + type Future = Pin + Send + 'static>>; + fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option { if let Err(e) = result { if self.remaining_tries > 0 { tracing::debug!(?req, ?e, remaining_tries = self.remaining_tries, "retrying"); - Some(future::ready(RetryLimit { - remaining_tries: self.remaining_tries - 1, - })) + let remaining_tries = self.remaining_tries - 1; + + Some( + async move { + // Let other tasks run, so we're more likely to choose a different peer, + // and so that any notfound inv entries win the race to the PeerSet. + // + // TODO: move syncer retries into the PeerSet, + // so we always choose different peers (#3235) + tokio::task::yield_now().await; + RetryLimit { remaining_tries } + } + .boxed(), + ) } else { None } @@ -39,24 +53,3 @@ impl Policy Some(req.clone()) } } - -/// A very basic retry policy that always retries failed requests. -/// -/// XXX remove this when https://github.com/tower-rs/tower/pull/414 lands. -#[derive(Clone, Debug)] -pub struct RetryErrors; - -impl Policy for RetryErrors { - type Future = future::Ready; - fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option { - if result.is_err() { - Some(future::ready(RetryErrors)) - } else { - None - } - } - - fn clone_request(&self, req: &Req) -> Option { - Some(req.clone()) - } -} diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index 20d93eacc11..1ffd4a0f277 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -69,7 +69,13 @@ mod tests; const FANOUT: usize = 3; /// The delay between crawl events. -const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75); +/// +/// This should be less than the target block interval, +/// so that we crawl peer mempools at least once per block. +/// +/// Using a prime number makes sure that mempool crawler fanouts +/// don't synchronise with other crawls. +const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73); /// The time to wait for a peer response. /// @@ -194,7 +200,14 @@ where let mut requests = FuturesUnordered::new(); // get readiness for one peer at a time, to avoid peer set contention - for _ in 0..FANOUT { + for attempt in 0..FANOUT { + if attempt > 0 { + // Let other tasks run, so we're more likely to choose a different peer. + // + // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214) + tokio::task::yield_now().await; + } + let mut peer_set = peer_set.clone(); // end the task on permanent peer set errors let peer_set = peer_set.ready().await?; diff --git a/zebrad/src/components/mempool/crawler/tests.rs b/zebrad/src/components/mempool/crawler/tests.rs index 652a3a25aa9..16a759524df 100644 --- a/zebrad/src/components/mempool/crawler/tests.rs +++ b/zebrad/src/components/mempool/crawler/tests.rs @@ -1,369 +1,4 @@ -use std::{collections::HashSet, time::Duration}; +//! Tests for the mempool crawler. -use proptest::{ - collection::{hash_set, vec}, - prelude::*, -}; -use tokio::time; - -use zebra_chain::{parameters::Network, transaction::UnminedTxId}; -use zebra_network as zn; -use zebra_state::ChainTipSender; -use zebra_test::mock_service::{MockService, PropTestAssertion}; - -use crate::components::{ - mempool::{ - self, - crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY}, - downloads::Gossip, - error::MempoolError, - Config, - }, - sync::RecentSyncLengths, -}; - -/// The number of iterations to crawl while testing. -/// -/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test. -/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for -/// at least `CRAWL_ITERATIONS` times the timeout for receiving a request (see more information in -/// [`MockServiceBuilder::with_max_request_delay`]). -const CRAWL_ITERATIONS: usize = 4; - -/// The maximum number of transactions crawled from a mocked peer. -const MAX_CRAWLED_TX: usize = 10; - -/// The amount of time to advance beyond the expected instant that the crawler wakes up. -const ERROR_MARGIN: Duration = Duration::from_millis(100); - -/// A [`MockService`] representing the network service. -type MockPeerSet = MockService; - -/// A [`MockService`] representing the mempool service. -type MockMempool = MockService; - -proptest! { - /// Test if crawler periodically crawls for transaction IDs. - /// - /// The crawler should periodically perform a fanned-out series of requests to obtain - /// transaction IDs from other peers. These requests should only be sent if the mempool is - /// enabled, i.e., if the block synchronizer is likely close to the chain tip. - #[test] - fn crawler_requests_for_transaction_ids(mut sync_lengths in any::>()) { - let runtime = zebra_test::init_async(); - - // Add a dummy last element, so that all of the original values are used. - sync_lengths.push(0); - - runtime.block_on(async move { - let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths, _chain_tip_sender, - ) = setup_crawler(); - - time::pause(); - - for sync_length in sync_lengths { - let mempool_is_enabled = sync_status.is_close_to_tip(); - - for _ in 0..CRAWL_ITERATIONS { - for _ in 0..FANOUT { - if mempool_is_enabled { - respond_with_transaction_ids(&mut peer_set, HashSet::new()).await?; - } else { - peer_set.expect_no_requests().await?; - } - } - - peer_set.expect_no_requests().await?; - - time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await; - } - - // Applying the update event at the end of the test iteration means that the first - // iteration runs with an empty recent sync. lengths vector. A dummy element is - // appended to the events so that all of the original values are applied. - recent_sync_lengths.push_extend_tips_length(sync_length); - } - - Ok::<(), TestCaseError>(()) - })?; - } - - /// Test if crawled transactions are forwarded to the [`Mempool`][mempool::Mempool] service. - /// - /// The transaction IDs sent by other peers to the crawler should be forwarded to the - /// [`Mempool`][mempool::Mempool] service so that they can be downloaded, verified and added to - /// the mempool. - #[test] - fn crawled_transactions_are_forwarded_to_downloader( - transaction_ids in hash_set(any::(), 1..MAX_CRAWLED_TX), - ) { - let runtime = zebra_test::init_async(); - - let transaction_id_count = transaction_ids.len(); - - runtime.block_on(async move { - let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) = - setup_crawler(); - - time::pause(); - - // Mock end of chain sync to enable the mempool crawler. - SyncStatus::sync_close_to_tip(&mut recent_sync_lengths); - - crawler_iteration(&mut peer_set, vec![transaction_ids.clone()]).await?; - - respond_to_queue_request( - &mut mempool, - transaction_ids, - vec![Ok(()); transaction_id_count], - ).await?; - - mempool.expect_no_requests().await?; - - Ok::<(), TestCaseError>(()) - })?; - } - - /// Test if errors while forwarding transaction IDs do not stop the crawler. - /// - /// The crawler should continue operating normally if some transactions fail to download or - /// even if the mempool service fails to enqueue the transactions to be downloaded. - #[test] - fn transaction_id_forwarding_errors_dont_stop_the_crawler( - service_call_error in any::(), - transaction_ids_for_call_failure in hash_set(any::(), 1..MAX_CRAWLED_TX), - transaction_ids_and_responses in - vec(any::<(UnminedTxId, Result<(), MempoolError>)>(), 1..MAX_CRAWLED_TX), - transaction_ids_for_return_to_normal in hash_set(any::(), 1..MAX_CRAWLED_TX), - ) { - let runtime = zebra_test::init_async(); - - // Make transaction_ids_and_responses unique - let unique_transaction_ids_and_responses: HashSet = transaction_ids_and_responses.iter().map(|(id, _result)| id).copied().collect(); - let transaction_ids_and_responses: Vec<(UnminedTxId, Result<(), MempoolError>)> = unique_transaction_ids_and_responses.iter().map(|unique_id| transaction_ids_and_responses.iter().find(|(id, _result)| id == unique_id).unwrap()).cloned().collect(); - - runtime.block_on(async move { - let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) = - setup_crawler(); - - time::pause(); - - // Mock end of chain sync to enable the mempool crawler. - SyncStatus::sync_close_to_tip(&mut recent_sync_lengths); - - // Prepare to simulate download errors. - let download_result_count = transaction_ids_and_responses.len(); - let mut transaction_ids_for_download_errors = HashSet::with_capacity(download_result_count); - let mut download_result_list = Vec::with_capacity(download_result_count); - - for (transaction_id, result) in transaction_ids_and_responses { - transaction_ids_for_download_errors.insert(transaction_id); - download_result_list.push(result); - } - - // First crawl iteration: - // 1. Fails with a mempool call error - // 2. Some downloads fail - // Rest: no crawled transactions - crawler_iteration( - &mut peer_set, - vec![ - transaction_ids_for_call_failure.clone(), - transaction_ids_for_download_errors.clone(), - ], - ) - .await?; - - // First test with an error returned from the Mempool service. - respond_to_queue_request_with_error( - &mut mempool, - transaction_ids_for_call_failure, - service_call_error, - ).await?; - - // Then test a failure to download transactions. - respond_to_queue_request( - &mut mempool, - transaction_ids_for_download_errors, - download_result_list, - ).await?; - - mempool.expect_no_requests().await?; - - // Wait until next crawl iteration. - time::sleep(RATE_LIMIT_DELAY).await; - - // Second crawl iteration: - // The mempool should continue crawling normally. - crawler_iteration( - &mut peer_set, - vec![transaction_ids_for_return_to_normal.clone()], - ) - .await?; - - let response_list = vec![Ok(()); transaction_ids_for_return_to_normal.len()]; - - respond_to_queue_request( - &mut mempool, - transaction_ids_for_return_to_normal, - response_list, - ).await?; - - mempool.expect_no_requests().await?; - - Ok::<(), TestCaseError>(()) - })?; - } -} - -/// Spawn a crawler instance using mock services. -fn setup_crawler() -> ( - MockPeerSet, - MockMempool, - SyncStatus, - RecentSyncLengths, - ChainTipSender, -) { - let peer_set = MockService::build().for_prop_tests(); - let mempool = MockService::build().for_prop_tests(); - let (sync_status, recent_sync_lengths) = SyncStatus::new(); - - // the network should be irrelevant here - let (chain_tip_sender, _latest_chain_tip, chain_tip_change) = - ChainTipSender::new(None, Network::Mainnet); - - Crawler::spawn( - &Config::default(), - peer_set.clone(), - mempool.clone(), - sync_status.clone(), - chain_tip_change, - ); - - ( - peer_set, - mempool, - sync_status, - recent_sync_lengths, - chain_tip_sender, - ) -} - -/// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list. -async fn respond_with_transaction_ids( - peer_set: &mut MockPeerSet, - transaction_ids: HashSet, -) -> Result<(), TestCaseError> { - peer_set - .expect_request(zn::Request::MempoolTransactionIds) - .await? - .respond(zn::Response::TransactionIds( - transaction_ids.into_iter().collect(), - )); - - Ok(()) -} - -/// Intercept fanned-out requests for mempool transaction IDs and answer with the `responses`. -/// -/// Each item in `responses` is a list of transaction IDs to send back to a single request. -/// Therefore, each item represents the response sent by a peer in the network. -/// -/// If there are less items in `responses` the [`FANOUT`] number, then the remaining requests are -/// answered with an empty list of transaction IDs. -/// -/// # Panics -/// -/// If `responses` contains more items than the [`FANOUT`] number. -async fn crawler_iteration( - peer_set: &mut MockPeerSet, - responses: Vec>, -) -> Result<(), TestCaseError> { - let empty_responses = FANOUT - .checked_sub(responses.len()) - .expect("Too many responses to be sent in a single crawl iteration"); - - for response in responses { - respond_with_transaction_ids(peer_set, response).await?; - } - - for _ in 0..empty_responses { - respond_with_transaction_ids(peer_set, HashSet::new()).await?; - } - - peer_set.expect_no_requests().await?; - - Ok(()) -} - -/// Intercept request for mempool to download and verify transactions. -/// -/// The intercepted request will be verified to check if it has the `expected_transaction_ids`, and -/// it will be answered with a list of results, one for each transaction requested to be -/// downloaded. -/// -/// # Panics -/// -/// If `response` and `expected_transaction_ids` have different sizes. -async fn respond_to_queue_request( - mempool: &mut MockMempool, - expected_transaction_ids: HashSet, - response: Vec>, -) -> Result<(), TestCaseError> { - mempool - .expect_request_that(|req| { - if let mempool::Request::Queue(req) = req { - let ids: HashSet = req - .iter() - .filter_map(|gossip| { - if let Gossip::Id(id) = gossip { - Some(*id) - } else { - None - } - }) - .collect(); - ids == expected_transaction_ids - } else { - false - } - }) - .await? - .respond(mempool::Response::Queued(response)); - - Ok(()) -} - -/// Intercept request for mempool to download and verify transactions, and answer with an error. -/// -/// The intercepted request will be verified to check if it has the `expected_transaction_ids`, and -/// it will be answered with `error`, as if the service had an internal failure that prevented it -/// from queuing the transactions for downloading. -async fn respond_to_queue_request_with_error( - mempool: &mut MockMempool, - expected_transaction_ids: HashSet, - error: MempoolError, -) -> Result<(), TestCaseError> { - mempool - .expect_request_that(|req| { - if let mempool::Request::Queue(req) = req { - let ids: HashSet = req - .iter() - .filter_map(|gossip| { - if let Gossip::Id(id) = gossip { - Some(*id) - } else { - None - } - }) - .collect(); - ids == expected_transaction_ids - } else { - false - } - }) - .await? - .respond(Err(error)); - - Ok(()) -} +mod prop; +mod timing; diff --git a/zebrad/src/components/mempool/crawler/tests/prop.rs b/zebrad/src/components/mempool/crawler/tests/prop.rs new file mode 100644 index 00000000000..6eab653d39f --- /dev/null +++ b/zebrad/src/components/mempool/crawler/tests/prop.rs @@ -0,0 +1,371 @@ +//! Randomised property tests for the mempool crawler. + +use std::{collections::HashSet, time::Duration}; + +use proptest::{ + collection::{hash_set, vec}, + prelude::*, +}; +use tokio::time; + +use zebra_chain::{parameters::Network, transaction::UnminedTxId}; +use zebra_network as zn; +use zebra_state::ChainTipSender; +use zebra_test::mock_service::{MockService, PropTestAssertion}; + +use crate::components::{ + mempool::{ + self, + crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY}, + downloads::Gossip, + error::MempoolError, + Config, + }, + sync::RecentSyncLengths, +}; + +/// The number of iterations to crawl while testing. +/// +/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test. +/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for +/// at least `CRAWL_ITERATIONS` times the timeout for receiving a request (see more information in +/// [`MockServiceBuilder::with_max_request_delay`]). +const CRAWL_ITERATIONS: usize = 4; + +/// The maximum number of transactions crawled from a mocked peer. +const MAX_CRAWLED_TX: usize = 10; + +/// The amount of time to advance beyond the expected instant that the crawler wakes up. +const ERROR_MARGIN: Duration = Duration::from_millis(100); + +/// A [`MockService`] representing the network service. +type MockPeerSet = MockService; + +/// A [`MockService`] representing the mempool service. +type MockMempool = MockService; + +proptest! { + /// Test if crawler periodically crawls for transaction IDs. + /// + /// The crawler should periodically perform a fanned-out series of requests to obtain + /// transaction IDs from other peers. These requests should only be sent if the mempool is + /// enabled, i.e., if the block synchronizer is likely close to the chain tip. + #[test] + fn crawler_requests_for_transaction_ids(mut sync_lengths in any::>()) { + let runtime = zebra_test::init_async(); + + // Add a dummy last element, so that all of the original values are used. + sync_lengths.push(0); + + runtime.block_on(async move { + let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths, _chain_tip_sender, + ) = setup_crawler(); + + time::pause(); + + for sync_length in sync_lengths { + let mempool_is_enabled = sync_status.is_close_to_tip(); + + for _ in 0..CRAWL_ITERATIONS { + for _ in 0..FANOUT { + if mempool_is_enabled { + respond_with_transaction_ids(&mut peer_set, HashSet::new()).await?; + } else { + peer_set.expect_no_requests().await?; + } + } + + peer_set.expect_no_requests().await?; + + time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await; + } + + // Applying the update event at the end of the test iteration means that the first + // iteration runs with an empty recent sync. lengths vector. A dummy element is + // appended to the events so that all of the original values are applied. + recent_sync_lengths.push_extend_tips_length(sync_length); + } + + Ok::<(), TestCaseError>(()) + })?; + } + + /// Test if crawled transactions are forwarded to the [`Mempool`][mempool::Mempool] service. + /// + /// The transaction IDs sent by other peers to the crawler should be forwarded to the + /// [`Mempool`][mempool::Mempool] service so that they can be downloaded, verified and added to + /// the mempool. + #[test] + fn crawled_transactions_are_forwarded_to_downloader( + transaction_ids in hash_set(any::(), 1..MAX_CRAWLED_TX), + ) { + let runtime = zebra_test::init_async(); + + let transaction_id_count = transaction_ids.len(); + + runtime.block_on(async move { + let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) = + setup_crawler(); + + time::pause(); + + // Mock end of chain sync to enable the mempool crawler. + SyncStatus::sync_close_to_tip(&mut recent_sync_lengths); + + crawler_iteration(&mut peer_set, vec![transaction_ids.clone()]).await?; + + respond_to_queue_request( + &mut mempool, + transaction_ids, + vec![Ok(()); transaction_id_count], + ).await?; + + mempool.expect_no_requests().await?; + + Ok::<(), TestCaseError>(()) + })?; + } + + /// Test if errors while forwarding transaction IDs do not stop the crawler. + /// + /// The crawler should continue operating normally if some transactions fail to download or + /// even if the mempool service fails to enqueue the transactions to be downloaded. + #[test] + fn transaction_id_forwarding_errors_dont_stop_the_crawler( + service_call_error in any::(), + transaction_ids_for_call_failure in hash_set(any::(), 1..MAX_CRAWLED_TX), + transaction_ids_and_responses in + vec(any::<(UnminedTxId, Result<(), MempoolError>)>(), 1..MAX_CRAWLED_TX), + transaction_ids_for_return_to_normal in hash_set(any::(), 1..MAX_CRAWLED_TX), + ) { + let runtime = zebra_test::init_async(); + + // Make transaction_ids_and_responses unique + let unique_transaction_ids_and_responses: HashSet = transaction_ids_and_responses.iter().map(|(id, _result)| id).copied().collect(); + let transaction_ids_and_responses: Vec<(UnminedTxId, Result<(), MempoolError>)> = unique_transaction_ids_and_responses.iter().map(|unique_id| transaction_ids_and_responses.iter().find(|(id, _result)| id == unique_id).unwrap()).cloned().collect(); + + runtime.block_on(async move { + let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) = + setup_crawler(); + + time::pause(); + + // Mock end of chain sync to enable the mempool crawler. + SyncStatus::sync_close_to_tip(&mut recent_sync_lengths); + + // Prepare to simulate download errors. + let download_result_count = transaction_ids_and_responses.len(); + let mut transaction_ids_for_download_errors = HashSet::with_capacity(download_result_count); + let mut download_result_list = Vec::with_capacity(download_result_count); + + for (transaction_id, result) in transaction_ids_and_responses { + transaction_ids_for_download_errors.insert(transaction_id); + download_result_list.push(result); + } + + // First crawl iteration: + // 1. Fails with a mempool call error + // 2. Some downloads fail + // Rest: no crawled transactions + crawler_iteration( + &mut peer_set, + vec![ + transaction_ids_for_call_failure.clone(), + transaction_ids_for_download_errors.clone(), + ], + ) + .await?; + + // First test with an error returned from the Mempool service. + respond_to_queue_request_with_error( + &mut mempool, + transaction_ids_for_call_failure, + service_call_error, + ).await?; + + // Then test a failure to download transactions. + respond_to_queue_request( + &mut mempool, + transaction_ids_for_download_errors, + download_result_list, + ).await?; + + mempool.expect_no_requests().await?; + + // Wait until next crawl iteration. + time::sleep(RATE_LIMIT_DELAY).await; + + // Second crawl iteration: + // The mempool should continue crawling normally. + crawler_iteration( + &mut peer_set, + vec![transaction_ids_for_return_to_normal.clone()], + ) + .await?; + + let response_list = vec![Ok(()); transaction_ids_for_return_to_normal.len()]; + + respond_to_queue_request( + &mut mempool, + transaction_ids_for_return_to_normal, + response_list, + ).await?; + + mempool.expect_no_requests().await?; + + Ok::<(), TestCaseError>(()) + })?; + } +} + +/// Spawn a crawler instance using mock services. +fn setup_crawler() -> ( + MockPeerSet, + MockMempool, + SyncStatus, + RecentSyncLengths, + ChainTipSender, +) { + let peer_set = MockService::build().for_prop_tests(); + let mempool = MockService::build().for_prop_tests(); + let (sync_status, recent_sync_lengths) = SyncStatus::new(); + + // the network should be irrelevant here + let (chain_tip_sender, _latest_chain_tip, chain_tip_change) = + ChainTipSender::new(None, Network::Mainnet); + + Crawler::spawn( + &Config::default(), + peer_set.clone(), + mempool.clone(), + sync_status.clone(), + chain_tip_change, + ); + + ( + peer_set, + mempool, + sync_status, + recent_sync_lengths, + chain_tip_sender, + ) +} + +/// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list. +async fn respond_with_transaction_ids( + peer_set: &mut MockPeerSet, + transaction_ids: HashSet, +) -> Result<(), TestCaseError> { + peer_set + .expect_request(zn::Request::MempoolTransactionIds) + .await? + .respond(zn::Response::TransactionIds( + transaction_ids.into_iter().collect(), + )); + + Ok(()) +} + +/// Intercept fanned-out requests for mempool transaction IDs and answer with the `responses`. +/// +/// Each item in `responses` is a list of transaction IDs to send back to a single request. +/// Therefore, each item represents the response sent by a peer in the network. +/// +/// If there are less items in `responses` the [`FANOUT`] number, then the remaining requests are +/// answered with an empty list of transaction IDs. +/// +/// # Panics +/// +/// If `responses` contains more items than the [`FANOUT`] number. +async fn crawler_iteration( + peer_set: &mut MockPeerSet, + responses: Vec>, +) -> Result<(), TestCaseError> { + let empty_responses = FANOUT + .checked_sub(responses.len()) + .expect("Too many responses to be sent in a single crawl iteration"); + + for response in responses { + respond_with_transaction_ids(peer_set, response).await?; + } + + for _ in 0..empty_responses { + respond_with_transaction_ids(peer_set, HashSet::new()).await?; + } + + peer_set.expect_no_requests().await?; + + Ok(()) +} + +/// Intercept request for mempool to download and verify transactions. +/// +/// The intercepted request will be verified to check if it has the `expected_transaction_ids`, and +/// it will be answered with a list of results, one for each transaction requested to be +/// downloaded. +/// +/// # Panics +/// +/// If `response` and `expected_transaction_ids` have different sizes. +async fn respond_to_queue_request( + mempool: &mut MockMempool, + expected_transaction_ids: HashSet, + response: Vec>, +) -> Result<(), TestCaseError> { + mempool + .expect_request_that(|req| { + if let mempool::Request::Queue(req) = req { + let ids: HashSet = req + .iter() + .filter_map(|gossip| { + if let Gossip::Id(id) = gossip { + Some(*id) + } else { + None + } + }) + .collect(); + ids == expected_transaction_ids + } else { + false + } + }) + .await? + .respond(mempool::Response::Queued(response)); + + Ok(()) +} + +/// Intercept request for mempool to download and verify transactions, and answer with an error. +/// +/// The intercepted request will be verified to check if it has the `expected_transaction_ids`, and +/// it will be answered with `error`, as if the service had an internal failure that prevented it +/// from queuing the transactions for downloading. +async fn respond_to_queue_request_with_error( + mempool: &mut MockMempool, + expected_transaction_ids: HashSet, + error: MempoolError, +) -> Result<(), TestCaseError> { + mempool + .expect_request_that(|req| { + if let mempool::Request::Queue(req) = req { + let ids: HashSet = req + .iter() + .filter_map(|gossip| { + if let Gossip::Id(id) = gossip { + Some(*id) + } else { + None + } + }) + .collect(); + ids == expected_transaction_ids + } else { + false + } + }) + .await? + .respond(Err(error)); + + Ok(()) +} diff --git a/zebrad/src/components/mempool/crawler/tests/timing.rs b/zebrad/src/components/mempool/crawler/tests/timing.rs new file mode 100644 index 00000000000..381d6638806 --- /dev/null +++ b/zebrad/src/components/mempool/crawler/tests/timing.rs @@ -0,0 +1,29 @@ +//! Timing tests for the mempool crawler. + +use std::convert::TryInto; + +use zebra_chain::parameters::POST_BLOSSOM_POW_TARGET_SPACING; +use zebra_network::constants::{DEFAULT_CRAWL_NEW_PEER_INTERVAL, HANDSHAKE_TIMEOUT}; + +use crate::components::mempool::crawler::RATE_LIMIT_DELAY; + +#[test] +fn ensure_timing_consistent() { + assert!( + RATE_LIMIT_DELAY.as_secs() + < POST_BLOSSOM_POW_TARGET_SPACING + .try_into() + .expect("not negative"), + "a mempool crawl should complete before most new blocks" + ); + + // The default peer crawler interval should be at least + // `HANDSHAKE_TIMEOUT` lower than all other crawler intervals. + // + // See `DEFAULT_CRAWL_NEW_PEER_INTERVAL` for details. + assert!( + DEFAULT_CRAWL_NEW_PEER_INTERVAL.as_secs() + HANDSHAKE_TIMEOUT.as_secs() + < RATE_LIMIT_DELAY.as_secs(), + "an address crawl and peer connections should complete before most syncer restarts" + ); +} diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 163363b97f1..d3e74a3b55b 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -156,12 +156,14 @@ pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(180); /// This delay is particularly important on instances with slow or unreliable /// networks, and on testnet, which has a small number of slow peers. /// +/// Using a prime number makes sure that syncer fanouts don't synchronise with other crawls. +/// /// ## Correctness /// /// If this delay is removed (or set too low), the syncer will /// sometimes get stuck in a failure loop, due to leftover downloads from /// previous sync runs. -const SYNC_RESTART_DELAY: Duration = Duration::from_secs(61); +const SYNC_RESTART_DELAY: Duration = Duration::from_secs(67); /// Controls how long we wait to retry a failed attempt to download /// and verify the genesis block. @@ -433,7 +435,14 @@ where tracing::debug!(?block_locator, "got block locator"); let mut requests = FuturesUnordered::new(); - for _ in 0..FANOUT { + for attempt in 0..FANOUT { + if attempt > 0 { + // Let other tasks run, so we're more likely to choose a different peer. + // + // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214) + tokio::task::yield_now().await; + } + requests.push(self.tip_network.ready().await.map_err(|e| eyre!(e))?.call( zn::Request::FindBlocks { known_blocks: block_locator.clone(), @@ -552,7 +561,14 @@ where for tip in tips { tracing::debug!(?tip, "asking peers to extend chain tip"); let mut responses = FuturesUnordered::new(); - for _ in 0..FANOUT { + for attempt in 0..FANOUT { + if attempt > 0 { + // Let other tasks run, so we're more likely to choose a different peer. + // + // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214) + tokio::task::yield_now().await; + } + responses.push(self.tip_network.ready().await.map_err(|e| eyre!(e))?.call( zn::Request::FindBlocks { known_blocks: vec![tip.tip], diff --git a/zebrad/src/components/sync/tests/timing.rs b/zebrad/src/components/sync/tests/timing.rs index 0ca206f1295..6fd76b021a7 100644 --- a/zebrad/src/components/sync/tests/timing.rs +++ b/zebrad/src/components/sync/tests/timing.rs @@ -1,12 +1,16 @@ -use std::sync::{ - atomic::{AtomicU8, Ordering}, - Arc, +use std::{ + convert::TryInto, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }, }; use futures::future; use tokio::time::{timeout, Duration}; -use zebra_chain::parameters::Network; +use zebra_chain::parameters::{Network, POST_BLOSSOM_POW_TARGET_SPACING}; +use zebra_network::constants::{DEFAULT_CRAWL_NEW_PEER_INTERVAL, HANDSHAKE_TIMEOUT}; use super::super::*; use crate::config::ZebradConfig; @@ -61,10 +65,28 @@ fn ensure_timeouts_consistent() { // This constraint makes genesis retries more likely to succeed assert!( - GENESIS_TIMEOUT_RETRY.as_secs() > zebra_network::constants::HANDSHAKE_TIMEOUT.as_secs() + GENESIS_TIMEOUT_RETRY.as_secs() > HANDSHAKE_TIMEOUT.as_secs() && GENESIS_TIMEOUT_RETRY.as_secs() < BLOCK_DOWNLOAD_TIMEOUT.as_secs(), "Genesis retries should wait for new peers, but they shouldn't wait too long" ); + + assert!( + SYNC_RESTART_DELAY.as_secs() + < POST_BLOSSOM_POW_TARGET_SPACING + .try_into() + .expect("not negative"), + "a syncer tip crawl should complete before most new blocks" + ); + + // The default peer crawler interval should be at least + // `HANDSHAKE_TIMEOUT` lower than all other crawler intervals. + // + // See `DEFAULT_CRAWL_NEW_PEER_INTERVAL` for details. + assert!( + DEFAULT_CRAWL_NEW_PEER_INTERVAL.as_secs() + HANDSHAKE_TIMEOUT.as_secs() + < SYNC_RESTART_DELAY.as_secs(), + "an address crawl and peer connections should complete before most syncer tips crawls" + ); } /// Test that calls to [`ChainSync::request_genesis`] are rate limited.