Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BlockSTM] Better naming for execution task #8694

Merged
merged 5 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions aptos-move/aptos-vm-types/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ impl VMOutput {
}

// Try to materialize deltas and add them to the write set.
let (change_set, gas_used, status) = self.unpack_with_fee_statement();
let (change_set, fee_statement, status) = self.unpack_with_fee_statement();
let materialized_change_set = change_set.try_materialize(state_view)?;
Ok(VMOutput::new(materialized_change_set, gas_used, status))
Ok(VMOutput::new(
materialized_change_set,
fee_statement,
status,
))
}

/// Converts VMOutput into TransactionOutput which can be used by storage
Expand Down
34 changes: 18 additions & 16 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
TASK_VALIDATE_SECONDS, VM_INIT_SECONDS, WORK_WITH_TASK_SECONDS,
},
errors::*,
scheduler::{DependencyStatus, Scheduler, SchedulerTask, Wave},
scheduler::{DependencyStatus, ExecutionTaskType, Scheduler, SchedulerTask, Wave},
task::{ExecutionStatus, ExecutorTask, Transaction, TransactionOutput},
txn_last_input_output::TxnLastInputOutput,
view::{LatestView, MVHashMapView},
Expand Down Expand Up @@ -432,11 +432,11 @@ where

// Remark: When early halting the BlockSTM, we have to make sure the current / new tasks
// will be properly handled by the threads. For instance, it is possible that the committing
// thread holds an execution task from the last iteration, and then early halts the BlockSTM
// due to a txn execution abort. In this case, we cannot reset the scheduler_task of the
// committing thread (to be Done), otherwise some other pending thread waiting for the execution
// will be pending on read forever (since the halt logic let the execution task to wake up such
// pending task).
// thread holds an execution task of ExecutionTaskType::Wakeup(DependencyCondvar) for some
// other thread pending on the dependency conditional variable from the last iteration. If
// the committing thread early halts BlockSTM and resets its scheduler_task to be Done, the
// pending thread will be pending on read forever. In other words, we rely on the committing
// thread to wake up the pending execution thread, if the committing thread holds the Wakeup task.
}
}

Expand Down Expand Up @@ -539,16 +539,18 @@ where
versioned_cache,
scheduler,
),
SchedulerTask::ExecutionTask(version_to_execute, None) => self.execute(
version_to_execute,
block,
last_input_output,
versioned_cache,
scheduler,
&executor,
base_view,
),
SchedulerTask::ExecutionTask(_, Some(condvar)) => {
SchedulerTask::ExecutionTask(version_to_execute, ExecutionTaskType::Execution) => {
self.execute(
version_to_execute,
block,
last_input_output,
versioned_cache,
scheduler,
&executor,
base_view,
)
},
SchedulerTask::ExecutionTask(_, ExecutionTaskType::Wakeup(condvar)) => {
let (lock, cvar) = &*condvar;
// Mark dependency resolved.
*lock.lock() = DependencyStatus::Resolved;
Expand Down
83 changes: 51 additions & 32 deletions aptos-move/block-executor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,22 @@ pub enum DependencyResult {
ExecutionHalted,
}

/// Two types of execution tasks: Execution and Wakeup.
/// Execution is a normal execution task, Wakeup is a task that just wakes up a suspended execution.
/// See explanations for the ExecutionStatus below.
#[derive(Debug, Clone)]
pub enum ExecutionTaskType {
Execution,
Wakeup(DependencyCondvar),
}

/// A holder for potential task returned from the Scheduler. ExecutionTask and ValidationTask
/// each contain a version of transaction that must be executed or validated, respectively.
/// NoTask holds no task (similar None if we wrapped tasks in Option), and Done implies that
/// there are no more tasks and the scheduler is done.
#[derive(Debug)]
pub enum SchedulerTask {
ExecutionTask(Version, Option<DependencyCondvar>),
ExecutionTask(Version, ExecutionTaskType),
ValidationTask(Version, Wave),
NoTask,
Done,
Expand All @@ -56,21 +65,24 @@ pub enum SchedulerTask {
/// 'execution status' as 'status'. Each status contains the latest incarnation number,
/// where incarnation = i means it is the i-th execution instance of the transaction.
///
/// 'ReadyToExecute' means that the corresponding incarnation should be executed and the scheduler
/// 'Ready' means that the corresponding incarnation should be executed and the scheduler
/// must eventually create a corresponding execution task. The scheduler ensures that exactly one
/// execution task gets created, changing the status to 'Executing' in the process. If a dependency
/// condition variable is set, then an execution of a prior incarnation is waiting on it with
/// a read dependency resolved (when dependency was encountered, the status changed to Suspended,
/// and suspended changed to ReadyToExecute when the dependency finished its execution). In this case
/// the caller need not create a new execution task, but just notify the suspended execution.
/// execution task gets created, changing the status to 'Executing' in the process. 'Ready' status
/// contains an ExecutionTaskType, which is either Execution or Wakeup. If it is Execution, then
/// the scheduler creates an execution task for the corresponding incarnation. If it is Wakeup,
/// a dependency condition variable is set in ExecutionTaskType::Wakeup(DependencyCondvar): an execution
/// of a prior incarnation is waiting on it with a read dependency resolved (when dependency was
/// encountered, the status changed to Suspended, and suspended changed to Ready when the dependency
/// finished its execution). In this case the caller need not create a new execution task, but
/// just notify the suspended execution via the dependency condition variable.
///
/// 'Executing' status of an incarnation turns into 'Executed' if the execution task finishes, or
/// if a dependency is encountered, it becomes 'ReadyToExecute(incarnation + 1)' once the
/// if a dependency is encountered, it becomes 'Ready(incarnation + 1)' once the
/// dependency is resolved. An 'Executed' status allows creation of validation tasks for the
/// corresponding incarnation, and a validation failure leads to an abort. The scheduler ensures
/// that there is exactly one abort, changing the status to 'Aborting' in the process. Once the
/// thread that successfully aborted performs everything that's required, it sets the status
/// to 'ReadyToExecute(incarnation + 1)', allowing the scheduler to create an execution
/// to 'Ready(incarnation + 1)', allowing the scheduler to create an execution
/// task for the next incarnation of the transaction.
///
/// 'ExecutionHalted' is a transaction status marking that parallel execution is halted, due to
Expand All @@ -95,7 +107,7 @@ pub enum SchedulerTask {
///
#[derive(Debug)]
enum ExecutionStatus {
ReadyToExecute(Incarnation, Option<DependencyCondvar>),
Ready(Incarnation, ExecutionTaskType),
Executing(Incarnation),
Suspended(Incarnation, DependencyCondvar),
Executed(Incarnation),
Expand All @@ -108,7 +120,7 @@ impl PartialEq for ExecutionStatus {
fn eq(&self, other: &Self) -> bool {
use ExecutionStatus::*;
match (self, other) {
(&ReadyToExecute(ref a, _), &ReadyToExecute(ref b, _))
(&Ready(ref a, _), &Ready(ref b, _))
| (&Executing(ref a), &Executing(ref b))
| (&Suspended(ref a, _), &Suspended(ref b, _))
| (&Executed(ref a), &Executed(ref b))
Expand Down Expand Up @@ -208,7 +220,7 @@ pub struct Scheduler {
// validation/execution preferences stick to the worker threads).
/// A shared index that tracks the minimum of all transaction indices that require execution.
/// The threads increment the index and attempt to create an execution task for the corresponding
/// transaction, if the status of the txn is 'ReadyToExecute'. This implements a counting-based
/// transaction, if the status of the txn is 'Ready'. This implements a counting-based
/// concurrent ordered set. It is reduced as necessary when transactions become ready to be
/// executed, in particular, when execution finishes and dependencies are resolved.
execution_idx: AtomicU32,
Expand Down Expand Up @@ -242,7 +254,7 @@ impl Scheduler {
txn_status: (0..num_txns)
.map(|_| {
CachePadded::new((
RwLock::new(ExecutionStatus::ReadyToExecute(0, None)),
RwLock::new(ExecutionStatus::Ready(0, ExecutionTaskType::Execution)),
RwLock::new(ValidationStatus::new()),
))
})
Expand Down Expand Up @@ -368,10 +380,10 @@ impl Scheduler {
{
return SchedulerTask::ValidationTask(version_to_validate, wave);
}
} else if let Some((version_to_execute, maybe_condvar)) =
} else if let Some((version_to_execute, execution_task_type)) =
self.try_execute_next_version()
{
return SchedulerTask::ExecutionTask(version_to_execute, maybe_condvar);
return SchedulerTask::ExecutionTask(version_to_execute, execution_task_type);
}
}
}
Expand Down Expand Up @@ -468,7 +480,7 @@ impl Scheduler {
let min_dep = txn_deps
.into_iter()
.map(|dep| {
// Mark the status of dependencies as 'ReadyToExecute' since dependency on
// Mark the status of dependencies as 'Ready' since dependency on
// transaction txn_idx is now resolved.
self.resume(dep);

Expand Down Expand Up @@ -536,8 +548,11 @@ impl Scheduler {
// re-execution task back to the caller. If incarnation fails, there is
// nothing to do, as another thread must have succeeded to incarnate and
// obtain the task for re-execution.
if let Some((new_incarnation, maybe_condvar)) = self.try_incarnate(txn_idx) {
return SchedulerTask::ExecutionTask((txn_idx, new_incarnation), maybe_condvar);
if let Some((new_incarnation, execution_task_type)) = self.try_incarnate(txn_idx) {
return SchedulerTask::ExecutionTask(
(txn_idx, new_incarnation),
execution_task_type,
);
}
}

Expand Down Expand Up @@ -573,10 +588,10 @@ impl Scheduler {
pub fn resolve_condvar(&self, txn_idx: TxnIndex) {
let mut status = self.txn_status[txn_idx as usize].0.write();
{
// Only transactions with status Suspended or ReadyToExecute may have the condition variable of pending threads.
// Only transactions with status Suspended or Ready may have the condition variable of pending threads.
match &*status {
ExecutionStatus::Suspended(_, condvar)
| ExecutionStatus::ReadyToExecute(_, Some(condvar)) => {
| ExecutionStatus::Ready(_, ExecutionTaskType::Wakeup(condvar)) => {
let (lock, cvar) = &*(condvar.clone());
// Mark parallel execution halted due to reasons like module r/w intersection.
*lock.lock() = DependencyStatus::ExecutionHalted;
Expand Down Expand Up @@ -639,11 +654,11 @@ impl Scheduler {
}

/// Try and incarnate a transaction. Only possible when the status is
/// ReadyToExecute(incarnation), in which case Some(incarnation) is returned and the
/// Ready(incarnation), in which case Some(incarnation) is returned and the
/// status is (atomically, due to the mutex) updated to Executing(incarnation).
/// An unsuccessful incarnation returns None. Since incarnation numbers never decrease
/// for each transaction, incarnate function may not succeed more than once per version.
fn try_incarnate(&self, txn_idx: TxnIndex) -> Option<(Incarnation, Option<DependencyCondvar>)> {
fn try_incarnate(&self, txn_idx: TxnIndex) -> Option<(Incarnation, ExecutionTaskType)> {
if txn_idx >= self.num_txns {
return None;
}
Expand All @@ -652,8 +667,8 @@ impl Scheduler {
// However, it is likely an overkill (and overhead to actually upgrade),
// while unlikely there would be much contention on a specific index lock.
let mut status = self.txn_status[txn_idx as usize].0.write();
if let ExecutionStatus::ReadyToExecute(incarnation, maybe_condvar) = &*status {
let ret = (*incarnation, maybe_condvar.clone());
if let ExecutionStatus::Ready(incarnation, execution_task_type) = &*status {
let ret: (u32, ExecutionTaskType) = (*incarnation, (*execution_task_type).clone());
*status = ExecutionStatus::Executing(*incarnation);
Some(ret)
} else {
Expand Down Expand Up @@ -695,7 +710,7 @@ impl Scheduler {
let status = self.txn_status[txn_idx as usize].0.read();
matches!(
*status,
ExecutionStatus::ReadyToExecute(0, _)
ExecutionStatus::Ready(0, _)
| ExecutionStatus::Executing(0)
| ExecutionStatus::Suspended(0, _)
)
Expand Down Expand Up @@ -744,11 +759,11 @@ impl Scheduler {
/// Grab an index to try and execute next (by fetch-and-incrementing execution_idx).
/// - If the index is out of bounds, return None (and invoke a check of whether
/// all txns can be committed).
/// - If the transaction is ready for execution (ReadyToExecute state), attempt
/// - If the transaction is ready for execution (Ready state), attempt
/// to create the next incarnation (should happen exactly once), and if successful,
/// return the version to the caller for the corresponding ExecutionTask.
/// - Otherwise, return None.
fn try_execute_next_version(&self) -> Option<(Version, Option<DependencyCondvar>)> {
fn try_execute_next_version(&self) -> Option<(Version, ExecutionTaskType)> {
let idx_to_execute = self.execution_idx.fetch_add(1, Ordering::SeqCst);

if idx_to_execute >= self.num_txns {
Expand All @@ -758,7 +773,9 @@ impl Scheduler {
// If successfully incarnated (changed status from ready to executing),
// return version for execution task, otherwise None.
self.try_incarnate(idx_to_execute)
.map(|(incarnation, maybe_condvar)| ((idx_to_execute, incarnation), maybe_condvar))
.map(|(incarnation, execution_task_type)| {
((idx_to_execute, incarnation), execution_task_type)
})
}

/// Put a transaction in a suspended state, with a condition variable that can be
Expand All @@ -767,7 +784,6 @@ impl Scheduler {
/// Return false when the execution is halted.
fn suspend(&self, txn_idx: TxnIndex, dep_condvar: DependencyCondvar) -> bool {
let mut status = self.txn_status[txn_idx as usize].0.write();

match *status {
ExecutionStatus::Executing(incarnation) => {
*status = ExecutionStatus::Suspended(incarnation, dep_condvar);
Expand All @@ -778,7 +794,7 @@ impl Scheduler {
}
}

/// When a dependency is resolved, mark the transaction as ReadyToExecute with an
/// When a dependency is resolved, mark the transaction as Ready with an
/// incremented incarnation number.
/// The caller must ensure that the transaction is in the Suspended state.
fn resume(&self, txn_idx: TxnIndex) {
Expand All @@ -789,7 +805,10 @@ impl Scheduler {
}

if let ExecutionStatus::Suspended(incarnation, dep_condvar) = &*status {
*status = ExecutionStatus::ReadyToExecute(*incarnation, Some(dep_condvar.clone()));
*status = ExecutionStatus::Ready(
*incarnation,
ExecutionTaskType::Wakeup(dep_condvar.clone()),
);
} else {
unreachable!();
}
Expand Down Expand Up @@ -819,7 +838,7 @@ impl Scheduler {

// Only makes sense when the current status is 'Aborting'.
debug_assert!(*status == ExecutionStatus::Aborting(incarnation));
*status = ExecutionStatus::ReadyToExecute(incarnation + 1, None);
*status = ExecutionStatus::Ready(incarnation + 1, ExecutionTaskType::Execution);
}

/// Checks whether the done marker is set. The marker can only be set by 'try_commit'.
Expand Down
Loading