From 0b233d1002d566e8624d0126f5abe86b384ef153 Mon Sep 17 00:00:00 2001 From: Rodrigo Palhares <1131748+rodrigopalhares@users.noreply.github.com> Date: Sat, 19 Jan 2019 23:16:13 -0200 Subject: [PATCH] Update monix 3.0.0-RC2 --- build.sbt | 4 ++-- src/main/scala/sangria/streaming/monix.scala | 12 ++++++++---- .../sangria/streaming/MonixIntegrationSpec.scala | 6 +++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/build.sbt b/build.sbt index 4977062..d72a7a2 100644 --- a/build.sbt +++ b/build.sbt @@ -20,8 +20,8 @@ scalacOptions ++= { libraryDependencies ++= Seq( "org.sangria-graphql" %% "sangria-streaming-api" % "1.0.0", - "io.monix" %% "monix-execution" % "2.1.2", - "io.monix" %% "monix-reactive" % "2.1.2", + "io.monix" %% "monix-execution" % "3.0.0-RC2", + "io.monix" %% "monix-reactive" % "3.0.0-RC2", "org.scalatest" %% "scalatest" % "3.0.1" % "test" ) diff --git a/src/main/scala/sangria/streaming/monix.scala b/src/main/scala/sangria/streaming/monix.scala index 027588c..ea856a6 100644 --- a/src/main/scala/sangria/streaming/monix.scala +++ b/src/main/scala/sangria/streaming/monix.scala @@ -1,9 +1,10 @@ package sangria.streaming import scala.language.higherKinds - import _root_.monix.execution.Scheduler import _root_.monix.reactive._ +import _root_.monix.eval.Task +import cats.effect.ExitCase import scala.concurrent.Future @@ -22,19 +23,22 @@ object monix { source.mergeMap(a ⇒ Observable.fromFuture(fn(a))) def first[T](s: Observable[T]) = - s.firstOrElseL(throw new IllegalStateException("Promise was not completed - observable haven't produced any elements.")).runAsync + s.firstOrElseL(throw new IllegalStateException("Promise was not completed - observable haven't produced any elements.")).runToFuture def failed[T](e: Throwable) = Observable.raiseError(e) def onComplete[Ctx, Res](result: Observable[Res])(op: ⇒ Unit) = - result.doOnTerminate(op) + result.guaranteeCase { + case ExitCase.Error(e) => Task(op) + case _ => Task(op) + } def flatMapFuture[Ctx, Res, T](future: Future[T])(resultFn: T ⇒ Observable[Res]) = Observable.fromFuture(future).mergeMap(resultFn) def merge[T](streams: Vector[Observable[T]]) = if (streams.nonEmpty) - Observable.merge(streams: _*) + Observable(streams: _*).merge else throw new IllegalStateException("No streams produced!") diff --git a/src/test/scala/sangria/streaming/MonixIntegrationSpec.scala b/src/test/scala/sangria/streaming/MonixIntegrationSpec.scala index 54d807d..7b526ae 100644 --- a/src/test/scala/sangria/streaming/MonixIntegrationSpec.scala +++ b/src/test/scala/sangria/streaming/MonixIntegrationSpec.scala @@ -58,7 +58,7 @@ class MonixIntegrationSpec extends WordSpec with Matchers { subj.onNext(2) subj.onComplete() - Await.ready(updated.toListL.runAsync, 2 seconds) + Await.ready(updated.toListL.runToFuture, 2 seconds) count.get() should be (1) } @@ -72,7 +72,7 @@ class MonixIntegrationSpec extends WordSpec with Matchers { subj.onError(new IllegalStateException("foo")) - Await.ready(updated.toListL.runAsync, 2 seconds) + Await.ready(updated.toListL.runToFuture, 2 seconds) count.get() should be (1) } @@ -138,7 +138,7 @@ class MonixIntegrationSpec extends WordSpec with Matchers { } def res[T](obs: Observable[T]) = - Await.result(obs.toListL.runAsync, 2 seconds) + Await.result(obs.toListL.runToFuture, 2 seconds) def res[T](f: Future[T]) = Await.result(f, 2 seconds)