diff --git a/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala b/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala index 7b8e6d8732..2d3e0888ab 100644 --- a/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala +++ b/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala @@ -14,21 +14,21 @@ import scala.jdk.CollectionConverters._ * cancel the execution when the channel closes. */ final class HttpRuntime[+R](strategy: HttpRuntime.Strategy[R]) { - def unsafeRun(ctx: ChannelHandlerContext)(program: ZIO[R, Throwable, Any]): Unit = { val rtm = strategy.getRuntime(ctx) // Close the connection if the program fails // When connection closes, interrupt the program - def closeListener(fiber: Fiber.Runtime[_, _]): GenericFutureListener[Future[_ >: Void]] = - (_: Future[_ >: Void]) => rtm.unsafeRunAsync_(fiber.interrupt): Unit rtm .unsafeRunAsync(for { fiber <- program.fork - close <- UIO(closeListener(fiber)) - _ <- UIO(ctx.channel().closeFuture.addListener(close)) + close <- UIO { + val close = closeListener(rtm, fiber) + ctx.channel().closeFuture.addListener(close) + close + } _ <- fiber.join _ <- UIO(ctx.channel().closeFuture().removeListener(close)) } yield ()) { @@ -41,6 +41,9 @@ final class HttpRuntime[+R](strategy: HttpRuntime.Strategy[R]) { if (ctx.channel().isOpen) ctx.close() } } + + private def closeListener(rtm: Runtime[Any], fiber: Fiber.Runtime[_, _]): GenericFutureListener[Future[_ >: Void]] = + (_: Future[_ >: Void]) => rtm.unsafeRunAsync_(fiber.interrupt): Unit } object HttpRuntime {