Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: Merge server and client Responses #1111

Merged
merged 2 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions zio-http/src/main/scala/zhttp/http/Http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ sealed trait Http[-R, +E, -A, +B] extends (A => ZIO[R, Option[E], B]) { self =>
/**
* Extracts body
*/
final def body(implicit eb: IsResponse[B], ee: E <:< Throwable): Http[R, Throwable, A, Chunk[Byte]] =
final def body(implicit eb: B <:< Response, ee: E <:< Throwable): Http[R, Throwable, A, Chunk[Byte]] =
self.bodyAsByteBuf.mapZIO(buf => Task(Chunk.fromArray(ByteBufUtil.getBytes(buf))))

/**
* Extracts body as a string
*/
final def bodyAsString(implicit eb: IsResponse[B], ee: E <:< Throwable): Http[R, Throwable, A, String] =
final def bodyAsString(implicit eb: B <:< Response, ee: E <:< Throwable): Http[R, Throwable, A, String] =
self.bodyAsByteBuf.mapZIO(bytes => Task(bytes.toString(HTTP_CHARSET)))

/**
Expand Down Expand Up @@ -135,13 +135,13 @@ sealed trait Http[-R, +E, -A, +B] extends (A => ZIO[R, Option[E], B]) { self =>
/**
* Extracts content-length from the response if available
*/
final def contentLength(implicit eb: IsResponse[B]): Http[R, E, A, Option[Long]] =
final def contentLength(implicit eb: B <:< Response): Http[R, E, A, Option[Long]] =
headers.map(_.contentLength)

/**
* Extracts the value of ContentType header
*/
final def contentType(implicit eb: IsResponse[B]): Http[R, E, A, Option[CharSequence]] =
final def contentType(implicit eb: B <:< Response): Http[R, E, A, Option[CharSequence]] =
headerValue(HttpHeaderNames.CONTENT_TYPE)

/**
Expand Down Expand Up @@ -211,13 +211,13 @@ sealed trait Http[-R, +E, -A, +B] extends (A => ZIO[R, Option[E], B]) { self =>
/**
* Extracts the value of the provided header name.
*/
final def headerValue(name: CharSequence)(implicit eb: IsResponse[B]): Http[R, E, A, Option[CharSequence]] =
final def headerValue(name: CharSequence)(implicit eb: B <:< Response): Http[R, E, A, Option[CharSequence]] =
headers.map(_.headerValue(name))

/**
* Extracts the `Headers` from the type `B` if possible
*/
final def headers(implicit eb: IsResponse[B]): Http[R, E, A, Headers] = self.map(eb.headers)
final def headers(implicit eb: B <:< Response): Http[R, E, A, Headers] = self.map(_.headers)

/**
* Transforms the output of the http app
Expand Down Expand Up @@ -294,7 +294,7 @@ sealed trait Http[-R, +E, -A, +B] extends (A => ZIO[R, Option[E], B]) { self =>
/**
* Extracts `Status` from the type `B` is possible.
*/
final def status(implicit ev: IsResponse[B]): Http[R, E, A, Status] = self.map(ev.status)
final def status(implicit ev: B <:< Response): Http[R, E, A, Status] = self.map(_.status)

/**
* Returns an Http that peeks at the success of this Http.
Expand Down Expand Up @@ -382,10 +382,10 @@ sealed trait Http[-R, +E, -A, +B] extends (A => ZIO[R, Option[E], B]) { self =>
* Extracts body as a ByteBuf
*/
private[zhttp] final def bodyAsByteBuf(implicit
eb: IsResponse[B],
eb: B <:< Response,
ee: E <:< Throwable,
): Http[R, Throwable, A, ByteBuf] =
self.widen[Throwable, B].mapZIO(eb.bodyAsByteBuf)
self.widen[Throwable, B].mapZIO(_.bodyAsByteBuf)

/**
* Evaluates the app and returns an HExit that can be resolved further
Expand Down
25 changes: 0 additions & 25 deletions zio-http/src/main/scala/zhttp/http/IsResponse.scala

This file was deleted.

16 changes: 14 additions & 2 deletions zio-http/src/main/scala/zhttp/http/Response.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package zhttp.http

import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.buffer.{ByteBuf, ByteBufUtil, Unpooled}
import io.netty.handler.codec.http.HttpVersion.HTTP_1_1
import io.netty.handler.codec.http.{HttpHeaderNames, HttpResponse}
import io.netty.handler.codec.http.{FullHttpResponse, HttpHeaderNames, HttpResponse}
import zhttp.html._
import zhttp.http.headers.HeaderExtension
import zhttp.socket.{IsWebSocket, Socket, SocketApp}
Expand Down Expand Up @@ -59,11 +59,16 @@ final case class Response private (
*/
def withServerTime: Response = self.copy(attribute = self.attribute.withServerTime)

final def bodyAsByteArray: Task[Array[Byte]] =
bodyAsByteBuf.flatMap(buf => Task(ByteBufUtil.getBytes(buf)))

/**
* Extracts the body as ByteBuf
*/
private[zhttp] def bodyAsByteBuf: Task[ByteBuf] = self.data.toByteBuf

def bodyAsString: Task[String] = bodyAsByteArray.map(new String(_, charset))

/**
* Encodes the Response into a Netty HttpResponse. Sets default headers such
* as `content-length`. For performance reasons, it is possible that it uses a
Expand Down Expand Up @@ -219,6 +224,13 @@ object Response {
headers = Headers(HeaderNames.contentType, HeaderValues.textPlain),
)

private[zhttp] def unsafeFromJResponse(jRes: FullHttpResponse): Response = {
val status = Status.fromHttpResponseStatus(jRes.status())
val headers = Headers.decode(jRes.headers())
val data = HttpData.fromByteBuf(Unpooled.copiedBuffer(jRes.content()))
Response(status, headers, data)
}

/**
* Attribute holds meta data for the backend
*/
Expand Down
42 changes: 10 additions & 32 deletions zio-http/src/main/scala/zhttp/service/Client.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package zhttp.service

import io.netty.bootstrap.Bootstrap
import io.netty.buffer.{ByteBuf, ByteBufUtil, Unpooled}
import io.netty.buffer.ByteBuf
import io.netty.channel.{
Channel,
ChannelFactory => JChannelFactory,
Expand All @@ -15,20 +15,20 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler
import zhttp.http._
import zhttp.http.headers.HeaderExtension
import zhttp.service
import zhttp.service.Client.{ClientRequest, ClientResponse}
import zhttp.service.Client.ClientRequest
import zhttp.service.client.ClientSSLHandler.ClientSSLOptions
import zhttp.service.client.{ClientInboundHandler, ClientSSLHandler}
import zhttp.socket.{Socket, SocketApp}
import zio.{Chunk, Promise, Task, ZIO}
import zio.{Promise, Task, ZIO}

import java.net.{InetAddress, InetSocketAddress, URI}

final case class Client[R](rtm: HttpRuntime[R], cf: JChannelFactory[Channel], el: JEventLoopGroup)
extends HttpMessageCodec {

def request(request: Client.ClientRequest): Task[Client.ClientResponse] =
def request(request: Client.ClientRequest): Task[Response] =
for {
promise <- Promise.make[Throwable, Client.ClientResponse]
promise <- Promise.make[Throwable, Response]
jReq <- encode(request)
_ <- ChannelFuture
.unit(unsafeRequest(request, jReq, promise))
Expand All @@ -41,7 +41,7 @@ final case class Client[R](rtm: HttpRuntime[R], cf: JChannelFactory[Channel], el
headers: Headers = Headers.empty,
socketApp: SocketApp[R],
sslOptions: ClientSSLOptions = ClientSSLOptions.DefaultSSL,
): ZIO[R, Throwable, ClientResponse] = for {
): ZIO[R, Throwable, Response] = for {
env <- ZIO.environment[R]
res <- request(
ClientRequest(
Expand All @@ -59,7 +59,7 @@ final case class Client[R](rtm: HttpRuntime[R], cf: JChannelFactory[Channel], el
private def unsafeRequest(
req: ClientRequest,
jReq: FullHttpRequest,
promise: Promise[Throwable, ClientResponse],
promise: Promise[Throwable, Response],
): JChannelFuture = {

try {
Expand Down Expand Up @@ -137,15 +137,15 @@ object Client {
headers: Headers = Headers.empty,
content: HttpData = HttpData.empty,
ssl: ClientSSLOptions = ClientSSLOptions.DefaultSSL,
): ZIO[EventLoopGroup with ChannelFactory, Throwable, ClientResponse] =
): ZIO[EventLoopGroup with ChannelFactory, Throwable, Response] =
for {
uri <- ZIO.fromEither(URL.fromString(url))
res <- request(ClientRequest(uri, method, headers, content, attribute = Attribute(ssl = Some(ssl))))
} yield res

def request(
request: ClientRequest,
): ZIO[EventLoopGroup with ChannelFactory, Throwable, ClientResponse] =
): ZIO[EventLoopGroup with ChannelFactory, Throwable, Response] =
for {
clt <- make[Any]
res <- clt.request(request)
Expand All @@ -156,7 +156,7 @@ object Client {
app: SocketApp[R],
headers: Headers = Headers.empty,
sslOptions: ClientSSLOptions = ClientSSLOptions.DefaultSSL,
): ZIO[R with EventLoopGroup with ChannelFactory, Throwable, ClientResponse] = {
): ZIO[R with EventLoopGroup with ChannelFactory, Throwable, Response] = {
for {
clt <- make[R]
uri <- ZIO.fromEither(URL.fromString(url))
Expand Down Expand Up @@ -193,33 +193,11 @@ object Client {
private[zhttp] def bodyAsByteBuf: Task[ByteBuf] = data.toByteBuf
}

final case class ClientResponse(status: Status, headers: Headers, private[zhttp] val buffer: ByteBuf)
extends HeaderExtension[ClientResponse] {
self =>

def body: Task[Chunk[Byte]] = Task(Chunk.fromArray(ByteBufUtil.getBytes(buffer)))

def bodyAsByteBuf: Task[ByteBuf] = Task(buffer)

def bodyAsString: Task[String] = Task(buffer.toString(self.charset))

override def updateHeaders(update: Headers => Headers): ClientResponse = self.copy(headers = update(headers))
}

case class Attribute(socketApp: Option[SocketApp[Any]] = None, ssl: Option[ClientSSLOptions] = None) { self =>
def withSSL(ssl: ClientSSLOptions): Attribute = self.copy(ssl = Some(ssl))
def withSocketApp(socketApp: SocketApp[Any]): Attribute = self.copy(socketApp = Some(socketApp))
}

object ClientResponse {
private[zhttp] def unsafeFromJResponse(jRes: FullHttpResponse): ClientResponse = {
val status = Status.fromHttpResponseStatus(jRes.status())
val headers = Headers.decode(jRes.headers())
val content = Unpooled.copiedBuffer(jRes.content())
Client.ClientResponse(status, headers, content)
}
}

object Attribute {
def empty: Attribute = Attribute()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zhttp.service.client

import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
import io.netty.handler.codec.http.{FullHttpRequest, FullHttpResponse}
import zhttp.service.Client.ClientResponse
import zhttp.http.Response
import zhttp.service.HttpRuntime
import zio.Promise

Expand All @@ -12,7 +12,7 @@ import zio.Promise
final class ClientInboundHandler[R](
zExec: HttpRuntime[R],
jReq: FullHttpRequest,
promise: Promise[Throwable, ClientResponse],
promise: Promise[Throwable, Response],
isWebSocket: Boolean,
) extends SimpleChannelInboundHandler[FullHttpResponse](true) {

Expand All @@ -28,7 +28,7 @@ final class ClientInboundHandler[R](
override def channelRead0(ctx: ChannelHandlerContext, msg: FullHttpResponse): Unit = {
msg.touch("handlers.ClientInboundHandler-channelRead0")

zExec.unsafeRun(ctx)(promise.succeed(ClientResponse.unsafeFromJResponse(msg)))
zExec.unsafeRun(ctx)(promise.succeed(Response.unsafeFromJResponse(msg)))
if (isWebSocket) {
ctx.fireChannelRead(msg.retain())
ctx.pipeline().remove(ctx.name()): Unit
Expand Down
4 changes: 2 additions & 2 deletions zio-http/src/main/scala/zhttp/socket/Socket.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package zhttp.socket

import zhttp.http.{Http, Response}
import zhttp.service.{ChannelFactory, Client, EventLoopGroup}
import zhttp.service.{ChannelFactory, EventLoopGroup}
import zio.stream.ZStream
import zio.{Cause, NeedsEnv, ZIO}

Expand All @@ -27,7 +27,7 @@ sealed trait Socket[-R, +E, -A, +B] { self =>

def connect(url: String)(implicit
ev: IsWebSocket[R, E, A, B],
): ZIO[R with EventLoopGroup with ChannelFactory, Throwable, Client.ClientResponse] =
): ZIO[R with EventLoopGroup with ChannelFactory, Throwable, Response] =
self.toSocketApp.connect(url)

def contramap[Z](za: Z => A): Socket[R, E, Z, B] = Socket.FCMap(self, za)
Expand Down
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zhttp/socket/SocketApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ final case class SocketApp[-R](
* Creates a socket connection on the provided URL. Typically used to connect
* as a client.
*/
def connect(url: String): ZIO[R with EventLoopGroup with ChannelFactory, Throwable, Client.ClientResponse] =
def connect(url: String): ZIO[R with EventLoopGroup with ChannelFactory, Throwable, Response] =
Client.socket(url, self)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import zhttp.http.URL.Location
import zhttp.http._
import zhttp.internal.DynamicServer.HttpEnv
import zhttp.internal.HttpRunnableSpec.HttpTestClient
import zhttp.service.Client.{ClientRequest, ClientResponse}
import zhttp.service.Client.ClientRequest
import zhttp.service._
import zhttp.service.client.ClientSSLHandler.ClientSSLOptions
import zhttp.socket.SocketApp
Expand Down Expand Up @@ -56,7 +56,7 @@ abstract class HttpRunnableSpec extends DefaultRunnableSpec { self =>
* while writing tests. It also allows us to simply pass a request in the
* end, to execute, and resolve it with a response, like a normal HttpApp.
*/
def deploy: HttpTestClient[Any, ClientRequest, ClientResponse] =
def deploy: HttpTestClient[Any, ClientRequest, Response] =
for {
port <- Http.fromZIO(DynamicServer.port)
id <- Http.fromZIO(DynamicServer.deploy(app))
Expand All @@ -69,7 +69,7 @@ abstract class HttpRunnableSpec extends DefaultRunnableSpec { self =>
}
} yield response

def deployWS: HttpTestClient[Any, SocketApp[Any], ClientResponse] =
def deployWS: HttpTestClient[Any, SocketApp[Any], Response] =
for {
id <- Http.fromZIO(DynamicServer.deploy(app))
url <- Http.fromZIO(DynamicServer.wsURL)
Expand Down