From 23f8c65543a4ff36e7c22699202bb0b7c0c9cf20 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 11 Jun 2024 02:49:00 +0900 Subject: [PATCH] Return back stale out-of-pool scheduler by timeout --- runtime/src/bank_forks.rs | 4 +- runtime/src/installed_scheduler_pool.rs | 224 ++++++++++++++++++++---- unified-scheduler-pool/src/lib.rs | 137 ++++++++++++--- 3 files changed, 305 insertions(+), 60 deletions(-) diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index e373c7ab5dcb30..264974fca536cb 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -232,7 +232,9 @@ impl BankForks { let bank = if let Some(scheduler_pool) = &self.scheduler_pool { let context = SchedulingContext::new(bank.clone()); let scheduler = scheduler_pool.take_scheduler(context); - BankWithScheduler::new(bank, Some(scheduler)) + let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler)); + scheduler_pool.register_timeout_listener(bank_with_scheduler.create_timeout_listener()); + bank_with_scheduler } else { BankWithScheduler::new_without_scheduler(bank) }; diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index c30d8fc7596ccd..bb8389bead93db 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -30,7 +30,8 @@ use { transaction::{Result, SanitizedTransaction, TransactionError}, }, std::{ - fmt::Debug, + fmt::{self, Debug}, + mem, ops::Deref, sync::{Arc, RwLock}, }, @@ -38,14 +39,50 @@ use { #[cfg(feature = "dev-context-only-utils")] use {mockall::automock, qualifier_attr::qualifiers}; +pub fn initialized_result_with_timings() -> ResultWithTimings { + (Ok(()), ExecuteTimings::default()) +} + pub trait InstalledSchedulerPool: Send + Sync + Debug { - fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox; + fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox { + self.take_resumed_scheduler(context, initialized_result_with_timings()) + } + + fn take_resumed_scheduler( + &self, + context: SchedulingContext, + result_with_timings: ResultWithTimings, + ) -> InstalledSchedulerBox; + + fn register_timeout_listener(&self, timeout_listener: TimeoutListener); } #[derive(Debug)] pub struct SchedulerAborted; pub type ScheduleResult = std::result::Result<(), SchedulerAborted>; +pub struct TimeoutListener { + callback: Box, +} + +impl TimeoutListener { + pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self { + Self { + callback: Box::new(f), + } + } + + pub fn trigger(self, pool: InstalledSchedulerPoolArc) { + (self.callback)(pool); + } +} + +impl Debug for TimeoutListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "TimeoutListener({self:p})") + } +} + #[cfg_attr(doc, aquamarine::aquamarine)] /// Schedules, executes, and commits transactions under encapsulated implementation /// @@ -250,6 +287,61 @@ impl WaitReason { } } +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum SchedulerStatus { + Unavailable, + Active(InstalledSchedulerBox), + Stale(InstalledSchedulerPoolArc, ResultWithTimings), +} + +impl SchedulerStatus { + fn new(scheduler: Option) -> Self { + match scheduler { + Some(scheduler) => SchedulerStatus::Active(scheduler), + None => SchedulerStatus::Unavailable, + } + } + + fn transition_from_stale_to_active( + &mut self, + f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox, + ) { + let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else { + panic!(); + }; + *self = Self::Active(f(pool, result_with_timings)); + } + + pub(crate) fn maybe_transition_from_active_to_stale( + &mut self, + f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings), + ) { + if !matches!(self, Self::Active(_scheduler)) { + return; + } + let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else { + panic!(); + }; + let (pool, result_with_timings) = f(scheduler); + *self = Self::Stale(pool, result_with_timings); + } + + pub(crate) fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox { + let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else { + panic!(); + }; + scheduler + } + + pub(crate) fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings { + let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else { + panic!(); + }; + result_with_timings + } +} + /// Very thin wrapper around Arc /// /// It brings type-safety against accidental mixing of bank and scheduler with different slots, @@ -277,7 +369,7 @@ pub struct BankWithSchedulerInner { bank: Arc, scheduler: InstalledSchedulerRwLock, } -pub type InstalledSchedulerRwLock = RwLock>; +pub type InstalledSchedulerRwLock = RwLock; impl BankWithScheduler { #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] @@ -292,7 +384,7 @@ impl BankWithScheduler { Self { inner: Arc::new(BankWithSchedulerInner { bank, - scheduler: RwLock::new(scheduler), + scheduler: RwLock::new(SchedulerStatus::new(scheduler)), }), } } @@ -321,7 +413,10 @@ impl BankWithScheduler { } pub fn has_installed_scheduler(&self) -> bool { - self.inner.scheduler.read().unwrap().is_some() + !matches!( + &*self.inner.scheduler.read().unwrap(), + SchedulerStatus::Unavailable + ) } /// Schedule the transaction as long as the scheduler hasn't been aborted. @@ -338,31 +433,31 @@ impl BankWithScheduler { transactions_with_indexes.len() ); - let scheduler_guard = self.inner.scheduler.read().unwrap(); - let scheduler = scheduler_guard.as_ref().unwrap(); - - for (sanitized_transaction, &index) in transactions_with_indexes { - if scheduler - .schedule_execution(&(sanitized_transaction, index)) - .is_err() - { - drop(scheduler_guard); - // This write lock isn't atomic with the above the read lock. So, another thread - // could have called .recover_error_after_abort() while we're literally stuck at - // the gaps of these locks (i.e. this comment in source code wise) under extreme - // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that - // consideration in mind. - // - // Lastly, this non-atomic nature is intentional for optimizing the fast code-path - let mut scheduler_guard = self.inner.scheduler.write().unwrap(); - let scheduler = scheduler_guard.as_mut().unwrap(); - return Err(scheduler.recover_error_after_abort()); + let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| { + for (sanitized_transaction, &index) in transactions_with_indexes { + scheduler.schedule_execution(&(sanitized_transaction, index))?; } + Ok(()) + }); + + if schedule_result.is_err() { + // This write lock isn't atomic with the above the read lock. So, another thread + // could have called .recover_error_after_abort() while we're literally stuck at + // the gaps of these locks (i.e. this comment in source code wise) under extreme + // race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that + // consideration in mind. + // + // Lastly, this non-atomic nature is intentional for optimizing the fast code-path + return Err(self.inner.retrieve_error_after_schedule_failure()); } Ok(()) } + pub(crate) fn create_timeout_listener(&self) -> TimeoutListener { + self.inner.do_create_timeout_listener() + } + // take needless &mut only to communicate its semantic mutability to humans... #[cfg(feature = "dev-context-only-utils")] pub fn drop_scheduler(&mut self) { @@ -391,11 +486,72 @@ impl BankWithScheduler { } pub const fn no_scheduler_available() -> InstalledSchedulerRwLock { - RwLock::new(None) + RwLock::new(SchedulerStatus::Unavailable) } } impl BankWithSchedulerInner { + fn with_active_scheduler( + self: &Arc, + f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult, + ) -> ScheduleResult { + let scheduler = self.scheduler.read().unwrap(); + match &*scheduler { + SchedulerStatus::Active(scheduler) => { + // This is the fast path, needing single read-lock most of time. + f(scheduler) + } + SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => { + Err(SchedulerAborted) + } + SchedulerStatus::Stale(_pool, _result_with_timings) => { + drop(scheduler); + + let context = SchedulingContext::new(self.bank.clone()); + let mut scheduler = self.scheduler.write().unwrap(); + scheduler.transition_from_stale_to_active(|scheduler_pool, result_with_timings| { + scheduler_pool.register_timeout_listener(self.do_create_timeout_listener()); + scheduler_pool.take_resumed_scheduler(context, result_with_timings) + }); + drop(scheduler); + + self.with_active_scheduler(f) + } + SchedulerStatus::Unavailable => panic!(), + } + } + + fn do_create_timeout_listener(self: &Arc) -> TimeoutListener { + let weak_bank = Arc::downgrade(self); + TimeoutListener::new(move |scheduler_pool| { + let Some(bank) = weak_bank.upgrade() else { + return; + }; + + let Ok(mut scheduler) = bank.scheduler.write() else { + return; + }; + + scheduler.maybe_transition_from_active_to_stale(|scheduler| { + let (result_with_timings, uninstalled_scheduler) = + scheduler.wait_for_termination(false); + uninstalled_scheduler.return_to_pool(); + (scheduler_pool, result_with_timings) + }) + }) + } + + fn retrieve_error_after_schedule_failure(&self) -> TransactionError { + let mut scheduler = self.scheduler.write().unwrap(); + match &mut *scheduler { + SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(), + SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => { + result.clone().unwrap_err() + } + _ => panic!(), + } + } + #[must_use] fn wait_for_completed_scheduler_from_drop(&self) -> Option { Self::wait_for_scheduler_termination( @@ -419,18 +575,24 @@ impl BankWithSchedulerInner { ); let mut scheduler = scheduler.write().unwrap(); - let (was_noop, result_with_timings) = - if let Some(scheduler) = scheduler.as_mut().filter(|_| reason.is_paused()) { + let (was_noop, result_with_timings) = match &mut *scheduler { + SchedulerStatus::Active(scheduler) if reason.is_paused() => { scheduler.pause_for_recent_blockhash(); (false, None) - } else if let Some(scheduler) = scheduler.take() { + } + SchedulerStatus::Active(_scheduler) => { + let scheduler = scheduler.transition_from_active_to_unavailable(); let (result_with_timings, uninstalled_scheduler) = scheduler.wait_for_termination(reason.is_dropped()); uninstalled_scheduler.return_to_pool(); (false, Some(result_with_timings)) - } else { - (true, None) - }; + } + SchedulerStatus::Stale(_pool, _result_with_timings) => { + let result_with_timings = scheduler.transition_from_stale_to_unavailable(); + (true, Some(result_with_timings)) + } + SchedulerStatus::Unavailable => (true, None), + }; debug!( "wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...", bank.slot(), diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index f8f1a9ec756abd..ea272a6ee6aadf 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -29,9 +29,10 @@ use { solana_runtime::{ bank::Bank, installed_scheduler_pool::{ - InstalledScheduler, InstalledSchedulerBox, InstalledSchedulerPool, - InstalledSchedulerPoolArc, ResultWithTimings, ScheduleResult, SchedulerAborted, - SchedulerId, SchedulingContext, UninstalledScheduler, UninstalledSchedulerBox, + initialized_result_with_timings, InstalledScheduler, InstalledSchedulerBox, + InstalledSchedulerPool, InstalledSchedulerPoolArc, ResultWithTimings, ScheduleResult, + SchedulerAborted, SchedulerId, SchedulingContext, TimeoutListener, + UninstalledScheduler, UninstalledSchedulerBox, }, prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, @@ -78,6 +79,7 @@ type AtomicSchedulerId = AtomicU64; pub struct SchedulerPool, TH: TaskHandler> { scheduler_inners: Mutex>, trashed_scheduler_inners: Mutex>, + timeout_listeners: Mutex>, handler_count: usize, handler_context: HandlerContext, // weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to @@ -109,6 +111,7 @@ pub type DefaultSchedulerPool = const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10); const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180); +const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(12); // Rough estimate of max UsageQueueLoader size in bytes: // UsageFromTask * UsageQueue's capacity * DEFAULT_MAX_USAGE_QUEUE_COUNT // 16 bytes * 128 items * 262_144 entries == 512 MiB @@ -149,6 +152,7 @@ where DEFAULT_POOL_CLEANER_INTERVAL, DEFAULT_MAX_POOLING_DURATION, DEFAULT_MAX_USAGE_QUEUE_COUNT, + DEFAULT_TIMEOUT_DURATION, ) } @@ -161,6 +165,7 @@ where pool_cleaner_interval: Duration, max_pooling_duration: Duration, max_usage_queue_count: usize, + timeout_duration: Duration, ) -> Arc { let handler_count = handler_count.unwrap_or(Self::default_handler_count()); assert!(handler_count >= 1); @@ -168,6 +173,7 @@ where let scheduler_pool = Arc::new_cyclic(|weak_self| Self { scheduler_inners: Mutex::default(), trashed_scheduler_inners: Mutex::default(), + timeout_listeners: Mutex::default(), handler_count, handler_context: HandlerContext { log_messages_bytes_limit, @@ -191,9 +197,9 @@ where break; }; - let idle_inner_count = { - let now = Instant::now(); + let now = Instant::now(); + let idle_inner_count = { // Pre-allocate rather large capacity to avoid reallocation inside the lock. let mut idle_inners = Vec::with_capacity(128); @@ -232,6 +238,23 @@ where trashed_inner_count }; + // Pre-allocate rather large capacity to avoid reallocation inside the lock. + let mut expired_listeners = Vec::with_capacity(128); + let Ok(mut timeout_listeners) = scheduler_pool.timeout_listeners.lock() else { + break; + }; + #[allow(unstable_name_collisions)] + expired_listeners.extend(timeout_listeners.extract_if( + |(_callback, registered_at)| { + now.duration_since(*registered_at) > timeout_duration + }, + )); + drop(timeout_listeners); + + for (timeout_listener, _registered_at) in expired_listeners { + timeout_listener.trigger(scheduler_pool.clone()); + } + info!( "Scheduler pool cleaner: dropped {} idle inners, {} trashed inners", idle_inner_count, trashed_inner_count, @@ -299,14 +322,23 @@ where } } + #[cfg(test)] fn do_take_scheduler(&self, context: SchedulingContext) -> S { + self.do_take_resumed_scheduler(context, initialized_result_with_timings()) + } + + fn do_take_resumed_scheduler( + &self, + context: SchedulingContext, + result_with_timings: ResultWithTimings, + ) -> S { // pop is intentional for filo, expecting relatively warmed-up scheduler due to having been // returned recently if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop() { - S::from_inner(inner, context) + S::from_inner(inner, context, result_with_timings) } else { - S::spawn(self.self_arc(), context) + S::spawn(self.self_arc(), context, result_with_timings) } } @@ -352,8 +384,19 @@ where S: SpawnableScheduler, TH: TaskHandler, { - fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox { - Box::new(self.do_take_scheduler(context)) + fn take_resumed_scheduler( + &self, + context: SchedulingContext, + result_with_timings: ResultWithTimings, + ) -> InstalledSchedulerBox { + Box::new(self.do_take_resumed_scheduler(context, result_with_timings)) + } + + fn register_timeout_listener(&self, timeout_listener: TimeoutListener) { + self.timeout_listeners + .lock() + .unwrap() + .push((timeout_listener, Instant::now())); } } @@ -427,7 +470,7 @@ enum SubchanneledPayload { CloseSubchannel, } -type NewTaskPayload = SubchanneledPayload; +type NewTaskPayload = SubchanneledPayload; // A tiny generic message type to synchronize multiple threads everytime some contextual data needs // to be switched (ie. SchedulingContext), just using a single communication channel. @@ -610,10 +653,6 @@ fn disconnected() -> Receiver { crossbeam_channel::unbounded().1 } -fn initialized_result_with_timings() -> ResultWithTimings { - (Ok(()), ExecuteTimings::default()) -} - #[derive(Debug)] pub struct PooledScheduler { inner: PooledSchedulerInner, @@ -718,13 +757,18 @@ struct ThreadManager, TH: TaskHandler> { } impl PooledScheduler { - fn do_spawn(pool: Arc>, initial_context: SchedulingContext) -> Self { + fn do_spawn( + pool: Arc>, + initial_context: SchedulingContext, + result_with_timings: ResultWithTimings, + ) -> Self { Self::from_inner( PooledSchedulerInner:: { thread_manager: ThreadManager::new(pool), usage_queue_loader: UsageQueueLoader::default(), }, initial_context, + result_with_timings, ) } } @@ -963,11 +1007,15 @@ impl, TH: TaskHandler> ThreadManager { 'nonaborted_main_loop: loop { match new_task_receiver.recv() { - Ok(NewTaskPayload::OpenSubchannel(context)) => { + Ok(NewTaskPayload::OpenSubchannel(( + new_context, + new_result_with_timings, + ))) => { // signal about new SchedulingContext to handler threads runnable_task_sender - .send_chained_channel(context, handler_count) + .send_chained_channel(new_context, handler_count) .unwrap(); + result_with_timings = new_result_with_timings; } Ok(_) => { unreachable!(); @@ -1028,7 +1076,7 @@ impl, TH: TaskHandler> ThreadManager { Ok(NewTaskPayload::CloseSubchannel) => { session_ending = true; } - Ok(NewTaskPayload::OpenSubchannel(_context)) => { + Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) => { unreachable!(); } Err(RecvError) => { @@ -1263,10 +1311,17 @@ impl, TH: TaskHandler> ThreadManager { } } - fn start_session(&mut self, context: &SchedulingContext) { + fn start_session( + &mut self, + context: &SchedulingContext, + result_with_timings: ResultWithTimings, + ) { assert_matches!(self.session_result_with_timings, None); self.new_task_sender - .send(NewTaskPayload::OpenSubchannel(context.clone())) + .send(NewTaskPayload::OpenSubchannel(( + context.clone(), + result_with_timings, + ))) .expect("no new session after aborted"); } } @@ -1276,9 +1331,17 @@ pub trait SpawnableScheduler: InstalledScheduler { fn into_inner(self) -> (ResultWithTimings, Self::Inner); - fn from_inner(inner: Self::Inner, context: SchedulingContext) -> Self; - - fn spawn(pool: Arc>, initial_context: SchedulingContext) -> Self + fn from_inner( + inner: Self::Inner, + context: SchedulingContext, + result_with_timings: ResultWithTimings, + ) -> Self; + + fn spawn( + pool: Arc>, + initial_context: SchedulingContext, + result_with_timings: ResultWithTimings, + ) -> Self where Self: Sized; } @@ -1295,13 +1358,23 @@ impl SpawnableScheduler for PooledScheduler { (result_with_timings, self.inner) } - fn from_inner(mut inner: Self::Inner, context: SchedulingContext) -> Self { - inner.thread_manager.start_session(&context); + fn from_inner( + mut inner: Self::Inner, + context: SchedulingContext, + result_with_timings: ResultWithTimings, + ) -> Self { + inner + .thread_manager + .start_session(&context, result_with_timings); Self { inner, context } } - fn spawn(pool: Arc>, initial_context: SchedulingContext) -> Self { - let mut scheduler = Self::do_spawn(pool, initial_context); + fn spawn( + pool: Arc>, + initial_context: SchedulingContext, + result_with_timings: ResultWithTimings, + ) -> Self { + let mut scheduler = Self::do_spawn(pool, initial_context, result_with_timings); scheduler .inner .thread_manager @@ -1458,6 +1531,7 @@ mod tests { SHORTENED_POOL_CLEANER_INTERVAL, shortened_max_pooling_duration, DEFAULT_MAX_USAGE_QUEUE_COUNT, + DEFAULT_TIMEOUT_DURATION, ); let pool = pool_raw.clone(); let bank = Arc::new(Bank::default_for_tests()); @@ -1515,6 +1589,7 @@ mod tests { SHORTENED_POOL_CLEANER_INTERVAL, DEFAULT_MAX_POOLING_DURATION, REDUCED_MAX_USAGE_QUEUE_COUNT, + DEFAULT_TIMEOUT_DURATION, ); let pool = pool_raw.clone(); let bank = Arc::new(Bank::default_for_tests()); @@ -1920,6 +1995,7 @@ mod tests { SHORTENED_POOL_CLEANER_INTERVAL, DEFAULT_MAX_POOLING_DURATION, DEFAULT_MAX_USAGE_QUEUE_COUNT, + DEFAULT_TIMEOUT_DURATION, ); let pool = pool_raw.clone(); let context = SchedulingContext::new(bank.clone()); @@ -2419,13 +2495,18 @@ mod tests { unimplemented!(); } - fn from_inner(_inner: Self::Inner, _context: SchedulingContext) -> Self { + fn from_inner( + _inner: Self::Inner, + _context: SchedulingContext, + _result_with_timings: ResultWithTimings, + ) -> Self { unimplemented!(); } fn spawn( pool: Arc>, initial_context: SchedulingContext, + _result_with_timings: ResultWithTimings, ) -> Self { AsyncScheduler::( Mutex::new(initialized_result_with_timings()),