Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel download and verify tasks when the mempool is deactivated #2764

Merged
merged 14 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ impl Service<zn::Request> for Inbound {
Poll::Ready(result)
}

/// Call the inbound service.
///
/// Errors indicate that the peer has done something wrong or unexpected,
/// and will cause callers to disconnect from the remote peer.
#[instrument(name = "inbound", skip(self, req))]
fn call(&mut self, req: zn::Request) -> Self::Future {
match req {
Expand Down
10 changes: 6 additions & 4 deletions zebrad/src/components/inbound/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ async fn mempool_requests_for_transactions() {
let peer_set = MockService::build().for_unit_tests();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = Arc::new(std::sync::Mutex::new(address_book));
let (sync_status, _recent_syncs) = SyncStatus::new();
let (_state_service, _latest_chain_tip, chain_tip_change) =
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, _latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);

let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);

let (block_verifier, transaction_verifier) =
Expand All @@ -48,6 +47,9 @@ async fn mempool_requests_for_transactions() {
chain_tip_change,
);

// Enable the mempool
let _ = mempool_service.enable(&mut recent_syncs).await;

let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network);
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();

Expand All @@ -65,7 +67,7 @@ async fn mempool_requests_for_transactions() {
block_verifier.clone(),
));

let r = setup_tx.send((peer_set_service, address_book, mempool));
let r = setup_tx.send((peer_set_service, address_book, mempool.clone()));
// We can't expect or unwrap because the returned Result does not implement Debug
assert!(r.is_ok());

Expand Down
244 changes: 187 additions & 57 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::{
collections::HashSet,
future::Future,
iter,
pin::Pin,
task::{Context, Poll},
};
Expand Down Expand Up @@ -38,15 +39,18 @@ use self::downloads::{
Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
};

#[cfg(test)]
use super::sync::RecentSyncLengths;
use super::sync::SyncStatus;

type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type TxVerifier = Buffer<
type OutboundService = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type StateService = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type TxVerifierService = Buffer<
BoxService<transaction::Request, transaction::Response, TransactionError>,
transaction::Request,
>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
type InboundTxDownloads =
TxDownloads<Timeout<OutboundService>, Timeout<TxVerifierService>, StateService>;

#[derive(Debug)]
#[allow(dead_code)]
Expand All @@ -65,20 +69,33 @@ pub enum Response {
Queued(Vec<Result<(), MempoolError>>),
}

/// The state of the mempool.
///
/// Indicates wether it is enabled or disabled and, if enabled, contains
/// the necessary data to run it.
enum State {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
/// The Mempool is disabled.
Disabled,
/// The Mempool is enabled.
Enabled {
/// The Mempool storage itself.
///
/// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to
/// inject transactions into `storage`, as transactions must be verified beforehand.
storage: storage::Storage,
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
/// The transaction dowload and verify stream.
tx_downloads: Pin<Box<InboundTxDownloads>>,
},
}

/// Mempool async management and query service.
///
/// The mempool is the set of all verified transactions that this node is aware
/// of that have yet to be confirmed by the Zcash network. A transaction is
/// confirmed when it has been included in a block ('mined').
pub struct Mempool {
/// The Mempool storage itself.
///
/// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to
/// inject transactions into `storage`, as transactions must be verified beforehand.
storage: storage::Storage,

/// The transaction dowload and verify stream.
tx_downloads: Pin<Box<InboundTxDownloads>>,
/// The state of the mempool.
state: State,

/// Allows checking if we are near the tip to enable/disable the mempool.
#[allow(dead_code)]
Expand All @@ -87,48 +104,120 @@ pub struct Mempool {
/// Allows the detection of chain tip resets.
#[allow(dead_code)]
chain_tip_change: ChainTipChange,

/// Handle to the outbound service.
/// Used to construct the transaction downloader.
outbound_service: OutboundService,

/// Handle to the state service.
/// Used to construct the transaction downloader.
state_service: StateService,

/// Handle to the transaction verifier service.
/// Used to construct the transaction downloader.
tx_verifier_service: TxVerifierService,
}

impl Mempool {
#[allow(dead_code)]
pub(crate) fn new(
_network: Network,
outbound: Outbound,
state: State,
tx_verifier: TxVerifier,
outbound_service: OutboundService,
state_service: StateService,
tx_verifier_service: TxVerifierService,
sync_status: SyncStatus,
chain_tip_change: ChainTipChange,
) -> Self {
let tx_downloads = Box::pin(TxDownloads::new(
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT),
state,
));

Mempool {
storage: Default::default(),
tx_downloads,
state: State::Disabled,
sync_status,
chain_tip_change,
outbound_service,
state_service,
tx_verifier_service,
}
}

/// Update the mempool state (enabled / disabled) depending on how close to
/// the tip is the synchronization, including side effects to state changes.
fn update_state(&mut self) {
// Update enabled / disabled state
let is_close_to_tip = self.sync_status.is_close_to_tip();
if is_close_to_tip && matches!(self.state, State::Disabled) {
let tx_downloads = Box::pin(TxDownloads::new(
Timeout::new(self.outbound_service.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
Timeout::new(self.tx_verifier_service.clone(), TRANSACTION_VERIFY_TIMEOUT),
self.state_service.clone(),
));
self.state = State::Enabled {
storage: Default::default(),
tx_downloads,
};
} else if !is_close_to_tip && matches!(self.state, State::Enabled { .. }) {
self.state = State::Disabled
}
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Return wether the mempool is enabled or not.
conradoplg marked this conversation as resolved.
Show resolved Hide resolved
#[allow(dead_code)]
conradoplg marked this conversation as resolved.
Show resolved Hide resolved
pub fn enabled(&self) -> bool {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
match self.state {
State::Disabled => false,
State::Enabled { .. } => true,
}
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Get the storage field of the mempool for testing purposes.
#[cfg(test)]
pub fn storage(&mut self) -> &mut storage::Storage {
&mut self.storage
match &mut self.state {
State::Disabled => panic!("mempool must be enabled"),
State::Enabled { storage, .. } => storage,
}
}

/// Get the transaction downloader of the mempool for testing purposes.
#[cfg(test)]
pub fn tx_downloads(&self) -> &Pin<Box<InboundTxDownloads>> {
match &self.state {
State::Disabled => panic!("mempool must be enabled"),
State::Enabled { tx_downloads, .. } => tx_downloads,
}
}

/// Enable the mempool by pretending the synchronization is close to the tip.
#[cfg(test)]
pub async fn enable(&mut self, recent_syncs: &mut RecentSyncLengths) {
use tower::ServiceExt;
// Pretend we're close to tip
SyncStatus::sync_close_to_tip(recent_syncs);
// Wait for the mempool to make it enable itself
let _ = self.ready_and().await;
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Disable the mempool by pretending the synchronization is far from the tip.
#[cfg(test)]
pub async fn disable(&mut self, recent_syncs: &mut RecentSyncLengths) {
use tower::ServiceExt;
// Pretend we're far from the tip
SyncStatus::sync_far_from_tip(recent_syncs);
// Wait for the mempool to make it enable itself
let _ = self.ready_and().await;
conradoplg marked this conversation as resolved.
Show resolved Hide resolved
}

/// Check if transaction should be downloaded and/or verified.
///
/// If it is already in the mempool (or in its rejected list)
/// then it shouldn't be downloaded/verified.
fn should_download_or_verify(&mut self, txid: UnminedTxId) -> Result<(), MempoolError> {
fn should_download_or_verify(
storage: &mut storage::Storage,
txid: UnminedTxId,
) -> Result<(), MempoolError> {
// Check if the transaction is already in the mempool.
if self.storage.contains(&txid) {
if storage.contains(&txid) {
return Err(MempoolError::InMempool);
}
if self.storage.contains_rejected(&txid) {
if storage.contains_rejected(&txid) {
return Err(MempoolError::Rejected);
}
Ok(())
Expand All @@ -142,43 +231,84 @@ impl Service<Request> for Mempool {
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Clean up completed download tasks and add to mempool if successful
while let Poll::Ready(Some(r)) = self.tx_downloads.as_mut().poll_next(cx) {
if let Ok(tx) = r {
// TODO: should we do something with the result?
let _ = self.storage.insert(tx);
self.update_state();

match &mut self.state {
State::Enabled {
storage,
tx_downloads,
} => {
// Clean up completed download tasks and add to mempool if successful
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
if let Ok(tx) = r {
// TODO: should we do something with the result?
conradoplg marked this conversation as resolved.
Show resolved Hide resolved
let _ = storage.insert(tx);
}
}
}
State::Disabled => {
// When the mempool is disabled we still return that the service is ready.
// Otherwise, callers could block waiting for the mempool to be enabled,
// which may not be the desired behaviour.
}
}
Poll::Ready(Ok(()))
}

/// Call the mempool service.
///
/// Errors indicate that the peer has done something wrong or unexpected,
/// and will cause callers to disconnect from the remote peer.
#[instrument(name = "mempool", skip(self, req))]
fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::TransactionIds => {
let res = self.storage.tx_ids();
async move { Ok(Response::TransactionIds(res)) }.boxed()
}
Request::TransactionsById(ids) => {
let rsp = Ok(self.storage.transactions(ids)).map(Response::Transactions);
async move { rsp }.boxed()
}
Request::RejectedTransactionIds(ids) => {
let rsp = Ok(self.storage.rejected_transactions(ids))
.map(Response::RejectedTransactionIds);
async move { rsp }.boxed()
}
Request::Queue(gossiped_txs) => {
let rsp: Vec<Result<(), MempoolError>> = gossiped_txs
.into_iter()
.map(|gossiped_tx| {
self.should_download_or_verify(gossiped_tx.id())?;
self.tx_downloads
.download_if_needed_and_verify(gossiped_tx)?;
Ok(())
})
.collect();
async move { Ok(Response::Queued(rsp)) }.boxed()
match &mut self.state {
State::Enabled {
storage,
tx_downloads,
} => match req {
Request::TransactionIds => {
let res = storage.tx_ids();
async move { Ok(Response::TransactionIds(res)) }.boxed()
}
Request::TransactionsById(ids) => {
let rsp = Ok(storage.transactions(ids)).map(Response::Transactions);
async move { rsp }.boxed()
}
Request::RejectedTransactionIds(ids) => {
let rsp = Ok(storage.rejected_transactions(ids))
.map(Response::RejectedTransactionIds);
async move { rsp }.boxed()
}
Request::Queue(gossiped_txs) => {
let rsp: Vec<Result<(), MempoolError>> = gossiped_txs
.into_iter()
.map(|gossiped_tx| {
Self::should_download_or_verify(storage, gossiped_tx.id())?;
tx_downloads.download_if_needed_and_verify(gossiped_tx)?;
Ok(())
})
.collect();
async move { Ok(Response::Queued(rsp)) }.boxed()
}
},
State::Disabled => {
// We can't return an error since that will cause a disconnection
// by the peer connection handler. Therefore, return successful
// empty responses.
let resp = match req {
Request::TransactionIds => Response::TransactionIds(Default::default()),
Request::TransactionsById(_) => Response::Transactions(Default::default()),
Request::RejectedTransactionIds(_) => {
Response::RejectedTransactionIds(Default::default())
}
// Special case; we can signal the error inside the response.
Request::Queue(gossiped_txs) => Response::Queued(
iter::repeat(Err(MempoolError::Disabled))
.take(gossiped_txs.len())
.collect(),
),
};
async move { Ok(resp) }.boxed()
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ where
Ok(())
}

/// Get the number of currently in-flight download tasks.
// Note: copied from zebrad/src/components/sync/downloads.rs
#[allow(dead_code)]
pub fn in_flight(&self) -> usize {
self.pending.len()
}

/// Check if transaction is already in the state.
async fn transaction_in_state(state: &mut ZS, txid: UnminedTxId) -> Result<(), BoxError> {
// Check if the transaction is already in the state.
Expand Down
3 changes: 3 additions & 0 deletions zebrad/src/components/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ pub enum MempoolError {
/// The queue's capacity is [`super::downloads::MAX_INBOUND_CONCURRENCY`].
#[error("transaction dropped because the queue is full")]
FullQueue,

#[error("mempool is disabled since synchronization is behind the chain tip")]
Disabled,
}
Loading