From accca1fc2134a865b650a8e23469f89b3669ae75 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Sun, 25 Jun 2023 15:13:28 +0800 Subject: [PATCH 1/7] style: rm unnecessary mod prefix --- tokio/src/sync/oneshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 0ff3ea28177..af3cc854f88 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -801,7 +801,7 @@ impl Sender { if state.is_closed() { coop.made_progress(); - return Poll::Ready(()); + return Ready(()); } if state.is_tx_task_set() { From daa1b00c72237b1c79bbb6ea88cf399199f86a5c Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 28 Jun 2023 11:29:24 +0800 Subject: [PATCH 2/7] mpsc cache line optimize The origin benchmark result: $ cargo bench --bench sync_mpsc running 10 tests test contention_bounded ... bench: 1,008,359 ns/iter (+/- 412,814) test contention_bounded_full ... bench: 1,427,243 ns/iter (+/- 500,287) test contention_unbounded ... bench: 845,013 ns/iter (+/- 394,673) test create_100_000_medium ... bench: 182 ns/iter (+/- 1) test create_100_medium ... bench: 182 ns/iter (+/- 1) test create_1_medium ... bench: 181 ns/iter (+/- 2) test send_large ... bench: 16,525 ns/iter (+/- 329) test send_medium ... bench: 628 ns/iter (+/- 5) test uncontented_bounded ... bench: 478,514 ns/iter (+/- 1,923) test uncontented_unbounded ... bench: 303,990 ns/iter (+/- 1,607) test result: ok. 0 passed; 0 failed; 0 ignored; 10 measured The current benchmark result: $ cargo bench --bench sync_mpsc running 10 tests test contention_bounded ... bench: 606,516 ns/iter (+/- 402,326) test contention_bounded_full ... bench: 727,239 ns/iter (+/- 340,756) test contention_unbounded ... bench: 760,523 ns/iter (+/- 482,628) test create_100_000_medium ... bench: 315 ns/iter (+/- 5) test create_100_medium ... bench: 317 ns/iter (+/- 6) test create_1_medium ... bench: 315 ns/iter (+/- 5) test send_large ... bench: 16,166 ns/iter (+/- 516) test send_medium ... bench: 695 ns/iter (+/- 6) test uncontented_bounded ... bench: 456,975 ns/iter (+/- 18,969) test uncontented_unbounded ... bench: 306,282 ns/iter (+/- 3,058) test result: ok. 0 passed; 0 failed; 0 ignored; 10 measured --- tokio/src/sync/mpsc/chan.rs | 21 ++++----- tokio/src/util/cacheline.rs | 90 +++++++++++++++++++++++++++++++++++++ tokio/src/util/mod.rs | 2 + 3 files changed, 103 insertions(+), 10 deletions(-) create mode 100644 tokio/src/util/cacheline.rs diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 6f87715dd34..11d9be42e91 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -12,6 +12,7 @@ use std::process; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; +use crate::util::cacheline::CachePadded; /// Channel sender. pub(crate) struct Tx { @@ -46,18 +47,18 @@ pub(crate) trait Semaphore { } pub(super) struct Chan { + /// Handle to the push half of the lock-free list. + tx: CachePadded>, + + /// Receiver waker. Notified when a value is pushed into the channel. + rx_waker: CachePadded, + /// Notifies all tasks listening for the receiver being dropped. notify_rx_closed: Notify, - /// Handle to the push half of the lock-free list. - tx: list::Tx, - /// Coordinates access to channel's capacity. semaphore: S, - /// Receiver waker. Notified when a value is pushed into the channel. - rx_waker: AtomicWaker, - /// Tracks the number of outstanding sender handles. /// /// When this drops to zero, the send half of the channel is closed. @@ -73,9 +74,9 @@ where { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Chan") - .field("tx", &self.tx) + .field("tx", &*self.tx) .field("semaphore", &self.semaphore) - .field("rx_waker", &self.rx_waker) + .field("rx_waker", &*self.rx_waker) .field("tx_count", &self.tx_count) .field("rx_fields", &"...") .finish() @@ -108,9 +109,9 @@ pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { let chan = Arc::new(Chan { notify_rx_closed: Notify::new(), - tx, + tx : CachePadded::new(tx), semaphore, - rx_waker: AtomicWaker::new(), + rx_waker: CachePadded::new(AtomicWaker::new()), tx_count: AtomicUsize::new(1), rx_fields: UnsafeCell::new(RxFields { list: rx, diff --git a/tokio/src/util/cacheline.rs b/tokio/src/util/cacheline.rs new file mode 100644 index 00000000000..87dca19026f --- /dev/null +++ b/tokio/src/util/cacheline.rs @@ -0,0 +1,90 @@ +use std::ops::{Deref, DerefMut}; + +/// Pads and aligns a value to the length of a cache line. +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache +// lines at a time, so we have to align to 128 bytes rather than 64. +// +// Sources: +// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf +// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 +// +// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. +// +// Sources: +// - https://www.mono-project.com/news/2016/09/12/arm64-icache/ +// +// powerpc64 has 128-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 +#[cfg_attr( +any(target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64",), +repr(align(128)) +)] +// arm, mips, mips64, and riscv64 have 32-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 +#[cfg_attr( +any( +target_arch = "arm", +target_arch = "mips", +target_arch = "mips64", +target_arch = "riscv64", +), +repr(align(32)) +)] +// s390x has 256-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +// x86 and wasm have 64-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// +// All others are assumed to have 64-byte cache line size. +#[cfg_attr( +not(any( +target_arch = "x86_64", +target_arch = "aarch64", +target_arch = "powerpc64", +target_arch = "arm", +target_arch = "mips", +target_arch = "mips64", +target_arch = "riscv64", +target_arch = "s390x", +)), +repr(align(64)) +)] +pub(crate) struct CachePadded { + value: T, +} + +impl CachePadded { + /// Pads and aligns a value to the length of a cache line. + pub(crate) fn new(value: T) -> CachePadded { + CachePadded:: { value } + } +} + +impl Deref for CachePadded { + type Target = T; + + fn deref(&self) -> &T { + &self.value + } +} + +impl DerefMut for CachePadded { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 6a7d4b1037c..432cef017c7 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -81,3 +81,5 @@ pub(crate) mod error; pub(crate) mod memchr; pub(crate) mod markers; + +pub(crate) mod cacheline; \ No newline at end of file From 8f9a008979b34e001337535add9d553ef6944fea Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 28 Jun 2023 11:51:25 +0800 Subject: [PATCH 3/7] rfmt cacheline.rs --- .idea/tokio-2.8.2-coderead.iml | 4 + .idea/vcs.xml | 6 ++ .idea/workspace.xml | 153 +++++++++++++++++++++++++++++++++ tokio/src/util/cacheline.rs | 44 +++++----- 4 files changed, 187 insertions(+), 20 deletions(-) create mode 100644 .idea/tokio-2.8.2-coderead.iml create mode 100644 .idea/vcs.xml create mode 100644 .idea/workspace.xml diff --git a/.idea/tokio-2.8.2-coderead.iml b/.idea/tokio-2.8.2-coderead.iml new file mode 100644 index 00000000000..7ee078df7b0 --- /dev/null +++ b/.idea/tokio-2.8.2-coderead.iml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 00000000000..35eb1ddfbbc --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 00000000000..b5cf8c8bf38 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,153 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true + + \ No newline at end of file diff --git a/tokio/src/util/cacheline.rs b/tokio/src/util/cacheline.rs index 87dca19026f..274c5fdab66 100644 --- a/tokio/src/util/cacheline.rs +++ b/tokio/src/util/cacheline.rs @@ -19,8 +19,12 @@ use std::ops::{Deref, DerefMut}; // Sources: // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 #[cfg_attr( -any(target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64",), -repr(align(128)) + any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + ), + repr(align(128)) )] // arm, mips, mips64, and riscv64 have 32-byte cache line size. // @@ -31,13 +35,13 @@ repr(align(128)) // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 #[cfg_attr( -any( -target_arch = "arm", -target_arch = "mips", -target_arch = "mips64", -target_arch = "riscv64", -), -repr(align(32)) + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + ), + repr(align(32)) )] // s390x has 256-byte cache line size. // @@ -52,17 +56,17 @@ repr(align(32)) // // All others are assumed to have 64-byte cache line size. #[cfg_attr( -not(any( -target_arch = "x86_64", -target_arch = "aarch64", -target_arch = "powerpc64", -target_arch = "arm", -target_arch = "mips", -target_arch = "mips64", -target_arch = "riscv64", -target_arch = "s390x", -)), -repr(align(64)) + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + target_arch = "s390x", + )), + repr(align(64)) )] pub(crate) struct CachePadded { value: T, From c7a7b405854a28413299c94dafafc53b9e40e46c Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 28 Jun 2023 11:52:24 +0800 Subject: [PATCH 4/7] ci --- .idea/tokio-2.8.2-coderead.iml | 4 - .idea/vcs.xml | 6 -- .idea/workspace.xml | 153 --------------------------------- 3 files changed, 163 deletions(-) delete mode 100644 .idea/tokio-2.8.2-coderead.iml delete mode 100644 .idea/vcs.xml delete mode 100644 .idea/workspace.xml diff --git a/.idea/tokio-2.8.2-coderead.iml b/.idea/tokio-2.8.2-coderead.iml deleted file mode 100644 index 7ee078df7b0..00000000000 --- a/.idea/tokio-2.8.2-coderead.iml +++ /dev/null @@ -1,4 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 35eb1ddfbbc..00000000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml deleted file mode 100644 index b5cf8c8bf38..00000000000 --- a/.idea/workspace.xml +++ /dev/null @@ -1,153 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - true - - \ No newline at end of file From ca6127f33c9fbd27a9ee8081a167ceec933c60d7 Mon Sep 17 00:00:00 2001 From: spongecaptain Date: Thu, 29 Jun 2023 10:59:44 +0800 Subject: [PATCH 5/7] chan.rs format --- tokio/src/sync/mpsc/chan.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 11d9be42e91..f24bb03fb4f 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -6,13 +6,13 @@ use crate::runtime::park::CachedParkThread; use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::{bounded, list, unbounded}; use crate::sync::notify::Notify; +use crate::util::cacheline::CachePadded; use std::fmt; use std::process; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; -use crate::util::cacheline::CachePadded; /// Channel sender. pub(crate) struct Tx { @@ -109,7 +109,7 @@ pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { let chan = Arc::new(Chan { notify_rx_closed: Notify::new(), - tx : CachePadded::new(tx), + tx: CachePadded::new(tx), semaphore, rx_waker: CachePadded::new(AtomicWaker::new()), tx_count: AtomicUsize::new(1), From dad16243652122a6b6aac085cee4a69b16815d6c Mon Sep 17 00:00:00 2001 From: spongecaptain Date: Thu, 29 Jun 2023 11:05:18 +0800 Subject: [PATCH 6/7] src/util/mod.rs rustfmt --- tokio/src/util/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 432cef017c7..568881c6641 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -82,4 +82,4 @@ pub(crate) mod memchr; pub(crate) mod markers; -pub(crate) mod cacheline; \ No newline at end of file +pub(crate) mod cacheline; From 1fa2805e1929082cc9ce68e7b9fd5b0100b585d0 Mon Sep 17 00:00:00 2001 From: spongecaptain Date: Sun, 30 Jul 2023 09:07:51 +0800 Subject: [PATCH 7/7] enable sync future for CachePadded --- tokio/src/util/cacheline.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/src/util/cacheline.rs b/tokio/src/util/cacheline.rs index 274c5fdab66..64fd5ccad33 100644 --- a/tokio/src/util/cacheline.rs +++ b/tokio/src/util/cacheline.rs @@ -1,3 +1,4 @@ +#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] use std::ops::{Deref, DerefMut}; /// Pads and aligns a value to the length of a cache line.