Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Mutex & AtomicCell #3346

Merged
merged 12 commits into from
Feb 7, 2023
Merged

Conversation

BalmungSan
Copy link
Contributor

@BalmungSan BalmungSan commented Jan 5, 2023

Optimize Mutex & AtomicCell

This PR adds two alternative implementations of Mutex & AtomicCell based on Async rather than just Concurrent, these implementations should be more efficient.
This is a binary and source-compatible change. Since users don't need to do anything special to get those new implementations, rather if the underlying data type they use support Async; e.g. like IO, they will get the optimized versions automatically.


Benchmark results

Configuration

  • forks: 2
  • threads: 1
  • warm-up iterations: 10
  • iterations: 10

Mutex

Concurrent

Benchmark (fibers) (iterations) Mode Cnt Score Error Units
MutexBenchmark.happyPathConcurrent 10 1000 thrpt 20 192.475 ± 2.296 ops/s
MutexBenchmark.happyPathConcurrent 50 1000 thrpt 20 41.280 ± 0.486 ops/s
MutexBenchmark.happyPathConcurrent 100 1000 thrpt 20 19.415 ± 0.289 ops/s
- - - - - - - -
MutexBenchmark.highContentionConcurrent 10 1000 thrpt 20 26.426 ± 0.330 ops/s
MutexBenchmark.highContentionConcurrent 50 1000 thrpt 20 6.963 ± 0.204 ops/s
MutexBenchmark.highContentionConcurrent 100 1000 thrpt 20 3.750 ± 0.055 ops/s
- - - - - - - -
MutexBenchmark.cancellationConcurrent 10 1000 thrpt 20 20.768 ± 1.628 ops/s
MutexBenchmark.cancellationConcurrent 50 1000 thrpt 20 6.835 ± 0.065 ops/s
MutexBenchmark.cancellationConcurrent 100 1000 thrpt 20 3.653 ± 0.128 ops/s

Async

Benchmark (fibers) (iterations) Mode Cnt Score Error Units
MutexBenchmark.happyPathAsync 10 1000 thrpt 20 212.839 ± 1.069 ops/s
MutexBenchmark.happyPathAsync 50 1000 thrpt 20 46.925 ± 1.366 ops/s
MutexBenchmark.happyPathAsync 100 1000 thrpt 20 23.266 ± 0.127 ops/s
- - - - - - - -
MutexBenchmark.highContentionAsync 10 1000 thrpt 20 33.913 ± 2.316 ops/s
MutexBenchmark.highContentionAsync 50 1000 thrpt 20 8.227 ± 0.082 ops/s
MutexBenchmark.highContentionAsync 100 1000 thrpt 20 4.577 ± 0.160 ops/s
- - - - - - - -
MutexBenchmark.cancellationAsync 10 1000 thrpt 20 24.807 ± 0.694 ops/s
MutexBenchmark.cancellationAsync 50 1000 thrpt 20 8.832 ± 0.086 ops/s
MutexBenchmark.cancellationAsync 100 1000 thrpt 20 5.034 ± 0.027 ops/s

AtomicCell (using the respective Mutex)

Concurrent

Benchmark (fibers) (iterations) Mode Cnt Score Error Units
AtomicCellBenchmark.happyPathConcurrent 10 1000 thrpt 20 164.129 ± 0.711 ops/s
AtomicCellBenchmark.happyPathConcurrent 50 1000 thrpt 20 34.864 ± 0.628 ops/s
AtomicCellBenchmark.happyPathConcurrent 100 1000 thrpt 20 17.276 ± 0.024 ops/s
- - - - - - - -
AtomicCellBenchmark.highContentionConcurrent 10 1000 thrpt 20 27.247 ± 1.409 ops/s
AtomicCellBenchmark.highContentionConcurrent 50 1000 thrpt 20 6.315 ± 0.073 ops/s
AtomicCellBenchmark.highContentionConcurrent 100 1000 thrpt 20 2.893 ± 0.034 ops/s
- - - - - - - -
AtomicCellBenchmark.cancellationConcurrent 10 1000 thrpt 20 20.799 ± 0.854 ops/s
AtomicCellBenchmark.cancellationConcurrent 50 1000 thrpt 20 7.363 ± 0.195 ops/s
AtomicCellBenchmark.cancellationConcurrent 100 1000 thrpt 20 3.723 ± 0.025 ops/s

Async

Benchmark (fibers) (iterations) Mode Cnt Score Error Units
AtomicCellBenchmark.happyPathAsync 10 1000 thrpt 20 192.392 ± 0.191 ops/s
AtomicCellBenchmark.happyPathAsync 50 1000 thrpt 20 40.638 ± 0.139 ops/s
AtomicCellBenchmark.happyPathAsync 100 1000 thrpt 20 18.958 ± 0.139 ops/s
- - - - - - - -
AtomicCellBenchmark.highContentionAsync 10 1000 thrpt 20 29.978 ± 0.128 ops/s
AtomicCellBenchmark.highContentionAsync 50 1000 thrpt 20 8.026 ± 0.366 ops/s
AtomicCellBenchmark.highContentionAsync 100 1000 thrpt 20 4.632 ± 0.045 ops/s
- - - - - - - -
AtomicCellBenchmark.cancellationAsync 10 1000 thrpt 20 23.468 ± 0.547 ops/s
AtomicCellBenchmark.cancellationAsync 50 1000 thrpt 20 8.633 ± 0.073 ops/s
AtomicCellBenchmark.cancellationAsync 100 1000 thrpt 20 5.680 ± 0.101 ops/s

Conclusions

IMHO the improvements are noticeable, being around a 20% increase in most situations.
Also, AtomicCell results are pretty equivalent; but still slightly better, than the Mutex one. Suggesting that most of the increase comes from the Mutex, still I think the AsyncAtomicCell is worth it.


AtomicCellBenchmarkResults.txt
MutexBenchmarkResults.txt

@BalmungSan BalmungSan force-pushed the improve-mutex branch 2 times, most recently from e38fe68 to f0827a1 Compare January 5, 2023 21:47
@armanbilge

This comment was marked as resolved.

@armanbilge
Copy link
Member

I said "improve" but I am not totally sure this version is better than just delegating to Semaphore

Here's an idea, that I think could be material improvement. What if we extracted the UnsafeUnbounded datastructure introduced for the async Queue into a common place, so that we can use it here?

final class UnsafeUnbounded[A] {

We can use the Semaphore-based implementation for Concurrent constraint, but use a runtime check for Async to upgrade to the UnsafeUnbounded implementation.

@armanbilge
Copy link
Member

We can use the Semaphore-based implementation for Concurrent constraint

Or the implementation proposed in this PR, which is good too :)


Btw: I believe this PR can be targeted at 3.4.x, since it's just internal optimizations (no new APIs)

@BalmungSan BalmungSan force-pushed the improve-mutex branch 5 times, most recently from 07be7bf to 9dd1b92 Compare January 7, 2023 03:10
Comment on lines 101 to 176
// Cancels a Fiber waiting for the Mutex.
private def cancel(thisCB: CB, thisCell: LockCell, previousCell: LockCell): F[Unit] =
F.delay {
// If we are canceled.
// First, we check if the state still contains ourselves,
// if that is the case, we swap it with the previousCell.
// This ensures any consequent attempt to acquire the Mutex
// will register its callback on the appropriate cell.
// Additionally, that confirms there is no Fiber
// currently waiting for us.
if (!state.compareAndSet(thisCell, previousCell)) {
// Otherwise,
// it means we have a Fiber waiting for us.
// Thus, we need to tell the previous cell
// to awake that Fiber instead.
var nextCB = thisCell.get()
while (nextCB eq null) {
// There is a tiny fraction of time when
// the next cell has acquired ourselves,
// but hasn't registered itself yet.
// Thus, we spin loop until that happens
nextCB = thisCell.get()
}
if (!previousCell.compareAndSet(thisCB, nextCB)) {
// However, in case the previous cell had already completed,
// then the Mutex is free and we can awake our waiting fiber.
if (nextCB ne null) nextCB.apply(RUnit)
}
}
}

// Awaits until the Mutex is free.
private def await(thisCell: LockCell): F[Unit] =
F.asyncCheckAttempt[Unit] { thisCB =>
F.delay {
val previousCell = state.getAndSet(thisCell)

if (previousCell eq null) {
// If the previous cell was null,
// then the Mutex is free.
RUnit.asInstanceOf[Either[Option[F[Unit]], Unit]]
} else {
// Otherwise,
// we check again that the previous cell haven't been completed yet,
// if not we tell the previous cell to awake us when they finish.
if (!previousCell.compareAndSet(null, thisCB)) {
// If it was already completed,
// then the Mutex is free.
RUnit.asInstanceOf[Either[Option[F[Unit]], Unit]]
} else {
Left(Some(cancel(thisCB, thisCell, previousCell)))
}
}
}
}

// Acquires the Mutex.
private def acquire(poll: Poll[F]): F[LockCell] =
F.delay(new AtomicReference[CB]()).flatMap { thisCell =>
poll(await(thisCell).map(_ => thisCell))
}

// Releases the Mutex.
private def release(thisCell: LockCell): F[Unit] =
F.delay {
// If the state still contains our own cell,
// then it means nobody was waiting for the Mutex,
// and thus it can be put on a free state again.
if (!state.compareAndSet(thisCell, null)) {
// Otherwise,
// our cell is probably not empty,
// we must awake whatever Fiber is waiting for us.
val nextCB = thisCell.getAndSet(Sentinel)
if (nextCB ne null) nextCB.apply(RUnit)
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the idea of this new implementation is to save the Deferred instantiation, based on the assumption that at any point in time a Fiber is waiting for only one other Fiber, and it is being waited by at most another one.

So, the idea is that we will have:

  • state which is a mutable reference to the last Cell in the chain.
  • thisCell which is the chain Cell of the Fiber trying to access the Mutex.
  • previousCell which is the previous Cell in the chain; maybe the one currently holding the Mutex.

Acquire:
First, we create our thisCell and replace it in the state, retrieving the previousCell; atomically.
We then use F.asyncCheckAttempt to check if the Mutex is currently in use. If that is the case, we register the callback in the previousCell and wait for it. Otherwise, we can just acquire it.

Release:
We call the callback that was registered in our Cell and call it. Considering the possibility that maybe no one is actually waiting for us.

Cancelation:
In the case we were canceled while waiting for the Mutex:

  • Situation A: No one is waiting for us.
    Thus, we only need to ensure that any Fiber that follows awaits the previous cell; they will know how to handle the case if it is already released.
  • Situation B: Someone is waiting for us.
    Then we need to unregister our callback from the previous cell and change it to the callback of the Fiber waiting for us; we need to double check in case the Mutex was released at that moment to rather notify whoever is waiting for us.

I believe this allocates the bare minimum and is concurrent safe.


PS: Thanks a lot to Arman (@armanbilge) for brainstorming with me on this implementation :)

@BalmungSan BalmungSan force-pushed the improve-mutex branch 2 times, most recently from 5b8b9d8 to 1c1b674 Compare January 7, 2023 14:45
@BalmungSan
Copy link
Contributor Author

@djspiewak @armanbilge before running the benchmarks I want to confirm with both of you if you think they are appropriate.

@BalmungSan BalmungSan changed the title Improve Mutex implementation Optimize Mutex & AtomicCell Feb 5, 2023
djspiewak
djspiewak previously approved these changes Feb 5, 2023
Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

@djspiewak
Copy link
Member

Let's see the benchmark results but overall lgtm

@BalmungSan BalmungSan requested review from djspiewak and armanbilge and removed request for armanbilge and djspiewak February 7, 2023 14:51
@BalmungSan
Copy link
Contributor Author

@djspiewak @armanbilge benchmark results added to the description of the PR! :D

@djspiewak djspiewak closed this Feb 7, 2023
@djspiewak djspiewak reopened this Feb 7, 2023
@djspiewak
Copy link
Member

Benchmarks look compelling! Thank you!

@djspiewak djspiewak merged commit 2f3ed2a into typelevel:series/3.x Feb 7, 2023
@BalmungSan BalmungSan deleted the improve-mutex branch February 7, 2023 23:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants