diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index ad8d127b8d6e5b..e28973d86f76ef 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -10,7 +10,7 @@ use { assert_matches::assert_matches, - crossbeam_channel::{select, unbounded, Receiver, Sender}, + crossbeam_channel::{select, unbounded, Receiver, SendError, Sender}, log::*, solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, @@ -247,36 +247,138 @@ type ExecutedTaskPayload = SubchanneledPayload, ()>; // minimum at the cost of a single heap allocation per switching for the sake of Box-ing the Self // type to avoid infinite mem::size_of() due to the recursive type structure. Needless to say, such // an allocation can be amortized to be negligible. -enum ChainedChannel { - Payload(P1), - PayloadAndChannel(Box>), -} +mod chained_channel { + use super::*; -trait WithChannelAndPayload: Send + Sync { - fn payload_and_channel(self: Box) -> PayloadAndChannelInner; -} + // hide variants by putting this inside newtype + enum ChainedChannelPrivate { + Payload(P), + ContextAndChannel(Box>), + } -type PayloadAndChannelInner = (P2, Receiver>); + pub(super) struct ChainedChannel(ChainedChannelPrivate); -struct PayloadAndChannelWrapper(PayloadAndChannelInner); + trait WithContextAndPayload: Send + Sync { + fn context_and_channel(self: Box) -> ContextAndChannelInner; + } -impl WithChannelAndPayload for PayloadAndChannelWrapper -where - P1: Send + Sync, - P2: Send + Sync, -{ - fn payload_and_channel(self: Box) -> PayloadAndChannelInner { - self.0 + type ContextAndChannelInner = (C, Receiver>); + + struct ContextAndChannelWrapper(ContextAndChannelInner); + + impl WithContextAndPayload for ContextAndChannelWrapper + where + P: Send + Sync, + C: Send + Sync, + { + fn context_and_channel(self: Box) -> ContextAndChannelInner { + self.0 + } } -} -impl ChainedChannel -where - P1: Send + Sync + 'static, - P2: Send + Sync + 'static, -{ - fn chain_to_new_channel(payload: P2, receiver: Receiver) -> Self { - Self::PayloadAndChannel(Box::new(PayloadAndChannelWrapper((payload, receiver)))) + impl ChainedChannel + where + P: Send + Sync + 'static, + C: Send + Sync + 'static, + { + fn chain_to_new_channel(context: C, receiver: Receiver) -> Self { + Self(ChainedChannelPrivate::ContextAndChannel(Box::new( + ContextAndChannelWrapper((context, receiver)), + ))) + } + } + + pub(super) struct ChainedChannelSender { + sender: Sender>, + } + + impl ChainedChannelSender + where + P: Send + Sync + 'static, + C: Send + Sync + 'static + Clone, + { + fn new(sender: Sender>) -> Self { + Self { sender } + } + + pub(super) fn send_payload( + &self, + payload: P, + ) -> std::result::Result<(), SendError>> { + self.sender + .send(ChainedChannel(ChainedChannelPrivate::Payload(payload))) + } + + pub(super) fn send_chained_channel( + &mut self, + context: C, + count: usize, + ) -> std::result::Result<(), SendError>> { + let (chained_sender, chained_receiver) = crossbeam_channel::unbounded(); + for _ in 0..count { + self.sender.send(ChainedChannel::chain_to_new_channel( + context.clone(), + chained_receiver.clone(), + ))? + } + self.sender = chained_sender; + Ok(()) + } + } + + pub(super) struct ChainedChannelReceiver { + receiver: Receiver>, + context: C, + } + + impl Clone for ChainedChannelReceiver { + fn clone(&self) -> Self { + Self { + receiver: self.receiver.clone(), + context: self.context.clone(), + } + } + } + + impl ChainedChannelReceiver { + fn new(receiver: Receiver>, initial_context: C) -> Self { + Self { + receiver, + context: initial_context, + } + } + + pub(super) fn context(&self) -> &C { + &self.context + } + + pub(super) fn for_select(&self) -> &Receiver> { + &self.receiver + } + + pub(super) fn after_select(&mut self, message: ChainedChannel) -> Option

{ + match message.0 { + ChainedChannelPrivate::Payload(payload) => Some(payload), + ChainedChannelPrivate::ContextAndChannel(new_context_and_channel) => { + (self.context, self.receiver) = new_context_and_channel.context_and_channel(); + None + } + } + } + } + + pub(super) fn unbounded( + initial_context: C, + ) -> (ChainedChannelSender, ChainedChannelReceiver) + where + P: Send + Sync + 'static, + C: Send + Sync + 'static + Clone, + { + let (sender, receiver) = crossbeam_channel::unbounded(); + ( + ChainedChannelSender::new(sender), + ChainedChannelReceiver::new(receiver, initial_context), + ) } } @@ -369,23 +471,6 @@ impl, TH: TaskHandler> ThreadManager { ); } - fn propagate_context_to_handler_threads( - runnable_task_sender: &mut Sender>, - context: SchedulingContext, - handler_count: usize, - ) { - let (next_sessioned_task_sender, runnable_task_receiver) = unbounded(); - for _ in 0..handler_count { - runnable_task_sender - .send(ChainedChannel::chain_to_new_channel( - context.clone(), - runnable_task_receiver.clone(), - )) - .unwrap(); - } - *runnable_task_sender = next_sessioned_task_sender; - } - fn take_session_result_with_timings(&mut self) -> ResultWithTimings { self.session_result_with_timings.take().unwrap() } @@ -399,8 +484,8 @@ impl, TH: TaskHandler> ThreadManager { } fn start_threads(&mut self, context: &SchedulingContext) { - let (runnable_task_sender, runnable_task_receiver) = - unbounded::>(); + let (mut runnable_task_sender, runnable_task_receiver) = + chained_channel::unbounded::(context.clone()); let (executed_task_sender, executed_task_receiver) = unbounded::(); let (finished_task_sender, finished_task_receiver) = unbounded::>(); let (accumulated_result_sender, accumulated_result_receiver) = @@ -422,7 +507,6 @@ impl, TH: TaskHandler> ThreadManager { let handler_count = self.handler_count; let session_result_sender = self.session_result_sender.clone(); let new_task_receiver = self.new_task_receiver.clone(); - let mut runnable_task_sender = runnable_task_sender.clone(); let mut session_ending = false; let mut active_task_count: usize = 0; @@ -481,17 +565,16 @@ impl, TH: TaskHandler> ThreadManager { // be resolved in the case of single-threaded FIFO like this. active_task_count = active_task_count.checked_add(1).unwrap(); runnable_task_sender - .send(ChainedChannel::Payload(task)) + .send_payload(task) .unwrap(); } NewTaskPayload::OpenSubchannel(context) => { // signal about new SchedulingContext to both handler and // accumulator threads - Self::propagate_context_to_handler_threads( - &mut runnable_task_sender, + runnable_task_sender.send_chained_channel( context, handler_count - ); + ).unwrap(); executed_task_sender .send(ExecutedTaskPayload::OpenSubchannel(())) .unwrap(); @@ -528,28 +611,25 @@ impl, TH: TaskHandler> ThreadManager { let handler_main_loop = || { let pool = self.pool.clone(); - let mut bank = context.bank().clone(); let mut runnable_task_receiver = runnable_task_receiver.clone(); let finished_task_sender = finished_task_sender.clone(); move || loop { let (task, sender) = select! { - recv(runnable_task_receiver) -> message => { - match message.unwrap() { - ChainedChannel::Payload(task) => { - (task, &finished_task_sender) - } - ChainedChannel::PayloadAndChannel(new_channel) => { - let new_context; - (new_context, runnable_task_receiver) = new_channel.payload_and_channel(); - bank = new_context.bank().clone(); - continue; - } + recv(runnable_task_receiver.for_select()) -> message => { + if let Some(task) = runnable_task_receiver.after_select(message.unwrap()) { + (task, &finished_task_sender) + } else { + continue; } }, }; let mut task = ExecutedTask::new_boxed(task); - Self::execute_task_with_handler(&bank, &mut task, &pool.handler_context); + Self::execute_task_with_handler( + runnable_task_receiver.context().bank(), + &mut task, + &pool.handler_context, + ); sender.send(task).unwrap(); } };