Skip to content

Commit

Permalink
Introduce EncodeStream
Browse files Browse the repository at this point in the history
  • Loading branch information
vkostyukov committed Jan 9, 2019
1 parent 66f15e2 commit 2e841ff
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 173 deletions.
26 changes: 26 additions & 0 deletions core/src/main/scala/io/finch/EncodeStream.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.finch

import com.twitter.io.{Buf, Reader}
import java.nio.charset.Charset

/**
* A type-class that defines encoding of a stream in a shape of `S[F[_], A]` to Finagle's [[Reader]].
*/
trait EncodeStream[S[_[_], _], F[_], A] {

type ContentType <: String

def apply(s: S[F, A], cs: Charset): Reader[Buf]
}

object EncodeStream {

type Aux[S[_[_], _], F[_], A, CT <: String] =
EncodeStream[S, F, A] { type ContentType = CT }

type Json[S[_[_],_], F[_], A] = Aux[S, F, A, Application.Json]

type Text[S[_[_],_], F[_], A] = Aux[S, F, A, Text.Plain]
}


10 changes: 5 additions & 5 deletions core/src/main/scala/io/finch/Endpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,8 @@ object Endpoint {
* an `S[F, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def streamBinaryBody[F[_], S[_[_], _]](implicit
liftReader: LiftReader[S, F],
F: Effect[F]
liftReader: LiftReader[S, F],
F: Effect[F]
): Endpoint[F, S[F, Buf]] = {
new Endpoint[F, S[F, Buf]] {
final def apply(input: Input): Endpoint.Result[F, S[F, Buf]] = {
Expand All @@ -915,9 +915,9 @@ object Endpoint {
}

def streamJsonBody[F[_], S[_[_], _], A](implicit
streamDecoder: DecodeStream.Aux[S, F, A, Application.Json],
liftReader: LiftReader[S, F],
F: Effect[F]
streamDecoder: DecodeStream.Aux[S, F, A, Application.Json],
liftReader: LiftReader[S, F],
F: Effect[F]
): Endpoint[F, S[F, A]] = new Endpoint[F, S[F, A]] {
final def apply(input: Input): Result[F, S[F, A]] = {
streamBinaryBody.apply(input).map(streamDecoder(_, input.request.charsetOrUtf8))
Expand Down
66 changes: 51 additions & 15 deletions core/src/main/scala/io/finch/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.finch
import cats.Eq
import com.twitter.finagle.http.{Method, Request}
import com.twitter.finagle.netty3.ChannelBufferBuf
import com.twitter.io.Buf
import com.twitter.io.{Buf, Reader}
import java.nio.charset.{Charset, StandardCharsets}
import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpMethod, HttpVersion}
import org.jboss.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpPostRequestEncoder}
Expand All @@ -30,8 +30,21 @@ final case class Input(request: Request, route: List[String]) {
* ```
* import io.finch._, io.circe._
*
* val text: Input = Input.post("/").withBody[Text.Plain]("Text Body")
* val json: Input = Input.post("/").withBody[Application.Json](Map("json" -> "object"))
* val text = Input.post("/").withBody[Text.Plain]("Text Body")
* val json = Input.post("/").withBody[Application.Json](Map("json" -> "object"))
*```
*
* Also possible to create chunked inputs passing a stream as an argument.
*
*```
* import io.finch._, io.finch.iteratee._, cats.effect.IO, io.iteratee.Enumerator
* import io.finch.circe._, io.circe.generic.auto._
*
* val enumerateText = Enumerator.enumerate[IO, String]("foo", "bar")
* val text = Input.post("/").withBody[Text.Plain](enumerateText)
*
* val enumerateJson = Enumerate.enumerate[IO, Map[String, String]](Map("foo" - "bar"))
* val json = Input.post("/").withBody[Application.Json](enumerateJson)
*```
*/
def withBody[CT <: String]: Input.Body[CT] = new Input.Body[CT](this)
Expand Down Expand Up @@ -75,7 +88,7 @@ final case class Input(request: Request, route: List[String]) {
)
} else ChannelBufferBuf.Owned(req.getContent)

withBody[Application.WwwFormUrlencoded](content, Some(StandardCharsets.UTF_8))
withBody[Application.WwwFormUrlencoded](content, StandardCharsets.UTF_8)
}
}

Expand All @@ -84,12 +97,13 @@ final case class Input(request: Request, route: List[String]) {
*/
object Input {

private final def copyRequest(from: Request): Request = {
val to = Request()
to.version = from.version
to.method = from.method
private final def copyRequest(from: Request): Request =
copyRequestWithReader(from, from.reader)

private final def copyRequestWithReader(from: Request, reader: Reader[Buf]): Request = {
val to = Request(from.version, from.method, from.uri, reader)
to.setChunked(from.isChunked)
to.content = from.content
to.uri = from.uri
from.headerMap.foreach { case (k, v) => to.headerMap.put(k, v) }

to
Expand All @@ -99,17 +113,39 @@ object Input {
* A helper class that captures the `Content-Type` of the payload.
*/
class Body[CT <: String](i: Input) {
def apply[A](body: A, charset: Option[Charset] = None)(implicit
e: Encode.Aux[A, CT], w: Witness.Aux[CT]
): Input = {
val content = e(body, charset.getOrElse(StandardCharsets.UTF_8))
def apply[A](body: A)(implicit e: Encode.Aux[A, CT], w: Witness.Aux[CT]): Input =
apply[A](body, StandardCharsets.UTF_8)

def apply[A](body: A, charset: Charset)(implicit
e: Encode.Aux[A, CT], W: Witness.Aux[CT]
): Input = {
val content = e(body, charset)
val copied = copyRequest(i.request)

copied.setChunked(false)
copied.content = content
copied.contentType = w.value
copied.contentType = W.value
copied.contentLength = content.length.toLong
charset.foreach(cs => copied.charset = cs.displayName().toLowerCase)
copied.charset = charset.displayName().toLowerCase

Input(copied, i.route)
}

def apply[S[_[_], _], F[_], A](s: S[F, A])(implicit
S: EncodeStream.Aux[S, F, A, CT], W: Witness.Aux[CT]
): Input = apply[S, F, A](s, StandardCharsets.UTF_8)

def apply[S[_[_], _], F[_], A](s: S[F, A], charset: Charset)(implicit
S: EncodeStream.Aux[S, F, A, CT],
W: Witness.Aux[CT]
): Input = {
val content = S(s, charset)
val copied = copyRequestWithReader(i.request, content)

copied.setChunked(true)
copied.contentType = W.value
copied.headerMap.setUnsafe("Transfer-Encoding", "chunked")
copied.charset = charset.displayName().toLowerCase

Input(copied, i.route)
}
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/io/finch/LiftReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ import com.twitter.io.{Buf, Reader}
trait LiftReader[S[_[_], _], F[_]] {

def apply(reader: Reader[Buf]): S[F, Buf]

}

object LiftReader {

def instance[S[_[_], _], F[_]](fn: Reader[Buf] => S[F, Buf]): LiftReader[S, F] = new LiftReader[S, F] {
def apply(reader: Reader[Buf]): S[F, Buf] = fn(reader)
}

}
14 changes: 13 additions & 1 deletion core/src/main/scala/io/finch/ToResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ trait HighPriorityToResponseInstances extends LowPriorityToResponseInstances {
w: Witness.Aux[CT]
): Aux[A, CT] = instance { (a, cs) =>
val buf = e(a, cs)
val rep = Response()
val rep = Response(Version.Http11, Status.Ok)

if (!buf.isEmpty) {
rep.content = buf
Expand All @@ -85,6 +85,18 @@ trait HighPriorityToResponseInstances extends LowPriorityToResponseInstances {

rep
}

implicit def streamToResponse[S[_[_], _], F[_], A, CT <: String](implicit
E: EncodeStream.Aux[S, F, A, CT],
W: Witness.Aux[CT]
): Aux[S[F, A], CT] = instance { (a, cs) =>
val stream = E(a, cs)
val rep = Response(Version.Http11, Status.Ok, stream)
rep.headerMap.setUnsafe("Content-Type", W.value)
rep.headerMap.setUnsafe("Transfer-Encoding", "chunked")

rep
}
}

object ToResponse extends HighPriorityToResponseInstances {
Expand Down
38 changes: 30 additions & 8 deletions core/src/test/scala/io/finch/InputSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package io.finch
import java.nio.charset.Charset

import com.twitter.finagle.http.Method
import com.twitter.io.Buf
import com.twitter.io.{Buf, Pipe, Reader}
import com.twitter.util.{Await, Future}
import io.finch.data.Foo
import io.finch.internal.HttpContent

Expand Down Expand Up @@ -34,21 +35,42 @@ class InputSpec extends FinchSpec {
}
}

it should "add content through withBody" in {
it should "add fully-buffered content through withBody" in {
check { (i: Input, b: Buf) =>
i.withBody[Text.Plain](b).request.content === b
}
}

it should "add content corresponding to a class through withBody[JSON]" in {
implicit val encodeFoo: Encode.Json[Foo] = Encode.json(
(a, cs) => Buf.ByteArray.Owned(a.s.getBytes(cs.name))
)
it should "add chunked content throhg withBody" in {
type ListStream[F[_], A] = List[A]
implicit def listToReader[F[_], CT <: String]: EncodeStream.Aux[ListStream, F, Buf, CT] =
new EncodeStream[ListStream, F, Buf] {
type ContentType = CT

def apply(s: ListStream[F, Buf], cs: Charset): Reader[Buf] = {
val p = new Pipe[Buf]

def loop(from: List[Buf]): Future[Unit] = from match {
case h :: t => p.write(h).before(loop(t))
case _ => p.close()
}

loop(s)
p
}
}

check { (i: Input, s: List[Buf]) =>
val out = i.withBody[Application.OctetStream].apply[ListStream, Future, Buf](s).request.reader
s.forall(buf => buf == Await.result(out.read()).get)
}
}

it should "add content corresponding to a class through withBody[JSON]" in {
check { (i: Input, f: Foo, cs: Charset) =>
val input = i.withBody[Application.Json](f, Some(cs))
val input = i.withBody[Application.Json](f, cs)

input.request.content.asString(cs) === f.s &&
input.request.content.asString(cs) === s"""{s:"${f.s}"""" &&
input.request.contentType === Some(s"application/json;charset=${cs.displayName.toLowerCase}")
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/scala/io/finch/data/Foo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ object Foo {
)
)

implicit val encodeJsonFoo: Encode.Json[Foo] =
Encode.json((foo, cs) => Buf.ByteArray.Owned(s"""{s:"${foo.s}"""".getBytes(cs)))

implicit val arbitraryFoo: Arbitrary[Foo] =
Arbitrary(Gen.alphaStr.suchThat(_.nonEmpty).map(Foo.apply))
}
Expand Down
90 changes: 47 additions & 43 deletions fs2/src/main/scala/io/finch/fs2/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,67 @@ package io.finch

import _root_.fs2.Stream
import cats.effect.Effect
import com.twitter.finagle.http.Response
import com.twitter.io.Buf
import com.twitter.io.{Buf, Pipe, Reader, Writer}
import com.twitter.util.Future
import shapeless.Witness
import java.nio.charset.Charset

package object fs2 extends StreamInstances {

implicit def streamLiftReader[F[_] : Effect](implicit toEffect: ToEffect[Future, F]): LiftReader[Stream, F] =
LiftReader.instance { reader =>
Stream
.repeatEval(Effect[F].defer(toEffect(reader.read)))
.unNoneTerminate
.onFinalize(Effect[F].delay(reader.discard()))
}

implicit def streamToJsonResponse[F[_] : Effect, A](implicit
e: Encode.Aux[A, Application.Json],
w: Witness.Aux[Application.Json]
): ToResponse.Aux[Stream[F, A], Application.Json] = {
mkToResponse[F, A, Application.Json](delimiter = Some(ToResponse.NewLine))
implicit def streamLiftReader[F[_]](implicit
F: Effect[F],
TE: ToEffect[Future, F]
): LiftReader[Stream, F] = LiftReader.instance { reader =>
Stream
.repeatEval(F.defer(TE(reader.read)))
.unNoneTerminate
.onFinalize(F.delay(reader.discard()))
}

implicit def encodeJsonFs2Stream[F[_]: Effect, A](implicit
A: Encode.Json[A]
): EncodeStream.Json[Stream, F, A] =
new EncodeFs2Stream[F, A, Application.Json] {
protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs)
override protected def writeChunk(chunk: Buf, w: Writer[Buf]): Future[Unit] =
w.write(chunk.concat(ToResponse.NewLine))
}

implicit def encodeTextFs2Stream[F[_]: Effect, A](implicit
A: Encode.Text[A]
): EncodeStream.Text[Stream, F, A] =
new EncodeFs2Stream[F, A, Text.Plain] {
override protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs)
}
}

trait StreamInstances {
protected abstract class EncodeFs2Stream[F[_], A, CT <: String](implicit
F: Effect[F],
TE: ToEffect[Future, F]
) extends EncodeStream[Stream, F, A] {

implicit def streamToResponse[F[_] : Effect, A, CT <: String](implicit
e: Encode.Aux[A, CT],
w: Witness.Aux[CT],
toEffect: ToEffect[Future, F]
): ToResponse.Aux[Stream[F, A], CT] = {
mkToResponse[F, A, CT](delimiter = None)
}
type ContentType = CT

protected def encodeChunk(chunk: A, cs: Charset): Buf
protected def writeChunk(chunk: Buf, w: Writer[Buf]): Future[Unit] = w.write(chunk)

protected def mkToResponse[F[_] : Effect, A, CT <: String](delimiter: Option[Buf])(implicit
e: Encode.Aux[A, CT],
w: Witness.Aux[CT],
toEffect: ToEffect[Future, F]
): ToResponse.Aux[Stream[F, A], CT] = {
ToResponse.instance[Stream[F, A], CT]((stream, cs) => {
val response = Response()
response.setChunked(true)
response.contentType = w.value
val writer = response.writer
val effect = stream
.map(e.apply(_, cs))
.evalMap(buf => toEffect(writer.write(delimiter match {
case Some(d) => buf.concat(d)
case _ => buf
})))
.onFinalize(Effect[F].defer(toEffect(writer.close())))
def apply(s: Stream[F, A], cs: Charset): Reader[Buf] = {
val p = new Pipe[Buf]
val run = s
.map(chunk => encodeChunk(chunk, cs))
.evalMap(chunk => TE(writeChunk(chunk, p)))
.onFinalize(F.suspend(TE(p.close())))
.compile
.drain

Effect[F].toIO(effect).unsafeRunAsyncAndForget()
response
})
F.toIO(run).unsafeRunAsyncAndForget()
p
}
}

implicit def encodeBufFs2[F[_]: Effect, CT <: String]: EncodeStream.Aux[Stream, F, Buf, CT] =
new EncodeFs2Stream[F, Buf, CT] {
protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk
}

}
Loading

0 comments on commit 2e841ff

Please sign in to comment.