Skip to content

Commit

Permalink
Added monix.Observable implementation
Browse files Browse the repository at this point in the history
This is basically a conversion to/from the internal fs2.Stream.
  • Loading branch information
Laurence Lavigne committed Apr 24, 2018
1 parent bb9f8a4 commit 3666518
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 95 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ lazy val `http-server` = project
.dependsOn(common % "compile->compile;test->test")
.dependsOn(internal)
.dependsOn(client % "test->test")
.dependsOn(server % "test->test")
.settings(moduleName := "frees-rpc-http-server")
.settings(rpcHttpServerSettings)
.disablePlugins(ScriptedPlugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@

package freestyle.rpc.http

import cats.Applicative
import cats.effect._
import cats.syntax.applicative._
import freestyle.rpc.protocol._
import fs2.Stream

class GreeterHandler[F[_]: Sync] extends Greeter[F] {
class UnaryGreeterHandler[F[_]: Applicative] extends UnaryGreeter[F] {

import cats.syntax.applicative._
import freestyle.rpc.protocol.Empty

def getHello(request: Empty.type): F[HelloResponse] = HelloResponse("hey").pure

def sayHello(request: HelloRequest): F[HelloResponse] = HelloResponse(request.hello).pure
}

class Fs2GreeterHandler[F[_]: Sync] extends Fs2Greeter[F] {

import fs2.Stream

def sayHellos(requests: Stream[F, HelloRequest]): F[HelloResponse] =
requests.compile.fold(HelloResponse("")) {
Expand All @@ -35,8 +41,30 @@ class GreeterHandler[F[_]: Sync] extends Greeter[F] {
}

def sayHelloAll(request: HelloRequest): Stream[F, HelloResponse] =
fs2.Stream(HelloResponse(request.hello), HelloResponse(request.hello))
Stream(HelloResponse(request.hello), HelloResponse(request.hello))

def sayHellosAll(requests: Stream[F, HelloRequest]): Stream[F, HelloResponse] =
requests.map(request => HelloResponse(request.hello))
}

class MonixGreeterHandler[F[_]: Async](implicit sc: monix.execution.Scheduler)
extends MonixGreeter[F] {

import freestyle.rpc.server.implicits._
import monix.reactive.Observable

def sayHellos(requests: Observable[HelloRequest]): F[HelloResponse] =
requests
.foldLeftL(HelloResponse("")) {
case (response, request) =>
HelloResponse(
if (response.hello.isEmpty) request.hello else s"${response.hello}, ${request.hello}")
}
.to[F]

def sayHelloAll(request: HelloRequest): Observable[HelloResponse] =
Observable(HelloResponse(request.hello), HelloResponse(request.hello))

def sayHellosAll(requests: Observable[HelloRequest]): Observable[HelloResponse] =
requests.map(request => HelloResponse(request.hello))
}
65 changes: 0 additions & 65 deletions modules/http/server/src/test/scala/GreeterRestClient.scala

This file was deleted.

108 changes: 108 additions & 0 deletions modules/http/server/src/test/scala/GreeterRestClients.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2017-2018 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package freestyle.rpc.http

import cats.effect._
import fs2.Stream
import io.circe.generic.auto._
import io.circe.syntax._
import org.http4s._
import org.http4s.circe._
import org.http4s.client._
import org.http4s.dsl.io._

class UnaryGreeterRestClient[F[_]: Effect](uri: Uri) {

private implicit val responseDecoder: EntityDecoder[F, HelloResponse] = jsonOf[F, HelloResponse]

def getHello()(implicit client: Client[F]): F[HelloResponse] = {
val request = Request[F](Method.GET, uri / "getHello")
client.expect[HelloResponse](request)
}

def sayHello(arg: HelloRequest)(implicit client: Client[F]): F[HelloResponse] = {
val request = Request[F](Method.POST, uri / "sayHello")
client.expect[HelloResponse](request.withBody(arg.asJson))
}

}

class Fs2GreeterRestClient[F[_]: Effect](uri: Uri) {

import freestyle.rpc.http.RestClient._

private implicit val responseDecoder: EntityDecoder[F, HelloResponse] = jsonOf[F, HelloResponse]

def sayHellos(arg: Stream[F, HelloRequest])(implicit client: Client[F]): F[HelloResponse] = {
val request = Request[F](Method.POST, uri / "sayHellos")
client.expect[HelloResponse](request.withBody(arg.map(_.asJson)))
}

def sayHelloAll(arg: HelloRequest)(implicit client: Client[F]): Stream[F, HelloResponse] = {
val request = Request[F](Method.POST, uri / "sayHelloAll")
client.streaming(request.withBody(arg.asJson))(_.asStream[HelloResponse])
}

def sayHellosAll(arg: Stream[F, HelloRequest])(
implicit client: Client[F]): Stream[F, HelloResponse] = {
val request = Request[F](Method.POST, uri / "sayHellosAll")
client.streaming(request.withBody(arg.map(_.asJson)))(_.asStream[HelloResponse])
}

}

class MonixGreeterRestClient[F[_]: Effect](uri: Uri)(implicit sc: monix.execution.Scheduler) {

import freestyle.rpc.http.RestClient._
import freestyle.rpc.http.Util._
import monix.reactive.Observable

private implicit val responseDecoder: EntityDecoder[F, HelloResponse] = jsonOf[F, HelloResponse]

def sayHellos(arg: Observable[HelloRequest])(implicit client: Client[F]): F[HelloResponse] = {
val request = Request[F](Method.POST, uri / "sayHellos")
client.expect[HelloResponse](request.withBody(arg.toStream.map(_.asJson)))
}

def sayHelloAll(arg: HelloRequest)(implicit client: Client[F]): Observable[HelloResponse] = {
val request = Request[F](Method.POST, uri / "sayHelloAll")
client.streaming(request.withBody(arg.asJson))(_.asStream[HelloResponse]).toObservable
}

def sayHellosAll(arg: Observable[HelloRequest])(
implicit client: Client[F]): Observable[HelloResponse] = {
val request = Request[F](Method.POST, uri / "sayHellosAll")
client
.streaming(request.withBody(arg.toStream.map(_.asJson)))(_.asStream[HelloResponse])
.toObservable
}

}

object RestClient {

import io.circe.Decoder
import io.circe.jawn.CirceSupportParser.facade
import jawnfs2._

implicit class ResponseOps[F[_]](response: Response[F]) {

def asStream[A](implicit decoder: Decoder[A]): Stream[F, A] =
if (response.status.code != 200) throw UnexpectedStatus(response.status)
else response.body.chunks.parseJsonStream.map(_.as[A]).rethrow
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,19 @@
package freestyle.rpc.http

import cats.effect._
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.functor._
import freestyle.rpc.protocol.Empty
import fs2.Stream
import jawn.ParseException
import io.circe._
import io.circe.Decoder
import io.circe.generic.auto._
import io.circe.jawn.CirceSupportParser.facade
import io.circe.syntax._
import jawnfs2._
import org.http4s._
import org.http4s.circe._
import org.http4s.dsl.Http4sDsl

class GreeterRestService[F[_]: Sync](handler: Greeter[F]) extends Http4sDsl[F] {
class UnaryGreeterRestService[F[_]: Sync](handler: UnaryGreeter[F]) extends Http4sDsl[F] {

import freestyle.rpc.http.GreeterRestService._
import freestyle.rpc.protocol.Empty

private implicit val requestDecoder: EntityDecoder[F, HelloRequest] = jsonOf[F, HelloRequest]

Expand All @@ -47,6 +42,16 @@ class GreeterRestService[F[_]: Sync](handler: Greeter[F]) extends Http4sDsl[F] {
request <- msg.as[HelloRequest]
response <- Ok(handler.sayHello(request).map(_.asJson))
} yield response
}
}

class Fs2GreeterRestService[F[_]: Sync](handler: Fs2Greeter[F]) extends Http4sDsl[F] {

import freestyle.rpc.http.RestService._

private implicit val requestDecoder: EntityDecoder[F, HelloRequest] = jsonOf[F, HelloRequest]

def service: HttpService[F] = HttpService[F] {

case msg @ POST -> Root / "sayHellos" =>
for {
Expand All @@ -65,14 +70,51 @@ class GreeterRestService[F[_]: Sync](handler: Greeter[F]) extends Http4sDsl[F] {
requests <- msg.asStream[HelloRequest]
responses <- Ok(handler.sayHellosAll(requests).map(_.asJson))
} yield responses
}
}

class MonixGreeterRestService[F[_]: Effect](handler: MonixGreeter[F])(
implicit sc: monix.execution.Scheduler)
extends Http4sDsl[F] {

import freestyle.rpc.http.RestService._
import freestyle.rpc.http.Util._

private implicit val requestDecoder: EntityDecoder[F, HelloRequest] = jsonOf[F, HelloRequest]

def service: HttpService[F] = HttpService[F] {

case msg @ POST -> Root / "sayHellos" =>
for {
requests <- msg.asStream[HelloRequest]
observable = requests.toObservable
response <- Ok(handler.sayHellos(observable).map(_.asJson))
} yield response

case msg @ POST -> Root / "sayHelloAll" =>
for {
request <- msg.as[HelloRequest]
responses <- Ok(handler.sayHelloAll(request).map(_.asJson).toStream)
} yield responses

case msg @ POST -> Root / "sayHellosAll" =>
for {
requests <- msg.asStream[HelloRequest]
obsResponses = handler.sayHellosAll(requests.toObservable).map(_.asJson)
responses <- Ok(obsResponses.toStream)
} yield responses
}
}

object GreeterRestService {
object RestService {

implicit class RequestOps[F[_]: Sync](request: Request[F]) {

import cats.syntax.applicative._
import io.circe.jawn.CirceSupportParser.facade
import jawnfs2._
import _root_.jawn.ParseException

def asStream[A](implicit decoder: Decoder[A]): F[Stream[F, A]] =
request.body.chunks.parseJsonStream
.map(_.as[A])
Expand Down
Loading

0 comments on commit 3666518

Please sign in to comment.