diff --git a/core/src/main/scala/io/finch/EncodeStreamToReader.scala b/core/src/main/scala/io/finch/EncodeStreamToReader.scala new file mode 100644 index 000000000..bc0eb642c --- /dev/null +++ b/core/src/main/scala/io/finch/EncodeStreamToReader.scala @@ -0,0 +1,25 @@ +package io.finch + +import com.twitter.io.{Buf, Reader} +import java.nio.charset.Charset + +/** + * + */ +trait EncodeStreamToReader[S[_[_], _], F[_], A] { + type ContentType <: String + + def apply(s: S[F, A], cs: Charset): Reader[Buf] +} + +object EncodeStreamToReader { + + type Aux[S[_[_], _], F[_], A, CT <: String] = + EncodeStreamToReader[S, F, A] { type ContentType = CT } + + type Json[S[_[_],_], F[_], A] = Aux[S, F, A, Application.Json] + + type Text[S[_[_],_], F[_], A] = Aux[S, F, A, Text.Plain] +} + + diff --git a/core/src/main/scala/io/finch/Input.scala b/core/src/main/scala/io/finch/Input.scala index fc5baaa8c..61caee153 100644 --- a/core/src/main/scala/io/finch/Input.scala +++ b/core/src/main/scala/io/finch/Input.scala @@ -3,7 +3,7 @@ package io.finch import cats.Eq import com.twitter.finagle.http.{Method, Request} import com.twitter.finagle.netty3.ChannelBufferBuf -import com.twitter.io.Buf +import com.twitter.io.{Buf, Reader} import java.nio.charset.{Charset, StandardCharsets} import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpMethod, HttpVersion} import org.jboss.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpPostRequestEncoder} @@ -36,6 +36,17 @@ final case class Input(request: Request, route: List[String]) { */ def withBody[CT <: String]: Input.Body[CT] = new Input.Body[CT](this) + + /** + * Returns a new `Input` wrapping a given stream as a payload. This requires content-type as a + * first type parameter (won't be inferred). + * + * ``` + * + * ``` + */ + def withStream[CT <: String]: Input.Stream[CT] = new Input.Stream[CT](this) + /** * Returns the new `Input` with `headers` amended. */ @@ -75,7 +86,7 @@ final case class Input(request: Request, route: List[String]) { ) } else ChannelBufferBuf.Owned(req.getContent) - withBody[Application.WwwFormUrlencoded](content, Some(StandardCharsets.UTF_8)) + withBody[Application.WwwFormUrlencoded](content, StandardCharsets.UTF_8) } } @@ -84,12 +95,13 @@ final case class Input(request: Request, route: List[String]) { */ object Input { - private final def copyRequest(from: Request): Request = { - val to = Request() - to.version = from.version - to.method = from.method + private final def copyRequest(from: Request): Request = + copyRequestWithReader(from, from.reader) + + private final def copyRequestWithReader(from: Request, reader: Reader[Buf]): Request = { + val to = Request(from.version, from.method, from.uri, reader) + to.setChunked(from.isChunked) to.content = from.content - to.uri = from.uri from.headerMap.foreach { case (k, v) => to.headerMap.put(k, v) } to @@ -99,17 +111,41 @@ object Input { * A helper class that captures the `Content-Type` of the payload. */ class Body[CT <: String](i: Input) { - def apply[A](body: A, charset: Option[Charset] = None)(implicit - e: Encode.Aux[A, CT], w: Witness.Aux[CT] - ): Input = { - val content = e(body, charset.getOrElse(StandardCharsets.UTF_8)) + def apply[A](body: A)(implicit e: Encode.Aux[A, CT], w: Witness.Aux[CT]): Input = + apply[A](body, StandardCharsets.UTF_8) + def apply[A](body: A, charset: Charset)(implicit + e: Encode.Aux[A, CT], W: Witness.Aux[CT] + ): Input = { + val content = e(body, charset) val copied = copyRequest(i.request) + copied.setChunked(false) copied.content = content - copied.contentType = w.value + copied.contentType = W.value copied.contentLength = content.length.toLong - charset.foreach(cs => copied.charset = cs.displayName().toLowerCase) + copied.charset = charset.displayName().toLowerCase + + Input(copied, i.route) + } + } + + class Stream[CT <: String](i: Input) { + def apply[S[_[_], _], F[_], A](s: S[F, A])(implicit + S: EncodeStreamToReader.Aux[S, F, A, CT], W: Witness.Aux[CT] + ): Input = apply[S, F, A](s, StandardCharsets.UTF_8) + + def apply[S[_[_], _], F[_], A](s: S[F, A], charset: Charset)(implicit + S: EncodeStreamToReader.Aux[S, F, A, CT], + W: Witness.Aux[CT] + ): Input = { + val content = S(s, charset) + val copied = copyRequestWithReader(i.request, content) + + copied.setChunked(true) + copied.contentType = W.value + copied.headerMap.setUnsafe("Transfer-Encoding", "chunked") + copied.charset = charset.displayName().toLowerCase Input(copied, i.route) } diff --git a/core/src/main/scala/io/finch/ToResponse.scala b/core/src/main/scala/io/finch/ToResponse.scala index c14d52bc5..32f791867 100644 --- a/core/src/main/scala/io/finch/ToResponse.scala +++ b/core/src/main/scala/io/finch/ToResponse.scala @@ -85,6 +85,17 @@ trait HighPriorityToResponseInstances extends LowPriorityToResponseInstances { rep } + + implicit def streamToResponse[S[_[_], _], F[_], A, CT <: String](implicit + E: EncodeStreamToReader.Aux[S, F, A, CT], + W: Witness.Aux[CT] + ): Aux[S[F, A], CT] = instance { (a, cs) => + val stream = E(a, cs) + val rep = Response(Version.Http11, Status.Ok, stream) + rep.headerMap.setUnsafe("Content-Type", W.value) + + rep + } } object ToResponse extends HighPriorityToResponseInstances { diff --git a/core/src/test/scala/io/finch/InputSpec.scala b/core/src/test/scala/io/finch/InputSpec.scala index 6ae96152f..49d088942 100644 --- a/core/src/test/scala/io/finch/InputSpec.scala +++ b/core/src/test/scala/io/finch/InputSpec.scala @@ -46,7 +46,7 @@ class InputSpec extends FinchSpec { ) check { (i: Input, f: Foo, cs: Charset) => - val input = i.withBody[Application.Json](f, Some(cs)) + val input = i.withBody[Application.Json](f, cs) input.request.content.asString(cs) === f.s && input.request.contentType === Some(s"application/json;charset=${cs.displayName.toLowerCase}") diff --git a/core/src/test/scala/io/finch/data/Foo.scala b/core/src/test/scala/io/finch/data/Foo.scala index ecd86a7e5..bcea89e71 100644 --- a/core/src/test/scala/io/finch/data/Foo.scala +++ b/core/src/test/scala/io/finch/data/Foo.scala @@ -29,6 +29,9 @@ object Foo { ) ) + implicit val encodeJsonFoo: Encode.Json[Foo] = + Encode.json((foo, _) => Buf.Utf8(s"""{s:"${foo.s}"""")) + implicit val arbitraryFoo: Arbitrary[Foo] = Arbitrary(Gen.alphaStr.suchThat(_.nonEmpty).map(Foo.apply)) } diff --git a/iteratee/src/main/scala/io/finch/iteratee/package.scala b/iteratee/src/main/scala/io/finch/iteratee/package.scala index e22347686..e809b5ecd 100644 --- a/iteratee/src/main/scala/io/finch/iteratee/package.scala +++ b/iteratee/src/main/scala/io/finch/iteratee/package.scala @@ -1,109 +1,116 @@ -package io.finch - -import cats.effect.Effect -import com.twitter.finagle.http.Response -import com.twitter.io._ -import com.twitter.util.Future -import io.finch.internal._ -import io.finch.items.RequestItem -import io.iteratee.{Enumerator, Iteratee} -import shapeless.Witness - -/** - * Iteratee module - */ -package object iteratee extends IterateeInstances { - - - private[finch] def enumeratorFromReader[F[_] : Effect](reader: Reader[Buf]): Enumerator[F, Buf] = { - def rec(reader: Reader[Buf]): Enumerator[F, Buf] = { - Enumerator.liftM[F, Option[Buf]] { - futureToEffect(reader.read()) - }.flatMap { - case None => Enumerator.empty[F, Buf] - case Some(buf) => Enumerator.enumOne[F, Buf](buf).append(rec(reader)) - } - } - rec(reader).ensure(Effect[F].delay(reader.discard())) - } - - /** - * An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as - * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. - */ - def enumeratorBody[F[_] : Effect, A, CT <: String](implicit - decode: Enumerate.Aux[F, A, CT] - ): Endpoint[F, Enumerator[F, A]] = new Endpoint[F, Enumerator[F, A]] { - final def apply(input: Input): Endpoint.Result[F, Enumerator[F, A]] = { - if (!input.request.isChunked) EndpointResult.NotMatched[F] - else { - val req = input.request - EndpointResult.Matched( - input, - Trace.empty, - Effect[F].pure(Output.payload(decode(enumeratorFromReader(req.reader), req.charsetOrUtf8))) - ) - } - } - - final override def item: RequestItem = items.BodyItem - final override def toString: String = "enumeratorBody" - } - - /** - * An evaluating [[Endpoint]] that reads a required chunked streaming JSON body, interpreted as - * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. - */ - def enumeratorJsonBody[F[_] : Effect, A](implicit - ad: Enumerate.Aux[F, A, Application.Json] - ): Endpoint[F, Enumerator[F, A]] = enumeratorBody[F, A, Application.Json].withToString("enumeratorJsonBody") - -} - -trait IterateeInstances extends LowPriorityInstances { - - implicit def enumeratorToJsonResponse[F[_] : Effect, A](implicit - e: Encode.Aux[A, Application.Json], - w: Witness.Aux[Application.Json] - ): ToResponse.Aux[Enumerator[F, A], Application.Json] = { - withCustomIteratee[F, A, Application.Json](writer => - Iteratee.foreachM[F, Buf]((buf: Buf) => futureToEffect(writer.write(buf.concat(ToResponse.NewLine)))) - ) - } -} - -trait LowPriorityInstances { - - protected def futureToEffect[F[_] : Effect, A](future: => Future[A]): F[A] = { - Effect[F].async[A](cb => { - future - .onFailure(t => cb(Left(t))) - .onSuccess(b => cb(Right(b))) - }) - } - - implicit def enumeratorToResponse[F[_] : Effect, A, CT <: String](implicit - e: Encode.Aux[A, CT], - w: Witness.Aux[CT] - ): ToResponse.Aux[Enumerator[F, A], CT] = { - withCustomIteratee(writer => Iteratee.foreachM[F, Buf]((buf: Buf) => futureToEffect(writer.write(buf)))) - } - - protected def withCustomIteratee[F[_] : Effect, A, CT <: String] - (iteratee: Writer[Buf] => Iteratee[F, Buf, Unit])(implicit - e: Encode.Aux[A, CT], - w: Witness.Aux[CT] - ): ToResponse.Aux[Enumerator[F, A], CT] = { - ToResponse.instance[Enumerator[F, A], CT]((enum, cs) => { - val response = Response() - response.setChunked(true) - response.contentType = w.value - val writer = response.writer - val stream = { - enum.ensure(Effect[F].suspend(futureToEffect(writer.close()))).map(e.apply(_, cs)).into(iteratee(writer)) - } - Effect[F].toIO(stream).unsafeRunAsyncAndForget() - response - }) - } -} +package io.finch + +import cats.effect.Effect +import com.twitter.io._ +import com.twitter.util.Future +import io.finch.internal._ +import io.finch.items.RequestItem +import io.iteratee.{Enumerator, Iteratee} +import java.nio.charset.Charset + +/** + * Iteratee module + */ +package object iteratee extends IterateeInstances { + + private[finch] def enumeratorFromReader[F[_] : Effect](reader: Reader[Buf]): Enumerator[F, Buf] = { + def loop(): Enumerator[F, Buf] = { + Enumerator + .liftM[F, Option[Buf]](toEffect[F, Option[Buf]](reader.read())) + .flatMap { + case None => Enumerator.empty[F, Buf] + case Some(buf) => Enumerator.enumOne[F, Buf](buf).append(loop()) + } + } + + loop().ensure(Effect[F].delay(reader.discard())) + } + + /** + * An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as + * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. + */ + def enumeratorBody[F[_] : Effect, A, CT <: String](implicit + decode: Enumerate.Aux[F, A, CT] + ): Endpoint[F, Enumerator[F, A]] = new Endpoint[F, Enumerator[F, A]] { + final def apply(input: Input): Endpoint.Result[F, Enumerator[F, A]] = { + if (!input.request.isChunked) EndpointResult.NotMatched[F] + else { + val req = input.request + EndpointResult.Matched( + input, + Trace.empty, + Effect[F].pure(Output.payload(decode(enumeratorFromReader(req.reader), req.charsetOrUtf8))) + ) + } + } + + final override def item: RequestItem = items.BodyItem + final override def toString: String = "enumeratorBody" + } + + /** + * An evaluating [[Endpoint]] that reads a required chunked streaming JSON body, interpreted as + * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. + */ + def enumeratorJsonBody[F[_] : Effect, A](implicit + ad: Enumerate.Aux[F, A, Application.Json] + ): Endpoint[F, Enumerator[F, A]] = enumeratorBody[F, A, Application.Json].withToString("enumeratorJsonBody") + +} + +trait IterateeInstances extends LowPriorityIterateeInstances { + + implicit def encodeJsonEnumeratorToReader[F[_]: Effect, A](implicit + A: Encode.Json[A] + ): EncodeStreamToReader.Json[Enumerator, F, A] = + new EncodeEnumeratorToReader[F, A, Application.Json] { + protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs) + override protected def writeChunk(chunk: Buf, w: Writer[Buf]): Future[Unit] = + w.write(chunk.concat(ToResponse.NewLine)) + } + + implicit def encodeTextEnumeratorToReader[F[_]: Effect, A](implicit + A: Encode.Text[A] + ): EncodeStreamToReader.Text[Enumerator, F, A] = + new EncodeEnumeratorToReader[F, A, Text.Plain] { + override protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs) + } +} + +trait LowPriorityIterateeInstances { + + protected def toEffect[F[_], A](f: => Future[A])(implicit F: Effect[F]): F[A] = + F.async[A]( + cb => f.onFailure(t => cb(Left(t))).onSuccess(b => cb(Right(b))) + ) + + protected abstract class EncodeEnumeratorToReader[F[_], A, CT <: String](implicit + F: Effect[F] + ) extends EncodeStreamToReader[Enumerator, F, A] { + + type ContentType = CT + + protected def encodeChunk(chunk: A, cs: Charset): Buf + protected def writeChunk(chunk: Buf, w: Writer[Buf]): Future[Unit] = w.write(chunk) + + private def writeIteratee(w: Writer[Buf]): Iteratee[F, Buf, Unit] = + Iteratee.foreachM[F, Buf](chunk => toEffect[F, Unit](writeChunk(chunk, w))) + + def apply(s: Enumerator[F, A], cs: Charset): Reader[Buf] = { + val p = new Pipe[Buf] + val run = s + .ensure(F.suspend(toEffect[F, Unit](p.close()))) + .map(chunk => encodeChunk(chunk, cs)) + .into(writeIteratee(p)) + + F.toIO(run).unsafeRunAsyncAndForget() + p + } + } + + implicit def encodeBufEnumeratorToReader[F[_]: Effect, CT <: String]: EncodeStreamToReader.Aux[Enumerator, F, Buf, CT] = + new EncodeEnumeratorToReader[F, Buf, CT] { + protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk + } +} diff --git a/iteratee/src/test/scala/io/finch/iteratee/ToResponseSpec.scala b/iteratee/src/test/scala/io/finch/iteratee/ToResponseSpec.scala index 80538f398..220afab00 100644 --- a/iteratee/src/test/scala/io/finch/iteratee/ToResponseSpec.scala +++ b/iteratee/src/test/scala/io/finch/iteratee/ToResponseSpec.scala @@ -3,8 +3,10 @@ package io.finch.iteratee import java.nio.charset.StandardCharsets import cats.effect.IO +import com.twitter.finagle.http.Response import com.twitter.io.Buf -import io.finch.{Application, FinchSpec, Text, ToResponse} +import io.finch._ +import io.finch.data.Foo import io.iteratee.Enumerator import org.scalatest.prop.GeneratorDrivenPropertyChecks @@ -14,23 +16,30 @@ class ToResponseSpec extends FinchSpec with GeneratorDrivenPropertyChecks { it should "correctly encode Enumerator to Response" in { forAll { data: List[Buf] => - enumeratorFromReader[IO](response[Buf, Text.Plain](data).reader).toVector.unsafeRunSync() should { + enumeratorFromReader[IO](toResponse[Text.Plain](data).reader).toVector.unsafeRunSync() should { contain theSameElementsAs data } } } it should "insert new lines after each chunk" in { - forAll { data: List[Buf] => - enumeratorFromReader[IO](response[Buf, Application.Json](data).reader).toVector.unsafeRunSync() should { - contain theSameElementsAs data.map(_.concat(ToResponse.NewLine)) + forAll { data: List[Foo] => + val encoded = data.map(foo => + Encode[Foo, Application.Json].apply(foo, StandardCharsets.UTF_8).concat(ToResponse.NewLine) + ) + + enumeratorFromReader[IO](toResponse[Application.Json](data).reader).toVector.unsafeRunSync() should { + contain theSameElementsAs encoded } } } - private def response[A, CT <: String](data: List[A])(implicit tr: ToResponse.Aux[Enumerator[IO, A], CT]) = { - val enumerator = Enumerator.enumList[IO, A](data) + private class toResponse[CT <: String] { + def apply[A](data: List[A])(implicit A: ToResponse.Aux[Enumerator[IO, A], CT]): Response = + A(Enumerator.enumList[IO, A](data), StandardCharsets.UTF_8) + } - tr(enumerator, StandardCharsets.UTF_8) + private object toResponse { + def apply[CT <: String]: toResponse[CT] = new toResponse[CT] } }