diff --git a/consensus/src/block_preparer.rs b/consensus/src/block_preparer.rs index ca92f1b835e15..1c6a88965fc29 100644 --- a/consensus/src/block_preparer.rs +++ b/consensus/src/block_preparer.rs @@ -11,6 +11,7 @@ use crate::{ use aptos_consensus_types::block::Block; use aptos_executor_types::ExecutorResult; use aptos_types::transaction::SignedTransaction; +use fail::fail_point; use std::sync::Arc; pub struct BlockPreparer { @@ -36,6 +37,12 @@ impl BlockPreparer { } pub async fn prepare_block(&self, block: &Block) -> ExecutorResult> { + fail_point!("consensus::prepare_block", |_| { + use aptos_executor_types::ExecutorError; + use std::{thread, time::Duration}; + thread::sleep(Duration::from_millis(10)); + Err(ExecutorError::CouldNotGetData) + }); let (txns, max_txns_from_block_to_execute) = self.payload_manager.get_transactions(block).await?; let txn_filter = self.txn_filter.clone(); diff --git a/consensus/src/pipeline/buffer_manager.rs b/consensus/src/pipeline/buffer_manager.rs index c01d960917f5b..347bf15d39d9d 100644 --- a/consensus/src/pipeline/buffer_manager.rs +++ b/consensus/src/pipeline/buffer_manager.rs @@ -118,6 +118,11 @@ pub struct BufferManager { block_rx: UnboundedReceiver, reset_rx: UnboundedReceiver, + + // self channel to retry execution schedule phase + execution_schedule_retry_tx: UnboundedSender<()>, + execution_schedule_retry_rx: UnboundedReceiver<()>, + stop: bool, epoch_state: Arc, @@ -172,6 +177,9 @@ impl BufferManager { let rb_backoff_policy = ExponentialBackoff::from_millis(2) .factor(50) .max_delay(Duration::from_secs(5)); + + let (tx, rx) = unbounded(); + Self { author, @@ -203,6 +211,10 @@ impl BufferManager { block_rx, reset_rx, + + execution_schedule_retry_tx: tx, + execution_schedule_retry_rx: rx, + stop: false, epoch_state, @@ -294,33 +306,27 @@ impl BufferManager { /// Set the execution root to the first not executed item (Ordered) and send execution request /// Set to None if not exist - async fn advance_execution_root(&mut self) { + /// Return Some(block_id) if the block needs to be scheduled for retry + fn advance_execution_root(&mut self) -> Option { let cursor = self.execution_root; self.execution_root = self .buffer .find_elem_from(cursor.or_else(|| *self.buffer.head_cursor()), |item| { item.is_ordered() }); - info!( - "Advance execution root from {:?} to {:?}", - cursor, self.execution_root - ); if self.execution_root.is_some() && cursor == self.execution_root { // Schedule retry. - // NOTE: probably should schedule retry for all ordered blocks, but since execution error - // is not expected nor retryable in reality, I'd rather remove retrying or do it more - // properly than complicating it here. - let ordered_blocks = self.buffer.get(&self.execution_root).get_blocks().clone(); - let request = self.create_new_request(ExecutionRequest { - ordered_blocks, - lifetime_guard: self.create_new_request(()), - }); - let sender = self.execution_schedule_phase_tx.clone(); - Self::spawn_retry_request(sender, request, Duration::from_millis(100)); + self.execution_root + } else { + info!( + "Advance execution root from {:?} to {:?}", + cursor, self.execution_root + ); + // Otherwise do nothing, because the execution wait phase is driven by the response of + // the execution schedule phase, which is in turn fed as soon as the ordered blocks + // come in. + None } - // Otherwise do nothing, because the execution wait phase is driven by the response of - // the execution schedule phase, which is in turn fed as soon as the ordered blocks - // come in. } /// Set the signing root to the first not signed item (Executed) and send execution request @@ -465,6 +471,28 @@ impl BufferManager { .expect("Failed to send execution wait request."); } + async fn retry_schedule_phase(&mut self) { + let mut cursor = self.execution_root; + let mut count = 0; + while cursor.is_some() { + let ordered_blocks = self.buffer.get(&cursor).get_blocks().clone(); + let request = self.create_new_request(ExecutionRequest { + ordered_blocks, + lifetime_guard: self.create_new_request(()), + }); + count += 1; + self.execution_schedule_phase_tx + .send(request) + .await + .expect("Failed to send execution schedule request."); + cursor = self.buffer.get_next(&cursor); + } + info!( + "Reschedule {} execution requests from {:?}", + count, self.execution_root + ); + } + /// If the response is successful, advance the item to Executed, otherwise panic (TODO fix). async fn process_execution_response(&mut self, response: ExecutionResponse) { let ExecutionResponse { block_id, inner } = response; @@ -477,11 +505,15 @@ impl BufferManager { let executed_blocks = match inner { Ok(result) => result, Err(ExecutorError::CouldNotGetData) => { - warn!("Execution error - CouldNotGetData"); + warn!("Execution error - CouldNotGetData {}", block_id); + return; + }, + Err(ExecutorError::BlockNotFound(block_id)) => { + warn!("Execution error BlockNotFound {}", block_id); return; }, Err(e) => { - error!("Execution error {:?}", e); + error!("Execution error {:?} for {}", e, block_id); return; }, }; @@ -771,7 +803,7 @@ impl BufferManager { monitor!("buffer_manager_process_ordered", { self.process_ordered_blocks(blocks).await; if self.execution_root.is_none() { - self.advance_execution_root().await; + self.advance_execution_root(); }}); }, reset_event = self.reset_rx.select_next_some() => { @@ -784,12 +816,27 @@ impl BufferManager { })}, response = self.execution_wait_phase_rx.select_next_some() => { monitor!("buffer_manager_process_execution_wait_response", { + let response_block_id = response.block_id; self.process_execution_response(response).await; - self.advance_execution_root().await; + if let Some(block_id) = self.advance_execution_root() { + // if the response is for the current execution root, retry the schedule phase + if response_block_id == block_id { + let mut tx = self.execution_schedule_retry_tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(100)).await; + // buffer manager can be dropped at the point of sending retry + let _ = tx.send(()).await; + }); + } + } if self.signing_root.is_none() { self.advance_signing_root().await; }}); }, + _ = self.execution_schedule_retry_rx.select_next_some() => { + monitor!("buffer_manager_process_execution_schedule_retry", + self.retry_schedule_phase().await); + }, response = self.signing_phase_rx.select_next_some() => { monitor!("buffer_manager_process_signing_response", { self.process_signing_response(response).await; @@ -801,7 +848,7 @@ impl BufferManager { if let Some(aggregated_block_id) = self.process_commit_message(rpc_request) { self.advance_head(aggregated_block_id).await; if self.execution_root.is_none() { - self.advance_execution_root().await; + self.advance_execution_root(); } if self.signing_root.is_none() { self.advance_signing_root().await; diff --git a/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs b/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs index 83718c4b2ab52..e8ff9d31564ca 100644 --- a/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs +++ b/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs @@ -232,6 +232,47 @@ async fn test_ordered_only_cert() { .unwrap(); } +#[tokio::test] +async fn test_execution_retry() { + let num_validators = 4; + + let mut swarm = create_swarm(num_validators, 1).await; + + test_consensus_fault_tolerance( + &mut swarm, + 3, + 5.0, + 1, + Box::new(FailPointFailureInjection::new(Box::new(move |cycle, _| { + ( + vec![( + cycle % num_validators, + "consensus::prepare_block".to_string(), + format!("{}%return", 50), + )], + true, + ) + }))), + Box::new(move |_, _, executed_rounds, executed_transactions, _, _| { + assert!( + executed_transactions >= 4, + "no progress with active consensus, only {} transactions", + executed_transactions + ); + assert!( + executed_rounds >= 1, + "no progress with active consensus, only {} rounds", + executed_rounds + ); + Ok(()) + }), + true, + false, + ) + .await + .unwrap(); +} + #[tokio::test] async fn test_fault_tolerance_of_network_send() { // Randomly increase network failure rate, until network halts, and check that it comes back afterwards.