Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce contention in broadcast channel #6284

Closed
wants to merge 19 commits into from

Conversation

vnetserg
Copy link
Contributor

Motivation

Broadcast channel performance is suboptimal in cases when there are a lot of threads that frequently subscribe for new values, as they all have to contend for the same mutex. See #5465

Solution

The proposed change is to use an atomic linked list to store waiters. This way the tail mutex can be replaced with RwLock, and adding new subscribers only requires a read lock. Justifying safety becomes trickier though, I did my best in the comments.

The PR also includes a benchmark that on my machine shows a dramatic improvement in high contention scenarios:

contention/10           time:   [3.5144 ms 3.5248 ms 3.5358 ms]
                        change: [-2.6135% -1.9438% -1.2888%] (p = 0.00 < 0.05)

contention/100          time:   [13.183 ms 13.209 ms 13.237 ms]
                        change: [-36.660% -36.443% -36.215%] (p = 0.00 < 0.05)

contention/500          time:   [48.847 ms 49.682 ms 50.539 ms]
                        change: [-53.426% -52.584% -51.764%] (p = 0.00 < 0.05)

contention/1000         time:   [117.23 ms 118.28 ms 119.34 ms]
                        change: [-37.337% -36.527% -35.757%] (p = 0.00 < 0.05)

Implement atomic linked list that allows pushing
waiters concurrently, which reduces contention.

Fixes: tokio-rs#5465
@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Jan 14, 2024
@Darksonn
Copy link
Contributor

What implementation are you using for making your linked list atomic? Is it a well-known implementation strategy, or something you came up with on your own?

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Jan 14, 2024
@Darksonn
Copy link
Contributor

It seems like the loom tests are not running on this PR. I guess github must have changed something that broke our CI config:

loom-sync:
name: loom tokio::sync
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-sync') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture sync::tests
working-directory: tokio

Successfully passing the tokio::sync loom tests will be a prerequisite for merging something like this.

@Darksonn
Copy link
Contributor

Hmm, looks like it started running after I merged master into your branch.

@vnetserg
Copy link
Contributor Author

What implementation are you using for making your linked list atomic? Is it a well-known implementation strategy, or something you came up with on your own?

I came up with it on my own, although the idea behind it is quite trivial, so I'm sure it has been implemented by someone already. On the other hand, it is quite a special case of an atomic list - it allows adding elements atomically, but removing them still has to be done with some outside synchronization (in case of this PR - a write lock). I can research and see if there are some popular implementations of this kind of atomic list, if it would help.

tokio/src/util/linked_list.rs Outdated Show resolved Hide resolved
Comment on lines 424 to 432
/// Atomically adds an element first in the list.
/// This method can be called concurrently from multiple threads.
///
/// # Safety
///
/// The caller must ensure that:
/// - `val` is not pushed concurrently by muptiple threads,
/// - `val` is not already part of some list.
pub(crate) unsafe fn push_front(&self, val: L::Handle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting implementation. It looks like it could be correct. I'll have to think more about it.

tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
@carllerche
Copy link
Member

carllerche commented Jan 16, 2024

What implementation are you using for making your linked list atomic? Is it a well-known implementation strategy, or something you came up with on your own?

I came up with it on my own, although the idea behind it is quite trivial, so I'm sure it has been implemented by someone already. On the other hand, it is quite a special case of an atomic list - it allows adding elements atomically, but removing them still has to be done with some outside synchronization (in case of this PR - a write lock). I can research and see if there are some popular implementations of this kind of atomic list, if it would help.

Here are two basic MPSC atomic linked list implementations:

They are different because intrusive lists have additional logic to handle ABA issues as nodes can be removed and re-inserted concurrently to a separate insert. The provided atomic LL implementation is generic over the pointer type, which allows it to be used in an intrusive way. I haven't gotten to the broadcast channel changes yet, so it is possible that the broadcast channel uses it correctly, but I see the atomic LL implementation with generic handles as a hazard.

@@ -310,7 +311,7 @@ struct Shared<T> {
mask: usize,

/// Tail of the queue. Includes the rx wait list.
tail: Mutex<Tail>,
tail: RwLock<Tail>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RwLock implementations tend to be heavier than Mutex. In this case, it looks like all reads are of numbers. Another option is to make these cells AtomicUsize (or AtomicU64) and require writers to these cells to hold the tail mutex. Reads can do the atomic read directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying that we could make Tail fields atomic? That would spare us the need to take a lock in some situations, but the main contention source is adding waiters to the list, which would still have to be done with a lock.

@carllerche
Copy link
Member

So, it seems like the main thing here is to take a "regular" DLL and make the push operation concurrent with other push operations but not with a pop operation. This is done by guarding the entire list with a single RWLock.


/// An atomic intrusive linked list. It allows pushing new nodes concurrently.
/// Removing nodes still requires an exclusive reference.
pub(crate) struct AtomicLinkedList<L, T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you stick with this strategy, I would rename this ConcurrentPushLinkedList or something explicit. Also, update the docs to be very clear that synchronization is required for all operations except concurrent push.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm inclined to agree with Carl --- the name "AtomicLinkedList" suggests that all links are atomic...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, it might be worth investigating whether there are performance benefits to using this list type in other places in tokio::sync. i think there are other synchronization primitives that currently force all tasks pushing to their wait lists pushes to serialize themselves with a Mutex, which could potentially benefit from this type. but, i would save that for a separate branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, it might be worth investigating whether there are performance benefits to using this list type in other places in tokio::sync.

I was eyeing sync::Notify: it has a mutex that guards a list of waiters. #5464 reduced contention on the watch channel by utilizing 8 instances of Notify instead of one. Looks like the perfect candidate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are definitely other places that could benefit from something similar. I'm also wondering if the timer could have a similar optimization. Some people experience significant contention in the timer.

However, timers have more contention on cancellation due to timeouts usually being cancelled...

@carllerche
Copy link
Member

I think I am following. I think the LL implementation works, but the docs need to be updated to very clearly state the requirements for using it.

Also, if you want to explore more tweaks to the code, another option might be a two-lock strategy (similar to the Michael & Scott queue). In this strategy, the head and tail of the linked list are guarded by two separate locks. For this to work, you need a sentinel node. This might be interesting because RwLocks need to worry about fairness. If you decouple the head & tail locks, then push & pop no longer contend.

To remove a node, you would then have to acquire a write lock on both locks. If we assume cancellation is a lower-priority operation, this might work out to be better.


/// An atomic intrusive linked list. It allows pushing new nodes concurrently.
/// Removing nodes still requires an exclusive reference.
pub(crate) struct AtomicLinkedList<L, T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm inclined to agree with Carl --- the name "AtomicLinkedList" suggests that all links are atomic...

}

#[cfg(test)]
#[cfg(not(loom))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have loom tests for this, potentially...


/// An atomic intrusive linked list. It allows pushing new nodes concurrently.
/// Removing nodes still requires an exclusive reference.
pub(crate) struct AtomicLinkedList<L, T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, it might be worth investigating whether there are performance benefits to using this list type in other places in tokio::sync. i think there are other synchronization primitives that currently force all tasks pushing to their wait lists pushes to serialize themselves with a Mutex, which could potentially benefit from this type. but, i would save that for a separate branch.

@vnetserg
Copy link
Contributor Author

Also, if you want to explore more tweaks to the code, another option might be a two-lock strategy (similar to the Michael & Scott queue). In this strategy, the head and tail of the linked list are guarded by two separate locks. For this to work, you need a sentinel node. This might be interesting because RwLocks need to worry about fairness. If you decouple the head & tail locks, then push & pop no longer contend.

That's an interesting strategy, I will read on that. However, according to my experiments, the main contention source is two places: pushing into the waiters list in Receiver::recv_ref and removing from the waiters list in Recv::drop. With two separate locks strategy, recv_ref will contend for the head lock, and Recv::drop may contend for two locks (although on a happy path it doesn't need locks). In the current implementation, the happy path is that recv_ref only takes a read lock and Recv::drop takes no lock at all.

The tradeoff between the current approach and the two locks approach, as I see it, is that the current implementation has writers contending with each other and readers, but readers don't contend with each other. In the two locks approach, writers contend between themselves and readers between themselves. It looks like if we came into situation when a lot if readers have to insert themselves into the waiters list, then writers are apparently slower then readers, so having readers not contend with each other is more beneficial then having them not contend with writers. What do you think?

@Darksonn
Copy link
Contributor

If we're comparing to alternate approaches, then the existing sharding solution used by the watch channel shouldn't be forgotten. Just because the CAS operation is atomic doesn't mean that it doesn't cause contention.

@vnetserg
Copy link
Contributor Author

If we're comparing to alternate approaches, then the existing sharding solution used by the watch channel shouldn't be forgotten. Just because the CAS operation is atomic doesn't mean that it doesn't cause contention.

This is true, sharding is a very valid option. Initially I wanted to implement it here, but I gave it up because of the complexity of the broadcast channel's interactions with the tail lock.

On the other hand, even with atomic pushes we can always fallback to sharding. If one list with atomic pushes performs better than one mutex-protected list, then probably sharded comparison will also be in favor of lists with atomic pushes (under high enough contention, that is - a mutex is probably faster when contention is low enough).

@carllerche
Copy link
Member

That's an interesting strategy, I will read on that. However, according to my experiments, the main contention source is two places: pushing into the waiters list in Receiver::recv_ref and removing from the waiters list in Recv::drop.

Thanks for clarifying. I'm going to dig into the details a bit because the pattern of using linked list for waiters is used everywhere in Tokio, so a change here could be generally applicable. For example, Readiness in scheduled_io.rs used for all async fn I/O ops follows the same pattern.

I went back to the original implementation to refresh my memory and Recv::drop does acquire the lock to read queued. It looks like the way you solved that was by making queued atomic so you can read it in drop without the lock. If queued is false, then don't acquire the lock. This makes a lot of sense. It also seems to be unrelated to making the push operation concurrent.

How much of the gain you observe is due to making queued an atomic vs the other changes? I don't think many other places in Tokio would benefit from making push concurrent, but I am not 100% sure about that.

Also, thinking more about the two-lock queue, I don't think it would support a doubly linked list.

@vnetserg
Copy link
Contributor Author

How much of the gain you observe is due to making queued an atomic vs the other changes?

That's a good question, I made a separate branch to test it. The benchmark shows the following on my machine:

  • All the current changes vs master:
contention/10           time:   [3.4879 ms 3.4964 ms 3.5052 ms]
                        change: [-5.2124% -4.8615% -4.5131%] (p = 0.00 < 0.05)

contention/100          time:   [11.896 ms 11.938 ms 11.983 ms]
                        change: [-36.770% -36.518% -36.276%] (p = 0.00 < 0.05)

contention/500          time:   [39.120 ms 39.326 ms 39.565 ms]
                        change: [-63.954% -63.624% -63.294%] (p = 0.00 < 0.05)

contention/1000         time:   [76.746 ms 77.944 ms 78.971 ms]
                        change: [-59.626% -58.912% -58.183%] (p = 0.00 < 0.05)
  • Atomic waiter.queued vs master:
contention/10           time:   [3.6555 ms 3.6723 ms 3.6895 ms]
                        change: [+1.6649% +2.3708% +3.0364%] (p = 0.00 < 0.05)

contention/100          time:   [18.266 ms 18.300 ms 18.336 ms]
                        change: [-13.471% -13.245% -13.032%] (p = 0.00 < 0.05)

contention/500          time:   [75.161 ms 75.410 ms 75.631 ms]
                        change: [-22.168% -21.858% -21.588%] (p = 0.00 < 0.05)

contention/1000         time:   [126.48 ms 130.42 ms 134.38 ms]
                        change: [-43.144% -41.486% -39.650%] (p = 0.00 < 0.05)

The smallest benchmark was probably subject to noise, but all in all, it looks like this change alone explains somewhere between a third and a half of performance improvement.

@vnetserg
Copy link
Contributor Author

vnetserg commented Jan 18, 2024

I don't think many other places in Tokio would benefit from making push concurrent, but I am not 100% sure about that.

At the very least, I think sync::Notify is worth a shot. It has a mutex-guarded LL and I think it is safe to assume that, in a typical use-case, there are more subscriptions than notifications.

@Darksonn
Copy link
Contributor

I discussed the linked list you proposed with Paul E. McKenney. He agrees that the concurrent insertion is correct, but he recommended that we go for sharding instead: "In my experience, sharding is almost always way better than tweaking locking and atomic operations. Shard first, tweak later, and even then only if necessary."

Using multiple Notify instances is one way, but there's also the linked list from #6001. We could use a hash of the address of the node as index for the sharded list.

@vnetserg
Copy link
Contributor Author

vnetserg commented Jan 19, 2024

Ok, I will look into sharding, probably better as a separate PR.

Do you think it is worth creating a PR to make queued AtomicBool as a low-hanging 15-40% boost?

@Darksonn
Copy link
Contributor

Yes, that sounds simple. I think that would be a good PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants