From 6c8f05100014828b41471826085d4593d52bda7e Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Sun, 23 Jan 2022 11:18:49 +0530 Subject: [PATCH 1/5] Use ZIO response --- example/src/main/scala/example/PlainTextBenchmarkServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/src/main/scala/example/PlainTextBenchmarkServer.scala b/example/src/main/scala/example/PlainTextBenchmarkServer.scala index 33af49d087..d2ec87a93e 100644 --- a/example/src/main/scala/example/PlainTextBenchmarkServer.scala +++ b/example/src/main/scala/example/PlainTextBenchmarkServer.scala @@ -28,7 +28,7 @@ object Main extends App { .exitCode } - private def app(response: Response) = Http.response(response) + private def app(response: Response) = Http.responseZIO(UIO(response)) private def server(response: Response) = Server.app(app(response)) ++ From e32af378a80fc51c4178c296c772cff867694e56 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Wed, 26 Jan 2022 12:49:17 +0530 Subject: [PATCH 2/5] fix: improve cancellation performance --- .../scala/zhttp/service/HttpRuntime.scala | 55 +++++++++++-------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala b/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala index 05b67735e2..7b8e6d8732 100644 --- a/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala +++ b/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala @@ -1,9 +1,9 @@ package zhttp.service import io.netty.channel.{ChannelHandlerContext, EventLoopGroup => JEventLoopGroup} -import io.netty.util.concurrent.{EventExecutor, Future} +import io.netty.util.concurrent.{EventExecutor, Future, GenericFutureListener} +import zio._ import zio.internal.Executor -import zio.{Exit, Runtime, URIO, ZIO} import scala.collection.mutable import scala.concurrent.{ExecutionContext => JExecutionContext} @@ -16,14 +16,21 @@ import scala.jdk.CollectionConverters._ final class HttpRuntime[+R](strategy: HttpRuntime.Strategy[R]) { def unsafeRun(ctx: ChannelHandlerContext)(program: ZIO[R, Throwable, Any]): Unit = { + val rtm = strategy.getRuntime(ctx) + + // Close the connection if the program fails + // When connection closes, interrupt the program + def closeListener(fiber: Fiber.Runtime[_, _]): GenericFutureListener[Future[_ >: Void]] = + (_: Future[_ >: Void]) => rtm.unsafeRunAsync_(fiber.interrupt): Unit + rtm .unsafeRunAsync(for { fiber <- program.fork - _ <- ZIO.effect { - ctx.channel().closeFuture.addListener((_: Future[_ <: Void]) => rtm.unsafeRunAsync_(fiber.interrupt): Unit) - } + close <- UIO(closeListener(fiber)) + _ <- UIO(ctx.channel().closeFuture.addListener(close)) _ <- fiber.join + _ <- UIO(ctx.channel().closeFuture().removeListener(close)) } yield ()) { case Exit.Success(_) => () case Exit.Failure(cause) => @@ -31,18 +38,36 @@ final class HttpRuntime[+R](strategy: HttpRuntime.Strategy[R]) { case None => () case Some(_) => System.err.println(cause.prettyPrint) } - ctx.close() + if (ctx.channel().isOpen) ctx.close() } } } object HttpRuntime { + def dedicated[R](group: JEventLoopGroup): URIO[R, HttpRuntime[R]] = + Strategy.dedicated(group).map(runtime => new HttpRuntime[R](runtime)) + + def default[R]: URIO[R, HttpRuntime[R]] = + Strategy.default().map(runtime => new HttpRuntime[R](runtime)) + + def sticky[R](group: JEventLoopGroup): URIO[R, HttpRuntime[R]] = + Strategy.sticky(group).map(runtime => new HttpRuntime[R](runtime)) + sealed trait Strategy[R] { def getRuntime(ctx: ChannelHandlerContext): Runtime[R] } object Strategy { + def dedicated[R](group: JEventLoopGroup): ZIO[R, Nothing, Strategy[R]] = + ZIO.runtime[R].map(runtime => Dedicated(runtime, group)) + + def default[R](): ZIO[R, Nothing, Strategy[R]] = + ZIO.runtime[R].map(runtime => Default(runtime)) + + def sticky[R](group: JEventLoopGroup): ZIO[R, Nothing, Strategy[R]] = + ZIO.runtime[R].map(runtime => Group(runtime, group)) + case class Default[R](runtime: Runtime[R]) extends Strategy[R] { override def getRuntime(ctx: ChannelHandlerContext): Runtime[R] = runtime } @@ -73,23 +98,5 @@ object HttpRuntime { override def getRuntime(ctx: ChannelHandlerContext): Runtime[R] = localRuntime.getOrElse(ctx.executor(), runtime) } - - def sticky[R](group: JEventLoopGroup): ZIO[R, Nothing, Strategy[R]] = - ZIO.runtime[R].map(runtime => Group(runtime, group)) - - def default[R](): ZIO[R, Nothing, Strategy[R]] = - ZIO.runtime[R].map(runtime => Default(runtime)) - - def dedicated[R](group: JEventLoopGroup): ZIO[R, Nothing, Strategy[R]] = - ZIO.runtime[R].map(runtime => Dedicated(runtime, group)) } - - def sticky[R](group: JEventLoopGroup): URIO[R, HttpRuntime[R]] = - Strategy.sticky(group).map(runtime => new HttpRuntime[R](runtime)) - - def dedicated[R](group: JEventLoopGroup): URIO[R, HttpRuntime[R]] = - Strategy.dedicated(group).map(runtime => new HttpRuntime[R](runtime)) - - def default[R]: URIO[R, HttpRuntime[R]] = - Strategy.default().map(runtime => new HttpRuntime[R](runtime)) } From 8a9a67a2fa865bf718c3659d4fac255d9323519f Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Wed, 26 Jan 2022 13:03:10 +0530 Subject: [PATCH 3/5] refactor: use sticky server --- zio-http/src/main/scala/zhttp/service/Server.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-http/src/main/scala/zhttp/service/Server.scala b/zio-http/src/main/scala/zhttp/service/Server.scala index cc2137a754..711f957734 100644 --- a/zio-http/src/main/scala/zhttp/service/Server.scala +++ b/zio-http/src/main/scala/zhttp/service/Server.scala @@ -215,7 +215,7 @@ object Server { for { channelFactory <- ZManaged.access[ServerChannelFactory](_.get) eventLoopGroup <- ZManaged.access[EventLoopGroup](_.get) - zExec <- HttpRuntime.default[R].toManaged_ + zExec <- HttpRuntime.sticky[R](eventLoopGroup).toManaged_ reqHandler = settings.app.compile(zExec, settings) respHandler = ServerResponseHandler(zExec, settings, ServerTimeGenerator.make) init = ServerChannelInitializer(zExec, settings, reqHandler, respHandler) From 94e6884d7fe3372b9613260b7fa25bc0abf34961 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Wed, 26 Jan 2022 13:15:17 +0530 Subject: [PATCH 4/5] refactor: reduce allocations --- .../src/main/scala/zhttp/service/HttpRuntime.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala b/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala index 7b8e6d8732..2d3e0888ab 100644 --- a/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala +++ b/zio-http/src/main/scala/zhttp/service/HttpRuntime.scala @@ -14,21 +14,21 @@ import scala.jdk.CollectionConverters._ * cancel the execution when the channel closes. */ final class HttpRuntime[+R](strategy: HttpRuntime.Strategy[R]) { - def unsafeRun(ctx: ChannelHandlerContext)(program: ZIO[R, Throwable, Any]): Unit = { val rtm = strategy.getRuntime(ctx) // Close the connection if the program fails // When connection closes, interrupt the program - def closeListener(fiber: Fiber.Runtime[_, _]): GenericFutureListener[Future[_ >: Void]] = - (_: Future[_ >: Void]) => rtm.unsafeRunAsync_(fiber.interrupt): Unit rtm .unsafeRunAsync(for { fiber <- program.fork - close <- UIO(closeListener(fiber)) - _ <- UIO(ctx.channel().closeFuture.addListener(close)) + close <- UIO { + val close = closeListener(rtm, fiber) + ctx.channel().closeFuture.addListener(close) + close + } _ <- fiber.join _ <- UIO(ctx.channel().closeFuture().removeListener(close)) } yield ()) { @@ -41,6 +41,9 @@ final class HttpRuntime[+R](strategy: HttpRuntime.Strategy[R]) { if (ctx.channel().isOpen) ctx.close() } } + + private def closeListener(rtm: Runtime[Any], fiber: Fiber.Runtime[_, _]): GenericFutureListener[Future[_ >: Void]] = + (_: Future[_ >: Void]) => rtm.unsafeRunAsync_(fiber.interrupt): Unit } object HttpRuntime { From 0e595167531d5a745e6610a2418cbb9aece3ff7f Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Wed, 26 Jan 2022 13:23:02 +0530 Subject: [PATCH 5/5] revert example --- build.sbt | 2 +- example/src/main/scala/example/PlainTextBenchmarkServer.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 36490ac34b..ebaa132997 100644 --- a/build.sbt +++ b/build.sbt @@ -124,6 +124,6 @@ lazy val zhttpTest = (project in file("zio-http-test")) lazy val example = (project in file("./example")) .settings(stdSettings("example")) .settings(publishSetting(false)) - .settings(runSettings("example.FileStreaming")) + .settings(runSettings("example.Main")) .settings(libraryDependencies ++= Seq(`jwt-core`)) .dependsOn(zhttp) diff --git a/example/src/main/scala/example/PlainTextBenchmarkServer.scala b/example/src/main/scala/example/PlainTextBenchmarkServer.scala index d2ec87a93e..33af49d087 100644 --- a/example/src/main/scala/example/PlainTextBenchmarkServer.scala +++ b/example/src/main/scala/example/PlainTextBenchmarkServer.scala @@ -28,7 +28,7 @@ object Main extends App { .exitCode } - private def app(response: Response) = Http.responseZIO(UIO(response)) + private def app(response: Response) = Http.response(response) private def server(response: Response) = Server.app(app(response)) ++