Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: Improve HttpData toByteBuf #1137

Merged
merged 11 commits into from
Mar 20, 2022
5 changes: 3 additions & 2 deletions example/src/main/scala/example/StaticServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ object StaticServer extends zio.App {
// A simple app to serve static resource files from a local directory.
val app = Http.collectHttp[Request] { case Method.GET -> "static" /: path =>
for {
file <- Http.getResourceAsFile(path.encode)
file <- Http.getResourceAsFile(path.encode.tail)
http <-
// Rendering a custom UI to list all the files in the directory
if (file.isDirectory) {
Expand Down Expand Up @@ -40,7 +40,8 @@ object StaticServer extends zio.App {
else if (file.isFile) Http.fromFile(file)

// Return a 404 if the file doesn't exist
else Http.empty
else
Http.empty
} yield http
}

Expand Down
12 changes: 6 additions & 6 deletions zio-http/src/main/scala/zhttp/http/Http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import zhttp.http.headers.HeaderModifier
import zhttp.service.server.ServerTime
import zhttp.service.{Handler, HttpRuntime, Server}
import zio._
import zio.blocking.Blocking
import zio.blocking.{Blocking, effectBlocking}
import zio.clock.Clock
import zio.duration.Duration
import zio.stream.ZStream
Expand Down Expand Up @@ -783,7 +783,7 @@ object Http {
/**
* Creates an Http app from the contents of a file.
*/
def fromFile(file: => java.io.File): HttpApp[Any, Throwable] = Http.fromFileZIO(Task(file))
def fromFile(file: => java.io.File): HttpApp[Any, Throwable] = Http.fromFileZIO(UIO(file))

/**
* Creates an Http app from the contents of a file which is produced from an
Expand Down Expand Up @@ -859,7 +859,7 @@ object Http {
/**
* Creates an Http app from a resource path
*/
def fromResource(path: String): HttpApp[Any, Throwable] =
def fromResource(path: String): HttpApp[Blocking, Throwable] =
Http.getResource(path).flatMap(url => Http.fromFile(new File(url.getPath)))

/**
Expand All @@ -884,15 +884,15 @@ object Http {
/**
* Attempts to retrieve files from the classpath.
*/
def getResource(path: String): Http[Any, Throwable, Any, net.URL] =
def getResource(path: String): Http[Blocking, Throwable, Any, net.URL] =
Http
.fromZIO(Blocking.Service.live.effectBlockingIO(getClass.getClassLoader.getResource(path)))
.fromZIO(effectBlocking(getClass.getClassLoader.getResource(path)))
.flatMap { resource => if (resource == null) Http.empty else Http.succeed(resource) }

/**
* Attempts to retrieve files from the classpath.
*/
def getResourceAsFile(path: String): Http[Any, Throwable, Any, File] =
def getResourceAsFile(path: String): Http[Blocking, Throwable, Any, File] =
Http.getResource(path).map(url => new File(url.getPath))

/**
Expand Down
245 changes: 190 additions & 55 deletions zio-http/src/main/scala/zhttp/http/HttpData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zhttp.http
import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.{HttpContent, LastHttpContent}
import zio.blocking.Blocking.Service.live.effectBlocking
import zhttp.http.HttpData.ByteBufConfig
import zio.stream.ZStream
import zio.{Chunk, Task, UIO, ZIO}

Expand All @@ -16,12 +16,23 @@ import java.nio.charset.Charset
sealed trait HttpData { self =>

/**
* Returns true if HttpData is a stream
* Encodes the HttpData into a ByteBuf. Takes in ByteBufConfig to have a more
* fine grained control over the encoding.
*/
final def isChunked: Boolean = self match {
case HttpData.BinaryStream(_) => true
case _ => false
}
def toByteBuf(config: ByteBufConfig): Task[ByteBuf]

/**
* Encodes the HttpData into a Stream of ByteBufs. Takes in ByteBufConfig to
* have a more fine grained control over the encoding.
*/
def toByteBufStream(config: ByteBufConfig): ZStream[Any, Throwable, ByteBuf]

/**
* Encodes the HttpData into a Http of ByeBuf. This could be more performant
* in certain cases. Takes in ByteBufConfig to have a more fine grained
* control over the encoding.
*/
def toHttp(config: ByteBufConfig): Http[Any, Throwable, Any, ByteBuf]

/**
* Returns true if HttpData is empty
Expand All @@ -31,21 +42,27 @@ sealed trait HttpData { self =>
case _ => false
}

final def toByteBuf: Task[ByteBuf] = {
self match {
case self: HttpData.Incoming => self.encode
case self: HttpData.Outgoing => self.encode
}
}
/**
* Encodes the HttpData into a ByteBuf.
*/
final def toByteBuf: Task[ByteBuf] = toByteBuf(ByteBufConfig.default)

final def toByteBufStream: ZStream[Any, Throwable, ByteBuf] = self match {
case self: HttpData.Incoming => self.encodeAsStream
case self: HttpData.Outgoing => ZStream.fromEffect(self.encode)
}
/**
* Encodes the HttpData into a Stream of ByteBufs
*/
final def toByteBufStream: ZStream[Any, Throwable, ByteBuf] = toByteBufStream(ByteBufConfig.default)

/**
* A bit more efficient version of toByteBuf in certain cases
*/
final def toHttp: Http[Any, Throwable, Any, ByteBuf] = toHttp(ByteBufConfig.default)
}

object HttpData {

private def collectStream[R, E](stream: ZStream[R, E, ByteBuf]): ZIO[R, E, ByteBuf] =
stream.fold(Unpooled.compositeBuffer()) { case (cmp, buf) => cmp.addComponent(true, buf) }

/**
* Helper to create empty HttpData
*/
Expand All @@ -64,9 +81,7 @@ object HttpData {
/**
* Helper to create HttpData from contents of a file
*/
def fromFile(file: => java.io.File): HttpData = {
RandomAccessFile(() => new java.io.RandomAccessFile(file, "r"))
}
def fromFile(file: => java.io.File): HttpData = JavaFile(() => file)

/**
* Helper to create HttpData from Stream of string
Expand All @@ -85,25 +100,18 @@ object HttpData {
*/
def fromString(text: String, charset: Charset = HTTP_CHARSET): HttpData = Text(text, charset)

private[zhttp] sealed trait Outgoing extends HttpData { self =>
def encode: ZIO[Any, Throwable, ByteBuf] =
self match {
case HttpData.Text(text, charset) => UIO(Unpooled.copiedBuffer(text, charset))
case HttpData.BinaryChunk(data) => UIO(Unpooled.copiedBuffer(data.toArray))
case HttpData.BinaryByteBuf(data) => UIO(data)
case HttpData.Empty => UIO(Unpooled.EMPTY_BUFFER)
case HttpData.BinaryStream(stream) =>
stream
.asInstanceOf[ZStream[Any, Throwable, ByteBuf]]
.fold(Unpooled.compositeBuffer())((c, b) => c.addComponent(true, b))
case HttpData.RandomAccessFile(raf) =>
effectBlocking {
val fis = new FileInputStream(raf().getFD)
val fileContent: Array[Byte] = new Array[Byte](raf().length().toInt)
fis.read(fileContent)
Unpooled.copiedBuffer(fileContent)
}
}
private[zhttp] sealed trait Complete extends HttpData

/**
* Provides a more fine grained control while encoding HttpData into ByteBUfs
*/
case class ByteBufConfig(chunkSize: Int = 1024 * 4) {
def chunkSize(fileLength: Long): Int = {
val actualInt = fileLength.toInt
if (actualInt < 0) chunkSize
else if (actualInt < chunkSize) actualInt
else chunkSize
}
}

private[zhttp] final class UnsafeContent(private val httpContent: HttpContent) extends AnyVal {
Expand All @@ -116,9 +124,13 @@ object HttpData {
def read(): Unit = ctx.read(): Unit
}

private[zhttp] final case class Incoming(unsafeRun: (UnsafeChannel => UnsafeContent => Unit) => Unit)
private[zhttp] final case class UnsafeAsync(unsafeRun: (UnsafeChannel => UnsafeContent => Unit) => Unit)
extends HttpData {
def encode: ZIO[Any, Nothing, ByteBuf] = for {

/**
* Encodes the HttpData into a ByteBuf.
*/
override def toByteBuf(config: ByteBufConfig): Task[ByteBuf] = for {
body <- ZIO.effectAsync[Any, Nothing, ByteBuf](cb =>
unsafeRun(ch => {
val buffer = Unpooled.compositeBuffer()
Expand All @@ -130,21 +142,144 @@ object HttpData {
)
} yield body

def encodeAsStream: ZStream[Any, Nothing, ByteBuf] = ZStream
.effectAsync[Any, Nothing, ByteBuf](cb =>
unsafeRun(ch =>
msg => {
cb(ZIO.succeed(Chunk(msg.content)))
if (msg.isLast) cb(ZIO.fail(None)) else ch.read()
},
),
)
/**
* Encodes the HttpData into a Stream of ByteBufs
*/
override def toByteBufStream(config: ByteBufConfig): ZStream[Any, Throwable, ByteBuf] =
ZStream
.effectAsync[Any, Nothing, ByteBuf](cb =>
unsafeRun(ch =>
msg => {
cb(ZIO.succeed(Chunk(msg.content)))
if (msg.isLast) cb(ZIO.fail(None)) else ch.read()
},
),
)

override def toHttp(config: ByteBufConfig): Http[Any, Throwable, Any, ByteBuf] =
Http.fromZIO(toByteBuf(config))
}

private[zhttp] final case class Text(text: String, charset: Charset) extends Complete {

private def encode = Unpooled.copiedBuffer(text, charset)

/**
* Encodes the HttpData into a ByteBuf.
*/
override def toByteBuf(config: ByteBufConfig): Task[ByteBuf] = UIO(encode)

/**
* Encodes the HttpData into a Stream of ByteBufs
*/
override def toByteBufStream(config: ByteBufConfig): ZStream[Any, Throwable, ByteBuf] =
ZStream.fromEffect(toByteBuf(config))

override def toHttp(config: ByteBufConfig): UHttp[Any, ByteBuf] = Http.succeed(encode)
}

private[zhttp] final case class BinaryChunk(data: Chunk[Byte]) extends Complete {

private def encode = Unpooled.wrappedBuffer(data.toArray)

/**
* Encodes the HttpData into a ByteBuf.
*/
override def toByteBuf(config: ByteBufConfig): Task[ByteBuf] = UIO(encode)

/**
* Encodes the HttpData into a Stream of ByteBufs
*/
override def toByteBufStream(config: ByteBufConfig): ZStream[Any, Throwable, ByteBuf] =
ZStream.fromEffect(toByteBuf(config))

override def toHttp(config: ByteBufConfig): UHttp[Any, ByteBuf] = Http.succeed(encode)
}

private[zhttp] final case class BinaryByteBuf(data: ByteBuf) extends Complete {

/**
* Encodes the HttpData into a ByteBuf.
*/
override def toByteBuf(config: ByteBufConfig): Task[ByteBuf] = Task(data)

/**
* Encodes the HttpData into a Stream of ByteBufs
*/
override def toByteBufStream(config: ByteBufConfig): ZStream[Any, Throwable, ByteBuf] =
ZStream.fromEffect(toByteBuf(config))

override def toHttp(config: ByteBufConfig): UHttp[Any, ByteBuf] = Http.succeed(data)
}

private[zhttp] final case class BinaryStream(stream: ZStream[Any, Throwable, ByteBuf]) extends Complete {

/**
* Encodes the HttpData into a ByteBuf.
*/
override def toByteBuf(config: ByteBufConfig): Task[ByteBuf] =
collectStream(toByteBufStream(config))

/**
* Encodes the HttpData into a Stream of ByteBufs
*/
override def toByteBufStream(config: ByteBufConfig): ZStream[Any, Throwable, ByteBuf] =
stream

override def toHttp(config: ByteBufConfig): Http[Any, Throwable, Any, ByteBuf] =
Http.fromZIO(toByteBuf(config))
}

private[zhttp] final case class JavaFile(unsafeFile: () => java.io.File) extends Complete {

/**
* Encodes the HttpData into a ByteBuf.
*/
override def toByteBuf(config: ByteBufConfig): Task[ByteBuf] =
collectStream(toByteBufStream(config))

/**
* Encodes the HttpData into a Stream of ByteBufs
*/
override def toByteBufStream(config: ByteBufConfig): ZStream[Any, Throwable, ByteBuf] =
ZStream.unwrap {
for {
file <- Task(unsafeFile())
fs <- Task(new FileInputStream(file))
size = config.chunkSize(file.length())
buffer = new Array[Byte](size)
} yield ZStream
.repeatEffectOption[Any, Throwable, ByteBuf] {
for {
len <- Task(fs.read(buffer)).mapError(Some(_))
bytes <- if (len > 0) UIO(Unpooled.copiedBuffer(buffer, 0, len)) else ZIO.fail(None)
} yield bytes
}
.ensuring(UIO(fs.close()))
}

override def toHttp(config: ByteBufConfig): Http[Any, Throwable, Any, ByteBuf] =
Http.fromZIO(toByteBuf(config))
}

object ByteBufConfig {
val default: ByteBufConfig = ByteBufConfig()
}

private[zhttp] case object Empty extends Complete {

/**
* Encodes the HttpData into a ByteBuf.
*/
override def toByteBuf(config: ByteBufConfig): Task[ByteBuf] = UIO(Unpooled.EMPTY_BUFFER)

/**
* Encodes the HttpData into a Stream of ByteBufs
*/
override def toByteBufStream(config: ByteBufConfig): ZStream[Any, Throwable, ByteBuf] =
ZStream.fromEffect(toByteBuf(config))

override def toHttp(config: ByteBufConfig): UHttp[Any, ByteBuf] = Http.empty
}

private[zhttp] final case class Text(text: String, charset: Charset) extends Outgoing
private[zhttp] final case class BinaryChunk(data: Chunk[Byte]) extends Outgoing
private[zhttp] final case class BinaryByteBuf(data: ByteBuf) extends Outgoing
private[zhttp] final case class BinaryStream(stream: ZStream[Any, Throwable, ByteBuf]) extends Outgoing
private[zhttp] final case class RandomAccessFile(unsafeGet: () => java.io.RandomAccessFile) extends Outgoing
private[zhttp] case object Empty extends Outgoing
}
6 changes: 3 additions & 3 deletions zio-http/src/main/scala/zhttp/http/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ final case class Response private (

val jHeaders = self.headers.encode
val jContent = self.data match {
case HttpData.Incoming(_) => null
case data: HttpData.Outgoing =>
case HttpData.UnsafeAsync(_) => null
case data: HttpData.Complete =>
data match {
case HttpData.Text(text, charset) => Unpooled.wrappedBuffer(text.getBytes(charset))
case HttpData.BinaryChunk(data) => Unpooled.copiedBuffer(data.toArray)
case HttpData.BinaryByteBuf(data) => data
case HttpData.BinaryStream(_) => null
case HttpData.Empty => Unpooled.EMPTY_BUFFER
case HttpData.RandomAccessFile(_) => null
case HttpData.JavaFile(_) => null
}
}

Expand Down
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zhttp/service/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[zhttp] final case class Handler[R](
jReq,
app,
new Request {
override def data: HttpData = HttpData.Incoming(callback =>
override def data: HttpData = HttpData.UnsafeAsync(callback =>
ctx
.pipeline()
.addAfter(HTTP_REQUEST_HANDLER, HTTP_CONTENT_HANDLER, new RequestBodyHandler(callback)): Unit,
Expand Down
Loading