From 6c9aa0f49caaa75fa564a299439910a466c5d13d Mon Sep 17 00:00:00 2001 From: danielxiangzl Date: Thu, 15 Jun 2023 15:40:47 -0700 Subject: [PATCH 1/4] better naming for execution task --- aptos-move/block-executor/src/executor.rs | 15 ++++-- .../src/proptest_types/tests.rs | 3 +- aptos-move/block-executor/src/scheduler.rs | 48 +++++++++++-------- aptos-move/block-executor/src/view.rs | 5 +- 4 files changed, 46 insertions(+), 25 deletions(-) diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index 17bce0f8b4d12..d17e46f98076e 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -9,7 +9,7 @@ use crate::{ TASK_VALIDATE_SECONDS, VM_INIT_SECONDS, WORK_WITH_TASK_SECONDS, }, errors::*, - scheduler::{DependencyStatus, Scheduler, SchedulerTask, Wave}, + scheduler::{DependencyStatus, Scheduler, SchedulerTask, Wave, ReadyTaskType}, task::{ExecutionStatus, ExecutorTask, Transaction, TransactionOutput}, txn_last_input_output::TxnLastInputOutput, view::{LatestView, MVHashMapView}, @@ -34,7 +34,7 @@ use std::{ mpsc, mpsc::{Receiver, Sender}, Arc, - }, + }, thread, }; struct CommitGuard<'a> { @@ -255,6 +255,7 @@ where last_input_output: &TxnLastInputOutput, ) { while let Some(txn_idx) = scheduler.try_commit() { + println!("committed txn_idx: {}", txn_idx); // Create a CommitGuard to ensure Coordinator sends the committed txn index to Worker. let _commit_guard: CommitGuard = CommitGuard::new(post_commit_txs, *worker_idx, txn_idx); @@ -287,6 +288,8 @@ where counters::PARALLEL_PER_BLOCK_GAS.observe(*accumulated_gas as f64); counters::PARALLEL_PER_BLOCK_COMMITTED_TXNS.observe((txn_idx + 1) as f64); info!("[BlockSTM]: Parallel execution early halted due to Abort or SkipRest txn, {} txns committed.", txn_idx + 1); + println!("thread {:?} task: {:?}", thread::current().id(), *scheduler_task); + *scheduler_task = SchedulerTask::Done; break; }, }; @@ -302,6 +305,8 @@ where counters::PARALLEL_PER_BLOCK_COMMITTED_TXNS.observe((txn_idx + 1) as f64); counters::PARALLEL_EXCEED_PER_BLOCK_GAS_LIMIT_COUNT.inc(); info!("[BlockSTM]: Parallel execution early halted due to accumulated_gas {} >= PER_BLOCK_GAS_LIMIT {}, {} txns committed", *accumulated_gas, per_block_gas_limit, txn_idx); + println!("thread {:?} task: {:?}", thread::current().id(), *scheduler_task); + *scheduler_task = SchedulerTask::Done; break; } } @@ -414,7 +419,7 @@ where versioned_cache, scheduler, ), - SchedulerTask::ExecutionTask(version_to_execute, None) => self.execute( + SchedulerTask::ExecutionTask(version_to_execute, ReadyTaskType::Execution) => self.execute( version_to_execute, block, last_input_output, @@ -423,7 +428,7 @@ where &executor, base_view, ), - SchedulerTask::ExecutionTask(_, Some(condvar)) => { + SchedulerTask::ExecutionTask(_, ReadyTaskType::Wakeup(condvar)) => { let (lock, cvar) = &*condvar; // Mark dependency resolved. *lock.lock() = DependencyStatus::Resolved; @@ -450,6 +455,8 @@ where }, } } + let thread_id = thread::current().id(); + println!("Thread finished: {:?}", thread_id); } pub(crate) fn execute_transactions_parallel( diff --git a/aptos-move/block-executor/src/proptest_types/tests.rs b/aptos-move/block-executor/src/proptest_types/tests.rs index da4f45966c529..2eace5d1ef6d9 100644 --- a/aptos-move/block-executor/src/proptest_types/tests.rs +++ b/aptos-move/block-executor/src/proptest_types/tests.rs @@ -64,7 +64,8 @@ fn run_transactions( let executable_txns = ExecutableTransactions::Unsharded(transactions); - for _ in 0..num_repeat { + for i in 0..num_repeat { + println!("Running test case {} of {}", i + 1, num_repeat); let output = BlockExecutor::< Transaction, ValueType>, Task, ValueType>, diff --git a/aptos-move/block-executor/src/scheduler.rs b/aptos-move/block-executor/src/scheduler.rs index 4a0c12312c3a8..e53210e2ecd86 100644 --- a/aptos-move/block-executor/src/scheduler.rs +++ b/aptos-move/block-executor/src/scheduler.rs @@ -14,7 +14,7 @@ use std::{ sync::{ atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, Condvar, - }, + }, thread, }; const TXN_IDX_MASK: u64 = (1 << 32) - 1; @@ -45,7 +45,7 @@ pub enum DependencyResult { /// there are no more tasks and the scheduler is done. #[derive(Debug)] pub enum SchedulerTask { - ExecutionTask(Version, Option), + ExecutionTask(Version, ReadyTaskType), ValidationTask(Version, Wave), NoTask, Done, @@ -93,9 +93,16 @@ pub enum SchedulerTask { /// ↓ finish_abort | /// Aborting(i) ---------------------------------------------------------> Ready(i+1) --- /// +#[derive(Debug, Clone)] +pub enum ReadyTaskType { + Execution, + Wakeup(DependencyCondvar), +} + #[derive(Debug)] enum ExecutionStatus { - ReadyToExecute(Incarnation, Option), + // ReadyToExecute(Incarnation, Option), + Ready(Incarnation, ReadyTaskType), Executing(Incarnation), Suspended(Incarnation, DependencyCondvar), Executed(Incarnation), @@ -108,7 +115,7 @@ impl PartialEq for ExecutionStatus { fn eq(&self, other: &Self) -> bool { use ExecutionStatus::*; match (self, other) { - (&ReadyToExecute(ref a, _), &ReadyToExecute(ref b, _)) + (&Ready(ref a, _), &Ready(ref b, _)) | (&Executing(ref a), &Executing(ref b)) | (&Suspended(ref a, _), &Suspended(ref b, _)) | (&Executed(ref a), &Executed(ref b)) @@ -242,7 +249,7 @@ impl Scheduler { txn_status: (0..num_txns) .map(|_| { CachePadded::new(( - RwLock::new(ExecutionStatus::ReadyToExecute(0, None)), + RwLock::new(ExecutionStatus::Ready(0, ReadyTaskType::Execution)), RwLock::new(ValidationStatus::new()), )) }) @@ -368,10 +375,10 @@ impl Scheduler { { return SchedulerTask::ValidationTask(version_to_validate, wave); } - } else if let Some((version_to_execute, maybe_condvar)) = + } else if let Some((version_to_execute, task_type)) = self.try_execute_next_version() { - return SchedulerTask::ExecutionTask(version_to_execute, maybe_condvar); + return SchedulerTask::ExecutionTask(version_to_execute, task_type); } } } @@ -536,8 +543,8 @@ impl Scheduler { // re-execution task back to the caller. If incarnation fails, there is // nothing to do, as another thread must have succeeded to incarnate and // obtain the task for re-execution. - if let Some((new_incarnation, maybe_condvar)) = self.try_incarnate(txn_idx) { - return SchedulerTask::ExecutionTask((txn_idx, new_incarnation), maybe_condvar); + if let Some((new_incarnation, task_type)) = self.try_incarnate(txn_idx) { + return SchedulerTask::ExecutionTask((txn_idx, new_incarnation), task_type); } } @@ -573,10 +580,11 @@ impl Scheduler { pub fn resolve_condvar(&self, txn_idx: TxnIndex) { let mut status = self.txn_status[txn_idx as usize].0.write(); { + println!("resolving txn_idx: {}, status: {:?}", txn_idx, *status); // Only transactions with status Suspended or ReadyToExecute may have the condition variable of pending threads. match &*status { ExecutionStatus::Suspended(_, condvar) - | ExecutionStatus::ReadyToExecute(_, Some(condvar)) => { + | ExecutionStatus::Ready(_, ReadyTaskType::Wakeup(condvar)) => { let (lock, cvar) = &*(condvar.clone()); // Mark parallel execution halted due to reasons like module r/w intersection. *lock.lock() = DependencyStatus::ExecutionHalted; @@ -643,7 +651,7 @@ impl Scheduler { /// status is (atomically, due to the mutex) updated to Executing(incarnation). /// An unsuccessful incarnation returns None. Since incarnation numbers never decrease /// for each transaction, incarnate function may not succeed more than once per version. - fn try_incarnate(&self, txn_idx: TxnIndex) -> Option<(Incarnation, Option)> { + fn try_incarnate(&self, txn_idx: TxnIndex) -> Option<(Incarnation, ReadyTaskType)> { if txn_idx >= self.num_txns { return None; } @@ -652,8 +660,9 @@ impl Scheduler { // However, it is likely an overkill (and overhead to actually upgrade), // while unlikely there would be much contention on a specific index lock. let mut status = self.txn_status[txn_idx as usize].0.write(); - if let ExecutionStatus::ReadyToExecute(incarnation, maybe_condvar) = &*status { - let ret = (*incarnation, maybe_condvar.clone()); + if let ExecutionStatus::Ready(incarnation, task_type) = &*status { + let ret: (u32, ReadyTaskType) = (*incarnation, (*task_type).clone()); + println!("thread {:?} incarnating txn_idx: {}, status: {:?}", thread::current().id(), txn_idx, *status); *status = ExecutionStatus::Executing(*incarnation); Some(ret) } else { @@ -695,7 +704,7 @@ impl Scheduler { let status = self.txn_status[txn_idx as usize].0.read(); matches!( *status, - ExecutionStatus::ReadyToExecute(0, _) + ExecutionStatus::Ready(0, _) | ExecutionStatus::Executing(0) | ExecutionStatus::Suspended(0, _) ) @@ -748,7 +757,7 @@ impl Scheduler { /// to create the next incarnation (should happen exactly once), and if successful, /// return the version to the caller for the corresponding ExecutionTask. /// - Otherwise, return None. - fn try_execute_next_version(&self) -> Option<(Version, Option)> { + fn try_execute_next_version(&self) -> Option<(Version,ReadyTaskType)> { let idx_to_execute = self.execution_idx.fetch_add(1, Ordering::SeqCst); if idx_to_execute >= self.num_txns { @@ -758,7 +767,7 @@ impl Scheduler { // If successfully incarnated (changed status from ready to executing), // return version for execution task, otherwise None. self.try_incarnate(idx_to_execute) - .map(|(incarnation, maybe_condvar)| ((idx_to_execute, incarnation), maybe_condvar)) + .map(|(incarnation, task_type)| ((idx_to_execute, incarnation), task_type)) } /// Put a transaction in a suspended state, with a condition variable that can be @@ -767,10 +776,11 @@ impl Scheduler { /// Return false when the execution is halted. fn suspend(&self, txn_idx: TxnIndex, dep_condvar: DependencyCondvar) -> bool { let mut status = self.txn_status[txn_idx as usize].0.write(); - + println!("suspending txn {} status {:?}", txn_idx, *status); match *status { ExecutionStatus::Executing(incarnation) => { *status = ExecutionStatus::Suspended(incarnation, dep_condvar); + println!("suspended txn {} status {:?}", txn_idx, *status); true }, ExecutionStatus::ExecutionHalted => false, @@ -789,7 +799,7 @@ impl Scheduler { } if let ExecutionStatus::Suspended(incarnation, dep_condvar) = &*status { - *status = ExecutionStatus::ReadyToExecute(*incarnation, Some(dep_condvar.clone())); + *status = ExecutionStatus::Ready(*incarnation, ReadyTaskType::Wakeup(dep_condvar.clone())); } else { unreachable!(); } @@ -819,7 +829,7 @@ impl Scheduler { // Only makes sense when the current status is 'Aborting'. debug_assert!(*status == ExecutionStatus::Aborting(incarnation)); - *status = ExecutionStatus::ReadyToExecute(incarnation + 1, None); + *status = ExecutionStatus::Ready(incarnation + 1, ReadyTaskType::Execution); } /// Checks whether the done marker is set. The marker can only be set by 'try_commit'. diff --git a/aptos-move/block-executor/src/view.rs b/aptos-move/block-executor/src/view.rs index 1451c838d8ae6..7c560047e54a9 100644 --- a/aptos-move/block-executor/src/view.rs +++ b/aptos-move/block-executor/src/view.rs @@ -23,7 +23,7 @@ use aptos_types::{ write_set::TransactionWrite, }; use aptos_vm_logging::{log_schema::AdapterLogSchema, prelude::*}; -use std::{cell::RefCell, fmt::Debug, hash::Hash, sync::Arc}; +use std::{cell::RefCell, fmt::Debug, hash::Hash, sync::Arc, thread}; /// A struct that is always used by a single thread performing an execution task. The struct is /// passed to the VM and acts as a proxy to resolve reads first in the shared multi-version @@ -142,9 +142,12 @@ impl< // eventually finish and lead to unblocking txn_idx, contradiction. let (lock, cvar) = &*dep_condition; let mut dep_resolved = lock.lock(); + let thread_id = thread::current().id(); while let DependencyStatus::Unresolved = *dep_resolved { + println!("thread {:?} waiting! txn {} waiting on txn {}", thread_id, txn_idx, dep_idx); dep_resolved = cvar.wait(dep_resolved).unwrap(); } + println!("thread {:?} resolved! txn {} waiting on txn {}", thread_id, txn_idx, dep_idx); if let DependencyStatus::ExecutionHalted = *dep_resolved { return ReadResult::ExecutionHalted; } From ff1ebf10a9fce05be7d5605a477957bb8c535d26 Mon Sep 17 00:00:00 2001 From: danielxiangzl Date: Thu, 15 Jun 2023 16:44:13 -0700 Subject: [PATCH 2/4] better comments --- aptos-move/block-executor/src/executor.rs | 41 ++++----- .../src/proptest_types/tests.rs | 3 +- aptos-move/block-executor/src/scheduler.rs | 91 ++++++++++--------- .../block-executor/src/unit_tests/mod.rs | 34 +++---- aptos-move/block-executor/src/view.rs | 5 +- 5 files changed, 88 insertions(+), 86 deletions(-) diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index d17e46f98076e..fdb88bf11dc60 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -9,7 +9,7 @@ use crate::{ TASK_VALIDATE_SECONDS, VM_INIT_SECONDS, WORK_WITH_TASK_SECONDS, }, errors::*, - scheduler::{DependencyStatus, Scheduler, SchedulerTask, Wave, ReadyTaskType}, + scheduler::{DependencyStatus, ExecutionTaskType, Scheduler, SchedulerTask, Wave}, task::{ExecutionStatus, ExecutorTask, Transaction, TransactionOutput}, txn_last_input_output::TxnLastInputOutput, view::{LatestView, MVHashMapView}, @@ -34,7 +34,7 @@ use std::{ mpsc, mpsc::{Receiver, Sender}, Arc, - }, thread, + }, }; struct CommitGuard<'a> { @@ -255,7 +255,6 @@ where last_input_output: &TxnLastInputOutput, ) { while let Some(txn_idx) = scheduler.try_commit() { - println!("committed txn_idx: {}", txn_idx); // Create a CommitGuard to ensure Coordinator sends the committed txn index to Worker. let _commit_guard: CommitGuard = CommitGuard::new(post_commit_txs, *worker_idx, txn_idx); @@ -288,7 +287,6 @@ where counters::PARALLEL_PER_BLOCK_GAS.observe(*accumulated_gas as f64); counters::PARALLEL_PER_BLOCK_COMMITTED_TXNS.observe((txn_idx + 1) as f64); info!("[BlockSTM]: Parallel execution early halted due to Abort or SkipRest txn, {} txns committed.", txn_idx + 1); - println!("thread {:?} task: {:?}", thread::current().id(), *scheduler_task); *scheduler_task = SchedulerTask::Done; break; }, @@ -305,7 +303,6 @@ where counters::PARALLEL_PER_BLOCK_COMMITTED_TXNS.observe((txn_idx + 1) as f64); counters::PARALLEL_EXCEED_PER_BLOCK_GAS_LIMIT_COUNT.inc(); info!("[BlockSTM]: Parallel execution early halted due to accumulated_gas {} >= PER_BLOCK_GAS_LIMIT {}, {} txns committed", *accumulated_gas, per_block_gas_limit, txn_idx); - println!("thread {:?} task: {:?}", thread::current().id(), *scheduler_task); *scheduler_task = SchedulerTask::Done; break; } @@ -313,11 +310,11 @@ where // Remark: When early halting the BlockSTM, we have to make sure the current / new tasks // will be properly handled by the threads. For instance, it is possible that the committing - // thread holds an execution task from the last iteration, and then early halts the BlockSTM - // due to a txn execution abort. In this case, we cannot reset the scheduler_task of the - // committing thread (to be Done), otherwise some other pending thread waiting for the execution - // will be pending on read forever (since the halt logic let the execution task to wake up such - // pending task). + // thread holds an execution task of ExecutionTaskType::Wakeup(DependencyCondvar) for some + // other thread pending on the dependency conditional variable from the last iteration. If + // the committing thread early halts BlockSTM and resets its scheduler_task to be Done, the + // pending thread will be pending on read forever. In other words, we rely on the committing + // thread to wake up the pending execution thread, if the committing thread holds the Wakeup task. } } @@ -419,16 +416,18 @@ where versioned_cache, scheduler, ), - SchedulerTask::ExecutionTask(version_to_execute, ReadyTaskType::Execution) => self.execute( - version_to_execute, - block, - last_input_output, - versioned_cache, - scheduler, - &executor, - base_view, - ), - SchedulerTask::ExecutionTask(_, ReadyTaskType::Wakeup(condvar)) => { + SchedulerTask::ExecutionTask(version_to_execute, ExecutionTaskType::Execution) => { + self.execute( + version_to_execute, + block, + last_input_output, + versioned_cache, + scheduler, + &executor, + base_view, + ) + }, + SchedulerTask::ExecutionTask(_, ExecutionTaskType::Wakeup(condvar)) => { let (lock, cvar) = &*condvar; // Mark dependency resolved. *lock.lock() = DependencyStatus::Resolved; @@ -455,8 +454,6 @@ where }, } } - let thread_id = thread::current().id(); - println!("Thread finished: {:?}", thread_id); } pub(crate) fn execute_transactions_parallel( diff --git a/aptos-move/block-executor/src/proptest_types/tests.rs b/aptos-move/block-executor/src/proptest_types/tests.rs index 2eace5d1ef6d9..da4f45966c529 100644 --- a/aptos-move/block-executor/src/proptest_types/tests.rs +++ b/aptos-move/block-executor/src/proptest_types/tests.rs @@ -64,8 +64,7 @@ fn run_transactions( let executable_txns = ExecutableTransactions::Unsharded(transactions); - for i in 0..num_repeat { - println!("Running test case {} of {}", i + 1, num_repeat); + for _ in 0..num_repeat { let output = BlockExecutor::< Transaction, ValueType>, Task, ValueType>, diff --git a/aptos-move/block-executor/src/scheduler.rs b/aptos-move/block-executor/src/scheduler.rs index e53210e2ecd86..f13dcbd2bcc33 100644 --- a/aptos-move/block-executor/src/scheduler.rs +++ b/aptos-move/block-executor/src/scheduler.rs @@ -14,7 +14,7 @@ use std::{ sync::{ atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, Condvar, - }, thread, + }, }; const TXN_IDX_MASK: u64 = (1 << 32) - 1; @@ -39,13 +39,22 @@ pub enum DependencyResult { ExecutionHalted, } +/// Two types of execution tasks: Execution and Wakeup. +/// Execution is a normal execution task, Wakeup is a task that just wakes up a suspended execution. +/// See explanations for the ExecutionStatus below. +#[derive(Debug, Clone)] +pub enum ExecutionTaskType { + Execution, + Wakeup(DependencyCondvar), +} + /// A holder for potential task returned from the Scheduler. ExecutionTask and ValidationTask /// each contain a version of transaction that must be executed or validated, respectively. /// NoTask holds no task (similar None if we wrapped tasks in Option), and Done implies that /// there are no more tasks and the scheduler is done. #[derive(Debug)] pub enum SchedulerTask { - ExecutionTask(Version, ReadyTaskType), + ExecutionTask(Version, ExecutionTaskType), ValidationTask(Version, Wave), NoTask, Done, @@ -56,21 +65,24 @@ pub enum SchedulerTask { /// 'execution status' as 'status'. Each status contains the latest incarnation number, /// where incarnation = i means it is the i-th execution instance of the transaction. /// -/// 'ReadyToExecute' means that the corresponding incarnation should be executed and the scheduler +/// 'Ready' means that the corresponding incarnation should be executed and the scheduler /// must eventually create a corresponding execution task. The scheduler ensures that exactly one -/// execution task gets created, changing the status to 'Executing' in the process. If a dependency -/// condition variable is set, then an execution of a prior incarnation is waiting on it with -/// a read dependency resolved (when dependency was encountered, the status changed to Suspended, -/// and suspended changed to ReadyToExecute when the dependency finished its execution). In this case -/// the caller need not create a new execution task, but just notify the suspended execution. +/// execution task gets created, changing the status to 'Executing' in the process. 'Ready' status +/// contains an ExecutionTaskType, which is either Execution or Wakeup. If it is Execution, then +/// the scheduler creates an execution task for the corresponding incarnation. If it is Wakeup, +/// a dependency condition variable is set in ExecutionTaskType::Wakeup(DependencyCondvar): an execution +/// of a prior incarnation is waiting on it with a read dependency resolved (when dependency was +/// encountered, the status changed to Suspended, and suspended changed to Ready when the dependency +/// finished its execution). In this case the caller need not create a new execution task, but +/// just notify the suspended execution via the dependency condition variable. /// /// 'Executing' status of an incarnation turns into 'Executed' if the execution task finishes, or -/// if a dependency is encountered, it becomes 'ReadyToExecute(incarnation + 1)' once the +/// if a dependency is encountered, it becomes 'Ready(incarnation + 1)' once the /// dependency is resolved. An 'Executed' status allows creation of validation tasks for the /// corresponding incarnation, and a validation failure leads to an abort. The scheduler ensures /// that there is exactly one abort, changing the status to 'Aborting' in the process. Once the /// thread that successfully aborted performs everything that's required, it sets the status -/// to 'ReadyToExecute(incarnation + 1)', allowing the scheduler to create an execution +/// to 'Ready(incarnation + 1)', allowing the scheduler to create an execution /// task for the next incarnation of the transaction. /// /// 'ExecutionHalted' is a transaction status marking that parallel execution is halted, due to @@ -93,16 +105,9 @@ pub enum SchedulerTask { /// ↓ finish_abort | /// Aborting(i) ---------------------------------------------------------> Ready(i+1) --- /// -#[derive(Debug, Clone)] -pub enum ReadyTaskType { - Execution, - Wakeup(DependencyCondvar), -} - #[derive(Debug)] enum ExecutionStatus { - // ReadyToExecute(Incarnation, Option), - Ready(Incarnation, ReadyTaskType), + Ready(Incarnation, ExecutionTaskType), Executing(Incarnation), Suspended(Incarnation, DependencyCondvar), Executed(Incarnation), @@ -215,7 +220,7 @@ pub struct Scheduler { // validation/execution preferences stick to the worker threads). /// A shared index that tracks the minimum of all transaction indices that require execution. /// The threads increment the index and attempt to create an execution task for the corresponding - /// transaction, if the status of the txn is 'ReadyToExecute'. This implements a counting-based + /// transaction, if the status of the txn is 'Ready'. This implements a counting-based /// concurrent ordered set. It is reduced as necessary when transactions become ready to be /// executed, in particular, when execution finishes and dependencies are resolved. execution_idx: AtomicU32, @@ -249,7 +254,7 @@ impl Scheduler { txn_status: (0..num_txns) .map(|_| { CachePadded::new(( - RwLock::new(ExecutionStatus::Ready(0, ReadyTaskType::Execution)), + RwLock::new(ExecutionStatus::Ready(0, ExecutionTaskType::Execution)), RwLock::new(ValidationStatus::new()), )) }) @@ -375,10 +380,10 @@ impl Scheduler { { return SchedulerTask::ValidationTask(version_to_validate, wave); } - } else if let Some((version_to_execute, task_type)) = + } else if let Some((version_to_execute, execution_task_type)) = self.try_execute_next_version() { - return SchedulerTask::ExecutionTask(version_to_execute, task_type); + return SchedulerTask::ExecutionTask(version_to_execute, execution_task_type); } } } @@ -475,7 +480,7 @@ impl Scheduler { let min_dep = txn_deps .into_iter() .map(|dep| { - // Mark the status of dependencies as 'ReadyToExecute' since dependency on + // Mark the status of dependencies as 'Ready' since dependency on // transaction txn_idx is now resolved. self.resume(dep); @@ -543,8 +548,11 @@ impl Scheduler { // re-execution task back to the caller. If incarnation fails, there is // nothing to do, as another thread must have succeeded to incarnate and // obtain the task for re-execution. - if let Some((new_incarnation, task_type)) = self.try_incarnate(txn_idx) { - return SchedulerTask::ExecutionTask((txn_idx, new_incarnation), task_type); + if let Some((new_incarnation, execution_task_type)) = self.try_incarnate(txn_idx) { + return SchedulerTask::ExecutionTask( + (txn_idx, new_incarnation), + execution_task_type, + ); } } @@ -580,11 +588,10 @@ impl Scheduler { pub fn resolve_condvar(&self, txn_idx: TxnIndex) { let mut status = self.txn_status[txn_idx as usize].0.write(); { - println!("resolving txn_idx: {}, status: {:?}", txn_idx, *status); - // Only transactions with status Suspended or ReadyToExecute may have the condition variable of pending threads. + // Only transactions with status Suspended or Ready may have the condition variable of pending threads. match &*status { ExecutionStatus::Suspended(_, condvar) - | ExecutionStatus::Ready(_, ReadyTaskType::Wakeup(condvar)) => { + | ExecutionStatus::Ready(_, ExecutionTaskType::Wakeup(condvar)) => { let (lock, cvar) = &*(condvar.clone()); // Mark parallel execution halted due to reasons like module r/w intersection. *lock.lock() = DependencyStatus::ExecutionHalted; @@ -647,11 +654,11 @@ impl Scheduler { } /// Try and incarnate a transaction. Only possible when the status is - /// ReadyToExecute(incarnation), in which case Some(incarnation) is returned and the + /// Ready(incarnation), in which case Some(incarnation) is returned and the /// status is (atomically, due to the mutex) updated to Executing(incarnation). /// An unsuccessful incarnation returns None. Since incarnation numbers never decrease /// for each transaction, incarnate function may not succeed more than once per version. - fn try_incarnate(&self, txn_idx: TxnIndex) -> Option<(Incarnation, ReadyTaskType)> { + fn try_incarnate(&self, txn_idx: TxnIndex) -> Option<(Incarnation, ExecutionTaskType)> { if txn_idx >= self.num_txns { return None; } @@ -660,9 +667,8 @@ impl Scheduler { // However, it is likely an overkill (and overhead to actually upgrade), // while unlikely there would be much contention on a specific index lock. let mut status = self.txn_status[txn_idx as usize].0.write(); - if let ExecutionStatus::Ready(incarnation, task_type) = &*status { - let ret: (u32, ReadyTaskType) = (*incarnation, (*task_type).clone()); - println!("thread {:?} incarnating txn_idx: {}, status: {:?}", thread::current().id(), txn_idx, *status); + if let ExecutionStatus::Ready(incarnation, execution_task_type) = &*status { + let ret: (u32, ExecutionTaskType) = (*incarnation, (*execution_task_type).clone()); *status = ExecutionStatus::Executing(*incarnation); Some(ret) } else { @@ -753,11 +759,11 @@ impl Scheduler { /// Grab an index to try and execute next (by fetch-and-incrementing execution_idx). /// - If the index is out of bounds, return None (and invoke a check of whether /// all txns can be committed). - /// - If the transaction is ready for execution (ReadyToExecute state), attempt + /// - If the transaction is ready for execution (Ready state), attempt /// to create the next incarnation (should happen exactly once), and if successful, /// return the version to the caller for the corresponding ExecutionTask. /// - Otherwise, return None. - fn try_execute_next_version(&self) -> Option<(Version,ReadyTaskType)> { + fn try_execute_next_version(&self) -> Option<(Version, ExecutionTaskType)> { let idx_to_execute = self.execution_idx.fetch_add(1, Ordering::SeqCst); if idx_to_execute >= self.num_txns { @@ -767,7 +773,9 @@ impl Scheduler { // If successfully incarnated (changed status from ready to executing), // return version for execution task, otherwise None. self.try_incarnate(idx_to_execute) - .map(|(incarnation, task_type)| ((idx_to_execute, incarnation), task_type)) + .map(|(incarnation, execution_task_type)| { + ((idx_to_execute, incarnation), execution_task_type) + }) } /// Put a transaction in a suspended state, with a condition variable that can be @@ -776,11 +784,9 @@ impl Scheduler { /// Return false when the execution is halted. fn suspend(&self, txn_idx: TxnIndex, dep_condvar: DependencyCondvar) -> bool { let mut status = self.txn_status[txn_idx as usize].0.write(); - println!("suspending txn {} status {:?}", txn_idx, *status); match *status { ExecutionStatus::Executing(incarnation) => { *status = ExecutionStatus::Suspended(incarnation, dep_condvar); - println!("suspended txn {} status {:?}", txn_idx, *status); true }, ExecutionStatus::ExecutionHalted => false, @@ -788,7 +794,7 @@ impl Scheduler { } } - /// When a dependency is resolved, mark the transaction as ReadyToExecute with an + /// When a dependency is resolved, mark the transaction as Ready with an /// incremented incarnation number. /// The caller must ensure that the transaction is in the Suspended state. fn resume(&self, txn_idx: TxnIndex) { @@ -799,7 +805,10 @@ impl Scheduler { } if let ExecutionStatus::Suspended(incarnation, dep_condvar) = &*status { - *status = ExecutionStatus::Ready(*incarnation, ReadyTaskType::Wakeup(dep_condvar.clone())); + *status = ExecutionStatus::Ready( + *incarnation, + ExecutionTaskType::Wakeup(dep_condvar.clone()), + ); } else { unreachable!(); } @@ -829,7 +838,7 @@ impl Scheduler { // Only makes sense when the current status is 'Aborting'. debug_assert!(*status == ExecutionStatus::Aborting(incarnation)); - *status = ExecutionStatus::Ready(incarnation + 1, ReadyTaskType::Execution); + *status = ExecutionStatus::Ready(incarnation + 1, ExecutionTaskType::Execution); } /// Checks whether the done marker is set. The marker can only be set by 'try_commit'. diff --git a/aptos-move/block-executor/src/unit_tests/mod.rs b/aptos-move/block-executor/src/unit_tests/mod.rs index 60458ee9dae09..fc69a0bbd986e 100644 --- a/aptos-move/block-executor/src/unit_tests/mod.rs +++ b/aptos-move/block-executor/src/unit_tests/mod.rs @@ -5,7 +5,7 @@ use crate::{ executor::BlockExecutor, proptest_types::types::{DeltaDataView, ExpectedOutput, KeyType, Task, Transaction, ValueType}, - scheduler::{DependencyResult, Scheduler, SchedulerTask}, + scheduler::{DependencyResult, ExecutionTaskType, Scheduler, SchedulerTask}, }; use aptos_aggregator::delta_change_set::{delta_add, delta_sub, DeltaOp, DeltaUpdate}; use aptos_mvhashmap::types::TxnIndex; @@ -278,7 +278,7 @@ fn scheduler_tasks() { // No validation tasks. assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((j, 0), None) if i == j + SchedulerTask::ExecutionTask((j, 0), ExecutionTaskType::Execution) if i == j )); } @@ -310,16 +310,16 @@ fn scheduler_tasks() { assert!(matches!( s.finish_abort(4, 0), - SchedulerTask::ExecutionTask((4, 1), None) + SchedulerTask::ExecutionTask((4, 1), ExecutionTaskType::Execution) )); assert!(matches!( s.finish_abort(1, 0), - SchedulerTask::ExecutionTask((1, 1), None) + SchedulerTask::ExecutionTask((1, 1), ExecutionTaskType::Execution) )); // Validation index = 2, wave = 1. assert!(matches!( s.finish_abort(3, 0), - SchedulerTask::ExecutionTask((3, 1), None) + SchedulerTask::ExecutionTask((3, 1), ExecutionTaskType::Execution) )); assert!(matches!( @@ -369,7 +369,7 @@ fn scheduler_first_wave() { // Nothing to validate. assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((j, 0), None) if j == i + SchedulerTask::ExecutionTask((j, 0), ExecutionTaskType::Execution) if j == i )); } @@ -387,7 +387,7 @@ fn scheduler_first_wave() { )); assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((5, 0), None) + SchedulerTask::ExecutionTask((5, 0), ExecutionTaskType::Execution) )); // Since (1, 0) is not EXECUTED, no validation tasks, and execution index // is already at the limit, so no tasks immediately available. @@ -424,7 +424,7 @@ fn scheduler_dependency() { // Nothing to validate. assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((j, 0), None) if j == i + SchedulerTask::ExecutionTask((j, 0), ExecutionTaskType::Execution) if j == i )); } @@ -458,7 +458,7 @@ fn scheduler_dependency() { // resumed task doesn't bump incarnation assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((4, 0), Some(_)) + SchedulerTask::ExecutionTask((4, 0), ExecutionTaskType::Wakeup(_)) )); } @@ -471,7 +471,7 @@ fn incarnation_one_scheduler(num_txns: TxnIndex) -> Scheduler { // Get the first executions out of the way. assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((j, 0), None) if j == i + SchedulerTask::ExecutionTask((j, 0), ExecutionTaskType::Execution) if j == i )); assert!(matches!( s.finish_execution(i, 0, false), @@ -484,7 +484,7 @@ fn incarnation_one_scheduler(num_txns: TxnIndex) -> Scheduler { assert!(s.try_abort(i, 0)); assert!(matches!( s.finish_abort(i, 0), - SchedulerTask::ExecutionTask((j, 1), None) if i == j + SchedulerTask::ExecutionTask((j, 1), ExecutionTaskType::Execution) if i == j )); } s @@ -528,7 +528,7 @@ fn scheduler_incarnation() { assert!(matches!( s.finish_abort(2, 1), - SchedulerTask::ExecutionTask((2, 2), None) + SchedulerTask::ExecutionTask((2, 2), ExecutionTaskType::Execution) )); // wave = 2, validation index = 2. assert!(matches!( @@ -541,15 +541,15 @@ fn scheduler_incarnation() { assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((1, 1), Some(_)) + SchedulerTask::ExecutionTask((1, 1), ExecutionTaskType::Wakeup(_)) )); assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((3, 1), Some(_)) + SchedulerTask::ExecutionTask((3, 1), ExecutionTaskType::Wakeup(_)) )); assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((4, 2), None) + SchedulerTask::ExecutionTask((4, 2), ExecutionTaskType::Execution) )); // execution index = 5 @@ -585,7 +585,7 @@ fn scheduler_basic() { // Nothing to validate. assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((j, 0), None) if j == i + SchedulerTask::ExecutionTask((j, 0), ExecutionTaskType::Execution) if j == i )); } @@ -635,7 +635,7 @@ fn scheduler_drain_idx() { // Nothing to validate. assert!(matches!( s.next_task(false), - SchedulerTask::ExecutionTask((j, 0), None) if j == i + SchedulerTask::ExecutionTask((j, 0), ExecutionTaskType::Execution) if j == i )); } diff --git a/aptos-move/block-executor/src/view.rs b/aptos-move/block-executor/src/view.rs index 7c560047e54a9..1451c838d8ae6 100644 --- a/aptos-move/block-executor/src/view.rs +++ b/aptos-move/block-executor/src/view.rs @@ -23,7 +23,7 @@ use aptos_types::{ write_set::TransactionWrite, }; use aptos_vm_logging::{log_schema::AdapterLogSchema, prelude::*}; -use std::{cell::RefCell, fmt::Debug, hash::Hash, sync::Arc, thread}; +use std::{cell::RefCell, fmt::Debug, hash::Hash, sync::Arc}; /// A struct that is always used by a single thread performing an execution task. The struct is /// passed to the VM and acts as a proxy to resolve reads first in the shared multi-version @@ -142,12 +142,9 @@ impl< // eventually finish and lead to unblocking txn_idx, contradiction. let (lock, cvar) = &*dep_condition; let mut dep_resolved = lock.lock(); - let thread_id = thread::current().id(); while let DependencyStatus::Unresolved = *dep_resolved { - println!("thread {:?} waiting! txn {} waiting on txn {}", thread_id, txn_idx, dep_idx); dep_resolved = cvar.wait(dep_resolved).unwrap(); } - println!("thread {:?} resolved! txn {} waiting on txn {}", thread_id, txn_idx, dep_idx); if let DependencyStatus::ExecutionHalted = *dep_resolved { return ReadResult::ExecutionHalted; } From e47d8e6564002b6d18a2198c90eb8dd2f4cad642 Mon Sep 17 00:00:00 2001 From: danielxiangzl Date: Thu, 15 Jun 2023 16:46:23 -0700 Subject: [PATCH 3/4] nit --- aptos-move/aptos-vm-types/src/output.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aptos-move/aptos-vm-types/src/output.rs b/aptos-move/aptos-vm-types/src/output.rs index eb34dab0b24ec..05fa3e3d5144a 100644 --- a/aptos-move/aptos-vm-types/src/output.rs +++ b/aptos-move/aptos-vm-types/src/output.rs @@ -91,9 +91,9 @@ impl VMOutput { } // Try to materialize deltas and add them to the write set. - let (change_set, gas_used, status) = self.unpack_with_fee_statement(); + let (change_set, fee_statement, status) = self.unpack_with_fee_statement(); let materialized_change_set = change_set.try_materialize(state_view)?; - Ok(VMOutput::new(materialized_change_set, gas_used, status)) + Ok(VMOutput::new(materialized_change_set, fee_statement, status)) } /// Converts VMOutput into TransactionOutput which can be used by storage From ee23d70eb9947415b6285c27cee498eb5b7ad7fe Mon Sep 17 00:00:00 2001 From: danielxiangzl Date: Thu, 15 Jun 2023 16:55:41 -0700 Subject: [PATCH 4/4] lint --- aptos-move/aptos-vm-types/src/output.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/aptos-move/aptos-vm-types/src/output.rs b/aptos-move/aptos-vm-types/src/output.rs index 05fa3e3d5144a..5e40abf94dc67 100644 --- a/aptos-move/aptos-vm-types/src/output.rs +++ b/aptos-move/aptos-vm-types/src/output.rs @@ -93,7 +93,11 @@ impl VMOutput { // Try to materialize deltas and add them to the write set. let (change_set, fee_statement, status) = self.unpack_with_fee_statement(); let materialized_change_set = change_set.try_materialize(state_view)?; - Ok(VMOutput::new(materialized_change_set, fee_statement, status)) + Ok(VMOutput::new( + materialized_change_set, + fee_statement, + status, + )) } /// Converts VMOutput into TransactionOutput which can be used by storage