Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change async_ to be uncancelable #3205

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ ThisBuild / git.gitUncommittedChanges := {
}
}

ThisBuild / tlBaseVersion := "3.4"
ThisBuild / tlBaseVersion := "3.5"
ThisBuild / tlUntaggedAreSnapshots := false

ThisBuild / organization := "org.typelevel"
Expand Down
4 changes: 2 additions & 2 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
G.uncancelable { poll =>
lift(k(resume)) flatMap {
case Some(fin) => G.onCancel(poll(get), lift(fin))
case None => poll(get)
case None => get
}
}
}
Expand Down Expand Up @@ -1237,7 +1237,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = {
val body = new Cont[IO, A, A] {
def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable { poll => lift(IO.delay(k(resume))).flatMap(_ => poll(get)) }
G.uncancelable(_ => lift(IO.delay(k(resume))).flatMap(_ => get))
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private final class IOFiber[A](
val handle = registerListener(oc => cb(Right(oc)))

if (handle == null)
None /* we were already invoked, so no `CallbackStack` needs to be managed */
Some(IO.unit) /* we were already invoked, so no `CallbackStack` needs to be managed */
else
Some(IO(handle.clearCurrent()))
}
Expand Down
4 changes: 2 additions & 2 deletions kernel/shared/src/main/scala/cats/effect/kernel/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
lift(k(resume)) flatMap {
case Right(a) => G.pure(a)
case Left(Some(fin)) => G.onCancel(poll(get), lift(fin))
case Left(None) => poll(get)
case Left(None) => get
}
}
}
Expand Down Expand Up @@ -156,7 +156,7 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
* Polymorphic so it can be used in situations where an arbitrary effect is expected eg
* [[Fiber.joinWithNever]]
*/
def never[A]: F[A] = async(_ => pure(none[F[Unit]]))
def never[A]: F[A] = async(_ => pure(Some(unit)))

/**
* Shift execution of the effect `fa` to the execution context `ec`. Execution is shifted back
Expand Down
19 changes: 9 additions & 10 deletions kernel/shared/src/main/scala/cats/effect/kernel/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -996,23 +996,22 @@ object Resource extends ResourceFOInstances0 with ResourceHOInstances0 with Reso
val nt2 = new (Resource[F, *] ~> D) {
def apply[A](rfa: Resource[F, A]) =
Kleisli { r =>
nt(rfa.allocatedCase) flatMap {
case (a, fin) =>
r.update(f => (ec: ExitCase) => f(ec) !> (F.unit >> fin(ec))).as(a)
G uncancelable { poll =>
poll(nt(rfa.allocatedCase)) flatMap {
case (a, fin) =>
r.update(f => (ec: ExitCase) => f(ec) !> (F.unit >> fin(ec))).as(a)
}
}
}
}

for {
r <- nt(F.ref((_: ExitCase) => F.unit).map(_.mapK(nt)))

a <- G.guaranteeCase(body[D].apply(cb, Kleisli.liftF(ga), nt2).run(r)) {
nt(F.ref((_: ExitCase) => F.unit).map(_.mapK(nt))) flatMap { r =>
G.guaranteeCase(
(body[D].apply(cb, Kleisli.liftF(ga), nt2).run(r), r.get).tupled) {
case Outcome.Succeeded(_) => G.unit
case oc => r.get.flatMap(fin => nt(fin(ExitCase.fromOutcome(oc))))
}

fin <- r.get
} yield (a, fin)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait AsyncLaws[F[_]] extends GenTemporalLaws[F, Throwable] with SyncLaws[F] {
F.async[A](k => F.delay(k(Left(e))) >> F.pure(Some(fu))) <-> F.raiseError(e)

def neverIsDerivedFromAsync[A] =
F.never[A] <-> F.async[A](_ => F.pure(None))
F.never[A] <-> F.async[A](_ => F.pure(Some(F.unit)))

def executionContextCommutativity[A](fa: F[A]) =
(fa *> F.executionContext) <-> (F.executionContext <* fa)
Expand Down
2 changes: 1 addition & 1 deletion tests/shared/src/test/scala/cats/effect/ResourceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline {
"Resource[IO, *]",
AsyncTests[Resource[IO, *]].async[Int, Int, Int](10.millis)
) /*(Parameters(seed =
Some(Seed.fromBase64("75d9nzLIEobZ3mfn0DvzUkMv-Jt7o7IyQyIvjqwkeVJ=").get)))*/
Some(Seed.fromBase64("0FaZxJyh_xN_NL3i_y7bNaLpaWuhO9qUPXmfxxgLIIN=").get)))*/
}

{
Expand Down