Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
vkostyukov committed Dec 22, 2018
1 parent b0c7402 commit eeee229
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 131 deletions.
25 changes: 25 additions & 0 deletions core/src/main/scala/io/finch/EncodeStreamToReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.finch

import com.twitter.io.{Buf, Reader}
import java.nio.charset.Charset

/**
*
*/
trait EncodeStreamToReader[S[_[_], _], F[_], A] {
type ContentType <: String

def apply(s: S[F, A], cs: Charset): Reader[Buf]
}

object EncodeStreamToReader {

type Aux[S[_[_], _], F[_], A, CT <: String] =
EncodeStreamToReader[S, F, A] { type ContentType = CT }

type Json[S[_[_],_], F[_], A] = Aux[S, F, A, Application.Json]

type Text[S[_[_],_], F[_], A] = Aux[S, F, A, Text.Plain]
}


62 changes: 49 additions & 13 deletions core/src/main/scala/io/finch/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.finch
import cats.Eq
import com.twitter.finagle.http.{Method, Request}
import com.twitter.finagle.netty3.ChannelBufferBuf
import com.twitter.io.Buf
import com.twitter.io.{Buf, Reader}
import java.nio.charset.{Charset, StandardCharsets}
import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpMethod, HttpVersion}
import org.jboss.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpPostRequestEncoder}
Expand Down Expand Up @@ -36,6 +36,17 @@ final case class Input(request: Request, route: List[String]) {
*/
def withBody[CT <: String]: Input.Body[CT] = new Input.Body[CT](this)


/**
* Returns a new `Input` wrapping a given stream as a payload. This requires content-type as a
* first type parameter (won't be inferred).
*
* ```
*
* ```
*/
def withStream[CT <: String]: Input.Stream[CT] = new Input.Stream[CT](this)

/**
* Returns the new `Input` with `headers` amended.
*/
Expand Down Expand Up @@ -75,7 +86,7 @@ final case class Input(request: Request, route: List[String]) {
)
} else ChannelBufferBuf.Owned(req.getContent)

withBody[Application.WwwFormUrlencoded](content, Some(StandardCharsets.UTF_8))
withBody[Application.WwwFormUrlencoded](content, StandardCharsets.UTF_8)
}
}

Expand All @@ -84,12 +95,13 @@ final case class Input(request: Request, route: List[String]) {
*/
object Input {

private final def copyRequest(from: Request): Request = {
val to = Request()
to.version = from.version
to.method = from.method
private final def copyRequest(from: Request): Request =
copyRequestWithReader(from, from.reader)

private final def copyRequestWithReader(from: Request, reader: Reader[Buf]): Request = {
val to = Request(from.version, from.method, from.uri, reader)
to.setChunked(from.isChunked)
to.content = from.content
to.uri = from.uri
from.headerMap.foreach { case (k, v) => to.headerMap.put(k, v) }

to
Expand All @@ -99,17 +111,41 @@ object Input {
* A helper class that captures the `Content-Type` of the payload.
*/
class Body[CT <: String](i: Input) {
def apply[A](body: A, charset: Option[Charset] = None)(implicit
e: Encode.Aux[A, CT], w: Witness.Aux[CT]
): Input = {
val content = e(body, charset.getOrElse(StandardCharsets.UTF_8))
def apply[A](body: A)(implicit e: Encode.Aux[A, CT], w: Witness.Aux[CT]): Input =
apply[A](body, StandardCharsets.UTF_8)

def apply[A](body: A, charset: Charset)(implicit
e: Encode.Aux[A, CT], W: Witness.Aux[CT]
): Input = {
val content = e(body, charset)
val copied = copyRequest(i.request)

copied.setChunked(false)
copied.content = content
copied.contentType = w.value
copied.contentType = W.value
copied.contentLength = content.length.toLong
charset.foreach(cs => copied.charset = cs.displayName().toLowerCase)
copied.charset = charset.displayName().toLowerCase

Input(copied, i.route)
}
}

class Stream[CT <: String](i: Input) {
def apply[S[_[_], _], F[_], A](s: S[F, A])(implicit
S: EncodeStreamToReader.Aux[S, F, A, CT], W: Witness.Aux[CT]
): Input = apply[S, F, A](s, StandardCharsets.UTF_8)

def apply[S[_[_], _], F[_], A](s: S[F, A], charset: Charset)(implicit
S: EncodeStreamToReader.Aux[S, F, A, CT],
W: Witness.Aux[CT]
): Input = {
val content = S(s, charset)
val copied = copyRequestWithReader(i.request, content)

copied.setChunked(true)
copied.contentType = W.value
copied.headerMap.setUnsafe("Transfer-Encoding", "chunked")
copied.charset = charset.displayName().toLowerCase

Input(copied, i.route)
}
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/io/finch/ToResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ trait HighPriorityToResponseInstances extends LowPriorityToResponseInstances {

rep
}

implicit def streamToResponse[S[_[_], _], F[_], A, CT <: String](implicit
E: EncodeStreamToReader.Aux[S, F, A, CT],
W: Witness.Aux[CT]
): Aux[S[F, A], CT] = instance { (a, cs) =>
val stream = E(a, cs)
val rep = Response(Version.Http11, Status.Ok, stream)
rep.headerMap.setUnsafe("Content-Type", W.value)

rep
}
}

object ToResponse extends HighPriorityToResponseInstances {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/io/finch/InputSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class InputSpec extends FinchSpec {
)

check { (i: Input, f: Foo, cs: Charset) =>
val input = i.withBody[Application.Json](f, Some(cs))
val input = i.withBody[Application.Json](f, cs)

input.request.content.asString(cs) === f.s &&
input.request.contentType === Some(s"application/json;charset=${cs.displayName.toLowerCase}")
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/scala/io/finch/data/Foo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ object Foo {
)
)

implicit val encodeJsonFoo: Encode.Json[Foo] =
Encode.json((foo, _) => Buf.Utf8(s"""{s:"${foo.s}""""))

implicit val arbitraryFoo: Arbitrary[Foo] =
Arbitrary(Gen.alphaStr.suchThat(_.nonEmpty).map(Foo.apply))
}
Expand Down
225 changes: 116 additions & 109 deletions iteratee/src/main/scala/io/finch/iteratee/package.scala
Original file line number Diff line number Diff line change
@@ -1,109 +1,116 @@
package io.finch

import cats.effect.Effect
import com.twitter.finagle.http.Response
import com.twitter.io._
import com.twitter.util.Future
import io.finch.internal._
import io.finch.items.RequestItem
import io.iteratee.{Enumerator, Iteratee}
import shapeless.Witness

/**
* Iteratee module
*/
package object iteratee extends IterateeInstances {


private[finch] def enumeratorFromReader[F[_] : Effect](reader: Reader[Buf]): Enumerator[F, Buf] = {
def rec(reader: Reader[Buf]): Enumerator[F, Buf] = {
Enumerator.liftM[F, Option[Buf]] {
futureToEffect(reader.read())
}.flatMap {
case None => Enumerator.empty[F, Buf]
case Some(buf) => Enumerator.enumOne[F, Buf](buf).append(rec(reader))
}
}
rec(reader).ensure(Effect[F].delay(reader.discard()))
}

/**
* An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as
* an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def enumeratorBody[F[_] : Effect, A, CT <: String](implicit
decode: Enumerate.Aux[F, A, CT]
): Endpoint[F, Enumerator[F, A]] = new Endpoint[F, Enumerator[F, A]] {
final def apply(input: Input): Endpoint.Result[F, Enumerator[F, A]] = {
if (!input.request.isChunked) EndpointResult.NotMatched[F]
else {
val req = input.request
EndpointResult.Matched(
input,
Trace.empty,
Effect[F].pure(Output.payload(decode(enumeratorFromReader(req.reader), req.charsetOrUtf8)))
)
}
}

final override def item: RequestItem = items.BodyItem
final override def toString: String = "enumeratorBody"
}

/**
* An evaluating [[Endpoint]] that reads a required chunked streaming JSON body, interpreted as
* an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def enumeratorJsonBody[F[_] : Effect, A](implicit
ad: Enumerate.Aux[F, A, Application.Json]
): Endpoint[F, Enumerator[F, A]] = enumeratorBody[F, A, Application.Json].withToString("enumeratorJsonBody")

}

trait IterateeInstances extends LowPriorityInstances {

implicit def enumeratorToJsonResponse[F[_] : Effect, A](implicit
e: Encode.Aux[A, Application.Json],
w: Witness.Aux[Application.Json]
): ToResponse.Aux[Enumerator[F, A], Application.Json] = {
withCustomIteratee[F, A, Application.Json](writer =>
Iteratee.foreachM[F, Buf]((buf: Buf) => futureToEffect(writer.write(buf.concat(ToResponse.NewLine))))
)
}
}

trait LowPriorityInstances {

protected def futureToEffect[F[_] : Effect, A](future: => Future[A]): F[A] = {
Effect[F].async[A](cb => {
future
.onFailure(t => cb(Left(t)))
.onSuccess(b => cb(Right(b)))
})
}

implicit def enumeratorToResponse[F[_] : Effect, A, CT <: String](implicit
e: Encode.Aux[A, CT],
w: Witness.Aux[CT]
): ToResponse.Aux[Enumerator[F, A], CT] = {
withCustomIteratee(writer => Iteratee.foreachM[F, Buf]((buf: Buf) => futureToEffect(writer.write(buf))))
}

protected def withCustomIteratee[F[_] : Effect, A, CT <: String]
(iteratee: Writer[Buf] => Iteratee[F, Buf, Unit])(implicit
e: Encode.Aux[A, CT],
w: Witness.Aux[CT]
): ToResponse.Aux[Enumerator[F, A], CT] = {
ToResponse.instance[Enumerator[F, A], CT]((enum, cs) => {
val response = Response()
response.setChunked(true)
response.contentType = w.value
val writer = response.writer
val stream = {
enum.ensure(Effect[F].suspend(futureToEffect(writer.close()))).map(e.apply(_, cs)).into(iteratee(writer))
}
Effect[F].toIO(stream).unsafeRunAsyncAndForget()
response
})
}
}
package io.finch

import cats.effect.Effect
import com.twitter.io._
import com.twitter.util.Future
import io.finch.internal._
import io.finch.items.RequestItem
import io.iteratee.{Enumerator, Iteratee}
import java.nio.charset.Charset

/**
* Iteratee module
*/
package object iteratee extends IterateeInstances {

private[finch] def enumeratorFromReader[F[_] : Effect](reader: Reader[Buf]): Enumerator[F, Buf] = {
def loop(): Enumerator[F, Buf] = {
Enumerator
.liftM[F, Option[Buf]](toEffect[F, Option[Buf]](reader.read()))
.flatMap {
case None => Enumerator.empty[F, Buf]
case Some(buf) => Enumerator.enumOne[F, Buf](buf).append(loop())
}
}

loop().ensure(Effect[F].delay(reader.discard()))
}

/**
* An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as
* an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def enumeratorBody[F[_] : Effect, A, CT <: String](implicit
decode: Enumerate.Aux[F, A, CT]
): Endpoint[F, Enumerator[F, A]] = new Endpoint[F, Enumerator[F, A]] {
final def apply(input: Input): Endpoint.Result[F, Enumerator[F, A]] = {
if (!input.request.isChunked) EndpointResult.NotMatched[F]
else {
val req = input.request
EndpointResult.Matched(
input,
Trace.empty,
Effect[F].pure(Output.payload(decode(enumeratorFromReader(req.reader), req.charsetOrUtf8)))
)
}
}

final override def item: RequestItem = items.BodyItem
final override def toString: String = "enumeratorBody"
}

/**
* An evaluating [[Endpoint]] that reads a required chunked streaming JSON body, interpreted as
* an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def enumeratorJsonBody[F[_] : Effect, A](implicit
ad: Enumerate.Aux[F, A, Application.Json]
): Endpoint[F, Enumerator[F, A]] = enumeratorBody[F, A, Application.Json].withToString("enumeratorJsonBody")

}

trait IterateeInstances extends LowPriorityIterateeInstances {

implicit def encodeJsonEnumeratorToReader[F[_]: Effect, A](implicit
A: Encode.Json[A]
): EncodeStreamToReader.Json[Enumerator, F, A] =
new EncodeEnumeratorToReader[F, A, Application.Json] {
protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs)
override protected def writeChunk(chunk: Buf, w: Writer[Buf]): Future[Unit] =
w.write(chunk.concat(ToResponse.NewLine))
}

implicit def encodeTextEnumeratorToReader[F[_]: Effect, A](implicit
A: Encode.Text[A]
): EncodeStreamToReader.Text[Enumerator, F, A] =
new EncodeEnumeratorToReader[F, A, Text.Plain] {
override protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs)
}
}

trait LowPriorityIterateeInstances {

protected def toEffect[F[_], A](f: => Future[A])(implicit F: Effect[F]): F[A] =
F.async[A](
cb => f.onFailure(t => cb(Left(t))).onSuccess(b => cb(Right(b)))
)

protected abstract class EncodeEnumeratorToReader[F[_], A, CT <: String](implicit
F: Effect[F]
) extends EncodeStreamToReader[Enumerator, F, A] {

type ContentType = CT

protected def encodeChunk(chunk: A, cs: Charset): Buf
protected def writeChunk(chunk: Buf, w: Writer[Buf]): Future[Unit] = w.write(chunk)

private def writeIteratee(w: Writer[Buf]): Iteratee[F, Buf, Unit] =
Iteratee.foreachM[F, Buf](chunk => toEffect[F, Unit](writeChunk(chunk, w)))

def apply(s: Enumerator[F, A], cs: Charset): Reader[Buf] = {
val p = new Pipe[Buf]
val run = s
.ensure(F.suspend(toEffect[F, Unit](p.close())))
.map(chunk => encodeChunk(chunk, cs))
.into(writeIteratee(p))

F.toIO(run).unsafeRunAsyncAndForget()
p
}
}

implicit def encodeBufEnumeratorToReader[F[_]: Effect, CT <: String]: EncodeStreamToReader.Aux[Enumerator, F, Buf, CT] =
new EncodeEnumeratorToReader[F, Buf, CT] {
protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk
}
}
Loading

0 comments on commit eeee229

Please sign in to comment.