Skip to content

Commit

Permalink
Refactor: Merge client and server Request (#1125)
Browse files Browse the repository at this point in the history
* introduce `Incoming` and `Outgoing` inHttpData

* streaming support

* benchmark disable objectAggregator

* cleanup

* refactor

* cleanup + PR comments

* cleanup + PR comments

* cleanup + PR comments

* refactor: rename variable

* memory leak

* refactor: Handler now extends ChannelInboundHandlerAdapter

* refactor: remove unused methods from UnsafeChannel

* remove bodyAsCharSequenceStream operator

* refactor: remove unnecessary methods on HttpData

* refactor: re-implement `bodyAsStream`

* refactor: remove unsafe modification of pipeline from HttpData

* refactor: rename HttpData types

* fix 2.12 build

* refactor: remove type param

* PR comment

* PR comment

* refaector: simplify releaseRequest

* refactor: reorder methods in ServerResponseHandler

* refactor: make methods final

* refactor: rename HttpData traits

* add `bodyAsByteArray` and derive `body` and `bodyAsString` from it.

* add test: should throw error for HttpData.Incoming

* Introduce `useAggregator` method on settings and use it everywhere

* remove sharable from `ServerResponseHandler`

* Update zio-http/src/main/scala/zhttp/http/Request.scala

* refactor: remove unnecessary pattern matching

* throw exception on unknown message type

* simplify test

* refactor: change order of ContentHandler. Move it before the RequestHandler

* test: update test structure

* refactor: move pattern match logic to WebSocketUpgrade

* revert addBefore Change because of degrade in performance (#1089)

* fix static server issue with streaming

* Introduce version in Request

* Merge Client and Server Request

* Delete `Request.make`

* Gen refactor

* rename files

* make function private

* rename attribute

* rename attribute

Co-authored-by: Tushar Mathur <tusharmath@gmail.com>
  • Loading branch information
d11-amitsingh and tusharmath authored Mar 9, 2022
1 parent c1d98ea commit 94be6b9
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 79 deletions.
74 changes: 24 additions & 50 deletions zio-http/src/main/scala/zhttp/service/Client.scala
Original file line number Diff line number Diff line change
@@ -1,37 +1,34 @@
package zhttp.service

import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.{
Channel,
ChannelFactory => JChannelFactory,
ChannelFuture => JChannelFuture,
ChannelHandlerContext,
ChannelInitializer,
EventLoopGroup => JEventLoopGroup,
}
import io.netty.handler.codec.http._
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler
import zhttp.http._
import zhttp.http.headers.HeaderExtension
import zhttp.service
import zhttp.service.Client.ClientRequest
import zhttp.service.Client.Config
import zhttp.service.client.ClientSSLHandler.ClientSSLOptions
import zhttp.service.client.{ClientInboundHandler, ClientSSLHandler}
import zhttp.socket.{Socket, SocketApp}
import zio.{Promise, Task, ZIO}

import java.net.{InetAddress, InetSocketAddress, URI}
import java.net.{InetSocketAddress, URI}

final case class Client[R](rtm: HttpRuntime[R], cf: JChannelFactory[Channel], el: JEventLoopGroup)
extends HttpMessageCodec {

def request(request: Client.ClientRequest): Task[Response] =
private[zhttp] def request(request: Request, clientConfig: Config): Task[Response] =
for {
promise <- Promise.make[Throwable, Response]
jReq <- encode(request)
_ <- ChannelFuture
.unit(unsafeRequest(request, jReq, promise))
.unit(unsafeRequest(request, clientConfig, jReq, promise))
.catchAll(cause => promise.fail(cause))
res <- promise.await
} yield res
Expand All @@ -44,20 +41,22 @@ final case class Client[R](rtm: HttpRuntime[R], cf: JChannelFactory[Channel], el
): ZIO[R, Throwable, Response] = for {
env <- ZIO.environment[R]
res <- request(
ClientRequest(
url,
Request(
version = Version.Http_1_1,
Method.GET,
url,
headers,
attribute = Client.Attribute(socketApp = Some(socketApp.provideEnvironment(env)), ssl = Some(sslOptions)),
),
clientConfig = Client.Config(socketApp = Some(socketApp.provideEnvironment(env)), ssl = Some(sslOptions)),
)
} yield res

/**
* It handles both - Websocket and HTTP requests.
*/
private def unsafeRequest(
req: ClientRequest,
req: Request,
clientConfig: Config,
jReq: FullHttpRequest,
promise: Promise[Throwable, Response],
): JChannelFuture = {
Expand All @@ -77,7 +76,7 @@ final case class Client[R](rtm: HttpRuntime[R], cf: JChannelFactory[Channel], el
override def initChannel(ch: Channel): Unit = {

val pipeline = ch.pipeline()
val sslOption: ClientSSLOptions = req.attribute.ssl.getOrElse(ClientSSLOptions.DefaultSSL)
val sslOption: ClientSSLOptions = clientConfig.ssl.getOrElse(ClientSSLOptions.DefaultSSL)

// If a https or wss request is made we need to add the ssl handler at the starting of the pipeline.
if (isSSL) pipeline.addLast(SSL_HANDLER, ClientSSLHandler.ssl(sslOption).newHandler(ch.alloc, host, port))
Expand All @@ -95,7 +94,7 @@ final case class Client[R](rtm: HttpRuntime[R], cf: JChannelFactory[Channel], el
// Add WebSocketHandlers if it's a `ws` or `wss` request
if (isWebSocket) {
val headers = req.headers.encode
val app = req.attribute.socketApp.getOrElse(Socket.empty.toSocketApp)
val app = clientConfig.socketApp.getOrElse(Socket.empty.toSocketApp)
val config = app.protocol.clientBuilder
.customHeaders(headers)
.webSocketUri(req.url.encode)
Expand Down Expand Up @@ -140,15 +139,19 @@ object Client {
): ZIO[EventLoopGroup with ChannelFactory, Throwable, Response] =
for {
uri <- ZIO.fromEither(URL.fromString(url))
res <- request(ClientRequest(uri, method, headers, content, attribute = Attribute(ssl = Some(ssl))))
res <- request(
Request(Version.Http_1_1, method, uri, headers, data = content),
clientConfig = Config(ssl = Some(ssl)),
)
} yield res

def request(
request: ClientRequest,
request: Request,
clientConfig: Config,
): ZIO[EventLoopGroup with ChannelFactory, Throwable, Response] =
for {
clt <- make[Any]
res <- clt.request(request)
res <- clt.request(request, clientConfig)
} yield res

def socket[R](
Expand All @@ -164,41 +167,12 @@ object Client {
} yield res
}

final case class ClientRequest(
url: URL,
method: Method = Method.GET,
headers: Headers = Headers.empty,
private[zhttp] val data: HttpData = HttpData.empty,
private[zhttp] val version: Version = Version.Http_1_1,
private[zhttp] val attribute: Attribute = Attribute.empty,
private val channelContext: ChannelHandlerContext = null,
) extends HeaderExtension[ClientRequest] {
self =>

def bodyAsString: Task[String] = bodyAsByteBuf.map(_.toString(headers.charset))

def remoteAddress: Option[InetAddress] = {
if (channelContext != null && channelContext.channel().remoteAddress().isInstanceOf[InetSocketAddress])
Some(channelContext.channel().remoteAddress().asInstanceOf[InetSocketAddress].getAddress)
else
None
}

/**
* Updates the headers using the provided function
*/
override def updateHeaders(update: Headers => Headers): ClientRequest =
self.copy(headers = update(self.headers))

private[zhttp] def bodyAsByteBuf: Task[ByteBuf] = data.toByteBuf
}

case class Attribute(socketApp: Option[SocketApp[Any]] = None, ssl: Option[ClientSSLOptions] = None) { self =>
def withSSL(ssl: ClientSSLOptions): Attribute = self.copy(ssl = Some(ssl))
def withSocketApp(socketApp: SocketApp[Any]): Attribute = self.copy(socketApp = Some(socketApp))
case class Config(socketApp: Option[SocketApp[Any]] = None, ssl: Option[ClientSSLOptions] = None) { self =>
def withSSL(ssl: ClientSSLOptions): Config = self.copy(ssl = Some(ssl))
def withSocketApp(socketApp: SocketApp[Any]): Config = self.copy(socketApp = Some(socketApp))
}

object Attribute {
def empty: Attribute = Attribute()
object Config {
def empty: Config = Config()
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package zhttp.service

import io.netty.handler.codec.http.{DefaultFullHttpRequest, FullHttpRequest, HttpHeaderNames}
import zhttp.http.Request
import zio.Task

trait EncodeClientRequest {
trait EncodeRequest {

/**
* Converts client params to JFullHttpRequest
*/
def encode(req: Client.ClientRequest): Task[FullHttpRequest] =
def encode(req: Request): Task[FullHttpRequest] =
req.bodyAsByteBuf.map { content =>
val method = req.method.toJava
val jVersion = req.version.toJava
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package zhttp.service

trait HttpMessageCodec extends EncodeClientRequest {}
trait HttpMessageCodec extends EncodeRequest {}
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ package zhttp.http

import io.netty.handler.codec.http.HttpHeaderNames
import zhttp.internal.HttpGen
import zhttp.service.{Client, EncodeClientRequest}
import zhttp.service.EncodeRequest
import zio.random.Random
import zio.test.Assertion._
import zio.test._

object EncodeClientRequestSpec extends DefaultRunnableSpec with EncodeClientRequest {
object EncodeRequestSpec extends DefaultRunnableSpec with EncodeRequest {

val anyClientParam: Gen[Random with Sized, Client.ClientRequest] = HttpGen.clientRequest(
val anyClientParam: Gen[Random with Sized, Request] = HttpGen.requestGen(
HttpGen.httpData(
Gen.listOf(Gen.alphaNumericString),
),
)

val clientParamWithAbsoluteUrl = HttpGen.clientRequest(
val clientParamWithAbsoluteUrl = HttpGen.requestGen(
dataGen = HttpGen.httpData(
Gen.listOf(Gen.alphaNumericString),
),
urlGen = HttpGen.genAbsoluteURL,
)

def clientParamWithFiniteData(size: Int): Gen[Random with Sized, Client.ClientRequest] = HttpGen.clientRequest(
def clientParamWithFiniteData(size: Int): Gen[Random with Sized, Request] = HttpGen.requestGen(
for {
content <- Gen.alphaNumericStringBounded(size, size)
data <- Gen.fromIterable(List(HttpData.fromString(content)))
Expand Down
14 changes: 6 additions & 8 deletions zio-http/src/test/scala/zhttp/http/GetBodyAsStringSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package zhttp.http

import zhttp.service.Client
import zio.Chunk
import zio.test.Assertion._
import zio.test._
Expand All @@ -18,20 +17,19 @@ object GetBodyAsStringSpec extends DefaultRunnableSpec {
testM("should map bytes according to charset given") {

checkM(charsetGen) { charset =>
val request = Client
.ClientRequest(
URL(!!),
headers = Headers.contentType(s"text/html; charset=$charset"),
data = HttpData.BinaryChunk(Chunk.fromArray("abc".getBytes(charset))),
)
val request = Request(
url = URL(!!),
headers = Headers.contentType(s"text/html; charset=$charset"),
data = HttpData.BinaryChunk(Chunk.fromArray("abc".getBytes(charset))),
)

val encoded = request.bodyAsString
val expected = new String(Chunk.fromArray("abc".getBytes(charset)).toArray, charset)
assertM(encoded)(equalTo(expected))
}
} +
testM("should map bytes to default utf-8 if no charset given") {
val request = Client.ClientRequest(URL(!!), data = HttpData.BinaryChunk(Chunk.fromArray("abc".getBytes())))
val request = Request(url = URL(!!), data = HttpData.BinaryChunk(Chunk.fromArray("abc".getBytes())))
val encoded = request.bodyAsString
val expected = new String(Chunk.fromArray("abc".getBytes()).toArray, HTTP_CHARSET)
assertM(encoded)(equalTo(expected))
Expand Down
19 changes: 11 additions & 8 deletions zio-http/src/test/scala/zhttp/internal/HttpGen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.netty.buffer.Unpooled
import zhttp.http.Scheme.{HTTP, HTTPS, WS, WSS}
import zhttp.http.URL.Location
import zhttp.http._
import zhttp.service.Client.ClientRequest
import zio.random.Random
import zio.stream.ZStream
import zio.test.{Gen, Sized}
Expand All @@ -13,28 +12,32 @@ import zio.{Chunk, ZIO}
import java.io.File

object HttpGen {
def clientParamsForFileHttpData(): Gen[Random with Sized, ClientRequest] = {
def clientParamsForFileHttpData(): Gen[Random with Sized, Request] = {
for {
file <- Gen.fromEffect(ZIO.succeed(new File(getClass.getResource("/TestFile.txt").getPath)))
method <- HttpGen.method
url <- HttpGen.url
headers <- Gen.listOf(HttpGen.header).map(Headers(_))
} yield ClientRequest(url, method, headers, HttpData.fromFile(file))
version <- httpVersion
} yield Request(version, method, url, headers, data = HttpData.fromFile(file))
}

def clientRequest[R](
def requestGen[R](
dataGen: Gen[R, HttpData],
methodGen: Gen[R, Method] = HttpGen.method,
urlGen: Gen[Random with Sized, URL] = HttpGen.url,
headerGen: Gen[Random with Sized, Header] = HttpGen.header,
): Gen[R with Random with Sized, ClientRequest] =
): Gen[R with Random with Sized, Request] =
for {
method <- methodGen
url <- urlGen
headers <- Gen.listOf(headerGen).map(Headers(_))
data <- dataGen
version <- Gen.fromIterable(List(Version.Http_1_0, Version.Http_1_1))
} yield ClientRequest(url, method, headers, data, version)
version <- httpVersion
} yield Request(version, method, url, headers, data = data)

def httpVersion: Gen[Random with Sized, Version] =
Gen.fromIterable(List(Version.Http_1_0, Version.Http_1_1))

def cookies: Gen[Random with Sized, Cookie] = for {
name <- Gen.anyString
Expand Down Expand Up @@ -123,7 +126,7 @@ object HttpGen {
}

def request: Gen[Random with Sized, Request] = for {
version <- Gen.fromIterable(List(Version.Http_1_0, Version.Http_1_1))
version <- httpVersion
method <- HttpGen.method
url <- HttpGen.url
headers <- Gen.listOf(HttpGen.header).map(Headers(_))
Expand Down
11 changes: 6 additions & 5 deletions zio-http/src/test/scala/zhttp/internal/HttpRunnableSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import zhttp.http.URL.Location
import zhttp.http._
import zhttp.internal.DynamicServer.HttpEnv
import zhttp.internal.HttpRunnableSpec.HttpTestClient
import zhttp.service.Client.ClientRequest
import zhttp.service.Client.Config
import zhttp.service._
import zhttp.service.client.ClientSSLHandler.ClientSSLOptions
import zhttp.socket.SocketApp
Expand All @@ -20,7 +20,7 @@ import zio.{Has, ZIO, ZManaged}
*/
abstract class HttpRunnableSpec extends DefaultRunnableSpec { self =>

implicit class RunnableClientHttpSyntax[R, A](app: Http[R, Throwable, Client.ClientRequest, A]) {
implicit class RunnableClientHttpSyntax[R, A](app: Http[R, Throwable, Request, A]) {

/**
* Runs the deployed Http app by making a real http request to it. The
Expand All @@ -34,7 +34,7 @@ abstract class HttpRunnableSpec extends DefaultRunnableSpec { self =>
version: Version = Version.Http_1_1,
): ZIO[R, Throwable, A] =
app(
Client.ClientRequest(
Request(
url = URL(path), // url set here is overridden later via `deploy` method
method = method,
headers = headers,
Expand All @@ -56,15 +56,16 @@ abstract class HttpRunnableSpec extends DefaultRunnableSpec { self =>
* while writing tests. It also allows us to simply pass a request in the
* end, to execute, and resolve it with a response, like a normal HttpApp.
*/
def deploy: HttpTestClient[Any, ClientRequest, Response] =
def deploy: HttpTestClient[Any, Request, Response] =
for {
port <- Http.fromZIO(DynamicServer.port)
id <- Http.fromZIO(DynamicServer.deploy(app))
response <- Http.fromFunctionZIO[Client.ClientRequest] { params =>
response <- Http.fromFunctionZIO[Request] { params =>
Client.request(
params
.addHeader(DynamicServer.APP_ID, id)
.copy(url = URL(params.url.path, Location.Absolute(Scheme.HTTP, "localhost", port))),
Config.empty,
)
}
} yield response
Expand Down

0 comments on commit 94be6b9

Please sign in to comment.