Skip to content

Commit

Permalink
backmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
amitksingh1490 committed Feb 9, 2022
2 parents 9f4acc5 + a408174 commit f3bb049
Show file tree
Hide file tree
Showing 16 changed files with 158 additions and 35 deletions.
Empty file.
Empty file.
32 changes: 32 additions & 0 deletions docs/website/docs/examples/advanced-examples/concrete-entity.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Concrete Enity
```scala
import zhttp.http._
import zhttp.service.Server
import zio._

/**
* Example to build app on concrete entity
*/
object ConcreteEntity extends ZIOAppDefault {
// Request
case class CreateUser(name: String)

// Response
case class UserCreated(id: Long)

val user: Http[Any, Nothing, CreateUser, UserCreated] =
Http.collect[CreateUser] { case CreateUser(_) =>
UserCreated(2)
}

val app: HttpApp[Any, Nothing] =
user
.contramap[Request](req => CreateUser(req.path.toString)) // Http[Any, Nothing, Request, UserCreated]
.map(userCreated => Response.text(userCreated.id.toString)) // Http[Any, Nothing, Request, Response]

// Run it like any simple app
val run =
Server.start(8090, app)
}

```
Empty file.
34 changes: 34 additions & 0 deletions docs/website/docs/examples/advanced-examples/stream-response.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Streaming Response

```scala
import zhttp.http._
import zhttp.service.Server
import zio.stream.ZStream
import zio._

/**
* Example to encode content using a ZStream
*/
object StreamingResponse extends ZIOAppDefault {
// Starting the server (for more advanced startup configuration checkout `HelloWorldAdvanced`)
override def run = Server.start(8090, app.silent)

// Create a message as a Chunk[Byte]
val message = Chunk.fromArray("Hello world !\r\n".getBytes(HTTP_CHARSET))
// Use `Http.collect` to match on route
val app: HttpApp[Any, Nothing] = Http.collect[Request] {

// Simple (non-stream) based route
case Method.GET -> !! / "health" => Response.ok

// ZStream powered response
case Method.GET -> !! / "stream" =>
Response(
status = Status.OK,
headers = Headers.contentLength(message.length.toLong),
data = HttpData.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
)

}
}
```
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
73 changes: 41 additions & 32 deletions zio-http/src/main/scala/zhttp/service/HttpRuntime.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package zhttp.service

import io.netty.channel.{ChannelHandlerContext, EventLoopGroup => JEventLoopGroup}
import io.netty.util.concurrent.{EventExecutor, Future}
import zio.{Executor, Exit, Runtime, URIO, ZIO}
import io.netty.util.concurrent.{EventExecutor, Future, GenericFutureListener}
import zio._

import scala.collection.mutable
import scala.concurrent.{ExecutionContext => JExecutionContext}
Expand All @@ -16,50 +16,77 @@ import scala.jdk.CollectionConverters._
final class HttpRuntime[+R](strategy: HttpRuntime.Strategy[R]) {

def unsafeRun(ctx: ChannelHandlerContext)(program: ZIO[R, Throwable, Any]): Unit = {
val rtm = strategy.getRuntime(ctx)

val rtm = strategy.runtime(ctx)

// Close the connection if the program fails
// When connection closes, interrupt the program

rtm
.unsafeRunAsyncWith(for {
fiber <- program.fork
_ <- ZIO.attempt {
ctx.channel().closeFuture.addListener((_: Future[_ <: Void]) => rtm.unsafeRunAsync(fiber.interrupt): Unit)
close <- UIO {
val close = closeListener(rtm, fiber)
ctx.channel().closeFuture.addListener(close)
close
}
_ <- fiber.join
_ <- UIO(ctx.channel().closeFuture().removeListener(close))
} yield ()) {
case Exit.Success(_) => ()
case Exit.Failure(cause) =>
cause.failureOption match {
case None => ()
case Some(_) => System.err.println(cause.prettyPrint)
case Some(_) => java.lang.System.err.println(cause.prettyPrint)
}
ctx.close()
if (ctx.channel().isOpen) ctx.close()
}
}

def unsafeRunUninterruptible(ctx: ChannelHandlerContext)(program: ZIO[R, Throwable, Any]): Unit = {
val rtm = strategy.getRuntime(ctx)
val rtm = strategy.runtime(ctx)
rtm
.unsafeRunAsyncWith(program) {
case Exit.Success(_) => ()
case Exit.Failure(cause) =>
cause.failureOption match {
case None => ()
case Some(_) => System.err.println(cause.prettyPrint)
case Some(_) => java.lang.System.err.println(cause.prettyPrint)
}
ctx.close()
}
}

private def closeListener(rtm: Runtime[Any], fiber: Fiber.Runtime[_, _]): GenericFutureListener[Future[_ >: Void]] =
(_: Future[_ >: Void]) => rtm.unsafeRunAsync(fiber.interrupt): Unit
}

object HttpRuntime {
def dedicated[R](group: JEventLoopGroup): URIO[R, HttpRuntime[R]] =
Strategy.dedicated(group).map(runtime => new HttpRuntime[R](runtime))

def default[R]: URIO[R, HttpRuntime[R]] =
Strategy.default().map(runtime => new HttpRuntime[R](runtime))

def sticky[R](group: JEventLoopGroup): URIO[R, HttpRuntime[R]] =
Strategy.sticky(group).map(runtime => new HttpRuntime[R](runtime))

sealed trait Strategy[R] {
def getRuntime(ctx: ChannelHandlerContext): Runtime[R]
def runtime(ctx: ChannelHandlerContext): Runtime[R]
}

object Strategy {

def dedicated[R](group: JEventLoopGroup): ZIO[R, Nothing, Strategy[R]] =
ZIO.runtime[R].map(runtime => Dedicated(runtime, group))

def default[R](): ZIO[R, Nothing, Strategy[R]] =
ZIO.runtime[R].map(runtime => Default(runtime))

def sticky[R](group: JEventLoopGroup): ZIO[R, Nothing, Strategy[R]] =
ZIO.runtime[R].map(runtime => Group(runtime, group))

case class Default[R](runtime: Runtime[R]) extends Strategy[R] {
override def getRuntime(ctx: ChannelHandlerContext): Runtime[R] = runtime
override def runtime(ctx: ChannelHandlerContext): Runtime[R] = runtime
}

case class Dedicated[R](runtime: Runtime[R], group: JEventLoopGroup) extends Strategy[R] {
Expand All @@ -69,7 +96,7 @@ object HttpRuntime {
}
}

override def getRuntime(ctx: ChannelHandlerContext): Runtime[R] = localRuntime
override def runtime(ctx: ChannelHandlerContext): Runtime[R] = localRuntime
}

case class Group[R](runtime: Runtime[R], group: JEventLoopGroup) extends Strategy[R] {
Expand All @@ -85,26 +112,8 @@ object HttpRuntime {
map
}

override def getRuntime(ctx: ChannelHandlerContext): Runtime[R] =
override def runtime(ctx: ChannelHandlerContext): Runtime[R] =
localRuntime.getOrElse(ctx.executor(), runtime)
}

def sticky[R](group: JEventLoopGroup): ZIO[R, Nothing, Strategy[R]] =
ZIO.runtime[R].map(runtime => Group(runtime, group))

def default[R](): ZIO[R, Nothing, Strategy[R]] =
ZIO.runtime[R].map(runtime => Default(runtime))

def dedicated[R](group: JEventLoopGroup): ZIO[R, Nothing, Strategy[R]] =
ZIO.runtime[R].map(runtime => Dedicated(runtime, group))
}

def sticky[R](group: JEventLoopGroup): URIO[R, HttpRuntime[R]] =
Strategy.sticky(group).map(runtime => new HttpRuntime[R](runtime))

def dedicated[R](group: JEventLoopGroup): URIO[R, HttpRuntime[R]] =
Strategy.dedicated(group).map(runtime => new HttpRuntime[R](runtime))

def default[R]: URIO[R, HttpRuntime[R]] =
Strategy.default().map(runtime => new HttpRuntime[R](runtime))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

2 changes: 1 addition & 1 deletion zio-http/src/test/scala/zhttp/http/MiddlewareSpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package zhttp.http

import zio._
import zio.test.Assertion._
import zio.test.{DefaultRunnableSpec, TestClock, TestConsole, assert, assertM}
import zio._

object MiddlewareSpec extends DefaultRunnableSpec with HExitAssertion {
def spec = suite("Middleware") {
Expand Down
4 changes: 3 additions & 1 deletion zio-http/src/test/scala/zhttp/service/KeepAliveSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import io.netty.handler.codec.http.HttpHeaderValues
import zhttp.http.{HeaderNames, Headers, Http, Version}
import zhttp.internal.{DynamicServer, HttpRunnableSpec}
import zhttp.service.server._
import zio._
import zio.test.Assertion.{equalTo, isNone, isSome}
import zio.test.TestAspect.timeout
import zio.test.assertM

object KeepAliveSpec extends HttpRunnableSpec {
Expand Down Expand Up @@ -43,7 +45,7 @@ object KeepAliveSpec extends HttpRunnableSpec {
override def spec = {
suite("ServerConfigSpec") {
appKeepAliveEnabled.as(List(keepAliveSpec)).useNow
}.provideCustomLayerShared(env)
}.provideCustomLayerShared(env) @@ timeout(30.seconds)
}

}
14 changes: 13 additions & 1 deletion zio-http/src/test/scala/zhttp/service/ServerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,19 @@ object ServerSpec extends HttpRunnableSpec {
}
} +
test("500 response") {
checkAll(HttpGen.method) { method =>
val methodGenWithoutHEAD: Gen[Any, Method] = Gen.fromIterable(
List(
Method.OPTIONS,
Method.GET,
Method.POST,
Method.PUT,
Method.PATCH,
Method.DELETE,
Method.TRACE,
Method.CONNECT,
),
)
checkAll(methodGenWithoutHEAD) { method =>
val actual = status(method, !! / "HExitFailure")
assertM(actual)(equalTo(Status.INTERNAL_SERVER_ERROR))
}
Expand Down
33 changes: 33 additions & 0 deletions zio-http/src/test/scala/zhttp/service/WebSocketServerSpec.scala
Original file line number Diff line number Diff line change
@@ -1 +1,34 @@
package zhttp.service

import zhttp.http.Status
import zhttp.internal.{DynamicServer, HttpRunnableSpec}
import zhttp.service.server._
import zhttp.socket.{Socket, WebSocketFrame}
import zio._
import zio.test.Assertion.equalTo
import zio.test.TestAspect.timeout
import zio.test._

object WebSocketServerSpec extends HttpRunnableSpec {

private val env =
EventLoopGroup.nio() ++ ServerChannelFactory.nio ++ DynamicServer.live ++ ChannelFactory.nio
private val app = serve { DynamicServer.app }

override def spec = suite("Server") {
app.as(List(websocketSpec)).useNow
}.provideCustomLayerShared(env) @@ timeout(10 seconds)

def websocketSpec = suite("WebSocket Server") {
suite("connections") {
test("Multiple websocket upgrades") {
val app = Socket.succeed(WebSocketFrame.text("BAR")).toHttp.deployWS
val codes = ZIO
.foreach(1 to 1024)(_ => app(Socket.empty.toSocketApp).map(_.status))
.map(_.count(_ == Status.SWITCHING_PROTOCOLS))

assertM(codes)(equalTo(1024))
}
}
}
}

1 comment on commit f3bb049

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 Performance Benchmark:

Concurrency: 256
Requests/sec: 927900.55

Please sign in to comment.