From faa40e01778163e689a04b681604f08ad9f998cf Mon Sep 17 00:00:00 2001 From: Janito Vaqueiro Ferreira Filho Date: Fri, 27 Aug 2021 02:05:58 +0000 Subject: [PATCH 1/3] Store a `SyncStatus` handle in the `Crawler` The helper type will make it easier to determine if the crawler is enabled or not. --- zebrad/src/commands/start.rs | 4 ++-- zebrad/src/components/mempool/crawler.rs | 6 +++++- zebrad/src/components/mempool/crawler/tests.rs | 10 ++++++++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 8a998445ff6..145a152995b 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -85,12 +85,12 @@ impl StartCmd { info!("initializing syncer"); // TODO: use sync_status to activate the mempool (#2592) - let (syncer, _sync_status) = + let (syncer, sync_status) = ChainSync::new(&config, peer_set.clone(), state, chain_verifier); select! { result = syncer.sync().fuse() => result, - _ = mempool::Crawler::spawn(peer_set).fuse() => { + _ = mempool::Crawler::spawn(peer_set, sync_status).fuse() => { unreachable!("The mempool crawler only stops if it panics"); } } diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index 22b509a6590..e924d6382df 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -10,6 +10,8 @@ use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; use zebra_network::{Request, Response}; +use super::super::sync::SyncStatus; + #[cfg(test)] mod tests; @@ -31,6 +33,7 @@ const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6); /// The mempool transaction crawler. pub struct Crawler { peer_set: Mutex>, + status: SyncStatus, } impl Crawler @@ -39,9 +42,10 @@ where S::Future: Send, { /// Spawn an asynchronous task to run the mempool crawler. - pub fn spawn(peer_set: S) -> JoinHandle> { + pub fn spawn(peer_set: S, status: SyncStatus) -> JoinHandle> { let crawler = Crawler { peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)), + status, }; tokio::spawn(crawler.run()) diff --git a/zebrad/src/components/mempool/crawler/tests.rs b/zebrad/src/components/mempool/crawler/tests.rs index 35b71d04233..afdf28944c4 100644 --- a/zebrad/src/components/mempool/crawler/tests.rs +++ b/zebrad/src/components/mempool/crawler/tests.rs @@ -8,7 +8,7 @@ use tower::{buffer::Buffer, util::BoxService, BoxError}; use zebra_network::{Request, Response}; -use super::{Crawler, FANOUT, RATE_LIMIT_DELAY}; +use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY}; /// The number of iterations to crawl while testing. /// @@ -30,7 +30,13 @@ const ERROR_MARGIN: Duration = Duration::from_millis(100); async fn crawler_requests_for_transaction_ids() { let (peer_set, mut requests) = mock_peer_set(); - Crawler::spawn(peer_set); + // Mock the latest sync length in a state that enables the mempool. + let (sync_status, mut recent_sync_lengths) = SyncStatus::new(); + for _ in 0..5 { + recent_sync_lengths.push_extend_tips_length(0); + } + + Crawler::spawn(peer_set, sync_status); time::pause(); From 8a1c81021a1cf579193d9ebe3fdcfc1bcb56c0ff Mon Sep 17 00:00:00 2001 From: Janito Vaqueiro Ferreira Filho Date: Thu, 26 Aug 2021 00:29:24 +0000 Subject: [PATCH 2/3] Pause crawler if mempool is disabled Implement waiting until the mempool becomes enabled, so that the crawler does not run while the mempool is disabled. If the `MempoolStatus` helper is unable to determine if the mempool is enabled, stop the crawler task entirely. --- zebrad/src/components/mempool/crawler.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index e924d6382df..bb53da4c667 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -52,17 +52,16 @@ where } /// Periodically crawl peers for transactions to include in the mempool. - pub async fn run(self) -> Result<(), BoxError> { - loop { - self.wait_until_enabled().await; + /// + /// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when + /// Zebra is shutting down. + pub async fn run(mut self) -> Result<(), BoxError> { + while self.status.wait_until_close_to_tip().await.is_ok() { self.crawl_transactions().await?; sleep(RATE_LIMIT_DELAY).await; } - } - /// Wait until the mempool is enabled. - async fn wait_until_enabled(&self) { - // TODO: Check if synchronizing up to chain tip has finished (#2603). + Ok(()) } /// Crawl peers for transactions. From 0f935daf6b22012652df9cedea864a71827daebf Mon Sep 17 00:00:00 2001 From: Janito Vaqueiro Ferreira Filho Date: Mon, 30 Aug 2021 15:07:27 +0000 Subject: [PATCH 3/3] Update test to consider when crawler is paused Change the mempool crawler test so that it's a proptest that tests different chain sync. lengths. This leads to different scenarios with the crawler pausing and resuming. --- .../src/components/mempool/crawler/tests.rs | 62 +++++++++++++------ 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/zebrad/src/components/mempool/crawler/tests.rs b/zebrad/src/components/mempool/crawler/tests.rs index afdf28944c4..e9ea6dc1986 100644 --- a/zebrad/src/components/mempool/crawler/tests.rs +++ b/zebrad/src/components/mempool/crawler/tests.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use proptest::prelude::*; use tokio::{ sync::mpsc::{self, UnboundedReceiver}, time::{self, timeout}, @@ -21,37 +22,60 @@ const CRAWL_ITERATIONS: usize = 4; /// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test. /// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for /// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`. -const MAX_REQUEST_DELAY: Duration = Duration::from_millis(250); +const MAX_REQUEST_DELAY: Duration = Duration::from_millis(25); /// The amount of time to advance beyond the expected instant that the crawler wakes up. const ERROR_MARGIN: Duration = Duration::from_millis(100); -#[tokio::test] -async fn crawler_requests_for_transaction_ids() { - let (peer_set, mut requests) = mock_peer_set(); +proptest! { + #[test] + fn crawler_requests_for_transaction_ids(mut sync_lengths in any::>()) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); - // Mock the latest sync length in a state that enables the mempool. - let (sync_status, mut recent_sync_lengths) = SyncStatus::new(); - for _ in 0..5 { - recent_sync_lengths.push_extend_tips_length(0); - } + // Add a dummy last element, so that all of the original values are used. + sync_lengths.push(0); - Crawler::spawn(peer_set, sync_status); + runtime.block_on(async move { + let (peer_set, mut requests) = mock_peer_set(); + let (sync_status, mut recent_sync_lengths) = SyncStatus::new(); - time::pause(); + time::pause(); - for _ in 0..CRAWL_ITERATIONS { - for _ in 0..FANOUT { - let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; + Crawler::spawn(peer_set, sync_status.clone()); - assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds)))); - } + for sync_length in sync_lengths { + let mempool_is_enabled = sync_status.is_close_to_tip(); + + for _ in 0..CRAWL_ITERATIONS { + for _ in 0..FANOUT { + let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; + + if mempool_is_enabled { + prop_assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds)))); + } else { + prop_assert!(request.is_err()); + } + } + + let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; + + prop_assert!(extra_request.is_err()); - let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; + time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await; + } - assert!(extra_request.is_err()); + // Applying the update event at the end of the test iteration means that the first + // iteration runs with an empty recent sync. lengths vector. A dummy element is + // appended to the events so that all of the original values are applied. + recent_sync_lengths.push_extend_tips_length(sync_length); + } - time::advance(RATE_LIMIT_DELAY + ERROR_MARGIN).await; + Ok(()) + })?; } }