Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear mempool at a network upgrade #2773

Merged
merged 6 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion zebra-state/src/service/chain_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl ChainTipChange {
///
/// If a lot of blocks are committed at the same time,
/// the change will skip some blocks, and return a [`Reset`].
pub async fn tip_change(&mut self) -> Result<TipAction, watch::error::RecvError> {
pub async fn wait_for_tip_change(&mut self) -> Result<TipAction, watch::error::RecvError> {
let block = self.tip_block_change().await?;

let action = self.action(block.clone());
Expand All @@ -320,6 +320,27 @@ impl ChainTipChange {
Ok(action)
}

/// Returns:
/// - `Some(`[`TipAction`]`)` if there has been a change since the last time the method was called.
/// - `None` if there has been no change.
///
/// See [`wait_for_tip_change`] for details.
pub fn last_tip_change(&mut self) -> Option<TipAction> {
// Obtain the tip block.
let block = self.best_tip_block()?;

// Ignore an unchanged tip.
if Some(block.hash) == self.last_change_hash {
return None;
}

let action = self.action(block.clone());

self.last_change_hash = Some(block.hash);

Some(action)
}

/// Return an action based on `block` and the last change we returned.
fn action(&self, block: ChainTipBlock) -> TipAction {
// check for an edge case that's dealt with by other code
Expand Down
2 changes: 1 addition & 1 deletion zebra-state/src/service/chain_tip/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ proptest! {

prop_assert_eq!(
chain_tip_change
.tip_change()
.wait_for_tip_change()
.now_or_never()
.transpose()
.expect("watch sender is not dropped"),
Expand Down
6 changes: 3 additions & 3 deletions zebra-state/src/service/chain_tip/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn chain_tip_change_is_initially_not_ready() {
ChainTipSender::new(None, Mainnet);

let first = chain_tip_change
.tip_change()
.wait_for_tip_change()
.now_or_never()
.transpose()
.expect("watch sender is not dropped");
Expand All @@ -49,7 +49,7 @@ fn chain_tip_change_is_initially_not_ready() {

// try again, just to be sure
let first = chain_tip_change
.tip_change()
.wait_for_tip_change()
.now_or_never()
.transpose()
.expect("watch sender is not dropped");
Expand All @@ -60,7 +60,7 @@ fn chain_tip_change_is_initially_not_ready() {
#[allow(clippy::redundant_clone)]
let first_clone = chain_tip_change
.clone()
.tip_change()
.wait_for_tip_change()
.now_or_never()
.transpose()
.expect("watch sender is not dropped");
Expand Down
29 changes: 4 additions & 25 deletions zebra-state/src/service/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{convert::TryInto, env, sync::Arc};

use futures::{stream::FuturesUnordered, FutureExt};
use futures::stream::FuturesUnordered;
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};

use zebra_chain::{
Expand Down Expand Up @@ -300,14 +300,7 @@ proptest! {
let (mut state_service, latest_chain_tip, mut chain_tip_change) = StateService::new(Config::ephemeral(), network);

prop_assert_eq!(latest_chain_tip.best_tip_height(), None);
prop_assert_eq!(
chain_tip_change
.tip_change()
.now_or_never()
.transpose()
.expect("watch sender is not dropped"),
None
);
prop_assert_eq!(chain_tip_change.last_tip_change(), None);

for block in finalized_blocks {
let expected_block = block.clone();
Expand All @@ -323,14 +316,7 @@ proptest! {
state_service.queue_and_commit_finalized(block);

prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height));
prop_assert_eq!(
chain_tip_change
.tip_change()
.now_or_never()
.transpose()
.expect("watch sender is not dropped"),
Some(expected_action)
);
prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action));
}

for block in non_finalized_blocks {
Expand All @@ -346,14 +332,7 @@ proptest! {
state_service.queue_and_commit_non_finalized(block);

prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height));
prop_assert_eq!(
chain_tip_change
.tip_change()
.now_or_never()
.transpose()
.expect("watch sender is not dropped"),
Some(expected_action)
);
prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action));
}
}

Expand Down
7 changes: 6 additions & 1 deletion zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zebra_chain::{
use zebra_consensus::{error::TransactionError, transaction};
use zebra_network as zn;
use zebra_state as zs;
use zs::ChainTipChange;
use zebra_state::{ChainTipChange, TipAction};

pub use crate::BoxError;

Expand Down Expand Up @@ -142,6 +142,11 @@ impl Service<Request> for Mempool {
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Clear the mempool if there has been a chain tip reset.
if let Some(TipAction::Reset { .. }) = self.chain_tip_change.last_tip_change() {
self.storage.clear();
}

// Clean up completed download tasks and add to mempool if successful
while let Poll::Ready(Some(r)) = self.tx_downloads.as_mut().poll_next(cx) {
if let Ok(tx) = r {
Expand Down
6 changes: 6 additions & 0 deletions zebrad/src/components/mempool/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,10 @@ impl Storage {
.filter(|tx| self.rejected.contains_key(tx))
.collect()
}

/// Clears the whole mempool storage.
pub fn clear(&mut self) {
self.verified.clear();
self.rejected.clear();
}
}