Skip to content

Commit

Permalink
Fix: Request Streaming (#1242)
Browse files Browse the repository at this point in the history
* chore: run tests in parallel

* fix: request body issues

* test: fix server spec test for decompression
  • Loading branch information
tusharmath committed May 8, 2022
1 parent ece26f2 commit daca599
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
24 changes: 13 additions & 11 deletions zio-http/src/main/scala/zhttp/service/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,20 @@ private[zhttp] final case class Handler[R](
case throwable: Throwable => resWriter.write(throwable, jReq)
}
case jReq: HttpRequest =>
if (canHaveBody(jReq)) {
ctx.channel().config().setAutoRead(false): Unit
}
val hasBody = canHaveBody(jReq)
if (hasBody) ctx.channel().config().setAutoRead(false): Unit
try
unsafeRun(
jReq,
app,
new Request {
override def data: HttpData = HttpData.UnsafeAsync(callback =>
ctx
.pipeline()
.addAfter(HTTP_REQUEST_HANDLER, HTTP_CONTENT_HANDLER, new RequestBodyHandler(callback)): Unit,
)
override def data: HttpData = if (hasBody) asyncData else HttpData.empty
private final def asyncData =
HttpData.UnsafeAsync(callback =>
ctx
.pipeline()
.addAfter(HTTP_REQUEST_HANDLER, HTTP_CONTENT_HANDLER, new RequestBodyHandler(callback)): Unit,
)

override def headers: Headers = Headers.make(jReq.headers())

Expand Down Expand Up @@ -101,9 +102,10 @@ private[zhttp] final case class Handler[R](
}
}

private def canHaveBody(req: HttpRequest): Boolean = req.method() match {
case HttpMethod.GET | HttpMethod.HEAD | HttpMethod.OPTIONS | HttpMethod.TRACE => false
case _ => true
private def canHaveBody(req: HttpRequest): Boolean = {
req.method() == HttpMethod.TRACE ||
req.headers().get(HttpHeaderNames.CONTENT_LENGTH) != null ||
req.headers().get(HttpHeaderNames.TRANSFER_ENCODING) != null
}

/**
Expand Down
7 changes: 4 additions & 3 deletions zio-http/src/test/scala/zhttp/service/ServerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object ServerSpec extends HttpRunnableSpec {

private val app =
serve(DynamicServer.app, Some(Server.requestDecompression(true) ++ Server.enableObjectAggregator(4096)))
private val appWithReqStreaming = serve(DynamicServer.app, None)
private val appWithReqStreaming = serve(DynamicServer.app, Some(Server.requestDecompression(true)))

def dynamicAppSpec = suite("DynamicAppSpec") {
suite("success") {
Expand Down Expand Up @@ -164,7 +164,7 @@ object ServerSpec extends HttpRunnableSpec {
}
}

def responseSpec = suite("ResponseSpec") {
def responseSpec = suite("ResponseSpec") {
testM("data") {
checkAllM(nonEmptyContent) { case (string, data) =>
val res = Http.fromData(data).deploy.bodyAsString.run()
Expand Down Expand Up @@ -258,6 +258,7 @@ object ServerSpec extends HttpRunnableSpec {
}
}
}

def requestBodySpec = suite("RequestBodySpec") {
testM("POST Request stream") {
val app: Http[Any, Throwable, Request, Response] = Http.collect[Request] { case req =>
Expand Down Expand Up @@ -298,6 +299,6 @@ object ServerSpec extends HttpRunnableSpec {
val spec = dynamicAppSpec + responseSpec + requestSpec + requestBodySpec + serverErrorSpec
suiteM("app without request streaming") { app.as(List(spec)).useNow } +
suiteM("app with request streaming") { appWithReqStreaming.as(List(spec)).useNow }
}.provideCustomLayerShared(env) @@ timeout(30 seconds) @@ sequential
}.provideCustomLayerShared(env) @@ timeout(20 seconds)

}

0 comments on commit daca599

Please sign in to comment.