Skip to content

Commit

Permalink
Only enable the mempool crawler after synchronization reaches the cha…
Browse files Browse the repository at this point in the history
…in tip (#2667)

* Store a `SyncStatus` handle in the `Crawler`

The helper type will make it easier to determine if the crawler is
enabled or not.

* Pause crawler if mempool is disabled

Implement waiting until the mempool becomes enabled, so that the crawler
does not run while the mempool is disabled.

If the `MempoolStatus` helper is unable to determine if the mempool is
enabled, stop the crawler task entirely.

* Update test to consider when crawler is paused

Change the mempool crawler test so that it's a proptest that tests
different chain sync. lengths. This leads to different scenarios with
the crawler pausing and resuming.

Co-authored-by: teor <teor@riseup.net>
  • Loading branch information
jvff and teor2345 authored Aug 31, 2021
1 parent 2dac0dd commit 8bff71e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 25 deletions.
4 changes: 2 additions & 2 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ impl StartCmd {

info!("initializing syncer");
// TODO: use sync_status to activate the mempool (#2592)
let (syncer, _sync_status) =
let (syncer, sync_status) =
ChainSync::new(&config, peer_set.clone(), state, chain_verifier);

select! {
result = syncer.sync().fuse() => result,
_ = mempool::Crawler::spawn(peer_set).fuse() => {
_ = mempool::Crawler::spawn(peer_set, sync_status).fuse() => {
unreachable!("The mempool crawler only stops if it panics");
}
}
Expand Down
19 changes: 11 additions & 8 deletions zebrad/src/components/mempool/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use tower::{timeout::Timeout, BoxError, Service, ServiceExt};

use zebra_network::{Request, Response};

use super::super::sync::SyncStatus;

#[cfg(test)]
mod tests;

Expand All @@ -31,6 +33,7 @@ const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
/// The mempool transaction crawler.
pub struct Crawler<S> {
peer_set: Mutex<Timeout<S>>,
status: SyncStatus,
}

impl<S> Crawler<S>
Expand All @@ -39,26 +42,26 @@ where
S::Future: Send,
{
/// Spawn an asynchronous task to run the mempool crawler.
pub fn spawn(peer_set: S) -> JoinHandle<Result<(), BoxError>> {
pub fn spawn(peer_set: S, status: SyncStatus) -> JoinHandle<Result<(), BoxError>> {
let crawler = Crawler {
peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)),
status,
};

tokio::spawn(crawler.run())
}

/// Periodically crawl peers for transactions to include in the mempool.
pub async fn run(self) -> Result<(), BoxError> {
loop {
self.wait_until_enabled().await;
///
/// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when
/// Zebra is shutting down.
pub async fn run(mut self) -> Result<(), BoxError> {
while self.status.wait_until_close_to_tip().await.is_ok() {
self.crawl_transactions().await?;
sleep(RATE_LIMIT_DELAY).await;
}
}

/// Wait until the mempool is enabled.
async fn wait_until_enabled(&self) {
// TODO: Check if synchronizing up to chain tip has finished (#2603).
Ok(())
}

/// Crawl peers for transactions.
Expand Down
60 changes: 45 additions & 15 deletions zebrad/src/components/mempool/crawler/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Duration;

use proptest::prelude::*;
use tokio::{
sync::mpsc::{self, UnboundedReceiver},
time::{self, timeout},
Expand All @@ -8,7 +9,7 @@ use tower::{buffer::Buffer, util::BoxService, BoxError};

use zebra_network::{Request, Response};

use super::{Crawler, FANOUT, RATE_LIMIT_DELAY};
use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY};

/// The number of iterations to crawl while testing.
///
Expand All @@ -21,31 +22,60 @@ const CRAWL_ITERATIONS: usize = 4;
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for
/// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`.
const MAX_REQUEST_DELAY: Duration = Duration::from_millis(250);
const MAX_REQUEST_DELAY: Duration = Duration::from_millis(25);

/// The amount of time to advance beyond the expected instant that the crawler wakes up.
const ERROR_MARGIN: Duration = Duration::from_millis(100);

#[tokio::test]
async fn crawler_requests_for_transaction_ids() {
let (peer_set, mut requests) = mock_peer_set();
proptest! {
#[test]
fn crawler_requests_for_transaction_ids(mut sync_lengths in any::<Vec<usize>>()) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let _guard = runtime.enter();

Crawler::spawn(peer_set);
// Add a dummy last element, so that all of the original values are used.
sync_lengths.push(0);

time::pause();
runtime.block_on(async move {
let (peer_set, mut requests) = mock_peer_set();
let (sync_status, mut recent_sync_lengths) = SyncStatus::new();

for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT {
let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;
time::pause();

assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds))));
}
Crawler::spawn(peer_set, sync_status.clone());

for sync_length in sync_lengths {
let mempool_is_enabled = sync_status.is_close_to_tip();

for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT {
let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;

if mempool_is_enabled {
prop_assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds))));
} else {
prop_assert!(request.is_err());
}
}

let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;

prop_assert!(extra_request.is_err());

let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;
time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await;
}

assert!(extra_request.is_err());
// Applying the update event at the end of the test iteration means that the first
// iteration runs with an empty recent sync. lengths vector. A dummy element is
// appended to the events so that all of the original values are applied.
recent_sync_lengths.push_extend_tips_length(sync_length);
}

time::advance(RATE_LIMIT_DELAY + ERROR_MARGIN).await;
Ok(())
})?;
}
}

Expand Down

0 comments on commit 8bff71e

Please sign in to comment.