Skip to content

Commit

Permalink
[BlockSTM] Remove Commit Lock (aptos-labs#8665)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielxiangzl authored and xbtmatt committed Jul 25, 2023
1 parent d32fd68 commit 659ab28
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
31 changes: 28 additions & 3 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,31 @@ use std::{
},
};

struct CommitGuard<'a> {
post_commit_txs: &'a Vec<Sender<u32>>,
worker_idx: usize,
txn_idx: u32,
}

impl<'a> CommitGuard<'a> {
fn new(post_commit_txs: &'a Vec<Sender<u32>>, worker_idx: usize, txn_idx: u32) -> Self {
Self {
post_commit_txs,
worker_idx,
txn_idx,
}
}
}

impl<'a> Drop for CommitGuard<'a> {
fn drop(&mut self) {
// Send the committed txn to the Worker thread.
self.post_commit_txs[self.worker_idx]
.send(self.txn_idx)
.expect("Worker must be available");
}
}

#[derive(Debug)]
enum CommitRole {
Coordinator(Vec<Sender<TxnIndex>>),
Expand Down Expand Up @@ -230,9 +255,9 @@ where
last_input_output: &TxnLastInputOutput<T::Key, E::Output, E::Error>,
) {
while let Some(txn_idx) = scheduler.try_commit() {
post_commit_txs[*worker_idx]
.send(txn_idx)
.expect("Worker must be available");
// Create a CommitGuard to ensure Coordinator sends the committed txn index to Worker.
let _commit_guard: CommitGuard =
CommitGuard::new(post_commit_txs, *worker_idx, txn_idx);
// Iterate round robin over workers to do commit_hook.
*worker_idx = (*worker_idx + 1) % post_commit_txs.len();

Expand Down
7 changes: 0 additions & 7 deletions aptos-move/block-executor/src/txn_last_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::{
task::{ExecutionStatus, Transaction, TransactionOutput},
};
use anyhow::anyhow;
use aptos_infallible::Mutex;
use aptos_mvhashmap::types::{Incarnation, TxnIndex, Version};
use aptos_types::{access_path::AccessPath, executable::ModulePath, write_set::WriteOp};
use arc_swap::ArcSwapOption;
Expand Down Expand Up @@ -130,8 +129,6 @@ pub struct TxnLastInputOutput<K, T: TransactionOutput, E: Debug> {
module_reads: DashSet<AccessPath>,

module_read_write_intersection: AtomicBool,

commit_locks: Vec<Mutex<()>>, // Shared locks to prevent race during commit
}

impl<K: ModulePath, T: TransactionOutput, E: Debug + Send + Clone> TxnLastInputOutput<K, T, E> {
Expand All @@ -146,7 +143,6 @@ impl<K: ModulePath, T: TransactionOutput, E: Debug + Send + Clone> TxnLastInputO
module_writes: DashSet::new(),
module_reads: DashSet::new(),
module_read_write_intersection: AtomicBool::new(false),
commit_locks: (0..num_txns).map(|_| Mutex::new(())).collect(),
}
}

Expand Down Expand Up @@ -234,7 +230,6 @@ impl<K: ModulePath, T: TransactionOutput, E: Debug + Send + Clone> TxnLastInputO
}

pub fn update_to_skip_rest(&self, txn_idx: TxnIndex) {
let _lock = self.commit_locks[txn_idx as usize].lock();
if let ExecutionStatus::Success(output) = self.take_output(txn_idx) {
self.outputs[txn_idx as usize].store(Some(Arc::new(TxnOutput {
output_status: ExecutionStatus::SkipRest(output),
Expand Down Expand Up @@ -268,7 +263,6 @@ impl<K: ModulePath, T: TransactionOutput, E: Debug + Send + Clone> TxnLastInputO
usize,
Box<dyn Iterator<Item = <<T as TransactionOutput>::Txn as Transaction>::Key>>,
) {
let _lock = self.commit_locks[txn_idx as usize].lock();
let ret: (
usize,
Box<dyn Iterator<Item = <<T as TransactionOutput>::Txn as Transaction>::Key>>,
Expand Down Expand Up @@ -298,7 +292,6 @@ impl<K: ModulePath, T: TransactionOutput, E: Debug + Send + Clone> TxnLastInputO
txn_idx: TxnIndex,
delta_writes: Vec<(<<T as TransactionOutput>::Txn as Transaction>::Key, WriteOp)>,
) {
let _lock = self.commit_locks[txn_idx as usize].lock();
match &self.outputs[txn_idx as usize]
.load_full()
.expect("Output must exist")
Expand Down

0 comments on commit 659ab28

Please sign in to comment.