From 04b6f3806668b6ac351aa0ed0278593de4d82a76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=8F=E4=B8=80?= Date: Fri, 2 Jun 2023 12:00:35 +0800 Subject: [PATCH 1/2] rt(threaded): adjust transition_from_parked behavior after introducing disable_lifo_slot feature --- .../runtime/scheduler/multi_thread/queue.rs | 9 ++++--- .../runtime/scheduler/multi_thread/worker.rs | 27 ++++++++++++++----- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 6444df88b8a..32c6357875c 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -105,9 +105,12 @@ pub(crate) fn local() -> (Steal, Local) { } impl Local { - /// Returns true if the queue has entries that can be stolen. - pub(crate) fn is_stealable(&self) -> bool { - !self.inner.is_empty() + /// Returns the number of entries in the queue + pub(crate) fn tasks_num(&self) -> (usize /* stealable */, usize /* total */) { + let n = self.inner.len() as usize; + // I'm not sure if it's really necessary to return a tuple, but based on has_tasks's comments, + // it seems that there may be tasks in queue that cannot be stolen? + (n, n) } /// How many tasks can be pushed into the queue diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 47ff86a5c3b..3e19a1ae98a 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -718,9 +718,7 @@ impl Context { // Place `park` back in `core` core.park = Some(park); - // If there are tasks available to steal, but this worker is not - // looking for tasks to steal, notify another worker. - if !core.is_searching && core.run_queue.is_stealable() { + if core.should_notify_others() { self.worker.handle.notify_parked_local(); } @@ -846,12 +844,29 @@ impl Core { worker.handle.transition_worker_from_searching(); } + fn has_tasks(&self) -> bool { + self.lifo_slot.is_some() || self.run_queue.has_tasks() + } + + fn should_notify_others(&self) -> bool { + // If there are tasks available to steal, but this worker is not + // looking for tasks to steal, notify another worker. + if self.is_searching { + return false; + } + let (stealable_tasks, total_tasks) = self.run_queue.tasks_num(); + if stealable_tasks == 0 { + return false; + } + self.lifo_slot.is_some() as usize + (total_tasks - stealable_tasks) > 0 + } + /// Prepares the worker state for parking. /// /// Returns true if the transition happened, false if there is work to do first. fn transition_to_parked(&mut self, worker: &Worker) -> bool { // Workers should not park if they have work to do - if self.lifo_slot.is_some() || self.run_queue.has_tasks() || self.is_traced { + if self.has_tasks() || self.is_traced { return false; } @@ -877,9 +892,9 @@ impl Core { /// Returns `true` if the transition happened. fn transition_from_parked(&mut self, worker: &Worker) -> bool { - // If a task is in the lifo slot, then we must unpark regardless of + // If a task is in the lifo slot/run queue, then we must unpark regardless of // being notified - if self.lifo_slot.is_some() { + if self.has_tasks() { // When a worker wakes, it should only transition to the "searching" // state when the wake originates from another worker *or* a new task // is pushed. We do *not* want the worker to transition to "searching" From f234563831d61c0841627ec4efbd685c7276e390 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=8F=E4=B8=80?= Date: Sat, 3 Jun 2023 10:31:10 +0800 Subject: [PATCH 2/2] Change task_num to len based on the annotation --- tokio/src/runtime/scheduler/multi_thread/queue.rs | 7 ++----- tokio/src/runtime/scheduler/multi_thread/worker.rs | 6 +----- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 32c6357875c..dd66fa2dde1 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -106,11 +106,8 @@ pub(crate) fn local() -> (Steal, Local) { impl Local { /// Returns the number of entries in the queue - pub(crate) fn tasks_num(&self) -> (usize /* stealable */, usize /* total */) { - let n = self.inner.len() as usize; - // I'm not sure if it's really necessary to return a tuple, but based on has_tasks's comments, - // it seems that there may be tasks in queue that cannot be stolen? - (n, n) + pub(crate) fn len(&self) -> usize { + self.inner.len() as usize } /// How many tasks can be pushed into the queue diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 3e19a1ae98a..7fc335f5165 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -854,11 +854,7 @@ impl Core { if self.is_searching { return false; } - let (stealable_tasks, total_tasks) = self.run_queue.tasks_num(); - if stealable_tasks == 0 { - return false; - } - self.lifo_slot.is_some() as usize + (total_tasks - stealable_tasks) > 0 + self.lifo_slot.is_some() as usize + self.run_queue.len() > 1 } /// Prepares the worker state for parking.