Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Request and Response to extend HttpDataExtension #1112

Merged
merged 5 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions example/src/main/scala/example/RequestStreaming.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package example

import zhttp.http._
import zhttp.service.Server
import zio._
object RequestStreaming extends App {

// Create HTTP route which echos back the request body
val app = Http.collect[Request] { case req @ Method.POST -> !! / "echo" =>
// Returns a stream of bytes from the request
// The stream supports back-pressure
val stream = req.bodyAsStream

// Creating HttpData from the stream
// This works for file of any size
val data = HttpData.fromStream(stream)

Response(data = data)
}

// Run it like any simple app
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
Server.start(8090, app).exitCode
}
40 changes: 40 additions & 0 deletions zio-http/src/main/scala/zhttp/http/HttpDataExtension.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package zhttp.http

import io.netty.buffer.{ByteBuf, ByteBufUtil}
import zhttp.http.headers.HeaderExtension
import zio.stream.ZStream
import zio.{Chunk, Task, UIO}

private[zhttp] trait HttpDataExtension[+A] extends HeaderExtension[A] { self: A =>
def data: HttpData

private[zhttp] final def bodyAsByteBuf: Task[ByteBuf] = data.toByteBuf

final def bodyAsByteArray: Task[Array[Byte]] =
bodyAsByteBuf.flatMap(buf => Task(ByteBufUtil.getBytes(buf)).ensuring(UIO(buf.release(buf.refCnt()))))

/**
* Decodes the content of request as a Chunk of Bytes
*/
final def body: Task[Chunk[Byte]] =
bodyAsByteArray.map(Chunk.fromArray)

/**
* Decodes the content of request as string
*/
final def bodyAsString: Task[String] =
bodyAsByteArray.map(new String(_, charset))

/**
* Decodes the content of request as stream of bytes
*/
final def bodyAsStream: ZStream[Any, Throwable, Byte] = data.toByteBufStream
.mapM[Any, Throwable, Chunk[Byte]] { buf =>
Task {
val bytes = Chunk.fromArray(ByteBufUtil.getBytes(buf))
buf.release(buf.refCnt())
bytes
}
}
.flattenChunks
}
34 changes: 1 addition & 33 deletions zio-http/src/main/scala/zhttp/http/Request.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package zhttp.http

import io.netty.buffer.{ByteBuf, ByteBufUtil}
import io.netty.handler.codec.http.{DefaultFullHttpRequest, HttpRequest}
import zhttp.http.headers.HeaderExtension
import zio.stream.ZStream
import zio.{Chunk, Task, UIO}

import java.net.InetAddress

trait Request extends HeaderExtension[Request] { self =>
trait Request extends HeaderExtension[Request] with HttpDataExtension[Request] { self =>

/**
* Updates the headers using the provided function
Expand Down Expand Up @@ -41,34 +38,6 @@ trait Request extends HeaderExtension[Request] { self =>
*/
def data: HttpData

final def bodyAsByteArray: Task[Array[Byte]] =
bodyAsByteBuf.flatMap(buf => Task(ByteBufUtil.getBytes(buf)).ensuring(UIO(buf.release(buf.refCnt()))))

/**
* Decodes the content of request as a Chunk of Bytes
*/
final def body: Task[Chunk[Byte]] =
bodyAsByteArray.map(Chunk.fromArray)

/**
* Decodes the content of request as string
*/
final def bodyAsString: Task[String] =
bodyAsByteArray.map(new String(_, charset))

/**
* Decodes the content of request as stream of bytes
*/
final def bodyAsStream: ZStream[Any, Throwable, Byte] = data.toByteBufStream
.mapM[Any, Throwable, Chunk[Byte]] { buf =>
Task {
val bytes = Chunk.fromArray(ByteBufUtil.getBytes(buf))
buf.release(buf.refCnt())
bytes
}
}
.flattenChunks

/**
* Gets all the headers in the Request
*/
Expand Down Expand Up @@ -124,7 +93,6 @@ trait Request extends HeaderExtension[Request] { self =>
*/
def version: Version

private[zhttp] final def bodyAsByteBuf: Task[ByteBuf] = data.toByteBuf
}

object Request {
Expand Down
17 changes: 4 additions & 13 deletions zio-http/src/main/scala/zhttp/http/Response.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package zhttp.http

import io.netty.buffer.{ByteBuf, ByteBufUtil, Unpooled}
import io.netty.buffer.Unpooled
import io.netty.handler.codec.http.HttpVersion.HTTP_1_1
import io.netty.handler.codec.http.{FullHttpResponse, HttpHeaderNames, HttpResponse}
import zhttp.html._
import zhttp.http.headers.HeaderExtension
import zhttp.socket.{IsWebSocket, Socket, SocketApp}
import zio.{Chunk, Task, UIO, ZIO}
import zio.{Chunk, UIO, ZIO}

import java.io.{PrintWriter, StringWriter}
import java.nio.charset.Charset
Expand All @@ -16,7 +16,8 @@ final case class Response private (
headers: Headers,
data: HttpData,
private[zhttp] val attribute: Response.Attribute,
) extends HeaderExtension[Response] { self =>
) extends HeaderExtension[Response]
with HttpDataExtension[Response] { self =>

/**
* Adds cookies in the response headers.
Expand Down Expand Up @@ -59,16 +60,6 @@ final case class Response private (
*/
def withServerTime: Response = self.copy(attribute = self.attribute.withServerTime)

final def bodyAsByteArray: Task[Array[Byte]] =
bodyAsByteBuf.flatMap(buf => Task(ByteBufUtil.getBytes(buf)))

/**
* Extracts the body as ByteBuf
*/
private[zhttp] def bodyAsByteBuf: Task[ByteBuf] = self.data.toByteBuf

def bodyAsString: Task[String] = bodyAsByteArray.map(new String(_, charset))

/**
* Encodes the Response into a Netty HttpResponse. Sets default headers such
* as `content-length`. For performance reasons, it is possible that it uses a
Expand Down