Skip to content

Commit

Permalink
Feature: Request Streaming (#1048)
Browse files Browse the repository at this point in the history
* introduce `Incoming` and `Outgoing` inHttpData

* streaming support

* benchmark disable objectAggregator

* cleanup

* refactor

* cleanup + PR comments

* cleanup + PR comments

* cleanup + PR comments

* refactor: rename variable

* memory leak

* refactor: Handler now extends ChannelInboundHandlerAdapter

* refactor: remove unused methods from UnsafeChannel

* remove bodyAsCharSequenceStream operator

* refactor: remove unnecessary methods on HttpData

* refactor: re-implement `bodyAsStream`

* refactor: remove unsafe modification of pipeline from HttpData

* refactor: rename HttpData types

* fix 2.12 build

* refactor: remove type param

* PR comment

* PR comment

* refaector: simplify releaseRequest

* refactor: reorder methods in ServerResponseHandler

* refactor: make methods final

* refactor: rename HttpData traits

* add `bodyAsByteArray` and derive `body` and `bodyAsString` from it.

* add test: should throw error for HttpData.Incoming

* Introduce `useAggregator` method on settings and use it everywhere

* remove sharable from `ServerResponseHandler`

* Update zio-http/src/main/scala/zhttp/http/Request.scala

* refactor: remove unnecessary pattern matching

* throw exception on unknown message type

* simplify test

* refactor: change order of ContentHandler. Move it before the RequestHandler

* test: update test structure

* refactor: move pattern match logic to WebSocketUpgrade

* revert addBefore Change because of degrade in performance (#1089)

* fix static server issue with streaming

* take case of auto read if body is not used

* autoRead when needed

* Update zio-http/src/main/scala/zhttp/service/RequestBodyHandler.scala

* Update zio-http/src/main/scala/zhttp/http/Response.scala

* Update zio-http/src/main/scala/zhttp/http/Response.scala

* remove test which is not used

* Update zio-http/src/main/scala/zhttp/service/Handler.scala

Co-authored-by: Shrey Mehta <36622672+smehta91@users.noreply.github.com>

* Update zio-http/src/main/scala/zhttp/service/Handler.scala

Co-authored-by: Shrey Mehta <36622672+smehta91@users.noreply.github.com>

* style: fmt

* exclude Head in 404 check

Co-authored-by: Tushar Mathur <tusharmath@gmail.com>
Co-authored-by: Shrey Mehta <36622672+smehta91@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 1, 2022
1 parent 69788f3 commit 8e4f6bf
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 164 deletions.
120 changes: 85 additions & 35 deletions zio-http/src/main/scala/zhttp/http/HttpData.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package zhttp.http

import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.{HttpContent, LastHttpContent}
import zio.blocking.Blocking.Service.live.effectBlocking
import zio.stream.ZStream
import zio.{Chunk, Task, UIO}
import zio.{Chunk, Task, UIO, ZIO}

import java.io.FileInputStream
import java.nio.charset.Charset
Expand All @@ -16,38 +18,30 @@ sealed trait HttpData { self =>
/**
* Returns true if HttpData is a stream
*/
def isChunked: Boolean = self match {
final def isChunked: Boolean = self match {
case HttpData.BinaryStream(_) => true
case _ => false
}

/**
* Returns true if HttpData is empty
*/
def isEmpty: Boolean = self match {
final def isEmpty: Boolean = self match {
case HttpData.Empty => true
case _ => false
}

def toByteBuf: Task[ByteBuf] = {
final def toByteBuf: Task[ByteBuf] = {
self match {
case HttpData.Text(text, charset) => UIO(Unpooled.copiedBuffer(text, charset))
case HttpData.BinaryChunk(data) => UIO(Unpooled.copiedBuffer(data.toArray))
case HttpData.BinaryByteBuf(data) => UIO(data)
case HttpData.Empty => UIO(Unpooled.EMPTY_BUFFER)
case HttpData.BinaryStream(stream) =>
stream
.asInstanceOf[ZStream[Any, Throwable, ByteBuf]]
.fold(Unpooled.compositeBuffer())((c, b) => c.addComponent(b))
case HttpData.RandomAccessFile(raf) =>
effectBlocking {
val fis = new FileInputStream(raf().getFD)
val fileContent: Array[Byte] = new Array[Byte](raf().length().toInt)
fis.read(fileContent)
Unpooled.copiedBuffer(fileContent)
}
case self: HttpData.Incoming => self.encode
case self: HttpData.Outgoing => self.encode
}
}

final def toByteBufStream: ZStream[Any, Throwable, ByteBuf] = self match {
case self: HttpData.Incoming => self.encodeAsStream
case self: HttpData.Outgoing => ZStream.fromEffect(self.encode)
}
}

object HttpData {
Expand All @@ -68,33 +62,89 @@ object HttpData {
def fromChunk(data: Chunk[Byte]): HttpData = BinaryChunk(data)

/**
* Helper to create HttpData from Stream of bytes
* Helper to create HttpData from contents of a file
*/
def fromStream(stream: ZStream[Any, Throwable, Byte]): HttpData =
HttpData.BinaryStream(stream.mapChunks(chunks => Chunk(Unpooled.copiedBuffer(chunks.toArray))))
def fromFile(file: => java.io.File): HttpData = {
RandomAccessFile(() => new java.io.RandomAccessFile(file, "r"))
}

/**
* Helper to create HttpData from Stream of string
*/
def fromStream(stream: ZStream[Any, Throwable, String], charset: Charset = HTTP_CHARSET): HttpData =
HttpData.BinaryStream(stream.map(str => Unpooled.copiedBuffer(str, charset)))
def fromStream(stream: ZStream[Any, Throwable, CharSequence], charset: Charset = HTTP_CHARSET): HttpData =
HttpData.BinaryStream(stream.map(str => Unpooled.wrappedBuffer(str.toString.getBytes(charset))))

/**
* Helper to create HttpData from String
* Helper to create HttpData from Stream of bytes
*/
def fromString(text: String, charset: Charset = HTTP_CHARSET): HttpData = Text(text, charset)
def fromStream(stream: ZStream[Any, Throwable, Byte]): HttpData =
HttpData.BinaryStream(stream.mapChunks(chunks => Chunk(Unpooled.wrappedBuffer(chunks.toArray))))

/**
* Helper to create HttpData from contents of a file
* Helper to create HttpData from String
*/
def fromFile(file: => java.io.File): HttpData = {
RandomAccessFile(() => new java.io.RandomAccessFile(file, "r"))
def fromString(text: String, charset: Charset = HTTP_CHARSET): HttpData = Text(text, charset)

private[zhttp] sealed trait Outgoing extends HttpData { self =>
def encode: ZIO[Any, Throwable, ByteBuf] =
self match {
case HttpData.Text(text, charset) => UIO(Unpooled.copiedBuffer(text, charset))
case HttpData.BinaryChunk(data) => UIO(Unpooled.copiedBuffer(data.toArray))
case HttpData.BinaryByteBuf(data) => UIO(data)
case HttpData.Empty => UIO(Unpooled.EMPTY_BUFFER)
case HttpData.BinaryStream(stream) =>
stream
.asInstanceOf[ZStream[Any, Throwable, ByteBuf]]
.fold(Unpooled.compositeBuffer())((c, b) => c.addComponent(b))
case HttpData.RandomAccessFile(raf) =>
effectBlocking {
val fis = new FileInputStream(raf().getFD)
val fileContent: Array[Byte] = new Array[Byte](raf().length().toInt)
fis.read(fileContent)
Unpooled.copiedBuffer(fileContent)
}
}
}

private[zhttp] final class UnsafeContent(private val httpContent: HttpContent) extends AnyVal {
def content: ByteBuf = httpContent.content()

def isLast: Boolean = httpContent.isInstanceOf[LastHttpContent]
}

private[zhttp] final class UnsafeChannel(private val ctx: ChannelHandlerContext) extends AnyVal {
def read(): Unit = ctx.read(): Unit
}

private[zhttp] final case class Incoming(unsafeRun: (UnsafeChannel => UnsafeContent => Unit) => Unit)
extends HttpData {
def encode: ZIO[Any, Nothing, ByteBuf] = for {
body <- ZIO.effectAsync[Any, Nothing, ByteBuf](cb =>
unsafeRun(ch => {
val buffer = Unpooled.compositeBuffer()
msg => {
buffer.addComponent(true, msg.content)
if (msg.isLast) cb(UIO(buffer)) else ch.read()
}
}),
)
} yield body

def encodeAsStream: ZStream[Any, Nothing, ByteBuf] = ZStream
.effectAsync[Any, Nothing, ByteBuf](cb =>
unsafeRun(ch =>
msg => {
cb(ZIO.succeed(Chunk(msg.content)))
if (msg.isLast) cb(ZIO.fail(None)) else ch.read()
},
),
)
}

private[zhttp] final case class Text(text: String, charset: Charset) extends HttpData
private[zhttp] final case class BinaryChunk(data: Chunk[Byte]) extends HttpData
private[zhttp] final case class BinaryByteBuf(data: ByteBuf) extends HttpData
private[zhttp] final case class BinaryStream(stream: ZStream[Any, Throwable, ByteBuf]) extends HttpData
private[zhttp] final case class RandomAccessFile(unsafeGet: () => java.io.RandomAccessFile) extends HttpData
private[zhttp] case object Empty extends HttpData
private[zhttp] final case class Text(text: String, charset: Charset) extends Outgoing
private[zhttp] final case class BinaryChunk(data: Chunk[Byte]) extends Outgoing
private[zhttp] final case class BinaryByteBuf(data: ByteBuf) extends Outgoing
private[zhttp] final case class BinaryStream(stream: ZStream[Any, Throwable, ByteBuf]) extends Outgoing
private[zhttp] final case class RandomAccessFile(unsafeGet: () => java.io.RandomAccessFile) extends Outgoing
private[zhttp] case object Empty extends Outgoing
}
27 changes: 22 additions & 5 deletions zio-http/src/main/scala/zhttp/http/Request.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zhttp.http
import io.netty.buffer.{ByteBuf, ByteBufUtil}
import io.netty.handler.codec.http.{DefaultFullHttpRequest, HttpRequest}
import zhttp.http.headers.HeaderExtension
import zio.stream.ZStream
import zio.{Chunk, Task, UIO}

import java.net.InetAddress
Expand Down Expand Up @@ -33,17 +34,33 @@ trait Request extends HeaderExtension[Request] { self =>
*/
def data: HttpData

final def bodyAsByteArray: Task[Array[Byte]] =
bodyAsByteBuf.flatMap(buf => Task(ByteBufUtil.getBytes(buf)).ensuring(UIO(buf.release(buf.refCnt()))))

/**
* Decodes the content of request as a Chunk of Bytes
*/
def body: Task[Chunk[Byte]] =
bodyAsByteBuf.flatMap(buf => Task(Chunk.fromArray(ByteBufUtil.getBytes(buf))))
final def body: Task[Chunk[Byte]] =
bodyAsByteArray.map(Chunk.fromArray)

/**
* Decodes the content of request as string
*/
def bodyAsString: Task[String] =
bodyAsByteBuf.flatMap(buf => Task(buf.toString(charset)))
final def bodyAsString: Task[String] =
bodyAsByteArray.map(new String(_, charset))

/**
* Decodes the content of request as stream of bytes
*/
final def bodyAsStream: ZStream[Any, Throwable, Byte] = data.toByteBufStream
.mapM[Any, Throwable, Chunk[Byte]] { buf =>
Task {
val bytes = Chunk.fromArray(ByteBufUtil.getBytes(buf))
buf.release(buf.refCnt())
bytes
}
}
.flattenChunks

/**
* Gets all the headers in the Request
Expand Down Expand Up @@ -95,7 +112,7 @@ trait Request extends HeaderExtension[Request] { self =>
*/
def url: URL

private[zhttp] def bodyAsByteBuf: Task[ByteBuf] = data.toByteBuf
private[zhttp] final def bodyAsByteBuf: Task[ByteBuf] = data.toByteBuf
}

object Request {
Expand Down
16 changes: 10 additions & 6 deletions zio-http/src/main/scala/zhttp/http/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ final case class Response private (

val jHeaders = self.headers.encode
val jContent = self.data match {
case HttpData.Text(text, charset) => Unpooled.wrappedBuffer(text.getBytes(charset))
case HttpData.BinaryChunk(data) => Unpooled.copiedBuffer(data.toArray)
case HttpData.BinaryByteBuf(data) => data
case HttpData.BinaryStream(_) => null
case HttpData.Empty => Unpooled.EMPTY_BUFFER
case HttpData.RandomAccessFile(_) => null
case HttpData.Incoming(_) => null
case data: HttpData.Outgoing =>
data match {
case HttpData.Text(text, charset) => Unpooled.wrappedBuffer(text.getBytes(charset))
case HttpData.BinaryChunk(data) => Unpooled.copiedBuffer(data.toArray)
case HttpData.BinaryByteBuf(data) => data
case HttpData.BinaryStream(_) => null
case HttpData.Empty => Unpooled.EMPTY_BUFFER
case HttpData.RandomAccessFile(_) => null
}
}

val hasContentLength = jHeaders.contains(HttpHeaderNames.CONTENT_LENGTH)
Expand Down
Loading

0 comments on commit 8e4f6bf

Please sign in to comment.