Skip to content

Commit

Permalink
Revert "Fix poll_ready usage in ChainVerifier (#1700)"
Browse files Browse the repository at this point in the history
This reverts commit 0723ac5.
  • Loading branch information
teor2345 authored Feb 15, 2021
1 parent 21dbf5c commit 7168068
Showing 1 changed file with 30 additions and 54 deletions.
84 changes: 30 additions & 54 deletions zebra-consensus/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,29 @@ use zebra_state as zs;

use crate::{
block::BlockVerifier,
checkpoint::{CheckpointList, CheckpointVerifier},
block::VerifyBlockError,
checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError},
BoxError, Config,
};

/// The bound for each verifier's buffer.
///
/// We choose the verifier buffer bound based on the maximum number of
/// concurrent verifier users, to avoid contention:
/// - the `ChainSync` component
/// - the `Inbound` service
/// - a miner component, which we might add in future, and
/// - 1 extra slot to avoid contention.
///
/// We deliberately add extra slots, because they only cost a small amount of
/// memory, but missing slots can significantly slow down Zebra.
const VERIFIER_BUFFER_BOUND: usize = 4;

/// The chain verifier routes requests to either the checkpoint verifier or the
/// block verifier, depending on the maximum checkpoint height.
struct ChainVerifier<S>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
// Normally, we erase the types on buffer-wrapped services.
// But if we did that here, the block and checkpoint services would be
// type-indistinguishable, risking future substitution errors.
block_verifier: Buffer<BlockVerifier<S>, Arc<block::Block>>,
checkpoint_verifier: Buffer<CheckpointVerifier<S>, Arc<block::Block>>,
block: BlockVerifier<S>,
checkpoint: CheckpointVerifier<S>,
max_checkpoint_height: block::Height,
}

#[derive(Debug, Display, Error)]
pub enum VerifyChainError {
/// block could not be checkpointed
Checkpoint(#[source] BoxError),
Checkpoint(#[source] VerifyCheckpointError),
/// block could not be verified
Block(#[source] BoxError),
Block(#[source] VerifyBlockError),
}

impl<S> Service<Arc<Block>> for ChainVerifier<S>
Expand All @@ -72,34 +57,30 @@ where
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Correctness:
//
// We can't call `poll_ready` on the block and checkpoint verifiers here,
// because each `poll_ready` must be followed by a `call`, and we don't
// know which verifier we're going to choose yet.
// See #1593 for details.
Poll::Ready(Ok(()))
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match (self.checkpoint.poll_ready(cx), self.block.poll_ready(cx)) {
// First, fail if either service fails.
(Poll::Ready(Err(e)), _) => Poll::Ready(Err(VerifyChainError::Checkpoint(e))),
(_, Poll::Ready(Err(e))) => Poll::Ready(Err(VerifyChainError::Block(e))),
// Second, we're unready if either service is unready.
(Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending,
// Finally, we're ready if both services are ready and OK.
(Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
}
}

fn call(&mut self, block: Arc<Block>) -> Self::Future {
match block.coinbase_height() {
// Correctness:
//
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has
// a matching `call`. See #1593 for details.
Some(height) if height <= self.max_checkpoint_height => self
.checkpoint_verifier
.clone()
.oneshot(block)
.checkpoint
.call(block)
.map_err(VerifyChainError::Checkpoint)
.boxed(),
// This also covers blocks with no height, which the block verifier
// will reject immediately.
_ => self
.block_verifier
.clone()
.oneshot(block)
.block
.call(block)
.map_err(VerifyChainError::Block)
.boxed(),
}
Expand All @@ -122,7 +103,7 @@ where
pub async fn init<S>(
config: Config,
network: Network,
state_service: S,
mut state_service: S,
) -> Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
Expand All @@ -137,13 +118,11 @@ where
.expect("hardcoded checkpoint list extends past sapling activation")
};

// Correctness:
//
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has a
// matching `call`. See #1593 for details.
let tip = match state_service
.clone()
.oneshot(zs::Request::Tip)
.ready_and()
.await
.unwrap()
.call(zs::Request::Tip)
.await
.unwrap()
{
Expand All @@ -152,18 +131,15 @@ where
};
tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier");

let block_verifier = BlockVerifier::new(network, state_service.clone());
let checkpoint_verifier = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);

let block_verifier = Buffer::new(block_verifier, VERIFIER_BUFFER_BOUND);
let checkpoint_verifier = Buffer::new(checkpoint_verifier, VERIFIER_BUFFER_BOUND);
let block = BlockVerifier::new(network, state_service.clone());
let checkpoint = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);

Buffer::new(
BoxService::new(ChainVerifier {
block_verifier,
checkpoint_verifier,
block,
checkpoint,
max_checkpoint_height,
}),
VERIFIER_BUFFER_BOUND,
3,
)
}

0 comments on commit 7168068

Please sign in to comment.