From 1c1b674fc2a0e308503bb30865204908eda8705c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Fri, 6 Jan 2023 22:48:08 -0500 Subject: [PATCH] Re-Add Concurrent based Mutex implementation --- build.sbt | 3 +- .../main/scala/cats/effect/std/Mutex.scala | 33 ++++++++++++++----- .../scala/cats/effect/std/MutexSpec.scala | 8 +++-- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/build.sbt b/build.sbt index 3285806586a..f8c193b657c 100644 --- a/build.sbt +++ b/build.sbt @@ -887,7 +887,8 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform) "cats.effect.std.Queue#DroppingQueue.onOfferNoCapacity"), // introduced by #3346 // private stuff - ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.std.Mutex#Impl.this") + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.std.Mutex$Impl") ) ) .jsSettings( diff --git a/std/shared/src/main/scala/cats/effect/std/Mutex.scala b/std/shared/src/main/scala/cats/effect/std/Mutex.scala index 0c9e0d860f4..9a34974f507 100644 --- a/std/shared/src/main/scala/cats/effect/std/Mutex.scala +++ b/std/shared/src/main/scala/cats/effect/std/Mutex.scala @@ -65,10 +65,20 @@ object Mutex { /** * Creates a new `Mutex`. */ - def apply[F[_]](implicit F: Async[F]): F[Mutex[F]] = - F.delay( - new AtomicReference[LockCell]() - ).map(state => new Impl[F](state)) + def apply[F[_]](implicit F: Concurrent[F]): F[Mutex[F]] = + F match { + case ff: Async[F] => + async[F](ff) + + case _ => + concurrent[F](F) + } + + def async[F[_]](implicit F: Async[F]): F[Mutex[F]] = + in[F, F](F, F) + + def concurrent[F[_]](implicit F: Concurrent[F]): F[Mutex[F]] = + Semaphore[F](n = 1).map(sem => new ConcurrentImpl[F](sem)) /** * Creates a new `Mutex`. Like `apply` but initializes state using another effect constructor. @@ -76,12 +86,17 @@ object Mutex { def in[F[_], G[_]](implicit F: Sync[F], G: Async[G]): F[Mutex[G]] = F.delay( new AtomicReference[LockCell]() - ).map(state => new Impl[G](state)) + ).map(state => new AsyncImpl[G](state)(G)) + + private final class ConcurrentImpl[F[_]](sem: Semaphore[F]) extends Mutex[F] { + override final val lock: Resource[F, Unit] = + sem.permit + + override def mapK[G[_]](f: F ~> G)(implicit G: MonadCancel[G, _]): Mutex[G] = + new ConcurrentImpl(sem.mapK(f)) + } - // TODO: In case in a future cats-effect provides a way to identify fibers, - // then this implementation can be made reentrant. - // Or, we may also provide an alternative implementation using LiftIO + IOLocal - private final class Impl[F[_]](state: AtomicReference[LockCell])(implicit F: Async[F]) + private final class AsyncImpl[F[_]](state: AtomicReference[LockCell])(implicit F: Async[F]) extends Mutex[F] { // Cancels a Fiber waiting for the Mutex. private def cancel(thisCB: CB, thisCell: LockCell, previousCell: LockCell): F[Unit] = diff --git a/tests/shared/src/test/scala/cats/effect/std/MutexSpec.scala b/tests/shared/src/test/scala/cats/effect/std/MutexSpec.scala index 34e182fce49..5567ee821ab 100644 --- a/tests/shared/src/test/scala/cats/effect/std/MutexSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/MutexSpec.scala @@ -26,8 +26,12 @@ import org.specs2.specification.core.Fragments import scala.concurrent.duration._ final class MutexSpec extends BaseSpec { - "Mutex" should { - tests(Mutex[IO]) + "ConcurrentMutex" should { + tests(Mutex.concurrent[IO]) + } + + "AsyncMutex" should { + tests(Mutex.async[IO]) } "Mutex with dual constructors" should {