Skip to content

Commit

Permalink
[LocalExecution] Remove local execution loop from FN
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Aug 19, 2024
1 parent 8a8c6b1 commit 7d48d44
Showing 1 changed file with 2 additions and 93 deletions.
95 changes: 2 additions & 93 deletions crates/sui-core/src/transaction_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures::future::{select, Either, Future};
use futures::FutureExt;
use mysten_common::sync::notify_read::NotifyRead;
use mysten_metrics::histogram::{Histogram, HistogramVec};
use mysten_metrics::{spawn_logged_monitored_task, spawn_monitored_task};
use mysten_metrics::spawn_logged_monitored_task;
use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
use prometheus::{
Expand All @@ -41,11 +41,9 @@ use sui_types::quorum_driver_types::{
};
use sui_types::sui_system_state::SuiSystemState;
use sui_types::transaction::VerifiedTransaction;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tracing::{debug, error, error_span, info, instrument, warn, Instrument};
use tracing::{debug, error_span, info, instrument, warn, Instrument};

// How long to wait for local execution (including parents) before a timeout
// is returned to client.
Expand All @@ -56,7 +54,6 @@ const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30);
pub struct TransactiondOrchestrator<A: Clone> {
quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
validator_state: Arc<AuthorityState>,
_local_executor_handle: JoinHandle<()>,
pending_tx_log: Arc<WritePathPendingTransactionLog>,
notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
metrics: Arc<TransactionOrchestratorMetrics>,
Expand Down Expand Up @@ -110,30 +107,14 @@ where
.start(),
);

let effects_receiver = quorum_driver_handler.subscribe_to_effects();
let state_clone = validator_state.clone();
let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
let metrics_clone = metrics.clone();
let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
parent_path.join("fullnode_pending_transactions"),
));
let pending_tx_log_clone = pending_tx_log.clone();
let _local_executor_handle = {
spawn_monitored_task!(async move {
Self::loop_execute_finalized_tx_locally(
state_clone,
effects_receiver,
pending_tx_log_clone,
metrics_clone,
)
.await;
})
};
Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
Self {
quorum_driver_handler,
validator_state,
_local_executor_handle,
pending_tx_log,
notifier,
metrics,
Expand Down Expand Up @@ -427,78 +408,6 @@ where
}
}

async fn loop_execute_finalized_tx_locally(
validator_state: Arc<AuthorityState>,
mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
pending_transaction_log: Arc<WritePathPendingTransactionLog>,
metrics: Arc<TransactionOrchestratorMetrics>,
) {
loop {
match effects_receiver.recv().await {
Ok(Ok((transaction, QuorumDriverResponse { effects_cert, .. }))) => {
let tx_digest = transaction.digest();
if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
panic!(
"Failed to finish transaction {tx_digest} in pending transaction log: {err}"
);
}

if transaction.contains_shared_object() {
// Do not locally execute transactions with shared objects, as this can
// cause forks until MVCC is merged.
continue;
}

let epoch_store = validator_state.load_epoch_store_one_call_per_task();

// This is a redundant verification, but SignatureVerifier will cache the
// previous result.
let transaction = match epoch_store.verify_transaction(transaction) {
Ok(transaction) => transaction,
Err(err) => {
// This should be impossible, since we verified the transaction
// before sending it to quorum driver.
error!(
?err,
"Transaction signature failed to verify after quorum driver execution."
);
continue;
}
};

let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution(
transaction,
effects_cert.executed_epoch(),
);

let _ = Self::execute_finalized_tx_locally_with_timeout(
&validator_state,
&epoch_store,
&executable_tx,
&effects_cert,
&metrics,
)
.await;
}
Ok(Err((tx_digest, _err))) => {
if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
error!(
?tx_digest,
"Failed to finish transaction in pending transaction log: {err}"
);
}
}
Err(RecvError::Closed) => {
error!("Sender of effects subscriber queue has been dropped!");
return;
}
Err(RecvError::Lagged(skipped_count)) => {
warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
}
}
}
}

pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
&self.quorum_driver_handler
}
Expand Down

0 comments on commit 7d48d44

Please sign in to comment.