Skip to content

Commit

Permalink
Re-Add Concurrent based Mutex implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Jan 7, 2023
1 parent d9d70ba commit 1c1b674
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,8 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform)
"cats.effect.std.Queue#DroppingQueue.onOfferNoCapacity"),
// introduced by #3346
// private stuff
ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.std.Mutex#Impl.this")
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.std.Mutex$Impl")
)
)
.jsSettings(
Expand Down
33 changes: 24 additions & 9 deletions std/shared/src/main/scala/cats/effect/std/Mutex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,38 @@ object Mutex {
/**
* Creates a new `Mutex`.
*/
def apply[F[_]](implicit F: Async[F]): F[Mutex[F]] =
F.delay(
new AtomicReference[LockCell]()
).map(state => new Impl[F](state))
def apply[F[_]](implicit F: Concurrent[F]): F[Mutex[F]] =
F match {
case ff: Async[F] =>
async[F](ff)

case _ =>
concurrent[F](F)
}

def async[F[_]](implicit F: Async[F]): F[Mutex[F]] =
in[F, F](F, F)

def concurrent[F[_]](implicit F: Concurrent[F]): F[Mutex[F]] =
Semaphore[F](n = 1).map(sem => new ConcurrentImpl[F](sem))

/**
* Creates a new `Mutex`. Like `apply` but initializes state using another effect constructor.
*/
def in[F[_], G[_]](implicit F: Sync[F], G: Async[G]): F[Mutex[G]] =
F.delay(
new AtomicReference[LockCell]()
).map(state => new Impl[G](state))
).map(state => new AsyncImpl[G](state)(G))

private final class ConcurrentImpl[F[_]](sem: Semaphore[F]) extends Mutex[F] {
override final val lock: Resource[F, Unit] =
sem.permit

override def mapK[G[_]](f: F ~> G)(implicit G: MonadCancel[G, _]): Mutex[G] =
new ConcurrentImpl(sem.mapK(f))
}

// TODO: In case in a future cats-effect provides a way to identify fibers,
// then this implementation can be made reentrant.
// Or, we may also provide an alternative implementation using LiftIO + IOLocal
private final class Impl[F[_]](state: AtomicReference[LockCell])(implicit F: Async[F])
private final class AsyncImpl[F[_]](state: AtomicReference[LockCell])(implicit F: Async[F])
extends Mutex[F] {
// Cancels a Fiber waiting for the Mutex.
private def cancel(thisCB: CB, thisCell: LockCell, previousCell: LockCell): F[Unit] =
Expand Down
8 changes: 6 additions & 2 deletions tests/shared/src/test/scala/cats/effect/std/MutexSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ import org.specs2.specification.core.Fragments
import scala.concurrent.duration._

final class MutexSpec extends BaseSpec {
"Mutex" should {
tests(Mutex[IO])
"ConcurrentMutex" should {
tests(Mutex.concurrent[IO])
}

"AsyncMutex" should {
tests(Mutex.async[IO])
}

"Mutex with dual constructors" should {
Expand Down

0 comments on commit 1c1b674

Please sign in to comment.