Skip to content

Commit

Permalink
[pipeline] better retry for execution error
Browse files Browse the repository at this point in the history
  • Loading branch information
Zekun Li authored and zekun000 committed Jun 11, 2024
1 parent 142c5c3 commit 3762d21
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 23 deletions.
7 changes: 7 additions & 0 deletions consensus/src/block_preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
use aptos_consensus_types::block::Block;
use aptos_executor_types::ExecutorResult;
use aptos_types::transaction::SignedTransaction;
use fail::fail_point;
use std::sync::Arc;

pub struct BlockPreparer {
Expand All @@ -36,6 +37,12 @@ impl BlockPreparer {
}

pub async fn prepare_block(&self, block: &Block) -> ExecutorResult<Vec<SignedTransaction>> {
fail_point!("consensus::prepare_block", |_| {
use aptos_executor_types::ExecutorError;
use std::{thread, time::Duration};
thread::sleep(Duration::from_millis(10));
Err(ExecutorError::CouldNotGetData)
});
let (txns, max_txns_from_block_to_execute) =
self.payload_manager.get_transactions(block).await?;
let txn_filter = self.txn_filter.clone();
Expand Down
93 changes: 70 additions & 23 deletions consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ pub struct BufferManager {

block_rx: UnboundedReceiver<OrderedBlocks>,
reset_rx: UnboundedReceiver<ResetRequest>,

// self channel to retry execution schedule phase
execution_schedule_retry_tx: UnboundedSender<()>,
execution_schedule_retry_rx: UnboundedReceiver<()>,

stop: bool,

epoch_state: Arc<EpochState>,
Expand Down Expand Up @@ -172,6 +177,9 @@ impl BufferManager {
let rb_backoff_policy = ExponentialBackoff::from_millis(2)
.factor(50)
.max_delay(Duration::from_secs(5));

let (tx, rx) = unbounded();

Self {
author,

Expand Down Expand Up @@ -203,6 +211,10 @@ impl BufferManager {

block_rx,
reset_rx,

execution_schedule_retry_tx: tx,
execution_schedule_retry_rx: rx,

stop: false,

epoch_state,
Expand Down Expand Up @@ -294,33 +306,27 @@ impl BufferManager {

/// Set the execution root to the first not executed item (Ordered) and send execution request
/// Set to None if not exist
async fn advance_execution_root(&mut self) {
/// Return Some(block_id) if the block needs to be scheduled for retry
fn advance_execution_root(&mut self) -> Option<HashValue> {
let cursor = self.execution_root;
self.execution_root = self
.buffer
.find_elem_from(cursor.or_else(|| *self.buffer.head_cursor()), |item| {
item.is_ordered()
});
info!(
"Advance execution root from {:?} to {:?}",
cursor, self.execution_root
);
if self.execution_root.is_some() && cursor == self.execution_root {
// Schedule retry.
// NOTE: probably should schedule retry for all ordered blocks, but since execution error
// is not expected nor retryable in reality, I'd rather remove retrying or do it more
// properly than complicating it here.
let ordered_blocks = self.buffer.get(&self.execution_root).get_blocks().clone();
let request = self.create_new_request(ExecutionRequest {
ordered_blocks,
lifetime_guard: self.create_new_request(()),
});
let sender = self.execution_schedule_phase_tx.clone();
Self::spawn_retry_request(sender, request, Duration::from_millis(100));
self.execution_root
} else {
info!(
"Advance execution root from {:?} to {:?}",
cursor, self.execution_root
);
// Otherwise do nothing, because the execution wait phase is driven by the response of
// the execution schedule phase, which is in turn fed as soon as the ordered blocks
// come in.
None
}
// Otherwise do nothing, because the execution wait phase is driven by the response of
// the execution schedule phase, which is in turn fed as soon as the ordered blocks
// come in.
}

/// Set the signing root to the first not signed item (Executed) and send execution request
Expand Down Expand Up @@ -465,6 +471,28 @@ impl BufferManager {
.expect("Failed to send execution wait request.");
}

async fn retry_schedule_phase(&mut self) {
let mut cursor = self.execution_root;
let mut count = 0;
while cursor.is_some() {
let ordered_blocks = self.buffer.get(&cursor).get_blocks().clone();
let request = self.create_new_request(ExecutionRequest {
ordered_blocks,
lifetime_guard: self.create_new_request(()),
});
count += 1;
self.execution_schedule_phase_tx
.send(request)
.await
.expect("Failed to send execution schedule request.");
cursor = self.buffer.get_next(&cursor);
}
info!(
"Reschedule {} execution requests from {:?}",
count, self.execution_root
);
}

/// If the response is successful, advance the item to Executed, otherwise panic (TODO fix).
async fn process_execution_response(&mut self, response: ExecutionResponse) {
let ExecutionResponse { block_id, inner } = response;
Expand All @@ -477,11 +505,15 @@ impl BufferManager {
let executed_blocks = match inner {
Ok(result) => result,
Err(ExecutorError::CouldNotGetData) => {
warn!("Execution error - CouldNotGetData");
warn!("Execution error - CouldNotGetData {}", block_id);
return;
},
Err(ExecutorError::BlockNotFound(block_id)) => {
warn!("Execution error BlockNotFound {}", block_id);
return;
},
Err(e) => {
error!("Execution error {:?}", e);
error!("Execution error {:?} for {}", e, block_id);
return;
},
};
Expand Down Expand Up @@ -771,7 +803,7 @@ impl BufferManager {
monitor!("buffer_manager_process_ordered", {
self.process_ordered_blocks(blocks).await;
if self.execution_root.is_none() {
self.advance_execution_root().await;
self.advance_execution_root();
}});
},
reset_event = self.reset_rx.select_next_some() => {
Expand All @@ -784,12 +816,27 @@ impl BufferManager {
})},
response = self.execution_wait_phase_rx.select_next_some() => {
monitor!("buffer_manager_process_execution_wait_response", {
let response_block_id = response.block_id;
self.process_execution_response(response).await;
self.advance_execution_root().await;
if let Some(block_id) = self.advance_execution_root() {
// if the response is for the current execution root, retry the schedule phase
if response_block_id == block_id {
let mut tx = self.execution_schedule_retry_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
// buffer manager can be dropped at the point of sending retry
let _ = tx.send(()).await;
});
}
}
if self.signing_root.is_none() {
self.advance_signing_root().await;
}});
},
_ = self.execution_schedule_retry_rx.select_next_some() => {
monitor!("buffer_manager_process_execution_schedule_retry",
self.retry_schedule_phase().await);
},
response = self.signing_phase_rx.select_next_some() => {
monitor!("buffer_manager_process_signing_response", {
self.process_signing_response(response).await;
Expand All @@ -801,7 +848,7 @@ impl BufferManager {
if let Some(aggregated_block_id) = self.process_commit_message(rpc_request) {
self.advance_head(aggregated_block_id).await;
if self.execution_root.is_none() {
self.advance_execution_root().await;
self.advance_execution_root();
}
if self.signing_root.is_none() {
self.advance_signing_root().await;
Expand Down
41 changes: 41 additions & 0 deletions testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,47 @@ async fn test_ordered_only_cert() {
.unwrap();
}

#[tokio::test]
async fn test_execution_retry() {
let num_validators = 4;

let mut swarm = create_swarm(num_validators, 1).await;

test_consensus_fault_tolerance(
&mut swarm,
3,
5.0,
1,
Box::new(FailPointFailureInjection::new(Box::new(move |cycle, _| {
(
vec![(
cycle % num_validators,
"consensus::prepare_block".to_string(),
format!("{}%return", 50),
)],
true,
)
}))),
Box::new(move |_, _, executed_rounds, executed_transactions, _, _| {
assert!(
executed_transactions >= 4,
"no progress with active consensus, only {} transactions",
executed_transactions
);
assert!(
executed_rounds >= 1,
"no progress with active consensus, only {} rounds",
executed_rounds
);
Ok(())
}),
true,
false,
)
.await
.unwrap();
}

#[tokio::test]
async fn test_fault_tolerance_of_network_send() {
// Randomly increase network failure rate, until network halts, and check that it comes back afterwards.
Expand Down

0 comments on commit 3762d21

Please sign in to comment.