From 7d48d444a5825780b5d77cfb0414b41c7dc46495 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Mon, 19 Aug 2024 09:43:46 -0700 Subject: [PATCH] [LocalExecution] Remove local execution loop from FN --- .../sui-core/src/transaction_orchestrator.rs | 95 +------------------ 1 file changed, 2 insertions(+), 93 deletions(-) diff --git a/crates/sui-core/src/transaction_orchestrator.rs b/crates/sui-core/src/transaction_orchestrator.rs index 1db608f9e7b6c..aca0cf06b4b79 100644 --- a/crates/sui-core/src/transaction_orchestrator.rs +++ b/crates/sui-core/src/transaction_orchestrator.rs @@ -17,7 +17,7 @@ use futures::future::{select, Either, Future}; use futures::FutureExt; use mysten_common::sync::notify_read::NotifyRead; use mysten_metrics::histogram::{Histogram, HistogramVec}; -use mysten_metrics::{spawn_logged_monitored_task, spawn_monitored_task}; +use mysten_metrics::spawn_logged_monitored_task; use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX}; use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge}; use prometheus::{ @@ -41,11 +41,9 @@ use sui_types::quorum_driver_types::{ }; use sui_types::sui_system_state::SuiSystemState; use sui_types::transaction::VerifiedTransaction; -use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; -use tokio::task::JoinHandle; use tokio::time::timeout; -use tracing::{debug, error, error_span, info, instrument, warn, Instrument}; +use tracing::{debug, error_span, info, instrument, warn, Instrument}; // How long to wait for local execution (including parents) before a timeout // is returned to client. @@ -56,7 +54,6 @@ const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(30); pub struct TransactiondOrchestrator { quorum_driver_handler: Arc>, validator_state: Arc, - _local_executor_handle: JoinHandle<()>, pending_tx_log: Arc, notifier: Arc>, metrics: Arc, @@ -110,30 +107,14 @@ where .start(), ); - let effects_receiver = quorum_driver_handler.subscribe_to_effects(); - let state_clone = validator_state.clone(); let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry)); - let metrics_clone = metrics.clone(); let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new( parent_path.join("fullnode_pending_transactions"), )); - let pending_tx_log_clone = pending_tx_log.clone(); - let _local_executor_handle = { - spawn_monitored_task!(async move { - Self::loop_execute_finalized_tx_locally( - state_clone, - effects_receiver, - pending_tx_log_clone, - metrics_clone, - ) - .await; - }) - }; Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone()); Self { quorum_driver_handler, validator_state, - _local_executor_handle, pending_tx_log, notifier, metrics, @@ -427,78 +408,6 @@ where } } - async fn loop_execute_finalized_tx_locally( - validator_state: Arc, - mut effects_receiver: Receiver, - pending_transaction_log: Arc, - metrics: Arc, - ) { - loop { - match effects_receiver.recv().await { - Ok(Ok((transaction, QuorumDriverResponse { effects_cert, .. }))) => { - let tx_digest = transaction.digest(); - if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) { - panic!( - "Failed to finish transaction {tx_digest} in pending transaction log: {err}" - ); - } - - if transaction.contains_shared_object() { - // Do not locally execute transactions with shared objects, as this can - // cause forks until MVCC is merged. - continue; - } - - let epoch_store = validator_state.load_epoch_store_one_call_per_task(); - - // This is a redundant verification, but SignatureVerifier will cache the - // previous result. - let transaction = match epoch_store.verify_transaction(transaction) { - Ok(transaction) => transaction, - Err(err) => { - // This should be impossible, since we verified the transaction - // before sending it to quorum driver. - error!( - ?err, - "Transaction signature failed to verify after quorum driver execution." - ); - continue; - } - }; - - let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution( - transaction, - effects_cert.executed_epoch(), - ); - - let _ = Self::execute_finalized_tx_locally_with_timeout( - &validator_state, - &epoch_store, - &executable_tx, - &effects_cert, - &metrics, - ) - .await; - } - Ok(Err((tx_digest, _err))) => { - if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) { - error!( - ?tx_digest, - "Failed to finish transaction in pending transaction log: {err}" - ); - } - } - Err(RecvError::Closed) => { - error!("Sender of effects subscriber queue has been dropped!"); - return; - } - Err(RecvError::Lagged(skipped_count)) => { - warn!("Skipped {skipped_count} transasctions in effects subscriber queue."); - } - } - } - } - pub fn quorum_driver(&self) -> &Arc> { &self.quorum_driver_handler }