Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel download and verify tasks when the mempool is deactivated #2764

Merged
merged 14 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion zebrad/src/components/inbound/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn mempool_requests_for_transactions() {
let (peer_set, _) = mock_peer_set();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = Arc::new(std::sync::Mutex::new(address_book));
let (sync_status, _recent_syncs) = SyncStatus::new();
let (sync_status, mut recent_syncs) = SyncStatus::new();

let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
Expand Down Expand Up @@ -61,6 +61,9 @@ async fn mempool_requests_for_transactions() {
// We can't expect or unwrap because the returned Result does not implement Debug
assert!(r.is_ok());

// Pretend we're close to tip to enable the mempool
SyncStatus::sync_close_to_tip(&mut recent_syncs);

// Test `Request::MempoolTransactionIds`
let request = inbound_service
.clone()
Expand Down
23 changes: 23 additions & 0 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub struct Mempool {
/// Allows checking if we are near the tip to enable/disable the mempool.
#[allow(dead_code)]
sync_status: SyncStatus,

/// Indicates whether the mempool is enabled or not.
enabled: bool,
}

impl Mempool {
Expand All @@ -102,6 +105,7 @@ impl Mempool {
storage: Default::default(),
tx_downloads,
sync_status,
enabled: false,
}
}

Expand All @@ -111,6 +115,12 @@ impl Mempool {
&mut self.storage
}

/// Get the transaction downloader of the mempool for testing purposes.
#[cfg(test)]
pub fn tx_downloads(&self) -> &Pin<Box<InboundTxDownloads>> {
&self.tx_downloads
}

/// Check if transaction should be downloaded and/or verified.
///
/// If it is already in the mempool (or in its rejected list)
Expand All @@ -134,6 +144,16 @@ impl Service<Request> for Mempool {
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let is_close_to_tip = self.sync_status.is_close_to_tip();
if self.enabled && !is_close_to_tip {
// Disable mempool
self.tx_downloads.cancel_all();
self.enabled = false;
} else if !self.enabled && is_close_to_tip {
// Enable mempool
self.enabled = true;
}

// Clean up completed download tasks and add to mempool if successful
while let Poll::Ready(Some(r)) = self.tx_downloads.as_mut().poll_next(cx) {
if let Ok(tx) = r {
Expand All @@ -146,6 +166,9 @@ impl Service<Request> for Mempool {

#[instrument(name = "mempool", skip(self, req))]
fn call(&mut self, req: Request) -> Self::Future {
if !self.enabled {
return async move { Err(MempoolError::Disabled.into()) }.boxed();
}
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
match req {
Request::TransactionIds => {
let res = self.storage.tx_ids();
Expand Down
22 changes: 22 additions & 0 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,28 @@ where
Ok(())
}

/// Cancel all running tasks and reset the downloader state.
// Note: copied from zebrad/src/components/sync/downloads.rs
pub fn cancel_all(&mut self) {
// Replace the pending task list with an empty one and drop it.
let _ = std::mem::take(&mut self.pending);
// Signal cancellation to all running tasks.
// Since we already dropped the JoinHandles above, they should
// fail silently.
for (_hash, cancel) in self.cancel_handles.drain() {
let _ = cancel.send(());
}
assert!(self.pending.is_empty());
assert!(self.cancel_handles.is_empty());
}

/// Get the number of currently in-flight download tasks.
// Note: copied from zebrad/src/components/sync/downloads.rs
#[allow(dead_code)]
pub fn in_flight(&self) -> usize {
self.pending.len()
}

/// Check if transaction is already in the state.
async fn transaction_in_state(state: &mut ZS, txid: UnminedTxId) -> Result<(), BoxError> {
// Check if the transaction is already in the state.
Expand Down
3 changes: 3 additions & 0 deletions zebrad/src/components/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ pub enum MempoolError {
/// The queue's capacity is [`super::downloads::MAX_INBOUND_CONCURRENCY`].
#[error("transaction dropped because the queue is full")]
FullQueue,

#[error("mempool is disabled since synchronization did not reach the tip")]
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
Disabled,
}
145 changes: 141 additions & 4 deletions zebrad/src/components/mempool/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn mempool_service_basic() -> Result<(), Report> {
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let (peer_set, _) = mock_peer_set();
let (sync_status, _recent_syncs) = SyncStatus::new();
let (sync_status, mut recent_syncs) = SyncStatus::new();

let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
Expand All @@ -37,6 +37,9 @@ async fn mempool_service_basic() -> Result<(), Report> {
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage.insert(genesis_transactions.1[0].clone())?;

// Pretend we're close to tip to enable the mempool
SyncStatus::sync_close_to_tip(&mut recent_syncs);

// Test `Request::TransactionIds`
let response = service
.ready_and()
Expand Down Expand Up @@ -75,14 +78,18 @@ async fn mempool_service_basic() -> Result<(), Report> {

// Insert more transactions into the mempool storage.
// This will cause the genesis transaction to be moved into rejected.
let more_transactions = unmined_transactions_in_blocks(10, network);
for tx in more_transactions.1.iter().skip(1) {
let (count, more_transactions) = unmined_transactions_in_blocks(10, network);
// Skip the first (used before) and the last (will be used later)
for tx in more_transactions.iter().skip(1).take(count - 2) {
service.storage.insert(tx.clone())?;
}

// Test `Request::RejectedTransactionIds`
let response = service
.oneshot(Request::RejectedTransactionIds(
.ready_and()
.await
.unwrap()
.call(Request::RejectedTransactionIds(
genesis_transactions_hash_set,
))
.await
Expand All @@ -94,5 +101,135 @@ async fn mempool_service_basic() -> Result<(), Report> {

assert_eq!(rejected_ids, genesis_transaction_ids);

// Test `Request::Queue`
// Use the ID of the last transaction in the list
let txid = more_transactions.last().unwrap().id;
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);

Ok(())
}

#[tokio::test]
async fn mempool_service_disabled() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let (peer_set, _) = mock_peer_set();
let (sync_status, mut recent_syncs) = SyncStatus::new();

let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;

// get the genesis block transactions from the Zcash blockchain.
let genesis_transactions = unmined_transactions_in_blocks(0, network);
// Start the mempool service
let mut service = Mempool::new(
network,
peer_set,
state_service.clone(),
tx_verifier,
sync_status,
);
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage.insert(genesis_transactions.1[0].clone())?;

// Test if mempool is disabled (it should start disabled)
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await;
assert_eq!(
*response
.expect_err("mempool should return an error")
.downcast_ref::<MempoolError>()
.expect("error must be MempoolError"),
MempoolError::Disabled,
"error must be MempoolError::Disabled"
);

// Pretend we're close to tip to enable the mempool
SyncStatus::sync_close_to_tip(&mut recent_syncs);

// Test if the mempool answers correctly (i.e. is enabled)
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let _genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};

let (_count, more_transactions) = unmined_transactions_in_blocks(1, network);

// Queue a transaction for download
// Use the ID of the last transaction in the list
let txid = more_transactions.last().unwrap().id;
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(
service.tx_downloads().in_flight(),
1,
"Transaction must be queued for download"
);

// Pretend we're far from the tip to disable the mempool
SyncStatus::sync_far_from_tip(&mut recent_syncs);

// Test if mempool is disabled again
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await;
assert_eq!(
*response
.expect_err("mempool should return an error")
.downcast_ref::<MempoolError>()
.expect("error must be MempoolError"),
MempoolError::Disabled,
"error must be MempoolError::Disabled"
);

assert_eq!(
service.tx_downloads().in_flight(),
0,
"Transaction download should have been cancelled"
);

Ok(())
}
18 changes: 18 additions & 0 deletions zebrad/src/components/sync/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,22 @@ impl SyncStatus {
// average sync length falls below the threshold.
avg < Self::MIN_DIST_FROM_TIP
}

/// Feed the given [`RecentSyncLengths`] it order to make the matching
/// [`SyncStatus`] report that it's close to the tip.
#[cfg(test)]
pub(crate) fn sync_close_to_tip(recent_syncs: &mut RecentSyncLengths) {
for _ in 0..RecentSyncLengths::MAX_RECENT_LENGTHS {
recent_syncs.push_extend_tips_length(1);
}
}

/// Feed the given [`RecentSyncLengths`] it order to make the matching
/// [`SyncStatus`] report that it's not close to the tip.
#[cfg(test)]
pub(crate) fn sync_far_from_tip(recent_syncs: &mut RecentSyncLengths) {
for _ in 0..RecentSyncLengths::MAX_RECENT_LENGTHS {
recent_syncs.push_extend_tips_length(Self::MIN_DIST_FROM_TIP * 10);
}
}
}