Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only enable the mempool crawler after synchronization reaches the chain tip #2667

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ impl StartCmd {

info!("initializing syncer");
// TODO: use sync_status to activate the mempool (#2592)
let (syncer, _sync_status) =
let (syncer, sync_status) =
ChainSync::new(&config, peer_set.clone(), state, chain_verifier);

select! {
result = syncer.sync().fuse() => result,
_ = mempool::Crawler::spawn(peer_set).fuse() => {
_ = mempool::Crawler::spawn(peer_set, sync_status).fuse() => {
unreachable!("The mempool crawler only stops if it panics");
}
}
Expand Down
19 changes: 11 additions & 8 deletions zebrad/src/components/mempool/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use tower::{timeout::Timeout, BoxError, Service, ServiceExt};

use zebra_network::{Request, Response};

use super::super::sync::SyncStatus;

#[cfg(test)]
mod tests;

Expand All @@ -31,6 +33,7 @@ const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
/// The mempool transaction crawler.
pub struct Crawler<S> {
peer_set: Mutex<Timeout<S>>,
status: SyncStatus,
}

impl<S> Crawler<S>
Expand All @@ -39,26 +42,26 @@ where
S::Future: Send,
{
/// Spawn an asynchronous task to run the mempool crawler.
pub fn spawn(peer_set: S) -> JoinHandle<Result<(), BoxError>> {
pub fn spawn(peer_set: S, status: SyncStatus) -> JoinHandle<Result<(), BoxError>> {
let crawler = Crawler {
peer_set: Mutex::new(Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT)),
status,
};

tokio::spawn(crawler.run())
}

/// Periodically crawl peers for transactions to include in the mempool.
pub async fn run(self) -> Result<(), BoxError> {
loop {
self.wait_until_enabled().await;
///
/// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when
/// Zebra is shutting down.
pub async fn run(mut self) -> Result<(), BoxError> {
while self.status.wait_until_close_to_tip().await.is_ok() {
self.crawl_transactions().await?;
sleep(RATE_LIMIT_DELAY).await;
}
}

/// Wait until the mempool is enabled.
async fn wait_until_enabled(&self) {
// TODO: Check if synchronizing up to chain tip has finished (#2603).
Ok(())
}

/// Crawl peers for transactions.
Expand Down
60 changes: 45 additions & 15 deletions zebrad/src/components/mempool/crawler/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Duration;

use proptest::prelude::*;
use tokio::{
sync::mpsc::{self, UnboundedReceiver},
time::{self, timeout},
Expand All @@ -8,7 +9,7 @@ use tower::{buffer::Buffer, util::BoxService, BoxError};

use zebra_network::{Request, Response};

use super::{Crawler, FANOUT, RATE_LIMIT_DELAY};
use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY};

/// The number of iterations to crawl while testing.
///
Expand All @@ -21,31 +22,60 @@ const CRAWL_ITERATIONS: usize = 4;
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for
/// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`.
const MAX_REQUEST_DELAY: Duration = Duration::from_millis(250);
const MAX_REQUEST_DELAY: Duration = Duration::from_millis(25);
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

/// The amount of time to advance beyond the expected instant that the crawler wakes up.
const ERROR_MARGIN: Duration = Duration::from_millis(100);

#[tokio::test]
async fn crawler_requests_for_transaction_ids() {
let (peer_set, mut requests) = mock_peer_set();
proptest! {
#[test]
fn crawler_requests_for_transaction_ids(mut sync_lengths in any::<Vec<usize>>()) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let _guard = runtime.enter();

Crawler::spawn(peer_set);
// Add a dummy last element, so that all of the original values are used.
sync_lengths.push(0);

time::pause();
runtime.block_on(async move {
let (peer_set, mut requests) = mock_peer_set();
let (sync_status, mut recent_sync_lengths) = SyncStatus::new();

for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT {
let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;
time::pause();

assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds))));
}
Crawler::spawn(peer_set, sync_status.clone());

for sync_length in sync_lengths {
let mempool_is_enabled = sync_status.is_close_to_tip();

for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT {
let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;

if mempool_is_enabled {
prop_assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds))));
} else {
prop_assert!(request.is_err());
}
}

let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;

prop_assert!(extra_request.is_err());

let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;
time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await;
}

assert!(extra_request.is_err());
// Applying the update event at the end of the test iteration means that the first
// iteration runs with an empty recent sync. lengths vector. A dummy element is
// appended to the events so that all of the original values are applied.
recent_sync_lengths.push_extend_tips_length(sync_length);
}

time::advance(RATE_LIMIT_DELAY + ERROR_MARGIN).await;
Ok(())
})?;
}
}

Expand Down