Skip to content

Commit

Permalink
Update monix 3.0.0-RC2
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigopalhares committed Jan 20, 2019
1 parent feeb2b3 commit 0b233d1
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ scalacOptions ++= {

libraryDependencies ++= Seq(
"org.sangria-graphql" %% "sangria-streaming-api" % "1.0.0",
"io.monix" %% "monix-execution" % "2.1.2",
"io.monix" %% "monix-reactive" % "2.1.2",
"io.monix" %% "monix-execution" % "3.0.0-RC2",
"io.monix" %% "monix-reactive" % "3.0.0-RC2",
"org.scalatest" %% "scalatest" % "3.0.1" % "test"
)

Expand Down
12 changes: 8 additions & 4 deletions src/main/scala/sangria/streaming/monix.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package sangria.streaming

import scala.language.higherKinds

import _root_.monix.execution.Scheduler
import _root_.monix.reactive._
import _root_.monix.eval.Task
import cats.effect.ExitCase

import scala.concurrent.Future

Expand All @@ -22,19 +23,22 @@ object monix {
source.mergeMap(a Observable.fromFuture(fn(a)))

def first[T](s: Observable[T]) =
s.firstOrElseL(throw new IllegalStateException("Promise was not completed - observable haven't produced any elements.")).runAsync
s.firstOrElseL(throw new IllegalStateException("Promise was not completed - observable haven't produced any elements.")).runToFuture

def failed[T](e: Throwable) = Observable.raiseError(e)

def onComplete[Ctx, Res](result: Observable[Res])(op: Unit) =
result.doOnTerminate(op)
result.guaranteeCase {
case ExitCase.Error(e) => Task(op)
case _ => Task(op)
}

def flatMapFuture[Ctx, Res, T](future: Future[T])(resultFn: T Observable[Res]) =
Observable.fromFuture(future).mergeMap(resultFn)

def merge[T](streams: Vector[Observable[T]]) =
if (streams.nonEmpty)
Observable.merge(streams: _*)
Observable(streams: _*).merge
else
throw new IllegalStateException("No streams produced!")

Expand Down
6 changes: 3 additions & 3 deletions src/test/scala/sangria/streaming/MonixIntegrationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class MonixIntegrationSpec extends WordSpec with Matchers {
subj.onNext(2)
subj.onComplete()

Await.ready(updated.toListL.runAsync, 2 seconds)
Await.ready(updated.toListL.runToFuture, 2 seconds)

count.get() should be (1)
}
Expand All @@ -72,7 +72,7 @@ class MonixIntegrationSpec extends WordSpec with Matchers {

subj.onError(new IllegalStateException("foo"))

Await.ready(updated.toListL.runAsync, 2 seconds)
Await.ready(updated.toListL.runToFuture, 2 seconds)

count.get() should be (1)
}
Expand Down Expand Up @@ -138,7 +138,7 @@ class MonixIntegrationSpec extends WordSpec with Matchers {
}

def res[T](obs: Observable[T]) =
Await.result(obs.toListL.runAsync, 2 seconds)
Await.result(obs.toListL.runToFuture, 2 seconds)

def res[T](f: Future[T]) =
Await.result(f, 2 seconds)
Expand Down

0 comments on commit 0b233d1

Please sign in to comment.