From 40a9851c82b4a669150800f84f026214e85921f4 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 24 Jun 2024 23:55:03 +0900 Subject: [PATCH] Avoid unneeded start_session() when spawning (#1815) * Avoid unneeded start_session() when spawning * Add comments --- unified-scheduler-pool/src/lib.rs | 113 ++++++++++++++++-------------- 1 file changed, 61 insertions(+), 52 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index ed7f1407a3c37f..1ea470aade791e 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -766,23 +766,6 @@ struct ThreadManager, TH: TaskHandler> { handler_threads: Vec>, } -impl PooledScheduler { - fn do_spawn( - pool: Arc>, - initial_context: SchedulingContext, - result_with_timings: ResultWithTimings, - ) -> Self { - Self::from_inner( - PooledSchedulerInner:: { - thread_manager: ThreadManager::new(pool), - usage_queue_loader: UsageQueueLoader::default(), - }, - initial_context, - result_with_timings, - ) - } -} - struct HandlerPanicked; type HandlerResult = std::result::Result, HandlerPanicked>; @@ -852,7 +835,15 @@ impl, TH: TaskHandler> ThreadManager { ); } - fn start_threads(&mut self, context: &SchedulingContext) { + // This method must take same set of session-related arguments as start_session() to avoid + // unneeded channel operations to minimize overhead. Starting threads incurs a very high cost + // already... Also, pre-creating threads isn't desirable as well to avoid `Option`-ed types + // for type safety. + fn start_threads( + &mut self, + context: SchedulingContext, + mut result_with_timings: ResultWithTimings, + ) { // Firstly, setup bi-directional messaging between the scheduler and handlers to pass // around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to // the handlers and the other for finished tasks from the handlers to the scheduler). @@ -930,7 +921,7 @@ impl, TH: TaskHandler> ThreadManager { // prioritization further. Consequently, this also contributes to alleviate the known // heuristic's caveat for the first task of linearized runs, which is described above. let (mut runnable_task_sender, runnable_task_receiver) = - chained_channel::unbounded::(context.clone()); + chained_channel::unbounded::(context); // Create two handler-to-scheduler channels to prioritize the finishing of blocked tasks, // because it is more likely that a blocked task will have more blocked tasks behind it, // which should be scheduled while minimizing the delay to clear buffered linearized runs @@ -1013,29 +1004,14 @@ impl, TH: TaskHandler> ThreadManager { let mut state_machine = unsafe { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; - let mut result_with_timings = initialized_result_with_timings(); + // The following loop maintains and updates ResultWithTimings as its + // externally-provieded mutable state for each session in this way: + // + // 1. Initial result_with_timing is propagated implicitly by the moved variable. + // 2. Subsequent result_with_timings are propagated explicitly from + // the new_task_receiver.recv() invocation located at the end of loop. 'nonaborted_main_loop: loop { - match new_task_receiver.recv() { - Ok(NewTaskPayload::OpenSubchannel(( - new_context, - new_result_with_timings, - ))) => { - // signal about new SchedulingContext to handler threads - runnable_task_sender - .send_chained_channel(new_context, handler_count) - .unwrap(); - result_with_timings = new_result_with_timings; - } - Ok(_) => { - unreachable!(); - } - Err(_) => { - // This unusual condition must be triggered by ThreadManager::drop(); - break 'nonaborted_main_loop; - } - } - let mut is_finished = false; while !is_finished { // ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl, @@ -1121,6 +1097,28 @@ impl, TH: TaskHandler> ThreadManager { .expect("always outlived receiver"); session_ending = false; } + + match new_task_receiver.recv() { + Ok(NewTaskPayload::OpenSubchannel(( + new_context, + new_result_with_timings, + ))) => { + // We just received subsequent (= not initial) session and about to + // enter into the preceding `while(!is_finished) {...}` loop again. + // Before that, propagate new SchedulingContext to handler threads + runnable_task_sender + .send_chained_channel(new_context, handler_count) + .unwrap(); + result_with_timings = new_result_with_timings; + } + Ok(_) => { + unreachable!(); + } + Err(_) => { + // This unusual condition must be triggered by ThreadManager::drop(); + break 'nonaborted_main_loop; + } + } } // There are several code-path reaching here out of the preceding unconditional @@ -1152,6 +1150,14 @@ impl, TH: TaskHandler> ThreadManager { let finished_blocked_task_sender = finished_blocked_task_sender.clone(); let finished_idle_task_sender = finished_idle_task_sender.clone(); + // The following loop maintains and updates SchedulingContext as its + // externally-provided state for each session in this way: + // + // 1. Initial context is propagated implicitly by the moved runnable_task_receiver, + // which is clone()-d just above for this particular thread. + // 2. Subsequent contexts are propagated explicitly inside `.after_select()` as part of + // `select_biased!`, which are sent from `.send_chained_channel()` in the scheduler + // thread for all-but-initial sessions. move || loop { let (task, sender) = select_biased! { recv(runnable_task_receiver.for_select()) -> message => { @@ -1327,13 +1333,14 @@ impl, TH: TaskHandler> ThreadManager { fn start_session( &mut self, - context: &SchedulingContext, + context: SchedulingContext, result_with_timings: ResultWithTimings, ) { + assert!(!self.are_threads_joined()); assert_matches!(self.session_result_with_timings, None); self.new_task_sender .send(NewTaskPayload::OpenSubchannel(( - context.clone(), + context, result_with_timings, ))) .expect("no new session after aborted"); @@ -1353,7 +1360,7 @@ pub trait SpawnableScheduler: InstalledScheduler { fn spawn( pool: Arc>, - initial_context: SchedulingContext, + context: SchedulingContext, result_with_timings: ResultWithTimings, ) -> Self where @@ -1379,21 +1386,23 @@ impl SpawnableScheduler for PooledScheduler { ) -> Self { inner .thread_manager - .start_session(&context, result_with_timings); + .start_session(context.clone(), result_with_timings); Self { inner, context } } fn spawn( pool: Arc>, - initial_context: SchedulingContext, + context: SchedulingContext, result_with_timings: ResultWithTimings, ) -> Self { - let mut scheduler = Self::do_spawn(pool, initial_context, result_with_timings); - scheduler - .inner + let mut inner = Self::Inner { + thread_manager: ThreadManager::new(pool), + usage_queue_loader: UsageQueueLoader::default(), + }; + inner .thread_manager - .start_threads(&scheduler.context); - scheduler + .start_threads(context.clone(), result_with_timings); + Self { inner, context } } } @@ -2775,13 +2784,13 @@ mod tests { fn spawn( pool: Arc>, - initial_context: SchedulingContext, + context: SchedulingContext, _result_with_timings: ResultWithTimings, ) -> Self { AsyncScheduler::( Mutex::new(initialized_result_with_timings()), Mutex::new(vec![]), - initial_context, + context, pool, ) }