diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index e84677e6007fff..aeededab8ee784 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -25,8 +25,8 @@ use { log::*, solana_program_runtime::timings::ExecuteTimings, solana_sdk::{ + clock::Slot, hash::Hash, - slot_history::Slot, transaction::{Result, SanitizedTransaction, TransactionError}, }, std::{ @@ -34,6 +34,7 @@ use { mem, ops::Deref, sync::{Arc, RwLock}, + thread, }, }; #[cfg(feature = "dev-context-only-utils")] @@ -623,7 +624,7 @@ impl BankWithSchedulerInner { "wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...", bank.slot(), reason, - std::thread::current(), + thread::current(), ); let mut scheduler = scheduler.write().unwrap(); @@ -656,7 +657,7 @@ impl BankWithSchedulerInner { reason, was_noop, result_with_timings.as_ref().map(|(result, _)| result), - std::thread::current(), + thread::current(), ); trace!( "wait_for_scheduler_termination(result_with_timings: {:?})", @@ -667,7 +668,7 @@ impl BankWithSchedulerInner { } fn drop_scheduler(&self) { - if std::thread::panicking() { + if thread::panicking() { error!( "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...", self.bank.slot(), diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 1ea470aade791e..231ca8843f9421 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -940,7 +940,7 @@ impl, TH: TaskHandler> ThreadManager { // 4. the handler thread processes the dispatched task. // 5. the handler thread reply back to the scheduler thread as an executed task. // 6. the scheduler thread post-processes the executed task. - let scheduler_main_loop = || { + let scheduler_main_loop = { let handler_count = self.pool.handler_count; let session_result_sender = self.session_result_sender.clone(); // Taking new_task_receiver here is important to ensure there's a single receiver. In @@ -1006,7 +1006,7 @@ impl, TH: TaskHandler> ThreadManager { }; // The following loop maintains and updates ResultWithTimings as its - // externally-provieded mutable state for each session in this way: + // externally-provided mutable state for each session in this way: // // 1. Initial result_with_timing is propagated implicitly by the moved variable. // 2. Subsequent result_with_timings are propagated explicitly from @@ -1062,9 +1062,8 @@ impl, TH: TaskHandler> ThreadManager { Ok(NewTaskPayload::CloseSubchannel) => { session_ending = true; } - Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) => { - unreachable!(); - } + Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) => + unreachable!(), Err(RecvError) => { // Mostly likely is that this scheduler is dropped for pruned blocks of // abandoned forks... @@ -1087,17 +1086,16 @@ impl, TH: TaskHandler> ThreadManager { is_finished = session_ending && state_machine.has_no_active_task(); } - if session_ending { - state_machine.reinitialize(); - session_result_sender - .send(std::mem::replace( - &mut result_with_timings, - initialized_result_with_timings(), - )) - .expect("always outlived receiver"); - session_ending = false; - } + // Finalize the current session after asserting it's explicitly requested so. + assert!(session_ending); + // Send result first because this is blocking the replay code-path. + session_result_sender + .send(result_with_timings) + .expect("always outlived receiver"); + state_machine.reinitialize(); + session_ending = false; + // Prepare for the new session. match new_task_receiver.recv() { Ok(NewTaskPayload::OpenSubchannel(( new_context, @@ -1111,13 +1109,13 @@ impl, TH: TaskHandler> ThreadManager { .unwrap(); result_with_timings = new_result_with_timings; } - Ok(_) => { - unreachable!(); - } Err(_) => { - // This unusual condition must be triggered by ThreadManager::drop(); + // This unusual condition must be triggered by ThreadManager::drop(). + // Initialize result_with_timings with a harmless value... + result_with_timings = initialized_result_with_timings(); break 'nonaborted_main_loop; } + Ok(_) => unreachable!(), } } @@ -1212,7 +1210,7 @@ impl, TH: TaskHandler> ThreadManager { self.scheduler_thread = Some( thread::Builder::new() .name("solScheduler".to_owned()) - .spawn_tracked(scheduler_main_loop()) + .spawn_tracked(scheduler_main_loop) .unwrap(), );