Skip to content

Commit

Permalink
[BlockSTM] Better naming for execution task (#8694)
Browse files Browse the repository at this point in the history
[BlockSTM] Better naming for execution task
  • Loading branch information
danielxiangzl authored Jun 16, 2023
1 parent 695bbac commit 903c684
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 67 deletions.
8 changes: 6 additions & 2 deletions aptos-move/aptos-vm-types/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ impl VMOutput {
}

// Try to materialize deltas and add them to the write set.
let (change_set, gas_used, status) = self.unpack_with_fee_statement();
let (change_set, fee_statement, status) = self.unpack_with_fee_statement();
let materialized_change_set = change_set.try_materialize(state_view)?;
Ok(VMOutput::new(materialized_change_set, gas_used, status))
Ok(VMOutput::new(
materialized_change_set,
fee_statement,
status,
))
}

/// Converts VMOutput into TransactionOutput which can be used by storage
Expand Down
34 changes: 18 additions & 16 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
TASK_VALIDATE_SECONDS, VM_INIT_SECONDS, WORK_WITH_TASK_SECONDS,
},
errors::*,
scheduler::{DependencyStatus, Scheduler, SchedulerTask, Wave},
scheduler::{DependencyStatus, ExecutionTaskType, Scheduler, SchedulerTask, Wave},
task::{ExecutionStatus, ExecutorTask, Transaction, TransactionOutput},
txn_last_input_output::TxnLastInputOutput,
view::{LatestView, MVHashMapView},
Expand Down Expand Up @@ -432,11 +432,11 @@ where

// Remark: When early halting the BlockSTM, we have to make sure the current / new tasks
// will be properly handled by the threads. For instance, it is possible that the committing
// thread holds an execution task from the last iteration, and then early halts the BlockSTM
// due to a txn execution abort. In this case, we cannot reset the scheduler_task of the
// committing thread (to be Done), otherwise some other pending thread waiting for the execution
// will be pending on read forever (since the halt logic let the execution task to wake up such
// pending task).
// thread holds an execution task of ExecutionTaskType::Wakeup(DependencyCondvar) for some
// other thread pending on the dependency conditional variable from the last iteration. If
// the committing thread early halts BlockSTM and resets its scheduler_task to be Done, the
// pending thread will be pending on read forever. In other words, we rely on the committing
// thread to wake up the pending execution thread, if the committing thread holds the Wakeup task.
}
}

Expand Down Expand Up @@ -539,16 +539,18 @@ where
versioned_cache,
scheduler,
),
SchedulerTask::ExecutionTask(version_to_execute, None) => self.execute(
version_to_execute,
block,
last_input_output,
versioned_cache,
scheduler,
&executor,
base_view,
),
SchedulerTask::ExecutionTask(_, Some(condvar)) => {
SchedulerTask::ExecutionTask(version_to_execute, ExecutionTaskType::Execution) => {
self.execute(
version_to_execute,
block,
last_input_output,
versioned_cache,
scheduler,
&executor,
base_view,
)
},
SchedulerTask::ExecutionTask(_, ExecutionTaskType::Wakeup(condvar)) => {
let (lock, cvar) = &*condvar;
// Mark dependency resolved.
*lock.lock() = DependencyStatus::Resolved;
Expand Down
83 changes: 51 additions & 32 deletions aptos-move/block-executor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,22 @@ pub enum DependencyResult {
ExecutionHalted,
}

/// Two types of execution tasks: Execution and Wakeup.
/// Execution is a normal execution task, Wakeup is a task that just wakes up a suspended execution.
/// See explanations for the ExecutionStatus below.
#[derive(Debug, Clone)]
pub enum ExecutionTaskType {
Execution,
Wakeup(DependencyCondvar),
}

/// A holder for potential task returned from the Scheduler. ExecutionTask and ValidationTask
/// each contain a version of transaction that must be executed or validated, respectively.
/// NoTask holds no task (similar None if we wrapped tasks in Option), and Done implies that
/// there are no more tasks and the scheduler is done.
#[derive(Debug)]
pub enum SchedulerTask {
ExecutionTask(Version, Option<DependencyCondvar>),
ExecutionTask(Version, ExecutionTaskType),
ValidationTask(Version, Wave),
NoTask,
Done,
Expand All @@ -56,21 +65,24 @@ pub enum SchedulerTask {
/// 'execution status' as 'status'. Each status contains the latest incarnation number,
/// where incarnation = i means it is the i-th execution instance of the transaction.
///
/// 'ReadyToExecute' means that the corresponding incarnation should be executed and the scheduler
/// 'Ready' means that the corresponding incarnation should be executed and the scheduler
/// must eventually create a corresponding execution task. The scheduler ensures that exactly one
/// execution task gets created, changing the status to 'Executing' in the process. If a dependency
/// condition variable is set, then an execution of a prior incarnation is waiting on it with
/// a read dependency resolved (when dependency was encountered, the status changed to Suspended,
/// and suspended changed to ReadyToExecute when the dependency finished its execution). In this case
/// the caller need not create a new execution task, but just notify the suspended execution.
/// execution task gets created, changing the status to 'Executing' in the process. 'Ready' status
/// contains an ExecutionTaskType, which is either Execution or Wakeup. If it is Execution, then
/// the scheduler creates an execution task for the corresponding incarnation. If it is Wakeup,
/// a dependency condition variable is set in ExecutionTaskType::Wakeup(DependencyCondvar): an execution
/// of a prior incarnation is waiting on it with a read dependency resolved (when dependency was
/// encountered, the status changed to Suspended, and suspended changed to Ready when the dependency
/// finished its execution). In this case the caller need not create a new execution task, but
/// just notify the suspended execution via the dependency condition variable.
///
/// 'Executing' status of an incarnation turns into 'Executed' if the execution task finishes, or
/// if a dependency is encountered, it becomes 'ReadyToExecute(incarnation + 1)' once the
/// if a dependency is encountered, it becomes 'Ready(incarnation + 1)' once the
/// dependency is resolved. An 'Executed' status allows creation of validation tasks for the
/// corresponding incarnation, and a validation failure leads to an abort. The scheduler ensures
/// that there is exactly one abort, changing the status to 'Aborting' in the process. Once the
/// thread that successfully aborted performs everything that's required, it sets the status
/// to 'ReadyToExecute(incarnation + 1)', allowing the scheduler to create an execution
/// to 'Ready(incarnation + 1)', allowing the scheduler to create an execution
/// task for the next incarnation of the transaction.
///
/// 'ExecutionHalted' is a transaction status marking that parallel execution is halted, due to
Expand All @@ -95,7 +107,7 @@ pub enum SchedulerTask {
///
#[derive(Debug)]
enum ExecutionStatus {
ReadyToExecute(Incarnation, Option<DependencyCondvar>),
Ready(Incarnation, ExecutionTaskType),
Executing(Incarnation),
Suspended(Incarnation, DependencyCondvar),
Executed(Incarnation),
Expand All @@ -108,7 +120,7 @@ impl PartialEq for ExecutionStatus {
fn eq(&self, other: &Self) -> bool {
use ExecutionStatus::*;
match (self, other) {
(&ReadyToExecute(ref a, _), &ReadyToExecute(ref b, _))
(&Ready(ref a, _), &Ready(ref b, _))
| (&Executing(ref a), &Executing(ref b))
| (&Suspended(ref a, _), &Suspended(ref b, _))
| (&Executed(ref a), &Executed(ref b))
Expand Down Expand Up @@ -208,7 +220,7 @@ pub struct Scheduler {
// validation/execution preferences stick to the worker threads).
/// A shared index that tracks the minimum of all transaction indices that require execution.
/// The threads increment the index and attempt to create an execution task for the corresponding
/// transaction, if the status of the txn is 'ReadyToExecute'. This implements a counting-based
/// transaction, if the status of the txn is 'Ready'. This implements a counting-based
/// concurrent ordered set. It is reduced as necessary when transactions become ready to be
/// executed, in particular, when execution finishes and dependencies are resolved.
execution_idx: AtomicU32,
Expand Down Expand Up @@ -242,7 +254,7 @@ impl Scheduler {
txn_status: (0..num_txns)
.map(|_| {
CachePadded::new((
RwLock::new(ExecutionStatus::ReadyToExecute(0, None)),
RwLock::new(ExecutionStatus::Ready(0, ExecutionTaskType::Execution)),
RwLock::new(ValidationStatus::new()),
))
})
Expand Down Expand Up @@ -368,10 +380,10 @@ impl Scheduler {
{
return SchedulerTask::ValidationTask(version_to_validate, wave);
}
} else if let Some((version_to_execute, maybe_condvar)) =
} else if let Some((version_to_execute, execution_task_type)) =
self.try_execute_next_version()
{
return SchedulerTask::ExecutionTask(version_to_execute, maybe_condvar);
return SchedulerTask::ExecutionTask(version_to_execute, execution_task_type);
}
}
}
Expand Down Expand Up @@ -468,7 +480,7 @@ impl Scheduler {
let min_dep = txn_deps
.into_iter()
.map(|dep| {
// Mark the status of dependencies as 'ReadyToExecute' since dependency on
// Mark the status of dependencies as 'Ready' since dependency on
// transaction txn_idx is now resolved.
self.resume(dep);

Expand Down Expand Up @@ -536,8 +548,11 @@ impl Scheduler {
// re-execution task back to the caller. If incarnation fails, there is
// nothing to do, as another thread must have succeeded to incarnate and
// obtain the task for re-execution.
if let Some((new_incarnation, maybe_condvar)) = self.try_incarnate(txn_idx) {
return SchedulerTask::ExecutionTask((txn_idx, new_incarnation), maybe_condvar);
if let Some((new_incarnation, execution_task_type)) = self.try_incarnate(txn_idx) {
return SchedulerTask::ExecutionTask(
(txn_idx, new_incarnation),
execution_task_type,
);
}
}

Expand Down Expand Up @@ -573,10 +588,10 @@ impl Scheduler {
pub fn resolve_condvar(&self, txn_idx: TxnIndex) {
let mut status = self.txn_status[txn_idx as usize].0.write();
{
// Only transactions with status Suspended or ReadyToExecute may have the condition variable of pending threads.
// Only transactions with status Suspended or Ready may have the condition variable of pending threads.
match &*status {
ExecutionStatus::Suspended(_, condvar)
| ExecutionStatus::ReadyToExecute(_, Some(condvar)) => {
| ExecutionStatus::Ready(_, ExecutionTaskType::Wakeup(condvar)) => {
let (lock, cvar) = &*(condvar.clone());
// Mark parallel execution halted due to reasons like module r/w intersection.
*lock.lock() = DependencyStatus::ExecutionHalted;
Expand Down Expand Up @@ -639,11 +654,11 @@ impl Scheduler {
}

/// Try and incarnate a transaction. Only possible when the status is
/// ReadyToExecute(incarnation), in which case Some(incarnation) is returned and the
/// Ready(incarnation), in which case Some(incarnation) is returned and the
/// status is (atomically, due to the mutex) updated to Executing(incarnation).
/// An unsuccessful incarnation returns None. Since incarnation numbers never decrease
/// for each transaction, incarnate function may not succeed more than once per version.
fn try_incarnate(&self, txn_idx: TxnIndex) -> Option<(Incarnation, Option<DependencyCondvar>)> {
fn try_incarnate(&self, txn_idx: TxnIndex) -> Option<(Incarnation, ExecutionTaskType)> {
if txn_idx >= self.num_txns {
return None;
}
Expand All @@ -652,8 +667,8 @@ impl Scheduler {
// However, it is likely an overkill (and overhead to actually upgrade),
// while unlikely there would be much contention on a specific index lock.
let mut status = self.txn_status[txn_idx as usize].0.write();
if let ExecutionStatus::ReadyToExecute(incarnation, maybe_condvar) = &*status {
let ret = (*incarnation, maybe_condvar.clone());
if let ExecutionStatus::Ready(incarnation, execution_task_type) = &*status {
let ret: (u32, ExecutionTaskType) = (*incarnation, (*execution_task_type).clone());
*status = ExecutionStatus::Executing(*incarnation);
Some(ret)
} else {
Expand Down Expand Up @@ -695,7 +710,7 @@ impl Scheduler {
let status = self.txn_status[txn_idx as usize].0.read();
matches!(
*status,
ExecutionStatus::ReadyToExecute(0, _)
ExecutionStatus::Ready(0, _)
| ExecutionStatus::Executing(0)
| ExecutionStatus::Suspended(0, _)
)
Expand Down Expand Up @@ -744,11 +759,11 @@ impl Scheduler {
/// Grab an index to try and execute next (by fetch-and-incrementing execution_idx).
/// - If the index is out of bounds, return None (and invoke a check of whether
/// all txns can be committed).
/// - If the transaction is ready for execution (ReadyToExecute state), attempt
/// - If the transaction is ready for execution (Ready state), attempt
/// to create the next incarnation (should happen exactly once), and if successful,
/// return the version to the caller for the corresponding ExecutionTask.
/// - Otherwise, return None.
fn try_execute_next_version(&self) -> Option<(Version, Option<DependencyCondvar>)> {
fn try_execute_next_version(&self) -> Option<(Version, ExecutionTaskType)> {
let idx_to_execute = self.execution_idx.fetch_add(1, Ordering::SeqCst);

if idx_to_execute >= self.num_txns {
Expand All @@ -758,7 +773,9 @@ impl Scheduler {
// If successfully incarnated (changed status from ready to executing),
// return version for execution task, otherwise None.
self.try_incarnate(idx_to_execute)
.map(|(incarnation, maybe_condvar)| ((idx_to_execute, incarnation), maybe_condvar))
.map(|(incarnation, execution_task_type)| {
((idx_to_execute, incarnation), execution_task_type)
})
}

/// Put a transaction in a suspended state, with a condition variable that can be
Expand All @@ -767,7 +784,6 @@ impl Scheduler {
/// Return false when the execution is halted.
fn suspend(&self, txn_idx: TxnIndex, dep_condvar: DependencyCondvar) -> bool {
let mut status = self.txn_status[txn_idx as usize].0.write();

match *status {
ExecutionStatus::Executing(incarnation) => {
*status = ExecutionStatus::Suspended(incarnation, dep_condvar);
Expand All @@ -778,7 +794,7 @@ impl Scheduler {
}
}

/// When a dependency is resolved, mark the transaction as ReadyToExecute with an
/// When a dependency is resolved, mark the transaction as Ready with an
/// incremented incarnation number.
/// The caller must ensure that the transaction is in the Suspended state.
fn resume(&self, txn_idx: TxnIndex) {
Expand All @@ -789,7 +805,10 @@ impl Scheduler {
}

if let ExecutionStatus::Suspended(incarnation, dep_condvar) = &*status {
*status = ExecutionStatus::ReadyToExecute(*incarnation, Some(dep_condvar.clone()));
*status = ExecutionStatus::Ready(
*incarnation,
ExecutionTaskType::Wakeup(dep_condvar.clone()),
);
} else {
unreachable!();
}
Expand Down Expand Up @@ -819,7 +838,7 @@ impl Scheduler {

// Only makes sense when the current status is 'Aborting'.
debug_assert!(*status == ExecutionStatus::Aborting(incarnation));
*status = ExecutionStatus::ReadyToExecute(incarnation + 1, None);
*status = ExecutionStatus::Ready(incarnation + 1, ExecutionTaskType::Execution);
}

/// Checks whether the done marker is set. The marker can only be set by 'try_commit'.
Expand Down
Loading

0 comments on commit 903c684

Please sign in to comment.