Skip to content

Commit

Permalink
refactor: Client.socket now returns a ZManaged
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath committed May 15, 2022
1 parent 3dd3cb5 commit ff39a63
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 23 deletions.
4 changes: 2 additions & 2 deletions example/src/main/scala/example/WebSocketSimpleClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ object WebSocketSimpleClient extends zio.App {
// Setup client envs
val env = EventLoopGroup.auto() ++ ChannelFactory.auto

val url = "ws://localhost:8090/subscriptions"
val url = "ws://ws.vi-server.org/mirror"

val app = Socket
.collect[WebSocketFrame] {
Expand All @@ -21,6 +21,6 @@ object WebSocketSimpleClient extends zio.App {
.connect(url)

override def run(args: List[String]): URIO[ZEnv, ExitCode] = {
app.exitCode.provideCustomLayer(env)
app.useForever.exitCode.provideCustomLayer(env)
}
}
16 changes: 12 additions & 4 deletions zio-http/src/main/scala/zhttp/http/Response.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package zhttp.http

import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.HttpVersion.HTTP_1_1
import io.netty.handler.codec.http.{FullHttpResponse, HttpHeaderNames, HttpResponse}
import zhttp.html._
import zhttp.http.headers.HeaderExtension
import zhttp.service.ChannelFuture
import zhttp.socket.{IsWebSocket, Socket, SocketApp}
import zio.{UIO, ZIO}
import zio.{Task, UIO, ZIO}

import java.io.{PrintWriter, StringWriter}
import java.io.{IOException, PrintWriter, StringWriter}

final case class Response private (
status: Status,
Expand All @@ -18,6 +20,11 @@ final case class Response private (
) extends HeaderExtension[Response]
with HttpDataExtension[Response] { self =>

private[zhttp] def close: Task[Unit] = self.attribute.channel match {
case Some(channel) => ChannelFuture.unit(channel.close())
case None => ZIO.fail(new IOException("Channel context isn't available"))
}

/**
* Encodes the Response into a Netty HttpResponse. Sets default headers such
* as `content-length`. For performance reasons, it is possible that it uses a
Expand Down Expand Up @@ -107,11 +114,11 @@ final case class Response private (
}

object Response {
private[zhttp] def unsafeFromJResponse(jRes: FullHttpResponse): Response = {
private[zhttp] def unsafeFromJResponse(ctx: ChannelHandlerContext, jRes: FullHttpResponse): Response = {
val status = Status.fromHttpResponseStatus(jRes.status())
val headers = Headers.decode(jRes.headers())
val data = HttpData.fromByteBuf(Unpooled.copiedBuffer(jRes.content()))
Response(status, headers, data)
Response(status, headers, data, attribute = Attribute(channel = Some(ctx)))
}

def apply[R, E](
Expand Down Expand Up @@ -235,6 +242,7 @@ object Response {
memoize: Boolean = false,
serverTime: Boolean = false,
encoded: Option[(Response, HttpResponse)] = None,
channel: Option[ChannelHandlerContext] = None,
) { self =>
def withEncodedResponse(jResponse: HttpResponse, response: Response): Attribute =
self.copy(encoded = Some(response -> jResponse))
Expand Down
14 changes: 7 additions & 7 deletions zio-http/src/main/scala/zhttp/service/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import zhttp.service.Client.Config
import zhttp.service.client.ClientSSLHandler.ClientSSLOptions
import zhttp.service.client.{ClientInboundHandler, ClientSSLHandler}
import zhttp.socket.{Socket, SocketApp}
import zio.{Promise, Task, ZIO}
import zio.{Promise, Task, ZIO, ZManaged}

import java.net.{InetSocketAddress, URI}

Expand All @@ -38,8 +38,8 @@ final case class Client[R](rtm: HttpRuntime[R], cf: JChannelFactory[Channel], el
headers: Headers = Headers.empty,
socketApp: SocketApp[R],
sslOptions: ClientSSLOptions = ClientSSLOptions.DefaultSSL,
): ZIO[R, Throwable, Response] = for {
env <- ZIO.environment[R]
): ZManaged[R, Throwable, Response] = for {
env <- ZManaged.environment[R]
res <- request(
Request(
version = Version.Http_1_1,
Expand All @@ -48,7 +48,7 @@ final case class Client[R](rtm: HttpRuntime[R], cf: JChannelFactory[Channel], el
headers,
),
clientConfig = Client.Config(socketApp = Some(socketApp.provideEnvironment(env)), ssl = Some(sslOptions)),
)
).toManaged(_.close.orDie)
} yield res

/**
Expand Down Expand Up @@ -159,10 +159,10 @@ object Client {
app: SocketApp[R],
headers: Headers = Headers.empty,
sslOptions: ClientSSLOptions = ClientSSLOptions.DefaultSSL,
): ZIO[R with EventLoopGroup with ChannelFactory, Throwable, Response] = {
): ZManaged[R with EventLoopGroup with ChannelFactory, Throwable, Response] = {
for {
clt <- make[R]
uri <- ZIO.fromEither(URL.fromString(url))
clt <- make[R].toManaged_
uri <- ZIO.fromEither(URL.fromString(url)).toManaged_
res <- clt.socket(uri, headers, app, sslOptions)
} yield res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final class ClientInboundHandler[R](
msg.touch("handlers.ClientInboundHandler-channelRead0")
// NOTE: The promise is made uninterruptible to be able to complete the promise in a error situation.
// It allows to avoid loosing the message from pipeline in case the channel pipeline is closed due to an error.
zExec.unsafeRunUninterruptible(ctx)(promise.succeed(Response.unsafeFromJResponse(msg)))
zExec.unsafeRunUninterruptible(ctx)(promise.succeed(Response.unsafeFromJResponse(ctx, msg)))

if (isWebSocket) {
ctx.fireChannelRead(msg.retain())
Expand Down
4 changes: 2 additions & 2 deletions zio-http/src/main/scala/zhttp/socket/Socket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import zhttp.service.{ChannelFactory, EventLoopGroup}
import zio.clock.Clock
import zio.duration.Duration
import zio.stream.ZStream
import zio.{Cause, NeedsEnv, ZIO}
import zio.{Cause, NeedsEnv, ZIO, ZManaged}

sealed trait Socket[-R, +E, -A, +B] { self =>
import Socket._
Expand All @@ -31,7 +31,7 @@ sealed trait Socket[-R, +E, -A, +B] { self =>

def connect(url: String)(implicit
ev: IsWebSocket[R, E, A, B],
): ZIO[R with EventLoopGroup with ChannelFactory, Throwable, Response] =
): ZManaged[R with EventLoopGroup with ChannelFactory, Throwable, Response] =
self.toSocketApp.connect(url)

def contramap[Z](za: Z => A): Socket[R, E, Z, B] = Socket.FCMap(self, za)
Expand Down
4 changes: 2 additions & 2 deletions zio-http/src/main/scala/zhttp/socket/SocketApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import zhttp.service.{ChannelFactory, Client, EventLoopGroup}
import zhttp.socket.SocketApp.Handle.{WithEffect, WithSocket}
import zhttp.socket.SocketApp.{Connection, Handle}
import zio.stream.ZStream
import zio.{NeedsEnv, ZIO}
import zio.{NeedsEnv, ZIO, ZManaged}

import java.net.SocketAddress

Expand All @@ -23,7 +23,7 @@ final case class SocketApp[-R](
* Creates a socket connection on the provided URL. Typically used to connect
* as a client.
*/
def connect(url: String): ZIO[R with EventLoopGroup with ChannelFactory, Throwable, Response] =
def connect(url: String): ZManaged[R with EventLoopGroup with ChannelFactory, Throwable, Response] =
Client.socket(url, self)

/**
Expand Down
12 changes: 7 additions & 5 deletions zio-http/src/test/scala/zhttp/internal/HttpRunnableSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ abstract class HttpRunnableSpec extends DefaultRunnableSpec { self =>
id <- Http.fromZIO(DynamicServer.deploy(app))
url <- Http.fromZIO(DynamicServer.wsURL)
response <- Http.fromFunctionZIO[SocketApp[HttpEnv]] { app =>
Client.socket(
url = url,
headers = Headers(DynamicServer.APP_ID, id),
app = app,
)
Client
.socket(
url = url,
headers = Headers(DynamicServer.APP_ID, id),
app = app,
)
.useNow
}
} yield response
}
Expand Down

3 comments on commit ff39a63

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 Performance Benchmark:

Concurrency: 256
Requests/sec: 942795.64

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 Performance Benchmark:

Concurrency: 256
Requests/sec: 944438.75

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 Performance Benchmark:

Concurrency: 256
Requests/sec: 929416.53

Please sign in to comment.