From aa1750a423a4322e3dcef0b619882a550dbdc737 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Sun, 9 Apr 2023 09:55:25 +0200 Subject: [PATCH] Real support for streaming in Client (#2083) * Real support for streaming, first step * Leak fix, withDisabledStreaming * Fixes * Fixes and debug prints * Fixes and cleanup * Remove some low-level configuration from Server.Config * Remove isRead attribute * Readded FlushConsolidationHandler * Fix for 2.12 * Change position of flush consolidation handler --- project/Dependencies.scala | 2 +- .../example/PlainTextBenchmarkServer.scala | 7 +- .../example/SimpleEffectBenchmarkServer.scala | 5 +- .../scala/example/WebSocketSimpleClient.scala | 7 +- .../main/scala/zio/http/ClientDriver.scala | 1 - .../src/main/scala/zio/http/DnsResolver.scala | 16 ++ .../src/main/scala/zio/http/Request.scala | 16 ++ .../src/main/scala/zio/http/Response.scala | 15 ++ zio-http/src/main/scala/zio/http/Server.scala | 88 +++---- .../src/main/scala/zio/http/ZClient.scala | 48 +++- .../scala/zio/http/forms/StreamingForm.scala | 16 ++ .../scala/zio/http/html/EncodingState.scala | 16 ++ .../main/scala/zio/http/model/Header.scala | 16 ++ .../zio/http/netty/AsyncBodyReader.scala | 93 +++++++ .../main/scala/zio/http/netty/NettyBody.scala | 1 - .../zio/http/netty/NettyBodyWriter.scala | 16 +- .../scala/zio/http/netty/NettyResponse.scala | 24 +- .../netty/client/ClientInboundHandler.scala | 91 ++++--- .../ClientInboundStreamingHandler.scala | 102 -------- .../netty/client/ClientRequestEncoder.scala | 59 ----- .../client/ClientResponseStreamHandler.scala | 31 +-- .../http/netty/client/NettyClientDriver.scala | 67 +++-- .../netty/client/NettyRequestEncoder.scala | 66 +++++ .../WebSocketClientInboundHandler.scala | 58 +++++ .../main/scala/zio/http/netty/package.scala | 1 - .../netty/server/ServerAsyncBodyHandler.scala | 24 +- .../server/ServerChannelInitializer.scala | 33 ++- .../netty/server/ServerInboundHandler.scala | 33 +-- .../src/test/scala/zio/http/ClientSpec.scala | 2 +- .../scala/zio/http/ClientStreamingSpec.scala | 232 +++++++++++++++--- .../test/scala/zio/http/DnsResolverSpec.scala | 16 ++ .../zio/http/NettyMaxHeaderLengthSpec.scala | 16 ++ .../zio/http/RequestStreamingServerSpec.scala | 3 +- .../src/test/scala/zio/http/ServerSpec.scala | 3 +- .../zio/http/internal/HttpRunnableSpec.scala | 13 +- .../client/NettyConnectionPoolSpec.scala | 26 +- ...ec.scala => NettyRequestEncoderSpec.scala} | 3 +- 37 files changed, 799 insertions(+), 467 deletions(-) create mode 100644 zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala delete mode 100644 zio-http/src/main/scala/zio/http/netty/client/ClientInboundStreamingHandler.scala delete mode 100644 zio-http/src/main/scala/zio/http/netty/client/ClientRequestEncoder.scala create mode 100644 zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala create mode 100644 zio-http/src/main/scala/zio/http/netty/client/WebSocketClientInboundHandler.scala rename zio-http/src/test/scala/zio/http/netty/client/{ClientRequestEncoderSpec.scala => NettyRequestEncoderSpec.scala} (97%) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index fed08df950..a1cd973c9b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,7 +6,7 @@ object Dependencies { val NettyVersion = "4.1.91.Final" val NettyIncubatorVersion = "0.0.19.Final" val ScalaCompactCollectionVersion = "2.8.1" - val ZioVersion = "2.0.10+55-a63d1724-SNAPSHOT" + val ZioVersion = "2.0.11" val ZioSchemaVersion = "0.4.8" val SttpVersion = "3.3.18" diff --git a/zio-http-example/src/main/scala/example/PlainTextBenchmarkServer.scala b/zio-http-example/src/main/scala/example/PlainTextBenchmarkServer.scala index 2105e32f22..e56d493007 100644 --- a/zio-http-example/src/main/scala/example/PlainTextBenchmarkServer.scala +++ b/zio-http-example/src/main/scala/example/PlainTextBenchmarkServer.scala @@ -2,6 +2,7 @@ package example import zio._ +import zio.http.Server.RequestStreaming import zio.http._ import zio.http.model.Header import zio.http.netty.NettyConfig @@ -38,13 +39,11 @@ object PlainTextBenchmarkServer extends ZIOAppDefault { private def jsonApp(json: Response): HttpApp[Any, Nothing] = Handler.response(json).toHttp.whenPathEq(jsonPath) - val app = plainTextApp(frozenPlainTextResponse) ++ jsonApp(frozenJsonResponse) + val app: App[Any] = plainTextApp(frozenPlainTextResponse) ++ jsonApp(frozenJsonResponse) private val config = Server.Config.default .port(8080) - .consolidateFlush(true) - .flowControl(false) - .objectAggregator(-1) + .enableRequestStreaming private val nettyConfig = NettyConfig.default .leakDetection(LeakDetectionLevel.DISABLED) diff --git a/zio-http-example/src/main/scala/example/SimpleEffectBenchmarkServer.scala b/zio-http-example/src/main/scala/example/SimpleEffectBenchmarkServer.scala index 899b554cdf..a7ef3143ef 100644 --- a/zio-http-example/src/main/scala/example/SimpleEffectBenchmarkServer.scala +++ b/zio-http-example/src/main/scala/example/SimpleEffectBenchmarkServer.scala @@ -2,6 +2,7 @@ package example import zio._ +import zio.http.Server.RequestStreaming import zio.http._ import zio.http.model.{Header, Method} import zio.http.netty.NettyConfig @@ -38,9 +39,7 @@ object SimpleEffectBenchmarkServer extends ZIOAppDefault { private val config = Server.Config.default .port(8080) - .consolidateFlush(true) - .flowControl(false) - .objectAggregator(-1) + .enableRequestStreaming private val nettyConfig = NettyConfig.default .leakDetection(LeakDetectionLevel.DISABLED) diff --git a/zio-http-example/src/main/scala/example/WebSocketSimpleClient.scala b/zio-http-example/src/main/scala/example/WebSocketSimpleClient.scala index 01b8546508..613eeb6d44 100644 --- a/zio-http-example/src/main/scala/example/WebSocketSimpleClient.scala +++ b/zio-http-example/src/main/scala/example/WebSocketSimpleClient.scala @@ -29,9 +29,12 @@ object WebSocketSimpleClient extends ZIOAppDefault { ZIO.succeed(println("Goodbye!")) *> ch.writeAndFlush(WebSocketFrame.close(1000)) } - val app: ZIO[Any with Client with Scope, Throwable, Response] = + val app: ZIO[Client with Scope, Throwable, Response] = httpSocket.toSocketApp.connect(url) *> ZIO.never - val run = app.provide(Client.default, Scope.default) + val run: ZIO[ZIOAppArgs with Scope, Throwable, Any] = + ZIO.scoped { + app.provideSome[Scope](Client.default) + } } diff --git a/zio-http/src/main/scala/zio/http/ClientDriver.scala b/zio-http/src/main/scala/zio/http/ClientDriver.scala index e3c9008058..7379f835b4 100644 --- a/zio-http/src/main/scala/zio/http/ClientDriver.scala +++ b/zio-http/src/main/scala/zio/http/ClientDriver.scala @@ -31,7 +31,6 @@ trait ClientDriver { req: Request, onResponse: Promise[Throwable, Response], onComplete: Promise[Throwable, ChannelState], - useAggregator: Boolean, enableKeepAlive: Boolean, createSocketApp: () => SocketApp[Any], )(implicit trace: Trace): ZIO[Scope, Throwable, ChannelInterface] diff --git a/zio-http/src/main/scala/zio/http/DnsResolver.scala b/zio-http/src/main/scala/zio/http/DnsResolver.scala index f963ab3377..d5f235f8a4 100644 --- a/zio-http/src/main/scala/zio/http/DnsResolver.scala +++ b/zio-http/src/main/scala/zio/http/DnsResolver.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package zio.http import java.net.{InetAddress, UnknownHostException} diff --git a/zio-http/src/main/scala/zio/http/Request.scala b/zio-http/src/main/scala/zio/http/Request.scala index c8b42bdaa7..1e865f6661 100644 --- a/zio-http/src/main/scala/zio/http/Request.scala +++ b/zio-http/src/main/scala/zio/http/Request.scala @@ -18,6 +18,8 @@ package zio.http import java.net.InetAddress +import zio.ZIO + import zio.http.model._ import zio.http.model.headers._ @@ -35,11 +37,25 @@ final case class Request( */ def addTrailingSlash: Request = self.copy(url = self.url.addTrailingSlash) + /** + * Collects the potentially streaming body of the request into a single chunk. + */ + def collect: ZIO[Any, Throwable, Request] = + if (self.body.isComplete) ZIO.succeed(self) + else + self.body.asChunk.map { bytes => + self.copy(body = Body.fromChunk(bytes)) + } + /** * Drops trailing slash from the path. */ def dropTrailingSlash: Request = self.copy(url = self.url.dropTrailingSlash) + /** Consumes the streaming body fully and then drops it */ + def ignoreBody: ZIO[Any, Throwable, Request] = + self.collect.map(_.copy(body = Body.empty)) + def patch(p: Request.Patch): Request = Request( body, diff --git a/zio-http/src/main/scala/zio/http/Response.scala b/zio-http/src/main/scala/zio/http/Response.scala index 1b43badd3c..89f3a86d58 100644 --- a/zio-http/src/main/scala/zio/http/Response.scala +++ b/zio-http/src/main/scala/zio/http/Response.scala @@ -33,6 +33,17 @@ sealed trait Response extends HeaderOps[Response] { self => def body: Body + /** + * Collects the potentially streaming body of the response into a single + * chunk. + */ + def collect: ZIO[Any, Throwable, Response] = + if (self.body.isComplete) ZIO.succeed(self) + else + self.body.asChunk.map { bytes => + self.copy(body = Body.fromChunk(bytes)) + } + def copy( status: Status = self.status, headers: Headers = self.headers, @@ -86,6 +97,10 @@ sealed trait Response extends HeaderOps[Response] { self => case _ => None } + /** Consumes the streaming body fully and then drops it */ + final def ignoreBody: ZIO[Any, Throwable, Response] = + self.collect.map(_.copy(body = Body.empty)) + final def isWebSocket: Boolean = self match { case _: SocketAppResponse => self.status == Status.SwitchingProtocols case _ => false diff --git a/zio-http/src/main/scala/zio/http/Server.scala b/zio-http/src/main/scala/zio/http/Server.scala index 1e1448423c..de050ce3a7 100644 --- a/zio-http/src/main/scala/zio/http/Server.scala +++ b/zio-http/src/main/scala/zio/http/Server.scala @@ -49,15 +49,12 @@ object Server { address: InetSocketAddress, acceptContinue: Boolean, keepAlive: Boolean, - consolidateFlush: Boolean, - flowControl: Boolean, requestDecompression: Decompression, responseCompression: Option[ResponseCompressionConfig], - objectAggregator: Int, + requestStreaming: RequestStreaming, maxHeaderSize: Int, ) { self => - def useAggregator: Boolean = objectAggregator >= 0 /** * Configure the server to use HttpServerExpectContinueHandler to send a 100 @@ -83,17 +80,14 @@ object Server { def binding(inetSocketAddress: InetSocketAddress): Config = self.copy(address = inetSocketAddress) /** - * Configure the server to use FlushConsolidationHandler to control the - * flush operations in a more efficient way if enabled (@see FlushConsolidationHandler). + * Disables streaming of request bodies. Payloads larger than + * maxContentLength will be rejected */ - def consolidateFlush(enable: Boolean): Config = self.copy(consolidateFlush = enable) + def disableRequestStreaming(maxContentLength: Int): Config = + self.copy(requestStreaming = RequestStreaming.Disabled(maxContentLength)) - /** - * Configure the server to use netty FlowControlHandler if enable (@see FlowControlHandler). - */ - def flowControl(enable: Boolean): Config = self.copy(flowControl = enable) + /** Enables streaming request bodies */ + def enableRequestStreaming: Config = self.copy(requestStreaming = RequestStreaming.Enabled) /** * Configure the server to use netty's HttpServerKeepAliveHandler to close @@ -103,11 +97,10 @@ object Server { def keepAlive(enable: Boolean): Config = self.copy(keepAlive = enable) /** - * Configure the server to use HttpObjectAggregator with the specified max - * size of the aggregated content. + * Configure the server to use `maxHeaderSize` value when encode/decode + * headers. */ - def objectAggregator(maxRequestSize: Int = 1024 * 100): Config = - self.copy(objectAggregator = maxRequestSize) + def maxHeaderSize(headerSize: Int): Config = self.copy(maxHeaderSize = headerSize) /** * Configure the server to listen on an available open port @@ -119,14 +112,6 @@ object Server { */ def port(port: Int): Config = self.copy(address = new InetSocketAddress(port)) - /** - * Configure the server to use netty's HttpContentDecompressor to decompress - * Http requests (@see HttpContentDecompressor). - */ - def requestDecompression(isStrict: Boolean): Config = - self.copy(requestDecompression = if (isStrict) Decompression.Strict else Decompression.NonStrict) - /** * Configure the new server with netty's HttpContentCompressor to compress * Http responses (@see HttpContentDecompressor). */ - def ssl(sslConfig: SSLConfig): Config = self.copy(sslConfig = Some(sslConfig)) + def requestDecompression(isStrict: Boolean): Config = + self.copy(requestDecompression = if (isStrict) Decompression.Strict else Decompression.NonStrict) /** - * Configure the server to use `maxHeaderSize` value when encode/decode - * headers. + * Configure the server with the following ssl options. */ - def maxHeaderSize(headerSize: Int): Config = self.copy(maxHeaderSize = headerSize) + def ssl(sslConfig: SSLConfig): Config = self.copy(sslConfig = Some(sslConfig)) + + /** Enables or disables request body streaming */ + def withRequestStreaming(requestStreaming: RequestStreaming): Config = + self.copy(requestStreaming = requestStreaming) } object Config { @@ -154,11 +145,9 @@ object Server { zio.Config.int("binding-port").withDefault(Config.default.address.getPort) ++ zio.Config.boolean("accept-continue").withDefault(Config.default.acceptContinue) ++ zio.Config.boolean("keep-alive").withDefault(Config.default.keepAlive) ++ - zio.Config.boolean("consolidate-flush").withDefault(Config.default.consolidateFlush) ++ - zio.Config.boolean("flow-control").withDefault(Config.default.flowControl) ++ Decompression.config.nested("request-decompression").withDefault(Config.default.requestDecompression) ++ ResponseCompressionConfig.config.nested("response-compression").optional ++ - zio.Config.int("max-aggregated-request-size").withDefault(Config.default.objectAggregator) ++ + RequestStreaming.config.nested("request-streaming").withDefault(Config.default.requestStreaming) ++ zio.Config.int("max-header-size").withDefault(Config.default.maxHeaderSize) }.map { case ( @@ -167,11 +156,9 @@ object Server { port, acceptContinue, keepAlive, - consolidateFlush, - flowControl, requestDecompression, responseCompression, - objectAggregator, + requestStreaming, maxHeaderSize, ) => Config( @@ -179,11 +166,9 @@ object Server { address = new InetSocketAddress(host.getOrElse(Config.default.address.getHostName), port), acceptContinue = acceptContinue, keepAlive = keepAlive, - consolidateFlush = consolidateFlush, - flowControl = flowControl, requestDecompression = requestDecompression, responseCompression = responseCompression, - objectAggregator = objectAggregator, + requestStreaming = requestStreaming, maxHeaderSize = maxHeaderSize, ) } @@ -193,11 +178,9 @@ object Server { address = new InetSocketAddress(8080), acceptContinue = false, keepAlive = true, - consolidateFlush = false, - flowControl = true, requestDecompression = Decompression.No, responseCompression = None, - objectAggregator = 1024 * 100, + requestStreaming = RequestStreaming.Disabled(1024 * 100), maxHeaderSize = 8192, ) @@ -286,6 +269,27 @@ object Server { } } + sealed trait RequestStreaming + + object RequestStreaming { + + /** Enable streaming request bodies */ + case object Enabled extends RequestStreaming + + /** + * Disable streaming request bodies. Bodies larger than the configured + * maximum content length will be rejected. + */ + final case class Disabled(maximumContentLength: Int) extends RequestStreaming + + lazy val config: zio.Config[RequestStreaming] = + (zio.Config.boolean("enabled").withDefault(true) ++ + zio.Config.int("maximum-content-length").withDefault(1024 * 100)).map { + case (true, _) => Enabled + case (false, maxLength) => Disabled(maxLength) + } + } + def serve[R]( httpApp: App[R], )(implicit trace: Trace): URIO[R with Server, Nothing] = diff --git a/zio-http/src/main/scala/zio/http/ZClient.scala b/zio-http/src/main/scala/zio/http/ZClient.scala index f44ab09889..bc05acdd54 100644 --- a/zio-http/src/main/scala/zio/http/ZClient.scala +++ b/zio-http/src/main/scala/zio/http/ZClient.scala @@ -182,6 +182,36 @@ trait ZClient[-Env, -In, +Err, +Out] extends HeaderOps[ZClient[Env, In, Err, Out final def map[Out2](f: Out => Out2): ZClient[Env, In, Err, Out2] = mapZIO(out => ZIO.succeed(f(out))) + def mapError[Err2](f: Err => Err2): ZClient[Env, In, Err2, Out] = + new ZClient[Env, In, Err2, Out] { + override def headers: Headers = self.headers + override def method: Method = self.method + override def sslConfig: Option[ClientSSLConfig] = self.sslConfig + override def url: URL = self.url + override def version: Version = self.version + override def request( + version: Version, + method: Method, + url: URL, + headers: Headers, + body: In, + sslConfig: Option[ClientSSLConfig], + )(implicit trace: Trace): ZIO[Env, Err2, Out] = + self.request(version, method, url, headers, body, sslConfig).mapError(f) + + override def socket[Env1 <: Env]( + app: SocketApp[Env1], + headers: Headers, + hostOption: Option[String], + pathPrefix: Path, + portOption: Option[RuntimeFlags], + queries: QueryParams, + schemeOption: Option[Scheme], + version: Version, + )(implicit trace: Trace): ZIO[Env1 with Scope, Err2, Out] = + self.socket(app, headers, hostOption, pathPrefix, portOption, queries, schemeOption, version).mapError(f) + } + final def mapZIO[Env1 <: Env, Err1 >: Err, Out2](f: Out => ZIO[Env1, Err1, Out2]): ZClient[Env1, In, Err1, Out2] = new ZClient[Env1, In, Err1, Out2] { override def headers: Headers = self.headers @@ -463,6 +493,12 @@ trait ZClient[-Env, -In, +Err, +Out] extends HeaderOps[ZClient[Env, In, Err, Out version, self, ) + + def withDisabledStreaming(implicit + ev1: Out <:< Response, + ev2: Err <:< Throwable, + ): ZClient[Env, In, Throwable, Response] = + mapError(ev2).mapZIO(out => ev1(out).collect) } object ZClient { @@ -471,7 +507,6 @@ object ZClient { socketApp: Option[SocketApp[Any]], ssl: Option[ClientSSLConfig], proxy: Option[zio.http.Proxy], - useAggregator: Boolean, connectionPool: ConnectionPoolConfig, maxHeaderSize: Int, requestDecompression: Decompression, @@ -484,8 +519,6 @@ object ZClient { def proxy(proxy: zio.http.Proxy): Config = self.copy(proxy = Some(proxy)) - def useObjectAggregator(objectAggregator: Boolean): Config = self.copy(useAggregator = objectAggregator) - def withFixedConnectionPool(size: Int): Config = self.copy(connectionPool = ConnectionPoolConfig.Fixed(size)) @@ -507,15 +540,13 @@ object ZClient { ( ClientSSLConfig.config.nested("ssl").optional.withDefault(Config.default.ssl) ++ zio.http.Proxy.config.nested("proxy").optional.withDefault(Config.default.proxy) ++ - zio.Config.boolean("use-aggregator").withDefault(Config.default.useAggregator) ++ ConnectionPoolConfig.config.nested("connection-pool").withDefault(Config.default.connectionPool) ++ zio.Config.int("max-header-size").withDefault(Config.default.maxHeaderSize) ++ Decompression.config.nested("request-decompression").withDefault(Config.default.requestDecompression) - ).map { case (ssl, proxy, useAggregator, connectionPool, maxHeaderSize, requestDecompression) => + ).map { case (ssl, proxy, connectionPool, maxHeaderSize, requestDecompression) => default.copy( ssl = ssl, proxy = proxy, - useAggregator = useAggregator, connectionPool = connectionPool, maxHeaderSize = maxHeaderSize, requestDecompression = requestDecompression, @@ -526,7 +557,6 @@ object ZClient { socketApp = None, ssl = None, proxy = None, - useAggregator = true, connectionPool = ConnectionPoolConfig.Disabled, maxHeaderSize = 8192, requestDecompression = Decompression.No, @@ -575,8 +605,7 @@ object ZClient { } final class ClientLive private (config: Config, driver: ClientDriver, connectionPool: ConnectionPool[Any]) - extends Client - with ClientRequestEncoder { self => + extends Client { self => def this(driver: ClientDriver)(connectionPool: ConnectionPool[driver.Connection])(settings: Config) = this(settings, driver, connectionPool.asInstanceOf[ConnectionPool[Any]]) @@ -664,7 +693,6 @@ object ZClient { request, onResponse, onComplete, - clientConfig.useAggregator, connectionPool.enableKeepAlive, () => clientConfig.socketApp.getOrElse(SocketApp()), ) diff --git a/zio-http/src/main/scala/zio/http/forms/StreamingForm.scala b/zio-http/src/main/scala/zio/http/forms/StreamingForm.scala index e4d56cb678..393617ea7c 100644 --- a/zio-http/src/main/scala/zio/http/forms/StreamingForm.scala +++ b/zio-http/src/main/scala/zio/http/forms/StreamingForm.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package zio.http.forms import java.nio.charset.Charset diff --git a/zio-http/src/main/scala/zio/http/html/EncodingState.scala b/zio-http/src/main/scala/zio/http/html/EncodingState.scala index dd06ef8752..4d143cf572 100644 --- a/zio-http/src/main/scala/zio/http/html/EncodingState.scala +++ b/zio-http/src/main/scala/zio/http/html/EncodingState.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package zio.http.html private[html] sealed trait EncodingState { diff --git a/zio-http/src/main/scala/zio/http/model/Header.scala b/zio-http/src/main/scala/zio/http/model/Header.scala index a539da4564..d0a3d1fe49 100644 --- a/zio-http/src/main/scala/zio/http/model/Header.scala +++ b/zio-http/src/main/scala/zio/http/model/Header.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package zio.http.model import java.net.URI diff --git a/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala b/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala new file mode 100644 index 0000000000..c75b8a1c05 --- /dev/null +++ b/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala @@ -0,0 +1,93 @@ +/* + * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.http.netty + +import zio.{Chunk, ChunkBuilder, Trace, Unsafe} + +import zio.http.netty.AsyncBodyReader.State +import zio.http.netty.NettyBody.UnsafeAsync + +import io.netty.buffer.ByteBufUtil +import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} +import io.netty.handler.codec.http.{HttpContent, LastHttpContent} + +abstract class AsyncBodyReader(implicit trace: Trace) extends SimpleChannelInboundHandler[HttpContent](true) { + + protected val unsafeClass: Unsafe = Unsafe.unsafe + + private var state: State = State.Buffering + private val buffer: ChunkBuilder[(Chunk[Byte], Boolean)] = ChunkBuilder.make[(Chunk[Byte], Boolean)]() + private var previousAutoRead: Boolean = false + private var ctx: ChannelHandlerContext = _ + + def connect(callback: UnsafeAsync): Unit = { + this.synchronized { + state match { + case State.Buffering => + state = State.Direct(callback) + buffer.result().foreach { case (chunk, isLast) => + callback(null, chunk, isLast) + } + ctx.read() + case State.Direct(_) => + throw new IllegalStateException("Cannot connect twice") + } + } + } + + override def handlerAdded(ctx: ChannelHandlerContext): Unit = { + previousAutoRead = ctx.channel().config().isAutoRead + ctx.channel().config().setAutoRead(false) + this.ctx = ctx + } + + override def handlerRemoved(ctx: ChannelHandlerContext): Unit = { + ctx.channel().config().setAutoRead(previousAutoRead) + } + + override def channelRead0( + ctx: ChannelHandlerContext, + msg: HttpContent, + ): Unit = { + val isLast = msg.isInstanceOf[LastHttpContent] + val chunk = Chunk.fromArray(ByteBufUtil.getBytes(msg.content())) + + this.synchronized { + state match { + case State.Buffering => + buffer += ((chunk, isLast)) + case State.Direct(callback) => + callback(ctx.channel(), chunk, isLast) + ctx.read() + } + } + + if (isLast) { + ctx.channel().pipeline().remove(this) + }: Unit + } +} + +object AsyncBodyReader { + sealed trait State + + object State { + case object Buffering extends State + + final case class Direct(callback: UnsafeAsync) extends State + } +} diff --git a/zio-http/src/main/scala/zio/http/netty/NettyBody.scala b/zio-http/src/main/scala/zio/http/netty/NettyBody.scala index 6452220e81..1c2dd7a9e9 100644 --- a/zio-http/src/main/scala/zio/http/netty/NettyBody.scala +++ b/zio-http/src/main/scala/zio/http/netty/NettyBody.scala @@ -123,7 +123,6 @@ object NettyBody extends BodyEncoding { case e: Throwable => emit(ZIO.fail(Option(e))) }, ) - .tap { case (ctx, _, isLast) => ZIO.attempt(ctx.read()).unless(isLast) } .takeUntil { case (_, _, isLast) => isLast } .map { case (_, msg, _) => msg } .flattenChunks diff --git a/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala b/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala index 6ae882594a..193cfda493 100644 --- a/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala +++ b/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala @@ -49,9 +49,7 @@ object NettyBodyWriter { ZIO.attempt { async { (ctx, msg, isLast) => ctx.writeAndFlush(msg) - val _ = - if (isLast) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) - else ctx.read() + if (isLast) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) } true } @@ -63,12 +61,14 @@ object NettyBodyWriter { case StreamBody(stream, _, _) => stream .runForeachChunk(chunk => - NettyFutureExecutor.executed( - ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(chunk.toArray))), - ), + NettyFutureExecutor.executed { + ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(chunk.toArray))) + }, ) - .flatMap { _ => - NettyFutureExecutor.executed(ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)).as(true) + .zipRight { + NettyFutureExecutor.executed { + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + }.as(true) } case ChunkBody(data, _, _) => ZIO.succeed { diff --git a/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala b/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala index 94b77cc45e..a4748c5853 100644 --- a/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala +++ b/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala @@ -16,13 +16,13 @@ package zio.http.netty -import zio.{Promise, Trace, Unsafe} +import zio.{Promise, Trace, Unsafe, ZIO} -import zio.http.Response import zio.http.Response.NativeResponse import zio.http.model.Header import zio.http.netty.client.{ChannelState, ClientResponseStreamHandler} import zio.http.netty.model.Conversions +import zio.http.{Body, Response} import io.netty.buffer.Unpooled import io.netty.channel.ChannelHandlerContext @@ -50,18 +50,30 @@ object NettyResponse { )(implicit unsafe: Unsafe, trace: Trace, - ): Response = { + ): ZIO[Any, Nothing, Response] = { val status = Conversions.statusFromNetty(jRes.status()) val headers = Conversions.headersFromNetty(jRes.headers()) - val data = NettyBody.fromAsync { callback => + + if (headers.get(Header.ContentLength).map(_.length).contains(0L)) { + onComplete + .succeed(ChannelState.Reusable) + .as( + new NativeResponse(Body.empty, headers, status, () => NettyFutureExecutor.executed(ctx.close())), + ) + } else { + val responseHandler = new ClientResponseStreamHandler(zExec, onComplete, keepAlive) ctx .pipeline() .addAfter( Names.ClientInboundHandler, Names.ClientStreamingBodyHandler, - new ClientResponseStreamHandler(callback, zExec, onComplete, keepAlive), + responseHandler, ): Unit + + val data = NettyBody.fromAsync { callback => + responseHandler.connect(callback) + } + ZIO.succeed(new NativeResponse(data, headers, status, () => NettyFutureExecutor.executed(ctx.close()))) } - new NativeResponse(data, headers, status, () => NettyFutureExecutor.executed(ctx.close())) } } diff --git a/zio-http/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala b/zio-http/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala index b3f71e4e82..bcc5ddf2c6 100644 --- a/zio-http/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala +++ b/zio-http/src/main/scala/zio/http/netty/client/ClientInboundHandler.scala @@ -18,82 +18,73 @@ package zio.http.netty.client import zio._ -import zio.http.Response -import zio.http.netty.{NettyFutureExecutor, NettyResponse, NettyRuntime} +import zio.http.netty.{NettyBodyWriter, NettyResponse, NettyRuntime} +import zio.http.{Request, Response} import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} -import io.netty.handler.codec.http.{FullHttpRequest, FullHttpResponse, HttpUtil} +import io.netty.handler.codec.http._ /** * Handles HTTP response */ final class ClientInboundHandler( - zExec: NettyRuntime, - jReq: FullHttpRequest, + rtm: NettyRuntime, + req: Request, + jReq: HttpRequest, onResponse: Promise[Throwable, Response], onComplete: Promise[Throwable, ChannelState], - isWebSocket: Boolean, enableKeepAlive: Boolean, )(implicit trace: Trace) - extends SimpleChannelInboundHandler[FullHttpResponse](true) { + extends SimpleChannelInboundHandler[HttpObject](false) { implicit private val unsafeClass: Unsafe = Unsafe.unsafe - override def channelActive(ctx: ChannelHandlerContext): Unit = { - if (isWebSocket) { - ctx.fireChannelActive() - () - } else { - sendRequest(ctx) - } + override def handlerAdded(ctx: ChannelHandlerContext): Unit = { + super.handlerAdded(ctx) } - private def sendRequest(ctx: ChannelHandlerContext): Unit = { - ctx.writeAndFlush(jReq) - () + override def channelActive(ctx: ChannelHandlerContext): Unit = { + sendRequest(ctx) } - override def channelRead0(ctx: ChannelHandlerContext, msg: FullHttpResponse): Unit = { - msg.touch("handlers.ClientInboundHandler-channelRead0") - // NOTE: The promise is made uninterruptible to be able to complete the promise in a error situation. - // It allows to avoid loosing the message from pipeline in case the channel pipeline is closed due to an error. - zExec.runUninterruptible(ctx, NettyRuntime.noopEnsuring) { - onResponse.succeed(NettyResponse.make(ctx, msg)) - }(unsafeClass, trace) + override def handlerRemoved(ctx: ChannelHandlerContext): Unit = super.handlerRemoved(ctx) - if (isWebSocket) { - ctx.fireChannelRead(msg.retain()) - ctx.pipeline().remove(ctx.name()): Unit + private def sendRequest(ctx: ChannelHandlerContext): Unit = { + jReq match { + case fullRequest: FullHttpRequest => + ctx.writeAndFlush(fullRequest) + case _: HttpRequest => + ctx.write(jReq) + rtm.run(ctx, NettyRuntime.noopEnsuring) { + NettyBodyWriter.write(req.body, ctx).unit + }(Unsafe.unsafe, trace) + ctx.flush(): Unit } + } - val shouldKeepAlive = enableKeepAlive && HttpUtil.isKeepAlive(msg) || isWebSocket + override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject): Unit = { + msg match { + case response: HttpResponse => + rtm.runUninterruptible(ctx, NettyRuntime.noopEnsuring) { + NettyResponse + .make( + ctx, + response, + rtm, + onComplete, + enableKeepAlive && HttpUtil.isKeepAlive(response), + ) + .flatMap(onResponse.succeed) + }(unsafeClass, trace) + case content: HttpContent => + ctx.fireChannelRead(content): Unit - if (!shouldKeepAlive) { - zExec.runUninterruptible(ctx, NettyRuntime.noopEnsuring)( - NettyFutureExecutor - .executed(ctx.close()) - .as(ChannelState.Invalid) - .exit - .flatMap(onComplete.done(_)), - )(unsafeClass, trace) - } else { - zExec.runUninterruptible(ctx, NettyRuntime.noopEnsuring)(onComplete.succeed(ChannelState.Reusable))( - unsafeClass, - trace, - ) + case err => throw new IllegalStateException(s"Client unexpected message type: $err") } - } override def exceptionCaught(ctx: ChannelHandlerContext, error: Throwable): Unit = { - zExec.runUninterruptible(ctx, NettyRuntime.noopEnsuring)( + rtm.runUninterruptible(ctx, NettyRuntime.noopEnsuring)( onResponse.fail(error) *> onComplete.fail(error), )(unsafeClass, trace) - releaseRequest() - } - - private def releaseRequest(): Unit = { - if (jReq.refCnt() > 0) { - jReq.release(jReq.refCnt()): Unit - } } } diff --git a/zio-http/src/main/scala/zio/http/netty/client/ClientInboundStreamingHandler.scala b/zio-http/src/main/scala/zio/http/netty/client/ClientInboundStreamingHandler.scala deleted file mode 100644 index a0dfe3bb05..0000000000 --- a/zio-http/src/main/scala/zio/http/netty/client/ClientInboundStreamingHandler.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package zio.http.netty.client - -import zio.{Promise, Trace, Unsafe} - -import zio.http.netty._ -import zio.http.netty.model.Conversions -import zio.http.{Request, Response} - -import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} -import io.netty.handler.codec.http._ - -final class ClientInboundStreamingHandler( - val rtm: NettyRuntime, - req: Request, - onResponse: Promise[Throwable, Response], - onComplete: Promise[Throwable, ChannelState], - enableKeepAlive: Boolean, -)(implicit trace: Trace) - extends SimpleChannelInboundHandler[HttpObject](false) { - - private implicit val unsafeClass: Unsafe = Unsafe.unsafe - - override def channelActive(ctx: ChannelHandlerContext): Unit = { - writeRequest(req, ctx): Unit - } - - override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject): Unit = { - msg match { - case response: HttpResponse => - ctx.channel().config().setAutoRead(false) - rtm.runUninterruptible(ctx, NettyRuntime.noopEnsuring) { - onResponse - .succeed( - NettyResponse.make( - ctx, - response, - rtm, - onComplete, - enableKeepAlive && HttpUtil.isKeepAlive(response), - ), - ) - }(unsafeClass, trace) - case content: HttpContent => - ctx.fireChannelRead(content): Unit - - case err => throw new IllegalStateException(s"Client unexpected message type: ${err}") - } - } - - override def exceptionCaught(ctx: ChannelHandlerContext, error: Throwable): Unit = { - rtm.runUninterruptible(ctx, NettyRuntime.noopEnsuring)( - onResponse.fail(error) *> onComplete.fail(error), - )(unsafeClass, trace) - } - - private def encodeRequest(req: Request): HttpRequest = { - val method = Conversions.methodToNetty(req.method) - val jVersion = Versions.convertToZIOToNetty(req.version) - - // As per the spec, the path should contain only the relative path. - // Host and port information should be in the headers. - val path = req.url.relative.encode - - val encodedReqHeaders = Conversions.headersToNetty(req.headers) - - val headers = req.url.hostPort match { - case Some(value) => encodedReqHeaders.set(HttpHeaderNames.HOST, value) - case None => encodedReqHeaders - } - - val h = headers - .add(HttpHeaderNames.TRANSFER_ENCODING, "chunked") - .add(HttpHeaderNames.USER_AGENT, "zhttp-client") - - new DefaultHttpRequest(jVersion, method, path, h) - - } - - private def writeRequest(msg: Request, ctx: ChannelHandlerContext): Unit = { - ctx.write(encodeRequest(msg)) - rtm.run(ctx, NettyRuntime.noopEnsuring) { - NettyBodyWriter.write(msg.body, ctx).unit - }(Unsafe.unsafe, trace) - ctx.flush(): Unit - } -} diff --git a/zio-http/src/main/scala/zio/http/netty/client/ClientRequestEncoder.scala b/zio-http/src/main/scala/zio/http/netty/client/ClientRequestEncoder.scala deleted file mode 100644 index b0a7d1b9b2..0000000000 --- a/zio-http/src/main/scala/zio/http/netty/client/ClientRequestEncoder.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package zio.http.netty.client - -import zio.{Task, Trace} - -import zio.http.Request -import zio.http.netty._ -import zio.http.netty.model.Conversions - -import io.netty.buffer.Unpooled -import io.netty.handler.codec.http.{DefaultFullHttpRequest, FullHttpRequest, HttpHeaderNames} - -trait ClientRequestEncoder { - - /** - * Converts client params to JFullHttpRequest - */ - def encode(req: Request)(implicit trace: Trace): Task[FullHttpRequest] = - req.body.asChunk.map { chunk => - val content = Unpooled.wrappedBuffer(chunk.toArray) - val method = Conversions.methodToNetty(req.method) - val jVersion = Versions.convertToZIOToNetty(req.version) - - // As per the spec, the path should contain only the relative path. - // Host and port information should be in the headers. - val path = req.url.relative.encode - - val encodedReqHeaders = Conversions.headersToNetty(req.headers) - - val headers = req.url.hostPort match { - case Some(host) => encodedReqHeaders.set(HttpHeaderNames.HOST, host) - case _ => encodedReqHeaders - } - - val writerIndex = content.writerIndex() - headers.set(HttpHeaderNames.CONTENT_LENGTH, writerIndex.toString) - - // TODO: we should also add a default user-agent req header as some APIs might reject requests without it. - val jReq = new DefaultFullHttpRequest(jVersion, method, path, content) - jReq.headers().set(headers) - - jReq - } -} diff --git a/zio-http/src/main/scala/zio/http/netty/client/ClientResponseStreamHandler.scala b/zio-http/src/main/scala/zio/http/netty/client/ClientResponseStreamHandler.scala index f296d7526e..519ccae6f0 100644 --- a/zio-http/src/main/scala/zio/http/netty/client/ClientResponseStreamHandler.scala +++ b/zio-http/src/main/scala/zio/http/netty/client/ClientResponseStreamHandler.scala @@ -16,42 +16,35 @@ package zio.http.netty.client -import zio.{Chunk, Promise, Trace, Unsafe} +import zio.{Promise, Trace} -import zio.http.netty.NettyBody.UnsafeAsync -import zio.http.netty.{NettyFutureExecutor, NettyRuntime} +import zio.http.netty.{AsyncBodyReader, NettyFutureExecutor, NettyRuntime} -import io.netty.buffer.ByteBufUtil import io.netty.channel._ import io.netty.handler.codec.http.{HttpContent, LastHttpContent} final class ClientResponseStreamHandler( - val callback: UnsafeAsync, - zExec: NettyRuntime, + rtm: NettyRuntime, onComplete: Promise[Throwable, ChannelState], keepAlive: Boolean, )(implicit trace: Trace) - extends SimpleChannelInboundHandler[HttpContent](false) { self => - - private val unsafeClass: Unsafe = Unsafe.unsafe + extends AsyncBodyReader { self => override def channelRead0( ctx: ChannelHandlerContext, msg: HttpContent, ): Unit = { val isLast = msg.isInstanceOf[LastHttpContent] - val chunk = Chunk.fromArray(ByteBufUtil.getBytes(msg.content())) - callback(ctx.channel(), chunk, isLast) - if (isLast) { - ctx.channel().pipeline().remove(self) + super.channelRead0(ctx, msg) + if (isLast) { if (keepAlive) - zExec.runUninterruptible(ctx, NettyRuntime.noopEnsuring)(onComplete.succeed(ChannelState.Reusable))( + rtm.runUninterruptible(ctx, NettyRuntime.noopEnsuring)(onComplete.succeed(ChannelState.Reusable))( unsafeClass, trace, ) else { - zExec.runUninterruptible(ctx, NettyRuntime.noopEnsuring)( + rtm.runUninterruptible(ctx, NettyRuntime.noopEnsuring)( NettyFutureExecutor .executed(ctx.close()) .as(ChannelState.Invalid) @@ -59,14 +52,10 @@ final class ClientResponseStreamHandler( .flatMap(onComplete.done(_)), )(unsafeClass, trace) } - }: Unit - } - - override def handlerAdded(ctx: ChannelHandlerContext): Unit = { - ctx.read(): Unit + } } override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { - zExec.runUninterruptible(ctx, NettyRuntime.noopEnsuring)(onComplete.fail(cause))(unsafeClass, trace) + rtm.runUninterruptible(ctx, NettyRuntime.noopEnsuring)(onComplete.fail(cause))(unsafeClass, trace) } } diff --git a/zio-http/src/main/scala/zio/http/netty/client/NettyClientDriver.scala b/zio-http/src/main/scala/zio/http/netty/client/NettyClientDriver.scala index c7f5c084c2..52d420e597 100644 --- a/zio-http/src/main/scala/zio/http/netty/client/NettyClientDriver.scala +++ b/zio-http/src/main/scala/zio/http/netty/client/NettyClientDriver.scala @@ -28,73 +28,51 @@ import zio.http.netty.socket.NettySocketProtocol import zio.http.socket.SocketApp import io.netty.channel.{Channel, ChannelFactory, ChannelHandler, EventLoopGroup} -import io.netty.handler.codec.http.HttpObjectAggregator import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler -import io.netty.handler.flow.FlowControlHandler +import io.netty.handler.codec.http.{FullHttpRequest, HttpObjectAggregator} final case class NettyClientDriver private ( channelFactory: ChannelFactory[Channel], eventLoopGroup: EventLoopGroup, nettyRuntime: NettyRuntime, clientConfig: NettyConfig, -) extends ClientDriver - with ClientRequestEncoder { +) extends ClientDriver { override type Connection = Channel - def requestOnChannel( + override def requestOnChannel( channel: Channel, location: URL.Location.Absolute, req: Request, onResponse: Promise[Throwable, Response], onComplete: Promise[Throwable, ChannelState], - useAggregator: Boolean, enableKeepAlive: Boolean, createSocketApp: () => SocketApp[Any], )(implicit trace: Trace): ZIO[Scope, Throwable, ChannelInterface] = { - encode(req).flatMap { jReq => - Scope.addFinalizerExit { exit => + NettyRequestEncoder.encode(req).flatMap { jReq => + Scope.addFinalizer { ZIO.attempt { - if (jReq.refCnt() > 0) { - jReq.release(jReq.refCnt()): Unit + jReq match { + case fullRequest: FullHttpRequest => + if (fullRequest.refCnt() > 0) + fullRequest.release(fullRequest.refCnt()) + case _ => } - }.ignore.when(exit.isFailure) + }.ignore }.as { val pipeline = channel.pipeline() val toRemove: mutable.Set[ChannelHandler] = new mutable.HashSet[ChannelHandler]() - // ObjectAggregator is used to work with FullHttpRequests and FullHttpResponses - // This is also required to make WebSocketHandlers work - if (useAggregator) { + if (location.scheme.isWebSocket) { val httpObjectAggregator = new HttpObjectAggregator(Int.MaxValue) - val clientInbound = - new ClientInboundHandler( - nettyRuntime, - jReq, - onResponse, - onComplete, - location.scheme.isWebSocket, - enableKeepAlive, - ) + val inboundHandler = new WebSocketClientInboundHandler(nettyRuntime, onResponse, onComplete) + pipeline.addLast(Names.HttpObjectAggregator, httpObjectAggregator) - pipeline.addLast(Names.ClientInboundHandler, clientInbound) + pipeline.addLast(Names.ClientInboundHandler, inboundHandler) toRemove.add(httpObjectAggregator) - toRemove.add(clientInbound) - } else { - val flowControl = new FlowControlHandler() - val clientInbound = - new ClientInboundStreamingHandler(nettyRuntime, req, onResponse, onComplete, enableKeepAlive) - - pipeline.addLast(Names.FlowControlHandler, flowControl) - pipeline.addLast(Names.ClientInboundHandler, clientInbound) + toRemove.add(inboundHandler) - toRemove.add(flowControl) - toRemove.add(clientInbound) - } - - // Add WebSocketHandlers if it's a `ws` or `wss` request - if (location.scheme.isWebSocket) { val headers = Conversions.headersToNetty(req.headers) val app = createSocketApp() val config = NettySocketProtocol @@ -127,6 +105,19 @@ final case class NettyClientDriver private ( NettyFutureExecutor.executed(channel.disconnect()) } } else { + val clientInbound = + new ClientInboundHandler( + nettyRuntime, + req, + jReq, + onResponse, + onComplete, + enableKeepAlive, + ) + + pipeline.addLast(Names.ClientInboundHandler, clientInbound) + + toRemove.add(clientInbound) pipeline.fireChannelRegistered() pipeline.fireChannelActive() diff --git a/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala b/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala new file mode 100644 index 0000000000..cbd9b86d65 --- /dev/null +++ b/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.http.netty.client + +import zio.{Task, Trace, ZIO} + +import zio.http.Request +import zio.http.netty._ +import zio.http.netty.model.Conversions + +import io.netty.buffer.Unpooled +import io.netty.handler.codec.http.{DefaultFullHttpRequest, DefaultHttpRequest, HttpHeaderNames, HttpRequest} + +private[zio] object NettyRequestEncoder { + + /** + * Converts a ZIO HTTP request to a Netty HTTP request. + */ + def encode(req: Request)(implicit trace: Trace): Task[HttpRequest] = { + val method = Conversions.methodToNetty(req.method) + val jVersion = Versions.convertToZIOToNetty(req.version) + + // As per the spec, the path should contain only the relative path. + // Host and port information should be in the headers. + val path = req.url.relative.encode + + val encodedReqHeaders = Conversions.headersToNetty(req.headers) + + val headers = req.url.hostPort match { + case Some(host) => encodedReqHeaders.set(HttpHeaderNames.HOST, host) + case _ => encodedReqHeaders + } + + if (req.body.isComplete) { + req.body.asChunk.map { chunk => + val content = Unpooled.wrappedBuffer(chunk.toArray) + + val writerIndex = content.writerIndex() + headers.set(HttpHeaderNames.CONTENT_LENGTH, writerIndex.toString) + + val jReq = new DefaultFullHttpRequest(jVersion, method, path, content) + jReq.headers().set(headers) + jReq + } + } else { + ZIO.attempt { + headers.set(HttpHeaderNames.TRANSFER_ENCODING, "chunked") + new DefaultHttpRequest(jVersion, method, path, headers) + } + } + } +} diff --git a/zio-http/src/main/scala/zio/http/netty/client/WebSocketClientInboundHandler.scala b/zio-http/src/main/scala/zio/http/netty/client/WebSocketClientInboundHandler.scala new file mode 100644 index 0000000000..b16d5c4296 --- /dev/null +++ b/zio-http/src/main/scala/zio/http/netty/client/WebSocketClientInboundHandler.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.http.netty.client + +import zio.{Promise, Trace, Unsafe} + +import zio.http.Response +import zio.http.netty.{NettyResponse, NettyRuntime} + +import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} +import io.netty.handler.codec.http.FullHttpResponse + +final class WebSocketClientInboundHandler( + rtm: NettyRuntime, + onResponse: Promise[Throwable, Response], + onComplete: Promise[Throwable, ChannelState], +)(implicit trace: Trace) + extends SimpleChannelInboundHandler[FullHttpResponse](true) { + implicit private val unsafeClass: Unsafe = Unsafe.unsafe + + override def channelActive(ctx: ChannelHandlerContext): Unit = { + ctx.fireChannelActive() + } + + override def channelRead0(ctx: ChannelHandlerContext, msg: FullHttpResponse): Unit = { + rtm.runUninterruptible(ctx, NettyRuntime.noopEnsuring) { + onResponse.succeed(NettyResponse.make(ctx, msg)) + }(unsafeClass, trace) + + ctx.fireChannelRead(msg.retain()) + ctx.pipeline().remove(ctx.name()): Unit + + rtm.runUninterruptible(ctx, NettyRuntime.noopEnsuring)(onComplete.succeed(ChannelState.Reusable))( + unsafeClass, + trace, + ) + } + + override def exceptionCaught(ctx: ChannelHandlerContext, error: Throwable): Unit = { + rtm.runUninterruptible(ctx, NettyRuntime.noopEnsuring)( + onResponse.fail(error) *> onComplete.fail(error), + )(unsafeClass, trace) + } +} diff --git a/zio-http/src/main/scala/zio/http/netty/package.scala b/zio-http/src/main/scala/zio/http/netty/package.scala index d6067a4149..19e152927e 100644 --- a/zio-http/src/main/scala/zio/http/netty/package.scala +++ b/zio-http/src/main/scala/zio/http/netty/package.scala @@ -37,7 +37,6 @@ package object netty { val WebSocketClientProtocolHandler = "WEB_SOCKET_CLIENT_PROTOCOL_HANDLER" val HttpRequestDecompression = "HTTP_REQUEST_DECOMPRESSION" val HttpResponseCompression = "HTTP_RESPONSE_COMPRESSION" - val LowLevelLogging = "LOW_LEVEL_LOGGING" val HttpContentHandler = "HTTP_CONTENT_HANDLER" val HttpRequestDecoder = "HTTP_REQUEST_DECODER" val HttpResponseEncoder = "HTTP_RESPONSE_ENCODER" diff --git a/zio-http/src/main/scala/zio/http/netty/server/ServerAsyncBodyHandler.scala b/zio-http/src/main/scala/zio/http/netty/server/ServerAsyncBodyHandler.scala index a8ebcadc0c..0b74930913 100644 --- a/zio-http/src/main/scala/zio/http/netty/server/ServerAsyncBodyHandler.scala +++ b/zio-http/src/main/scala/zio/http/netty/server/ServerAsyncBodyHandler.scala @@ -16,26 +16,6 @@ package zio.http.netty.server -import zio.Chunk +import zio.http.netty.AsyncBodyReader -import zio.http.netty.NettyBody - -import io.netty.buffer.ByteBufUtil -import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} -import io.netty.handler.codec.http.{HttpContent, LastHttpContent} - -private[zio] final class ServerAsyncBodyHandler(val async: NettyBody.UnsafeAsync) - extends SimpleChannelInboundHandler[HttpContent](true) { - self => - - override def channelRead0(ctx: ChannelHandlerContext, msg: HttpContent): Unit = { - val isLast = msg.isInstanceOf[LastHttpContent] - val chunk = Chunk.fromArray(ByteBufUtil.getBytes(msg.content())) - async(ctx.channel(), chunk, isLast) - if (isLast) ctx.channel().pipeline().remove(self): Unit - } - - override def handlerAdded(ctx: ChannelHandlerContext): Unit = { - ctx.read(): Unit - } -} +private[zio] final class ServerAsyncBodyHandler extends AsyncBodyReader {} diff --git a/zio-http/src/main/scala/zio/http/netty/server/ServerChannelInitializer.scala b/zio-http/src/main/scala/zio/http/netty/server/ServerChannelInitializer.scala index 816981619d..09dae6fd3d 100644 --- a/zio-http/src/main/scala/zio/http/netty/server/ServerChannelInitializer.scala +++ b/zio-http/src/main/scala/zio/http/netty/server/ServerChannelInitializer.scala @@ -19,6 +19,7 @@ package zio.http.netty.server import zio._ import zio.http.Server +import zio.http.Server.RequestStreaming import zio.http.netty.Names import zio.http.netty.model.Conversions @@ -26,7 +27,6 @@ import io.netty.channel.ChannelHandler.Sharable import io.netty.channel._ import io.netty.handler.codec.http.HttpObjectDecoder.{DEFAULT_MAX_CHUNK_SIZE, DEFAULT_MAX_INITIAL_LINE_LENGTH} import io.netty.handler.codec.http._ -import io.netty.handler.flow.FlowControlHandler import io.netty.handler.flush.FlushConsolidationHandler /** @@ -68,9 +68,11 @@ private[zio] final case class ServerChannelInitializer( }) // ObjectAggregator - // Always add ObjectAggregator - if (cfg.useAggregator) - pipeline.addLast(Names.HttpObjectAggregator, new HttpObjectAggregator(cfg.objectAggregator)) + cfg.requestStreaming match { + case RequestStreaming.Enabled => + case RequestStreaming.Disabled(maximumContentLength) => + pipeline.addLast(Names.HttpObjectAggregator, new HttpObjectAggregator(maximumContentLength)) + } // ExpectContinueHandler // Add expect continue handler is settings is true @@ -80,19 +82,11 @@ private[zio] final case class ServerChannelInitializer( // Add Keep-Alive handler is settings is true if (cfg.keepAlive) pipeline.addLast(Names.HttpKeepAliveHandler, new HttpServerKeepAliveHandler) - // FlowControlHandler - // Required because HttpObjectDecoder fires an HttpRequest that is immediately followed by a LastHttpContent event. - // For reference: https://netty.io/4.1/api/io/netty/handler/flow/FlowControlHandler.html - if (cfg.flowControl) pipeline.addLast(Names.FlowControlHandler, new FlowControlHandler()) - - // FlushConsolidationHandler - // Flushing content is done in batches. Can potentially improve performance. - if (cfg.consolidateFlush) pipeline.addLast(Names.HttpServerFlushConsolidation, new FlushConsolidationHandler) + pipeline.addLast(Names.HttpServerFlushConsolidation, new FlushConsolidationHandler()) // RequestHandler // Always add ZIO Http Request Handler pipeline.addLast(Names.HttpRequestHandler, reqHandler) - // TODO: find a different approach if (cfg.channelInitializer != null) { cfg.channelInitializer(pipeline) } () } @@ -101,10 +95,11 @@ private[zio] final case class ServerChannelInitializer( object ServerChannelInitializer { implicit val trace: Trace = Trace.empty - val layer = ZLayer.fromZIO { - for { - cfg <- ZIO.service[Server.Config] - handler <- ZIO.service[SimpleChannelInboundHandler[HttpObject]] - } yield ServerChannelInitializer(cfg, handler) - } + val layer: ZLayer[SimpleChannelInboundHandler[HttpObject] with Server.Config, Nothing, ServerChannelInitializer] = + ZLayer.fromZIO { + for { + cfg <- ZIO.service[Server.Config] + handler <- ZIO.service[SimpleChannelInboundHandler[HttpObject]] + } yield ServerChannelInitializer(cfg, handler) + } } diff --git a/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala b/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala index ea29c1b182..d388f89fed 100644 --- a/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala +++ b/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala @@ -27,14 +27,12 @@ import zio.http._ import zio.http.model._ import zio.http.netty._ import zio.http.netty.model.Conversions -import zio.http.netty.server.ServerInboundHandler.isReadKey import zio.http.netty.socket.NettySocketProtocol import io.netty.channel.ChannelHandler.Sharable import io.netty.channel._ import io.netty.handler.codec.http._ import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler -import io.netty.util.AttributeKey @Sharable private[zio] final case class ServerInboundHandler( @@ -98,17 +96,7 @@ private[zio] final case class ServerInboundHandler( } else app.runZIOOrNull(req) if (!attemptImmediateWrite(ctx, exit, time)) { - - if ( - jReq.method() == HttpMethod.TRACE || - jReq.headers().contains(HttpHeaderNames.CONTENT_LENGTH) || - jReq.headers().contains(HttpHeaderNames.TRANSFER_ENCODING) - ) - ctx.channel().config().setAutoRead(false) - - writeResponse(ctx, env, exit, jReq) { () => - val _ = ctx.channel().config().setAutoRead(true) - } + writeResponse(ctx, env, exit, jReq) { () => } } case msg: HttpContent => @@ -134,15 +122,13 @@ private[zio] final case class ServerInboundHandler( super.exceptionCaught(ctx, t) } - private def addAsyncBodyHandler(ctx: ChannelHandlerContext, async: NettyBody.UnsafeAsync): Unit = { - if (ctx.channel().attr(isReadKey).get()) - throw new RuntimeException("Unable to add the async body handler as the content has already been read.") - + private def addAsyncBodyHandler(ctx: ChannelHandlerContext): AsyncBodyReader = { + val handler = new ServerAsyncBodyHandler ctx .channel() .pipeline() - .addAfter(Names.HttpRequestHandler, Names.HttpContentHandler, new ServerAsyncBodyHandler(async)) - ctx.channel().attr(isReadKey).set(true) + .addAfter(Names.HttpRequestHandler, Names.HttpContentHandler, handler) + handler } private def attemptFastWrite( @@ -195,8 +181,6 @@ private[zio] final case class ServerInboundHandler( ZIO.succeed(true) _ <- ZIO.attempt(ctx.flush()).when(!flushed) } yield () - - _ <- ZIO.attempt(ctx.channel().attr(isReadKey).set(false)) } yield () } @@ -242,9 +226,10 @@ private[zio] final case class ServerInboundHandler( remoteAddress, ) case nettyReq: HttpRequest => - val body = NettyBody.fromAsync( + val handler = addAsyncBodyHandler(ctx) + val body = NettyBody.fromAsync( { async => - addAsyncBodyHandler(ctx, async) + handler.connect(async) }, contentType, ) @@ -358,8 +343,6 @@ private[zio] final case class ServerInboundHandler( object ServerInboundHandler { - private[zio] val isReadKey = AttributeKey.newInstance[Boolean]("IS_READ_KEY") - val live: ZLayer[ ServerTime with Server.Config with NettyRuntime with AppRef, Nothing, diff --git a/zio-http/src/test/scala/zio/http/ClientSpec.scala b/zio-http/src/test/scala/zio/http/ClientSpec.scala index 01aadac798..9f513096d9 100644 --- a/zio-http/src/test/scala/zio/http/ClientSpec.scala +++ b/zio-http/src/test/scala/zio/http/ClientSpec.scala @@ -72,7 +72,7 @@ object ClientSpec extends HttpRunnableSpec { override def spec = { suite("Client") { serve(DynamicServer.app).as(List(clientSpec)) - }.provideShared(DynamicServer.live, severTestLayer, Client.default, Scope.default) @@ + }.provideSomeShared[Scope](DynamicServer.live, severTestLayer, Client.default) @@ timeout(5 seconds) @@ sequential } } diff --git a/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala b/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala index 9996c7fad9..0c6ac36d99 100644 --- a/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala +++ b/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala @@ -16,41 +16,211 @@ package zio.http -import zio.test.Assertion.equalTo -import zio.test.TestAspect.{sequential, timeout} -import zio.test.{Spec, TestEnvironment, assertZIO} -import zio.{Scope, ZLayer, durationInt} +import zio._ +import zio.test.TestAspect.{nonFlaky, withLiveClock} +import zio.test.{Spec, TestEnvironment, assertTrue} -import zio.stream.ZStream +import zio.stream.{ZStream, ZStreamAspect} -import zio.http.internal.{DynamicServer, HttpRunnableSpec, severTestLayer} -import zio.http.model.Method +import zio.http.Server.RequestStreaming +import zio.http.internal.HttpRunnableSpec +import zio.http.model.{Headers, Method, Status, Version} import zio.http.netty.NettyConfig -import zio.http.netty.client.NettyClientDriver +import zio.http.netty.NettyConfig.LeakDetectionLevel object ClientStreamingSpec extends HttpRunnableSpec { - def clientStreamingSpec = suite("ClientStreamingSpec")( - test("streaming content from server - extended") { - val app = Http.collect[Request] { case req => Response(body = Body.fromStream(req.body.asStream)) } - val stream = ZStream.fromIterable(List("This ", "is ", "a ", "longer ", "text."), chunkSize = 1) - val res = app.deployChunked.body - .run(method = Method.POST, body = Body.fromStream(stream)) - .flatMap(_.asString) - assertZIO(res)(equalTo("This is a longer text.")) - }, - ) - - override def spec: Spec[TestEnvironment with Scope, Any] = suite("ClientProxy") { - serve(DynamicServer.app).as(List(clientStreamingSpec)) - }.provideShared( - DynamicServer.live, - severTestLayer, - Client.customized, - ZLayer.succeed(ZClient.Config.default.useObjectAggregator(false)), - NettyClientDriver.live, - DnsResolver.default, - ZLayer.succeed(NettyConfig.default), - ) @@ - timeout(5 seconds) @@ sequential + val app = Http + .collectZIO[Request] { + case Method.GET -> !! / "simple-get" => + ZIO.succeed(Response.text("simple response")) + case Method.GET -> !! / "streaming-get" => + ZIO.succeed( + Response(body = + Body.fromStream(ZStream.fromIterable("streaming response".getBytes) @@ ZStreamAspect.rechunk(3)), + ), + ) + case req @ Method.POST -> !! / "simple-post" => + req.ignoreBody.as(Response.ok) + case req @ Method.POST -> !! / "streaming-echo" => + ZIO.succeed(Response(body = Body.fromStream(req.body.asStream))) + } + .withDefaultErrorResponse + + // TODO: test failure cases + + private def tests(streamingServer: Boolean): Seq[Spec[Client, Throwable]] = + Seq( + test("simple get") { + for { + port <- server(streamingServer) + client <- ZIO.service[Client] + response <- client.request( + Version.Http_1_1, + Method.GET, + URL.decode(s"http://localhost:$port/simple-get").toOption.get, + Headers.empty, + Body.empty, + None, + ) + body <- response.body.asString + } yield assertTrue(response.status == Status.Ok, body == "simple response") + }, + test("streaming get") { + for { + port <- server(streamingServer) + client <- ZIO.service[Client] + response <- client.request( + Version.Http_1_1, + Method.GET, + URL.decode(s"http://localhost:$port/streaming-get").toOption.get, + Headers.empty, + Body.empty, + None, + ) + body <- response.body.asStream.chunks.map(chunk => new String(chunk.toArray)).runCollect + } yield assertTrue( + response.status == Status.Ok, + body == Chunk( + "str", + "eam", + "ing", + " re", + "spo", + "nse", + "", + ), + ) + }, + test("simple post") { + for { + port <- server(streamingServer) + client <- ZIO.service[Client] + response <- client + .request( + Version.Http_1_1, + Method.POST, + URL.decode(s"http://localhost:$port/simple-post").toOption.get, + Headers.empty, + Body.fromStream( + (ZStream.fromIterable("streaming request".getBytes) @@ ZStreamAspect.rechunk(3)) + .schedule(Schedule.fixed(10.millis)), + ), + None, + ) + } yield assertTrue(response.status == Status.Ok) + }, + test("echo") { + for { + port <- server(streamingServer) + client <- ZIO.service[Client] + response <- client + .request( + Version.Http_1_1, + Method.POST, + URL.decode(s"http://localhost:$port/streaming-echo").toOption.get, + Headers.empty, + Body.fromStream( + ZStream.fromIterable("streaming request".getBytes) @@ ZStreamAspect.rechunk(3), + ), + None, + ) + body <- response.body.asStream.chunks.map(chunk => new String(chunk.toArray)).runCollect + expectedBody = + if (streamingServer) + Chunk( + "str", + "eam", + "ing", + " re", + "que", + "st", + "", + ) + else + Chunk("streaming request", "") + } yield assertTrue( + response.status == Status.Ok, + body == expectedBody, + ) + }, + ) + + private def streamingOnlyTests = + Seq( + test("echo with sync point") { + for { + port <- server(streaming = true) + client <- ZIO.service[Client] + sync <- Promise.make[Nothing, Unit] + response <- client + .request( + Version.Http_1_1, + Method.POST, + URL.decode(s"http://localhost:$port/streaming-echo").toOption.get, + Headers.empty, + Body.fromStream( + (ZStream.fromIterable("streaming request".getBytes) @@ ZStreamAspect.rechunk(3)).chunks.tap { chunk => + if (chunk == Chunk.fromArray(" re".getBytes)) + sync.await + else + ZIO.unit + }.flattenChunks, + ), + None, + ) + body <- response.body.asStream.chunks + .map(chunk => new String(chunk.toArray)) + .tap { chunk => + if (chunk == "ing") sync.succeed(()) else ZIO.unit + } + .runCollect + expectedBody = + Chunk( + "str", + "eam", + "ing", + " re", + "que", + "st", + "", + ) + } yield assertTrue( + response.status == Status.Ok, + body == expectedBody, + ) + } @@ nonFlaky, + ) + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("Client streaming")( + suite("streaming server")( + tests(streamingServer = true) ++ + streamingOnlyTests: _*, + ), + suite("non-streaming server")( + tests(streamingServer = false): _*, + ), + ).provide( + Client.default, + ) @@ withLiveClock + + private def server(streaming: Boolean): ZIO[Any, Throwable, Int] = + for { + portPromise <- Promise.make[Throwable, Int] + _ <- Server + .install(app) + .intoPromise(portPromise) + .zipRight(ZIO.never) + .provide( + ZLayer.succeed(NettyConfig.default.leakDetection(LeakDetectionLevel.PARANOID)), + ZLayer.succeed( + Server.Config.default.onAnyOpenPort + .withRequestStreaming(if (streaming) RequestStreaming.Enabled else RequestStreaming.Disabled(1024)), + ), + Server.customized, + ) + .fork + port <- portPromise.await + } yield port } diff --git a/zio-http/src/test/scala/zio/http/DnsResolverSpec.scala b/zio-http/src/test/scala/zio/http/DnsResolverSpec.scala index 2e561238cd..81319210d3 100644 --- a/zio-http/src/test/scala/zio/http/DnsResolverSpec.scala +++ b/zio-http/src/test/scala/zio/http/DnsResolverSpec.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package zio.http import java.net.{InetAddress, UnknownHostException} diff --git a/zio-http/src/test/scala/zio/http/NettyMaxHeaderLengthSpec.scala b/zio-http/src/test/scala/zio/http/NettyMaxHeaderLengthSpec.scala index 0b09d7ebd8..8f3c79c6e2 100644 --- a/zio-http/src/test/scala/zio/http/NettyMaxHeaderLengthSpec.scala +++ b/zio-http/src/test/scala/zio/http/NettyMaxHeaderLengthSpec.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package zio.http import zio.test._ diff --git a/zio-http/src/test/scala/zio/http/RequestStreamingServerSpec.scala b/zio-http/src/test/scala/zio/http/RequestStreamingServerSpec.scala index 9b52c71162..dea1b59d0d 100644 --- a/zio-http/src/test/scala/zio/http/RequestStreamingServerSpec.scala +++ b/zio-http/src/test/scala/zio/http/RequestStreamingServerSpec.scala @@ -21,6 +21,7 @@ import zio.test.TestAspect.{diagnose, sequential, shrinks, timeout} import zio.test.assertZIO import zio.{Scope, ZIO, ZLayer, durationInt} +import zio.http.Server.RequestStreaming import zio.http.ServerSpec.requestBodySpec import zio.http.internal.{DynamicServer, HttpRunnableSpec} import zio.http.model.Status @@ -31,7 +32,7 @@ object RequestStreamingServerSpec extends HttpRunnableSpec { Server.Config.default .port(0) .requestDecompression(true) - .objectAggregator(-1) + .enableRequestStreaming private val appWithReqStreaming = serve(DynamicServer.app) diff --git a/zio-http/src/test/scala/zio/http/ServerSpec.scala b/zio-http/src/test/scala/zio/http/ServerSpec.scala index 46dc748315..a4d59057cd 100644 --- a/zio-http/src/test/scala/zio/http/ServerSpec.scala +++ b/zio-http/src/test/scala/zio/http/ServerSpec.scala @@ -26,6 +26,7 @@ import zio.{Chunk, Scope, ZIO, ZLayer, durationInt} import zio.stream.{ZPipeline, ZStream} +import zio.http.Server.RequestStreaming import zio.http.html.{body, div, id} import zio.http.internal.{DynamicServer, HttpGen, HttpRunnableSpec} import zio.http.model._ @@ -42,7 +43,7 @@ object ServerSpec extends HttpRunnableSpec { private val MaxSize = 1024 * 10 val configApp = Server.Config.default .requestDecompression(true) - .objectAggregator(MaxSize) + .disableRequestStreaming(MaxSize) .responseCompression() private val app = serve(DynamicServer.app) diff --git a/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala b/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala index d989dca60b..e8d68839e7 100644 --- a/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala +++ b/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala @@ -81,12 +81,15 @@ abstract class HttpRunnableSpec extends ZIOSpecDefault { self => for { port <- Handler.fromZIO(DynamicServer.port) id <- Handler.fromZIO(DynamicServer.deploy[R](app)) + client <- Handler.fromZIO(ZIO.service[Client]) response <- Handler.fromFunctionZIO[Request] { params => - Client.request( - params - .addHeader(DynamicServer.APP_ID, id) - .copy(url = URL(params.url.path, Location.Absolute(Scheme.HTTP, "localhost", port))), - ) + client + .request( + params + .addHeader(DynamicServer.APP_ID, id) + .copy(url = URL(params.url.path, Location.Absolute(Scheme.HTTP, "localhost", port))), + ) + .flatMap(_.collect) } } yield response } diff --git a/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala b/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala index 0e882d0a33..51a0b1f044 100644 --- a/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala +++ b/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala @@ -33,7 +33,8 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec { private val app = Http.collectZIO[Request] { case req @ Method.POST -> !! / "streaming" => ZIO.succeed(Response(body = Body.fromStream(req.body.asStream))) case Method.GET -> !! / "slow" => ZIO.sleep(1.hour).as(Response.text("done")) - case req => req.body.asString.map(Response.text(_)) + case req => + req.body.asString.map(Response.text(_)) } private val connectionCloseHeader = Headers(Header.Connection.Close) @@ -67,16 +68,17 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec { ) } @@ nonFlaky(10), test("streaming request") { - val res = ZIO.foreachPar((1 to N).toList) { idx => - val stream = ZStream.fromIterable(List("a", "b", "c-", idx.toString), chunkSize = 1) - app.deploy.body - .run( - method = Method.POST, - body = Body.fromStream(stream), - headers = extraHeaders, - ) - .flatMap(_.asString) - } + val res = ZIO + .foreachPar((1 to N).toList) { idx => + val stream = ZStream.fromIterable(List("a", "b", "c-", idx.toString), chunkSize = 1) + app.deploy.body + .run( + method = Method.POST, + body = Body.fromStream(stream), + headers = extraHeaders, + ) + .flatMap(_.asString) + } val expected = (1 to N).map(idx => s"abc-$idx").toList assertZIO(res)(equalTo(expected)) } @@ nonFlaky(10), @@ -210,7 +212,7 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec { ) override def spec: Spec[Scope, Throwable] = { - connectionPoolSpec @@ timeout(30.seconds) @@ diagnose(30.seconds) @@ sequential @@ withLiveClock + connectionPoolSpec @@ timeout(30.seconds) @@ sequential @@ withLiveClock } } diff --git a/zio-http/src/test/scala/zio/http/netty/client/ClientRequestEncoderSpec.scala b/zio-http/src/test/scala/zio/http/netty/client/NettyRequestEncoderSpec.scala similarity index 97% rename from zio-http/src/test/scala/zio/http/netty/client/ClientRequestEncoderSpec.scala rename to zio-http/src/test/scala/zio/http/netty/client/NettyRequestEncoderSpec.scala index cfd2f960a4..614e194143 100644 --- a/zio-http/src/test/scala/zio/http/netty/client/ClientRequestEncoderSpec.scala +++ b/zio-http/src/test/scala/zio/http/netty/client/NettyRequestEncoderSpec.scala @@ -26,7 +26,8 @@ import zio.http.{Body, Request} import io.netty.handler.codec.http.HttpHeaderNames -object ClientRequestEncoderSpec extends ZIOSpecDefault with ClientRequestEncoder { +object NettyRequestEncoderSpec extends ZIOSpecDefault { + import NettyRequestEncoder._ val anyClientParam: Gen[Sized, Request] = HttpGen.requestGen( HttpGen.body(