diff --git a/example/src/main/scala/example/RequestStreaming.scala b/example/src/main/scala/example/RequestStreaming.scala new file mode 100644 index 0000000000..3e662fe4aa --- /dev/null +++ b/example/src/main/scala/example/RequestStreaming.scala @@ -0,0 +1,24 @@ +package example + +import zhttp.http._ +import zhttp.service.Server +import zio._ +object RequestStreaming extends App { + + // Create HTTP route which echos back the request body + val app = Http.collect[Request] { case req @ Method.POST -> !! / "echo" => + // Returns a stream of bytes from the request + // The stream supports back-pressure + val stream = req.bodyAsStream + + // Creating HttpData from the stream + // This works for file of any size + val data = HttpData.fromStream(stream) + + Response(data = data) + } + + // Run it like any simple app + override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = + Server.start(8090, app).exitCode +} diff --git a/zio-http/src/main/scala/zhttp/http/HttpDataExtension.scala b/zio-http/src/main/scala/zhttp/http/HttpDataExtension.scala new file mode 100644 index 0000000000..d631659d30 --- /dev/null +++ b/zio-http/src/main/scala/zhttp/http/HttpDataExtension.scala @@ -0,0 +1,40 @@ +package zhttp.http + +import io.netty.buffer.{ByteBuf, ByteBufUtil} +import zhttp.http.headers.HeaderExtension +import zio.stream.ZStream +import zio.{Chunk, Task, UIO} + +private[zhttp] trait HttpDataExtension[+A] extends HeaderExtension[A] { self: A => + def data: HttpData + + private[zhttp] final def bodyAsByteBuf: Task[ByteBuf] = data.toByteBuf + + final def bodyAsByteArray: Task[Array[Byte]] = + bodyAsByteBuf.flatMap(buf => Task(ByteBufUtil.getBytes(buf)).ensuring(UIO(buf.release(buf.refCnt())))) + + /** + * Decodes the content of request as a Chunk of Bytes + */ + final def body: Task[Chunk[Byte]] = + bodyAsByteArray.map(Chunk.fromArray) + + /** + * Decodes the content of request as string + */ + final def bodyAsString: Task[String] = + bodyAsByteArray.map(new String(_, charset)) + + /** + * Decodes the content of request as stream of bytes + */ + final def bodyAsStream: ZStream[Any, Throwable, Byte] = data.toByteBufStream + .mapM[Any, Throwable, Chunk[Byte]] { buf => + Task { + val bytes = Chunk.fromArray(ByteBufUtil.getBytes(buf)) + buf.release(buf.refCnt()) + bytes + } + } + .flattenChunks +} diff --git a/zio-http/src/main/scala/zhttp/http/Request.scala b/zio-http/src/main/scala/zhttp/http/Request.scala index dbc9449395..6bf4736bff 100644 --- a/zio-http/src/main/scala/zhttp/http/Request.scala +++ b/zio-http/src/main/scala/zhttp/http/Request.scala @@ -1,14 +1,11 @@ package zhttp.http -import io.netty.buffer.{ByteBuf, ByteBufUtil} import io.netty.handler.codec.http.{DefaultFullHttpRequest, HttpRequest} import zhttp.http.headers.HeaderExtension -import zio.stream.ZStream -import zio.{Chunk, Task, UIO} import java.net.InetAddress -trait Request extends HeaderExtension[Request] { self => +trait Request extends HeaderExtension[Request] with HttpDataExtension[Request] { self => /** * Updates the headers using the provided function @@ -41,34 +38,6 @@ trait Request extends HeaderExtension[Request] { self => */ def data: HttpData - final def bodyAsByteArray: Task[Array[Byte]] = - bodyAsByteBuf.flatMap(buf => Task(ByteBufUtil.getBytes(buf)).ensuring(UIO(buf.release(buf.refCnt())))) - - /** - * Decodes the content of request as a Chunk of Bytes - */ - final def body: Task[Chunk[Byte]] = - bodyAsByteArray.map(Chunk.fromArray) - - /** - * Decodes the content of request as string - */ - final def bodyAsString: Task[String] = - bodyAsByteArray.map(new String(_, charset)) - - /** - * Decodes the content of request as stream of bytes - */ - final def bodyAsStream: ZStream[Any, Throwable, Byte] = data.toByteBufStream - .mapM[Any, Throwable, Chunk[Byte]] { buf => - Task { - val bytes = Chunk.fromArray(ByteBufUtil.getBytes(buf)) - buf.release(buf.refCnt()) - bytes - } - } - .flattenChunks - /** * Gets all the headers in the Request */ @@ -124,7 +93,6 @@ trait Request extends HeaderExtension[Request] { self => */ def version: Version - private[zhttp] final def bodyAsByteBuf: Task[ByteBuf] = data.toByteBuf } object Request { diff --git a/zio-http/src/main/scala/zhttp/http/Response.scala b/zio-http/src/main/scala/zhttp/http/Response.scala index ed18bd924d..40faed27a2 100644 --- a/zio-http/src/main/scala/zhttp/http/Response.scala +++ b/zio-http/src/main/scala/zhttp/http/Response.scala @@ -1,12 +1,12 @@ package zhttp.http -import io.netty.buffer.{ByteBuf, ByteBufUtil, Unpooled} +import io.netty.buffer.Unpooled import io.netty.handler.codec.http.HttpVersion.HTTP_1_1 import io.netty.handler.codec.http.{FullHttpResponse, HttpHeaderNames, HttpResponse} import zhttp.html._ import zhttp.http.headers.HeaderExtension import zhttp.socket.{IsWebSocket, Socket, SocketApp} -import zio.{Chunk, Task, UIO, ZIO} +import zio.{Chunk, UIO, ZIO} import java.io.{PrintWriter, StringWriter} import java.nio.charset.Charset @@ -16,7 +16,8 @@ final case class Response private ( headers: Headers, data: HttpData, private[zhttp] val attribute: Response.Attribute, -) extends HeaderExtension[Response] { self => +) extends HeaderExtension[Response] + with HttpDataExtension[Response] { self => /** * Adds cookies in the response headers. @@ -59,16 +60,6 @@ final case class Response private ( */ def withServerTime: Response = self.copy(attribute = self.attribute.withServerTime) - final def bodyAsByteArray: Task[Array[Byte]] = - bodyAsByteBuf.flatMap(buf => Task(ByteBufUtil.getBytes(buf))) - - /** - * Extracts the body as ByteBuf - */ - private[zhttp] def bodyAsByteBuf: Task[ByteBuf] = self.data.toByteBuf - - def bodyAsString: Task[String] = bodyAsByteArray.map(new String(_, charset)) - /** * Encodes the Response into a Netty HttpResponse. Sets default headers such * as `content-length`. For performance reasons, it is possible that it uses a