From 653912ef3a96f40206c8012a603c6e51ec0fbd2b Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 13 Jun 2024 17:08:10 +0900 Subject: [PATCH 1/7] Adjust replay-related metrics for unified schduler --- core/src/replay_stage.rs | 5 +- ledger/src/blockstore_processor.rs | 78 +++++++++++++++++++++++------- program-runtime/src/timings.rs | 28 ++++++----- 3 files changed, 82 insertions(+), 29 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d547abc096d65d..be23be60f92bf5 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3009,10 +3009,12 @@ impl ReplayStage { .expect("Bank fork progress entry missing for completed bank"); let replay_stats = bank_progress.replay_stats.clone(); + let mut is_stats_from_completed_scheduler = false; if let Some((result, completed_execute_timings)) = bank.wait_for_completed_scheduler() { + is_stats_from_completed_scheduler = true; let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads( completed_execute_timings, ); @@ -3020,7 +3022,7 @@ impl ReplayStage { .write() .unwrap() .batch_execute - .accumulate(metrics); + .accumulate(metrics, is_stats_from_completed_scheduler); if let Err(err) = result { let root = bank_forks.read().unwrap().root(); @@ -3219,6 +3221,7 @@ impl ReplayStage { r_replay_progress.num_entries, r_replay_progress.num_shreds, bank_complete_time.as_us(), + is_stats_from_completed_scheduler, ); execute_timings.accumulate(&r_replay_stats.batch_execute.totals); } else { diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 30abc9bf6eaea6..d892a85f2bae5c 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -513,7 +513,8 @@ fn rebatch_and_execute_batches( prioritization_fee_cache, )?; - timing.accumulate(execute_batches_internal_metrics); + // Pass false because this code-path is never touched by unified scheduler. + timing.accumulate(execute_batches_internal_metrics, false); Ok(()) } @@ -1079,11 +1080,15 @@ pub struct ConfirmationTiming { /// and replay. As replay can run in parallel with the verification, this value can not be /// recovered from the `replay_elapsed` and or `{poh,transaction}_verify_elapsed`. This /// includes failed cases, when `confirm_slot_entries` exist with an error. In microseconds. + /// When unified scheduler is enabled, replay excludes the transaction execution, only + /// accounting for task creation and submission to the scheduler. pub confirmation_elapsed: u64, /// Wall clock time used by the entry replay code. Does not include the PoH or the transaction /// signature/precompiles verification, but can overlap with the PoH and signature verification. /// In microseconds. + /// When unified scheduler is enabled, replay excludes the transaction execution, only + /// accounting for task creation and submission to the scheduler. pub replay_elapsed: u64, /// Wall clock times, used for the PoH verification of entries. In microseconds. @@ -1137,7 +1142,11 @@ pub struct BatchExecutionTiming { } impl BatchExecutionTiming { - pub fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) { + pub fn accumulate( + &mut self, + new_batch: ExecuteBatchesInternalMetrics, + is_unified_scheduler_enabled: bool, + ) { let Self { totals, wall_clock_us, @@ -1146,9 +1155,13 @@ impl BatchExecutionTiming { saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us); - use ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen}; - totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len); - totals.saturating_add_in_place(NumExecuteBatches, 1); + use ExecuteTimingType::NumExecuteBatches; + // These metrics aren't applicable for the unified scheduler + if !is_unified_scheduler_enabled { + use ExecuteTimingType::TotalBatchesLen; + totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len); + totals.saturating_add_in_place(NumExecuteBatches, 1); + } for thread_times in new_batch.execution_timings_per_thread.values() { totals.accumulate(&thread_times.execute_timings); @@ -1161,9 +1174,12 @@ impl BatchExecutionTiming { if let Some(slowest) = slowest { slowest_thread.accumulate(slowest); - slowest_thread - .execute_timings - .saturating_add_in_place(NumExecuteBatches, 1); + // This metric isn't applicable for the unified scheduler + if !is_unified_scheduler_enabled { + slowest_thread + .execute_timings + .saturating_add_in_place(NumExecuteBatches, 1); + } }; } } @@ -1176,16 +1192,25 @@ pub struct ThreadExecuteTimings { } impl ThreadExecuteTimings { - pub fn report_stats(&self, slot: Slot) { + pub fn report_stats(&self, slot: Slot, is_unified_scheduler_enabled: bool) { + let (total_thread_us, total_transactions_executed) = if is_unified_scheduler_enabled { + (None, None) + } else { + ( + Some(self.total_thread_us as i64), + Some(self.total_transactions_executed as i64), + ) + }; + lazy! { datapoint_info!( "replay-slot-end-to-end-stats", ("slot", slot as i64, i64), - ("total_thread_us", self.total_thread_us as i64, i64), - ("total_transactions_executed", self.total_transactions_executed as i64, i64), + ("total_thread_us", total_thread_us, Option), + ("total_transactions_executed", total_transactions_executed, Option), // Everything inside the `eager!` block will be eagerly expanded before // evaluation of the rest of the surrounding macro. - eager!{report_execute_timings!(self.execute_timings)} + eager!{report_execute_timings!(self.execute_timings, is_unified_scheduler_enabled)} ); }; } @@ -1222,7 +1247,24 @@ impl ReplaySlotStats { num_entries: usize, num_shreds: u64, bank_complete_time_us: u64, + is_unified_scheduler_enabled: bool, ) { + let confirmation_elapsed = if is_unified_scheduler_enabled { + "confirmation_without_replay_us" + } else { + "confirmation_time_us" + }; + let replay_elapsed = if is_unified_scheduler_enabled { + "task_submission_us" + } else { + "replay_time" + }; + let execute_batches_us = if is_unified_scheduler_enabled { + None + } else { + Some(self.batch_execute.wall_clock_us as i64) + }; + lazy! { datapoint_info!( "replay-slot-stats", @@ -1243,9 +1285,9 @@ impl ReplaySlotStats { self.transaction_verify_elapsed as i64, i64 ), - ("confirmation_time_us", self.confirmation_elapsed as i64, i64), - ("replay_time", self.replay_elapsed as i64, i64), - ("execute_batches_us", self.batch_execute.wall_clock_us as i64, i64), + (confirmation_elapsed, self.confirmation_elapsed as i64, i64), + (replay_elapsed, self.replay_elapsed as i64, i64), + ("execute_batches_us", execute_batches_us, Option), ( "replay_total_elapsed", self.started.elapsed().as_micros() as i64, @@ -1257,11 +1299,13 @@ impl ReplaySlotStats { ("total_shreds", num_shreds as i64, i64), // Everything inside the `eager!` block will be eagerly expanded before // evaluation of the rest of the surrounding macro. - eager!{report_execute_timings!(self.batch_execute.totals)} + eager!{report_execute_timings!(self.batch_execute.totals, is_unified_scheduler_enabled)} ); }; - self.batch_execute.slowest_thread.report_stats(slot); + self.batch_execute + .slowest_thread + .report_stats(slot, is_unified_scheduler_enabled); let mut per_pubkey_timings: Vec<_> = self .batch_execute diff --git a/program-runtime/src/timings.rs b/program-runtime/src/timings.rs index f1966ba00151e0..9a831deb3cf088 100644 --- a/program-runtime/src/timings.rs +++ b/program-runtime/src/timings.rs @@ -88,7 +88,7 @@ impl core::fmt::Debug for Metrics { eager_macro_rules! { $eager_1 #[macro_export] macro_rules! report_execute_timings { - ($self: expr) => { + ($self: expr, $is_unified_scheduler_enabled: expr) => { ( "validate_transactions_us", *$self @@ -149,19 +149,25 @@ eager_macro_rules! { $eager_1 ), ( "total_batches_len", - *$self - - .metrics - .index(ExecuteTimingType::TotalBatchesLen), - i64 + (if $is_unified_scheduler_enabled { + None + } else { + Some(*$self + .metrics + .index(ExecuteTimingType::TotalBatchesLen)) + }), + Option ), ( "num_execute_batches", - *$self - - .metrics - .index(ExecuteTimingType::NumExecuteBatches), - i64 + (if $is_unified_scheduler_enabled { + None + } else { + Some(*$self + .metrics + .index(ExecuteTimingType::NumExecuteBatches)) + }), + Option ), ( "update_transaction_statuses", From 7422d1f041d3674c0a5e5ddbddeb2d6b1bc5e7c8 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 18 Jun 2024 09:52:33 +0900 Subject: [PATCH 2/7] Fix grammar --- core/src/replay_stage.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index be23be60f92bf5..1914df68d1360c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3009,12 +3009,12 @@ impl ReplayStage { .expect("Bank fork progress entry missing for completed bank"); let replay_stats = bank_progress.replay_stats.clone(); - let mut is_stats_from_completed_scheduler = false; + let mut are_stats_from_completed_scheduler = false; if let Some((result, completed_execute_timings)) = bank.wait_for_completed_scheduler() { - is_stats_from_completed_scheduler = true; + are_stats_from_completed_scheduler = true; let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads( completed_execute_timings, ); @@ -3022,7 +3022,7 @@ impl ReplayStage { .write() .unwrap() .batch_execute - .accumulate(metrics, is_stats_from_completed_scheduler); + .accumulate(metrics, are_stats_from_completed_scheduler); if let Err(err) = result { let root = bank_forks.read().unwrap().root(); @@ -3221,7 +3221,7 @@ impl ReplayStage { r_replay_progress.num_entries, r_replay_progress.num_shreds, bank_complete_time.as_us(), - is_stats_from_completed_scheduler, + are_stats_from_completed_scheduler, ); execute_timings.accumulate(&r_replay_stats.batch_execute.totals); } else { From a7614c83e840a9837f88c41218fc472281a904c2 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 18 Jun 2024 10:14:40 +0900 Subject: [PATCH 3/7] Don't compute slowest for unified scheduler --- ledger/src/blockstore_processor.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index d892a85f2bae5c..db128f886ec21e 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1167,20 +1167,20 @@ impl BatchExecutionTiming { totals.accumulate(&thread_times.execute_timings); } - let slowest = new_batch - .execution_timings_per_thread - .values() - .max_by_key(|thread_times| thread_times.total_thread_us); - - if let Some(slowest) = slowest { - slowest_thread.accumulate(slowest); - // This metric isn't applicable for the unified scheduler - if !is_unified_scheduler_enabled { + // This metric isn't applicable for the unified scheduler + if !is_unified_scheduler_enabled { + let slowest = new_batch + .execution_timings_per_thread + .values() + .max_by_key(|thread_times| thread_times.total_thread_us); + + if let Some(slowest) = slowest { + slowest_thread.accumulate(slowest); slowest_thread .execute_timings .saturating_add_in_place(NumExecuteBatches, 1); - } - }; + }; + } } } From f73462993dff9152aecd56a0ddadd679251a3a2c Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 18 Jun 2024 22:27:28 +0900 Subject: [PATCH 4/7] Rename to is_unified_scheduler_enabled --- core/src/replay_stage.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 1914df68d1360c..920aaca9046b8e 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3009,12 +3009,14 @@ impl ReplayStage { .expect("Bank fork progress entry missing for completed bank"); let replay_stats = bank_progress.replay_stats.clone(); - let mut are_stats_from_completed_scheduler = false; + let mut is_unified_scheduler_enabled = false; if let Some((result, completed_execute_timings)) = bank.wait_for_completed_scheduler() { - are_stats_from_completed_scheduler = true; + // It's guaranteed that wait_for_completed_scheduler() returns Some(_), iff the + // unified scheduler is enabled for the bank. + is_unified_scheduler_enabled = true; let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads( completed_execute_timings, ); @@ -3022,7 +3024,7 @@ impl ReplayStage { .write() .unwrap() .batch_execute - .accumulate(metrics, are_stats_from_completed_scheduler); + .accumulate(metrics, is_unified_scheduler_enabled); if let Err(err) = result { let root = bank_forks.read().unwrap().root(); @@ -3221,7 +3223,7 @@ impl ReplayStage { r_replay_progress.num_entries, r_replay_progress.num_shreds, bank_complete_time.as_us(), - are_stats_from_completed_scheduler, + is_unified_scheduler_enabled, ); execute_timings.accumulate(&r_replay_stats.batch_execute.totals); } else { From 5dc01c2b5340315bed89295d9cf7130868c24fe1 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 18 Jun 2024 22:28:36 +0900 Subject: [PATCH 5/7] Hoist uses to top of file --- ledger/src/blockstore_processor.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index db128f886ec21e..35e287a2e2118e 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -80,6 +80,7 @@ use { time::{Duration, Instant}, }, thiserror::Error, + ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen}, }; pub struct TransactionBatchWithIndexes<'a, 'b> { @@ -1155,10 +1156,8 @@ impl BatchExecutionTiming { saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us); - use ExecuteTimingType::NumExecuteBatches; // These metrics aren't applicable for the unified scheduler if !is_unified_scheduler_enabled { - use ExecuteTimingType::TotalBatchesLen; totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len); totals.saturating_add_in_place(NumExecuteBatches, 1); } From 53b7442f3ae36da0a9730e2c00d002fb56f24094 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 18 Jun 2024 22:29:40 +0900 Subject: [PATCH 6/7] Conditionally disable replay-slot-end-to-end-stats --- ledger/src/blockstore_processor.rs | 52 +++++++++++++++++------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 35e287a2e2118e..c12339cd253e9a 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1135,11 +1135,20 @@ pub struct BatchExecutionTiming { /// Wall clock time used by the transaction execution part of pipeline. /// [`ConfirmationTiming::replay_elapsed`] includes this time. In microseconds. - pub wall_clock_us: u64, + wall_clock_us: u64, /// Time used to execute transactions, via `execute_batch()`, in the thread that consumed the - /// most time. - pub slowest_thread: ThreadExecuteTimings, + /// most time (in terms of total_thread_us) among rayon threads. Note that the slowest thread + /// is determined each time a given group of batches is newly processed. So, this is a coarse + /// approximation of wall-time single-threaded linearized metrics, discarding all metrics other + /// than the arbitrary set of batches mixed with various transactions, which replayed slowest + /// as a whole for each rayon processing session, also after blockstore_processor's rebatching. + /// + /// When unified scheduler is enabled, this field isn't maintained, because it's not batched at + /// all and thus execution is fairly evenly distributed across its worker threads in the + /// granularity of individual transactions, meaning single-threaded metrics can reliably + /// be derived from dividing replay-slot-stats by the number of threads. + slowest_thread: ThreadExecuteTimings, } impl BatchExecutionTiming { @@ -1154,10 +1163,10 @@ impl BatchExecutionTiming { slowest_thread, } = self; - saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us); - - // These metrics aren't applicable for the unified scheduler + // These metric fields aren't applicable for the unified scheduler if !is_unified_scheduler_enabled { + saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us); + totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len); totals.saturating_add_in_place(NumExecuteBatches, 1); } @@ -1166,7 +1175,8 @@ impl BatchExecutionTiming { totals.accumulate(&thread_times.execute_timings); } - // This metric isn't applicable for the unified scheduler + // This whole metric (replay-slot-end-to-end-stats) isn't applicable for the unified + // scheduler. if !is_unified_scheduler_enabled { let slowest = new_batch .execution_timings_per_thread @@ -1191,25 +1201,17 @@ pub struct ThreadExecuteTimings { } impl ThreadExecuteTimings { - pub fn report_stats(&self, slot: Slot, is_unified_scheduler_enabled: bool) { - let (total_thread_us, total_transactions_executed) = if is_unified_scheduler_enabled { - (None, None) - } else { - ( - Some(self.total_thread_us as i64), - Some(self.total_transactions_executed as i64), - ) - }; - + pub fn report_stats(&self, slot: Slot) { lazy! { datapoint_info!( "replay-slot-end-to-end-stats", ("slot", slot as i64, i64), - ("total_thread_us", total_thread_us, Option), - ("total_transactions_executed", total_transactions_executed, Option), + ("total_thread_us", self.total_thread_us as i64, i64), + ("total_transactions_executed", self.total_transactions_executed as i64, i64), // Everything inside the `eager!` block will be eagerly expanded before // evaluation of the rest of the surrounding macro. - eager!{report_execute_timings!(self.execute_timings, is_unified_scheduler_enabled)} + // Pass false because this code-path is never touched by unified scheduler. + eager!{report_execute_timings!(self.execute_timings, false)} ); }; } @@ -1302,9 +1304,13 @@ impl ReplaySlotStats { ); }; - self.batch_execute - .slowest_thread - .report_stats(slot, is_unified_scheduler_enabled); + // Skip reporting replay-slot-end-to-end-stats entirely if unified scheduler is enabled, + // because the whole metrics itself is only meaningful for rayon-based worker threads. + // + // See slowest_thread doc comment for details. + if !is_unified_scheduler_enabled { + self.batch_execute.slowest_thread.report_stats(slot); + } let mut per_pubkey_timings: Vec<_> = self .batch_execute From fab274b6b4cdf02d4316880d4a19b42ac8aee6e0 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 19 Jun 2024 09:28:55 +0900 Subject: [PATCH 7/7] Remove the misleading fairly balanced text --- ledger/src/blockstore_processor.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index c12339cd253e9a..84deb781806768 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1145,9 +1145,7 @@ pub struct BatchExecutionTiming { /// as a whole for each rayon processing session, also after blockstore_processor's rebatching. /// /// When unified scheduler is enabled, this field isn't maintained, because it's not batched at - /// all and thus execution is fairly evenly distributed across its worker threads in the - /// granularity of individual transactions, meaning single-threaded metrics can reliably - /// be derived from dividing replay-slot-stats by the number of threads. + /// all. slowest_thread: ThreadExecuteTimings, }