Skip to content

Commit

Permalink
Real support for streaming in Client (#2083)
Browse files Browse the repository at this point in the history
* Real support for streaming, first step

* Leak fix, withDisabledStreaming

* Fixes

* Fixes and debug prints

* Fixes and cleanup

* Remove some low-level configuration from Server.Config

* Remove isRead attribute

* Readded FlushConsolidationHandler

* Fix for 2.12

* Change position of flush consolidation handler
  • Loading branch information
vigoo authored Apr 9, 2023
1 parent 8140c9f commit aa1750a
Show file tree
Hide file tree
Showing 37 changed files with 799 additions and 467 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object Dependencies {
val NettyVersion = "4.1.91.Final"
val NettyIncubatorVersion = "0.0.19.Final"
val ScalaCompactCollectionVersion = "2.8.1"
val ZioVersion = "2.0.10+55-a63d1724-SNAPSHOT"
val ZioVersion = "2.0.11"
val ZioSchemaVersion = "0.4.8"
val SttpVersion = "3.3.18"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package example

import zio._

import zio.http.Server.RequestStreaming
import zio.http._
import zio.http.model.Header
import zio.http.netty.NettyConfig
Expand Down Expand Up @@ -38,13 +39,11 @@ object PlainTextBenchmarkServer extends ZIOAppDefault {
private def jsonApp(json: Response): HttpApp[Any, Nothing] =
Handler.response(json).toHttp.whenPathEq(jsonPath)

val app = plainTextApp(frozenPlainTextResponse) ++ jsonApp(frozenJsonResponse)
val app: App[Any] = plainTextApp(frozenPlainTextResponse) ++ jsonApp(frozenJsonResponse)

private val config = Server.Config.default
.port(8080)
.consolidateFlush(true)
.flowControl(false)
.objectAggregator(-1)
.enableRequestStreaming

private val nettyConfig = NettyConfig.default
.leakDetection(LeakDetectionLevel.DISABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package example

import zio._

import zio.http.Server.RequestStreaming
import zio.http._
import zio.http.model.{Header, Method}
import zio.http.netty.NettyConfig
Expand Down Expand Up @@ -38,9 +39,7 @@ object SimpleEffectBenchmarkServer extends ZIOAppDefault {

private val config = Server.Config.default
.port(8080)
.consolidateFlush(true)
.flowControl(false)
.objectAggregator(-1)
.enableRequestStreaming

private val nettyConfig = NettyConfig.default
.leakDetection(LeakDetectionLevel.DISABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ object WebSocketSimpleClient extends ZIOAppDefault {
ZIO.succeed(println("Goodbye!")) *> ch.writeAndFlush(WebSocketFrame.close(1000))
}

val app: ZIO[Any with Client with Scope, Throwable, Response] =
val app: ZIO[Client with Scope, Throwable, Response] =
httpSocket.toSocketApp.connect(url) *> ZIO.never

val run = app.provide(Client.default, Scope.default)
val run: ZIO[ZIOAppArgs with Scope, Throwable, Any] =
ZIO.scoped {
app.provideSome[Scope](Client.default)
}

}
1 change: 0 additions & 1 deletion zio-http/src/main/scala/zio/http/ClientDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ trait ClientDriver {
req: Request,
onResponse: Promise[Throwable, Response],
onComplete: Promise[Throwable, ChannelState],
useAggregator: Boolean,
enableKeepAlive: Boolean,
createSocketApp: () => SocketApp[Any],
)(implicit trace: Trace): ZIO[Scope, Throwable, ChannelInterface]
Expand Down
16 changes: 16 additions & 0 deletions zio-http/src/main/scala/zio/http/DnsResolver.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2021 - 2023 Sporta Technologies PVT LTD & the ZIO HTTP contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zio.http

import java.net.{InetAddress, UnknownHostException}
Expand Down
16 changes: 16 additions & 0 deletions zio-http/src/main/scala/zio/http/Request.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package zio.http

import java.net.InetAddress

import zio.ZIO

import zio.http.model._
import zio.http.model.headers._

Expand All @@ -35,11 +37,25 @@ final case class Request(
*/
def addTrailingSlash: Request = self.copy(url = self.url.addTrailingSlash)

/**
* Collects the potentially streaming body of the request into a single chunk.
*/
def collect: ZIO[Any, Throwable, Request] =
if (self.body.isComplete) ZIO.succeed(self)
else
self.body.asChunk.map { bytes =>
self.copy(body = Body.fromChunk(bytes))
}

/**
* Drops trailing slash from the path.
*/
def dropTrailingSlash: Request = self.copy(url = self.url.dropTrailingSlash)

/** Consumes the streaming body fully and then drops it */
def ignoreBody: ZIO[Any, Throwable, Request] =
self.collect.map(_.copy(body = Body.empty))

def patch(p: Request.Patch): Request =
Request(
body,
Expand Down
15 changes: 15 additions & 0 deletions zio-http/src/main/scala/zio/http/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ sealed trait Response extends HeaderOps[Response] { self =>

def body: Body

/**
* Collects the potentially streaming body of the response into a single
* chunk.
*/
def collect: ZIO[Any, Throwable, Response] =
if (self.body.isComplete) ZIO.succeed(self)
else
self.body.asChunk.map { bytes =>
self.copy(body = Body.fromChunk(bytes))
}

def copy(
status: Status = self.status,
headers: Headers = self.headers,
Expand Down Expand Up @@ -86,6 +97,10 @@ sealed trait Response extends HeaderOps[Response] { self =>
case _ => None
}

/** Consumes the streaming body fully and then drops it */
final def ignoreBody: ZIO[Any, Throwable, Response] =
self.collect.map(_.copy(body = Body.empty))

final def isWebSocket: Boolean = self match {
case _: SocketAppResponse => self.status == Status.SwitchingProtocols
case _ => false
Expand Down
88 changes: 46 additions & 42 deletions zio-http/src/main/scala/zio/http/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,12 @@ object Server {
address: InetSocketAddress,
acceptContinue: Boolean,
keepAlive: Boolean,
consolidateFlush: Boolean,
flowControl: Boolean,
requestDecompression: Decompression,
responseCompression: Option[ResponseCompressionConfig],
objectAggregator: Int,
requestStreaming: RequestStreaming,
maxHeaderSize: Int,
) {
self =>
def useAggregator: Boolean = objectAggregator >= 0

/**
* Configure the server to use HttpServerExpectContinueHandler to send a 100
Expand All @@ -83,17 +80,14 @@ object Server {
def binding(inetSocketAddress: InetSocketAddress): Config = self.copy(address = inetSocketAddress)

/**
* Configure the server to use FlushConsolidationHandler to control the
* flush operations in a more efficient way if enabled (@see <a
* href="https://netty.io/4.1/api/io/netty/handler/flush/FlushConsolidationHandler.html">FlushConsolidationHandler<a>).
* Disables streaming of request bodies. Payloads larger than
* maxContentLength will be rejected
*/
def consolidateFlush(enable: Boolean): Config = self.copy(consolidateFlush = enable)
def disableRequestStreaming(maxContentLength: Int): Config =
self.copy(requestStreaming = RequestStreaming.Disabled(maxContentLength))

/**
* Configure the server to use netty FlowControlHandler if enable (@see <a
* href="https://netty.io/4.1/api/io/netty/handler/flow/FlowControlHandler.html">FlowControlHandler</a>).
*/
def flowControl(enable: Boolean): Config = self.copy(flowControl = enable)
/** Enables streaming request bodies */
def enableRequestStreaming: Config = self.copy(requestStreaming = RequestStreaming.Enabled)

/**
* Configure the server to use netty's HttpServerKeepAliveHandler to close
Expand All @@ -103,11 +97,10 @@ object Server {
def keepAlive(enable: Boolean): Config = self.copy(keepAlive = enable)

/**
* Configure the server to use HttpObjectAggregator with the specified max
* size of the aggregated content.
* Configure the server to use `maxHeaderSize` value when encode/decode
* headers.
*/
def objectAggregator(maxRequestSize: Int = 1024 * 100): Config =
self.copy(objectAggregator = maxRequestSize)
def maxHeaderSize(headerSize: Int): Config = self.copy(maxHeaderSize = headerSize)

/**
* Configure the server to listen on an available open port
Expand All @@ -119,14 +112,6 @@ object Server {
*/
def port(port: Int): Config = self.copy(address = new InetSocketAddress(port))

/**
* Configure the server to use netty's HttpContentDecompressor to decompress
* Http requests (@see <a href =
* "https://netty.io/4.1/api/io/netty/handler/codec/http/HttpContentDecompressor.html">HttpContentDecompressor</a>).
*/
def requestDecompression(isStrict: Boolean): Config =
self.copy(requestDecompression = if (isStrict) Decompression.Strict else Decompression.NonStrict)

/**
* Configure the new server with netty's HttpContentCompressor to compress
* Http responses (@see <a href =
Expand All @@ -136,15 +121,21 @@ object Server {
self.copy(responseCompression = Option(rCfg))

/**
* Configure the server with the following ssl options.
* Configure the server to use netty's HttpContentDecompressor to decompress
* Http requests (@see <a href =
* "https://netty.io/4.1/api/io/netty/handler/codec/http/HttpContentDecompressor.html">HttpContentDecompressor</a>).
*/
def ssl(sslConfig: SSLConfig): Config = self.copy(sslConfig = Some(sslConfig))
def requestDecompression(isStrict: Boolean): Config =
self.copy(requestDecompression = if (isStrict) Decompression.Strict else Decompression.NonStrict)

/**
* Configure the server to use `maxHeaderSize` value when encode/decode
* headers.
* Configure the server with the following ssl options.
*/
def maxHeaderSize(headerSize: Int): Config = self.copy(maxHeaderSize = headerSize)
def ssl(sslConfig: SSLConfig): Config = self.copy(sslConfig = Some(sslConfig))

/** Enables or disables request body streaming */
def withRequestStreaming(requestStreaming: RequestStreaming): Config =
self.copy(requestStreaming = requestStreaming)
}

object Config {
Expand All @@ -154,11 +145,9 @@ object Server {
zio.Config.int("binding-port").withDefault(Config.default.address.getPort) ++
zio.Config.boolean("accept-continue").withDefault(Config.default.acceptContinue) ++
zio.Config.boolean("keep-alive").withDefault(Config.default.keepAlive) ++
zio.Config.boolean("consolidate-flush").withDefault(Config.default.consolidateFlush) ++
zio.Config.boolean("flow-control").withDefault(Config.default.flowControl) ++
Decompression.config.nested("request-decompression").withDefault(Config.default.requestDecompression) ++
ResponseCompressionConfig.config.nested("response-compression").optional ++
zio.Config.int("max-aggregated-request-size").withDefault(Config.default.objectAggregator) ++
RequestStreaming.config.nested("request-streaming").withDefault(Config.default.requestStreaming) ++
zio.Config.int("max-header-size").withDefault(Config.default.maxHeaderSize)
}.map {
case (
Expand All @@ -167,23 +156,19 @@ object Server {
port,
acceptContinue,
keepAlive,
consolidateFlush,
flowControl,
requestDecompression,
responseCompression,
objectAggregator,
requestStreaming,
maxHeaderSize,
) =>
Config(
sslConfig = sslConfig,
address = new InetSocketAddress(host.getOrElse(Config.default.address.getHostName), port),
acceptContinue = acceptContinue,
keepAlive = keepAlive,
consolidateFlush = consolidateFlush,
flowControl = flowControl,
requestDecompression = requestDecompression,
responseCompression = responseCompression,
objectAggregator = objectAggregator,
requestStreaming = requestStreaming,
maxHeaderSize = maxHeaderSize,
)
}
Expand All @@ -193,11 +178,9 @@ object Server {
address = new InetSocketAddress(8080),
acceptContinue = false,
keepAlive = true,
consolidateFlush = false,
flowControl = true,
requestDecompression = Decompression.No,
responseCompression = None,
objectAggregator = 1024 * 100,
requestStreaming = RequestStreaming.Disabled(1024 * 100),
maxHeaderSize = 8192,
)

Expand Down Expand Up @@ -286,6 +269,27 @@ object Server {
}
}

sealed trait RequestStreaming

object RequestStreaming {

/** Enable streaming request bodies */
case object Enabled extends RequestStreaming

/**
* Disable streaming request bodies. Bodies larger than the configured
* maximum content length will be rejected.
*/
final case class Disabled(maximumContentLength: Int) extends RequestStreaming

lazy val config: zio.Config[RequestStreaming] =
(zio.Config.boolean("enabled").withDefault(true) ++
zio.Config.int("maximum-content-length").withDefault(1024 * 100)).map {
case (true, _) => Enabled
case (false, maxLength) => Disabled(maxLength)
}
}

def serve[R](
httpApp: App[R],
)(implicit trace: Trace): URIO[R with Server, Nothing] =
Expand Down
Loading

0 comments on commit aa1750a

Please sign in to comment.