Skip to content

Commit

Permalink
upgrade zio version (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamgfraser authored May 4, 2022
1 parent e050c84 commit 17a05bf
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 64 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ For this reason `toZIOSink` returns a tuple of a callback and a `Sink`. The call

```scala mdoc
val asSink = subscriber.toZIOSink[Throwable]
val failingStream = Stream.range(3, 13) ++ Stream.fail(new RuntimeException("boom!"))
val failingStream = ZStream.range(3, 13) ++ ZStream.fail(new RuntimeException("boom!"))
runtime.unsafeRun(
asSink.use { case (signalError, sink) =>
failingStream.run(sink).catchAll(signalError)
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Global / onChangedBuildSource := ReloadOnSourceChanges
addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")

val zioVersion = "2.0.0-RC5"
val zioVersion = "2.0.0-RC6"
val rsVersion = "1.0.3"
val collCompatVersion = "2.7.0"

Expand Down
36 changes: 18 additions & 18 deletions src/main/scala/zio/interop/reactivestreams/Adapters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object Adapters {

def streamToPublisher[R, E <: Throwable, O](
stream: => ZStream[R, E, O]
)(implicit trace: ZTraceElement): ZIO[R, Nothing, Publisher[O]] =
)(implicit trace: Trace): ZIO[R, Nothing, Publisher[O]] =
ZIO.runtime.map { runtime => subscriber =>
if (subscriber == null) {
throw new NullPointerException("Subscriber must not be null.")
Expand All @@ -35,7 +35,7 @@ object Adapters {

def subscriberToSink[E <: Throwable, I](
subscriber: => Subscriber[I]
)(implicit trace: ZTraceElement): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = {
)(implicit trace: Trace): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = {
val sub = subscriber
for {
error <- Promise.make[E, Nothing]
Expand All @@ -48,7 +48,7 @@ object Adapters {
def publisherToStream[O](
publisher: => Publisher[O],
bufferSize: => Int
)(implicit trace: ZTraceElement): ZStream[Any, Throwable, O] = {
)(implicit trace: Trace): ZStream[Any, Throwable, O] = {

val pullOrFail =
for {
Expand All @@ -66,7 +66,7 @@ object Adapters {
def sinkToSubscriber[R, I, L, Z](
sink: => ZSink[R, Throwable, I, L, Z],
bufferSize: => Int
)(implicit trace: ZTraceElement): ZIO[R with Scope, Throwable, (Subscriber[I], IO[Throwable, Z])] =
)(implicit trace: Trace): ZIO[R with Scope, Throwable, (Subscriber[I], IO[Throwable, Z])] =
for {
subscriberP <- makeSubscriber[I](bufferSize)
(subscriber, p) = subscriberP
Expand Down Expand Up @@ -145,19 +145,19 @@ object Adapters {

override def await(): IO[Option[Throwable], Unit] =
done match {
case Some(value) => IO.fail(value)
case Some(value) => ZIO.fail(value)
case None =>
val p = Promise.unsafeMake[Option[Throwable], Unit](FiberId.None)
toNotify = Some(p)
// An element has arrived in the meantime, we do not need to start waiting.
if (!q.isEmpty()) {
toNotify = None
IO.unit
ZIO.unit
} else
done.fold(p.await) { e =>
// The producer has canceled or errored in the meantime.
toNotify = None
IO.fail(e)
ZIO.fail(e)
}
}

Expand All @@ -166,7 +166,7 @@ object Adapters {
override def onSubscribe(s: Subscription): Unit =
if (s == null) {
val e = new NullPointerException("s was null in onSubscribe")
p.unsafeDone(IO.fail(e))
p.unsafeDone(ZIO.fail(e))
throw e
} else {
val shouldCancel = isSubscribedOrInterrupted.getAndSet(true)
Expand All @@ -181,7 +181,7 @@ object Adapters {
failNPE("t was null in onNext")
} else {
q.offer(t)
toNotify.foreach(_.unsafeDone(IO.unit))
toNotify.foreach(_.unsafeDone(ZIO.unit))
}

override def onError(e: Throwable): Unit =
Expand All @@ -192,7 +192,7 @@ object Adapters {

override def onComplete(): Unit = {
done = Some(None)
toNotify.foreach(_.unsafeDone(IO.fail(None)))
toNotify.foreach(_.unsafeDone(ZIO.fail(None)))
}

private def failNPE(msg: String) = {
Expand All @@ -203,7 +203,7 @@ object Adapters {

private def fail(e: Throwable) = {
done = Some(Some(e))
toNotify.foreach(_.unsafeDone(IO.fail(Some(e))))
toNotify.foreach(_.unsafeDone(ZIO.fail(Some(e))))
}

}
Expand All @@ -217,12 +217,12 @@ object Adapters {
): ZSink[Any, Nothing, I, I, Unit] =
ZSink
.foldChunksZIO[Any, Nothing, I, Boolean](true)(identity) { (_, chunk) =>
IO
ZIO
.iterate(chunk)(!_.isEmpty) { chunk =>
subscription
.offer(chunk.size)
.flatMap { acceptedCount =>
UIO
ZIO
.foreach(chunk.take(acceptedCount))(a => ZIO.succeed(subscriber.onNext(a)))
.as(chunk.drop(acceptedCount))
}
Expand Down Expand Up @@ -252,7 +252,7 @@ object Adapters {
var result: IO[Unit, Int] = null
state.updateAndGet {
case `canceled` =>
result = IO.fail(())
result = ZIO.fail(())
canceled
case State(0L, _) =>
val p = Promise.unsafeMake[Unit, Int](FiberId.None)
Expand All @@ -261,7 +261,7 @@ object Adapters {
case State(requestedCount, _) =>
val newRequestedCount = Math.max(requestedCount - n, 0L)
val accepted = Math.min(requestedCount, n.toLong).toInt
result = IO.succeedNow(accepted)
result = ZIO.succeedNow(accepted)
requested(newRequestedCount)
}
result
Expand All @@ -279,7 +279,7 @@ object Adapters {
val newRequestedCount = requestedCount + n
val accepted = Math.min(offered.toLong, newRequestedCount)
val remaining = newRequestedCount - accepted
notification = () => toNotify.unsafeDone(IO.succeedNow(accepted.toInt))
notification = () => toNotify.unsafeDone(ZIO.succeedNow(accepted.toInt))
requested(remaining)
case State(requestedCount, _) if ((Long.MaxValue - n) > requestedCount) =>
requested(requestedCount + n)
Expand All @@ -290,11 +290,11 @@ object Adapters {
}

override def cancel(): Unit =
state.getAndSet(canceled).toNotify.foreach { case (_, p) => p.unsafeDone(IO.fail(())) }
state.getAndSet(canceled).toNotify.foreach { case (_, p) => p.unsafeDone(ZIO.fail(())) }
}

private def fromPull[R, E, A](zio: ZIO[R with Scope, Nothing, ZIO[R, Option[E], Chunk[A]]])(implicit
trace: ZTraceElement
trace: Trace
): ZStream[R, E, A] =
ZStream.unwrapScoped[R](zio.map(pull => ZStream.repeatZIOChunkOption(pull)))
}
10 changes: 5 additions & 5 deletions src/main/scala/zio/interop/reactivestreams/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.interop

import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import zio.{ Scope, UIO, Task, ZIO, ZTraceElement }
import zio.{ Scope, UIO, Task, ZIO, Trace }
import zio.stream.ZSink
import zio.stream.ZStream

Expand All @@ -13,7 +13,7 @@ package object reactivestreams {
/** Create a `Publisher` from a `Stream`. Every time the `Publisher` is subscribed to, a new instance of the
* `Stream` is run.
*/
def toPublisher(implicit trace: ZTraceElement): ZIO[R, Nothing, Publisher[O]] =
def toPublisher(implicit trace: Trace): ZIO[R, Nothing, Publisher[O]] =
Adapters.streamToPublisher(stream)
}

Expand All @@ -26,7 +26,7 @@ package object reactivestreams {
* The size used as internal buffer. If possible, set to a power of 2 value for best performance.
*/
def toSubscriber(qSize: Int = 16)(implicit
trace: ZTraceElement
trace: Trace
): ZIO[R with Scope, Throwable, (Subscriber[A], Task[Z])] =
Adapters.sinkToSubscriber(sink, qSize)
}
Expand All @@ -37,7 +37,7 @@ package object reactivestreams {
* @param qSize
* The size used as internal buffer. If possible, set to a power of 2 value for best performance.
*/
def toZIOStream(qSize: Int = 16)(implicit trace: ZTraceElement): ZStream[Any, Throwable, O] =
def toZIOStream(qSize: Int = 16)(implicit trace: Trace): ZStream[Any, Throwable, O] =
Adapters.publisherToStream(publisher, qSize)
}

Expand All @@ -54,7 +54,7 @@ package object reactivestreams {
* ```
*/
def toZIOSink[E <: Throwable](implicit
trace: ZTraceElement
trace: Trace
): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] =
Adapters.subscriberToSink(subscriber)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import zio.Chunk
import zio.Exit
import zio.Fiber
import zio.Promise
import zio.Runtime
import zio.Supervisor
import zio.Task
import zio.UIO
import zio.ZEnvironment
import zio.ZIO
import zio.ZTraceElement
import zio.Trace
import zio.durationInt
import zio.stream.Sink
import zio.stream.Stream
import zio.stream.ZSink
import zio.stream.ZStream
import zio.test.Assertion._
import zio.test._

Expand All @@ -26,13 +27,13 @@ object PublisherToStreamSpec extends ZIOSpecDefault {
override def spec =
suite("Converting a `Publisher` to a `Stream`")(
test("works with a well behaved `Publisher`") {
assertM(publish(seq, None))(succeeds(equalTo(seq)))
assertZIO(publish(seq, None))(succeeds(equalTo(seq)))
},
test("fails with an initially failed `Publisher`") {
assertM(publish(Chunk.empty, Some(e)))(fails(equalTo(e)))
assertZIO(publish(Chunk.empty, Some(e)))(fails(equalTo(e)))
},
test("fails with an eventually failing `Publisher`") {
assertM(publish(seq, Some(e)))(fails(equalTo(e)))
assertZIO(publish(seq, Some(e)))(fails(equalTo(e)))
},
test("does not fail a fiber on failing `Publisher`") {

Expand All @@ -51,7 +52,7 @@ object PublisherToStreamSpec extends ZIOSpecDefault {

@transient var failedAFiber = false

def value(implicit trace: ZTraceElement): UIO[Boolean] =
def value(implicit trace: Trace): UIO[Boolean] =
ZIO.succeed(failedAFiber)

def unsafeOnStart[R, E, A](
Expand All @@ -67,24 +68,23 @@ object PublisherToStreamSpec extends ZIOSpecDefault {
}

for {
outerRuntime <- ZIO.runtime[Any]
runtime = outerRuntime.mapRuntimeConfig(_.copy(supervisor = supervisor))
exit <- runtime.run(publisher.toZIOStream().runDrain.exit)
failed <- supervisor.value
runtime <- Runtime.addSupervisor(supervisor).toRuntime
exit <- runtime.run(publisher.toZIOStream().runDrain.exit)
failed <- supervisor.value
} yield assert(exit)(fails(anything)) && assert(failed)(isFalse)

},
test("does not freeze on stream end") {
withProbe(probe =>
for {
fiber <- Stream
fiber <- ZStream
.fromZIO(
ZIO.succeed(
probe.toZIOStream()
)
)
.flatMap(identity)
.run(Sink.collectAll[Int])
.run(ZSink.collectAll[Int])
.fork
_ <- ZIO.attemptBlockingInterrupt(probe.expectRequest())
_ <- ZIO.succeed(probe.sendNext(1))
Expand Down Expand Up @@ -112,11 +112,11 @@ object PublisherToStreamSpec extends ZIOSpecDefault {
_ <- ZIO.succeed(subscriber.onSubscribe(subscription))
_ <- cancelledLatch.await
} yield ()
assertM(tst.exit)(succeeds(anything))
assertZIO(tst.exit)(succeeds(anything))
} @@ TestAspect.nonFlaky @@ TestAspect.timeout(60.seconds),
test("cancels subscription when interrupted after subscription") {
withProbe(probe =>
assertM((for {
assertZIO((for {
fiber <- probe.toZIOStream(bufferSize).runDrain.fork
_ <- ZIO.attemptBlockingInterrupt(probe.expectRequest())
_ <- fiber.interrupt
Expand All @@ -128,7 +128,7 @@ object PublisherToStreamSpec extends ZIOSpecDefault {
} @@ TestAspect.nonFlaky @@ TestAspect.timeout(60.seconds),
test("cancels subscription when interrupted during consumption") {
withProbe(probe =>
assertM((for {
assertZIO((for {
fiber <- probe.toZIOStream(bufferSize).runDrain.fork
demand <- ZIO.attemptBlockingInterrupt(probe.expectRequest())
_ <- ZIO.attempt((1 to demand.toInt).foreach(i => probe.sendNext(i)))
Expand All @@ -141,7 +141,7 @@ object PublisherToStreamSpec extends ZIOSpecDefault {
} @@ TestAspect.nonFlaky @@ TestAspect.timeout(60.seconds),
test("cancels subscription on stream end") {
withProbe(probe =>
assertM((for {
assertZIO((for {
fiber <- probe.toZIOStream(bufferSize).take(1).runDrain.fork
demand <- ZIO.attemptBlockingInterrupt(probe.expectRequest())
_ <- ZIO.attempt((1 to demand.toInt).foreach(i => probe.sendNext(i)))
Expand All @@ -154,7 +154,7 @@ object PublisherToStreamSpec extends ZIOSpecDefault {
},
test("cancels subscription on stream error") {
withProbe(probe =>
assertM(for {
assertZIO(for {
fiber <- probe.toZIOStream(bufferSize).mapZIO(_ => ZIO.fail(new Throwable("boom!"))).runDrain.fork
demand <- ZIO.attemptBlockingInterrupt(probe.expectRequest())
_ <- ZIO.attempt((1 to demand.toInt).foreach(i => probe.sendNext(i)))
Expand Down Expand Up @@ -192,7 +192,7 @@ object PublisherToStreamSpec extends ZIOSpecDefault {
val faillable =
withProbe(probe =>
for {
fiber <- probe.toZIOStream(bufferSize).run(Sink.collectAll[Int]).fork
fiber <- probe.toZIOStream(bufferSize).run(ZSink.collectAll[Int]).fork
_ <- loop(probe, seq)
r <- fiber.join
} yield r
Expand Down
Loading

0 comments on commit 17a05bf

Please sign in to comment.