Skip to content

Commit

Permalink
fix non-terminating dispatcher due to running tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Jan 18, 2024
1 parent 99e8dbf commit c4e98a8
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,14 @@ object AsyncAwaitDsl {

// format: off
val tree = q"""
final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[$effect], callback: _root_.cats.effect.cpsinternal.AsyncAwaitDsl.AwaitCallback[$effect]) extends _root_.cats.effect.cpsinternal.AsyncAwaitStateMachine(dispatcher, callback)(${F}) {
final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[$effect], callback: _root_.cats.effect.cpsinternal.AsyncAwaitDsl.AwaitCallback[$effect], stopSignal: ${appliedType(effect, typeOf[Unit])}) extends _root_.cats.effect.cpsinternal.AsyncAwaitStateMachine(dispatcher, callback, stopSignal)(${F}) {
${mark(q"""override def apply(tr$$async: _root_.cats.effect.cpsinternal.AsyncAwaitDsl.AwaitOutcome[$effect]): _root_.scala.Unit = $body""")}
}
${F}.flatten {
_root_.cats.effect.std.Dispatcher.sequential[$effect].use { dispatcher =>
${F}.async[$name#FF[AnyRef]](cb => ${F}.delay { new $name(dispatcher, cb).start(); Some(${F}.unit) })
_root_.cats.effect.Deferred[$name#FF, Unit].flatMap { deferred =>
${F}.async[$name#FF[AnyRef]](cb => ${F}.delay { val i = new $name(dispatcher, cb, deferred.get); i.start(); Some(deferred.complete(()).void) })
}
}
}.asInstanceOf[${c.macroApplication.tpe}]
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import cats.syntax.all._

abstract class AsyncAwaitStateMachine[F[_]](
dispatcher: Dispatcher[F],
callback: AsyncAwaitDsl.AwaitCallback[F])(
callback: AsyncAwaitDsl.AwaitCallback[F],
stopSignal: F[Unit])(
implicit F: Async[F])
extends Function1[AsyncAwaitDsl.AwaitOutcome[F], Unit] {

Expand Down Expand Up @@ -61,20 +62,20 @@ abstract class AsyncAwaitStateMachine[F[_]](
// as inspecting the Succeeded outcome using dispatcher is risky on algebraic sums,
// such as OptionT, EitherT, ...
var awaitedValue: Option[AnyRef] = None
F.uncancelable { poll =>
F.race(stopSignal, F.uncancelable { poll =>
poll(summary *> f)
.flatTap(r => F.delay { awaitedValue = Some(r) })
.start
.flatMap(fiber => poll(fiber.join).onCancel(fiber.cancel))
}.flatMap {
case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]])))
case Errored(e) => F.delay(this(Left(F.raiseError(e))))
case Succeeded(awaitOutcome) =>
awaitedValue match {
case Some(v) => F.delay(this(Right(awaitOutcome.void -> v)))
case None => F.delay(this(Left(awaitOutcome)))
}
}
}.flatMap {
case Canceled() => F.delay(this (Left(F.canceled.asInstanceOf[F[AnyRef]])))
case Errored(e) => F.delay(this (Left(F.raiseError(e))))
case Succeeded(awaitOutcome) =>
awaitedValue match {
case Some(v) => F.delay(this (Right(awaitOutcome.void -> v)))
case None => F.delay(this (Left(awaitOutcome)))
}
})
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class AsyncAwaitSpec extends Specification with CatsEffect {
ref <- Ref[IO].of(0)
defer <- Deferred[IO, Unit]
fiber <- async[IO] {
(defer.complete(()) *> IO.never[Unit]).await
defer.complete(()).await
IO.never[Unit].await
ref.update(_ + 1).await
}.start
_ <- defer.get
Expand Down

0 comments on commit c4e98a8

Please sign in to comment.