Skip to content

Commit

Permalink
fix: signal error without Promise in subscriberToSink (#298)
Browse files Browse the repository at this point in the history
* fix: signal error without Promise in subscriberToSink

* fix: 3.1.0 compilation
  • Loading branch information
runtologist authored Dec 19, 2021
1 parent 3c6afc9 commit 15e7e03
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
20 changes: 12 additions & 8 deletions src/main/scala/zio/interop/reactivestreams/Adapters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,20 @@ object Adapters {

def subscriberToSink[E <: Throwable, I](
subscriber: => Subscriber[I]
)(implicit trace: ZTraceElement): ZManaged[Any, Nothing, (Promise[E, Nothing], ZSink[Any, Nothing, I, I, Unit])] = {
)(implicit trace: ZTraceElement): ZManaged[Any, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = {
val sub = subscriber
for {
runtime <- ZIO.runtime[Any].toManaged
demand <- Queue.unbounded[Long].toManaged
error <- Promise.make[E, Nothing].toManaged
subscription = createSubscription(sub, demand, runtime)
_ <- ZManaged.succeed(sub.onSubscribe(subscription))
_ <- error.await.catchAll(t => UIO(sub.onError(t)) *> demand.shutdown).toManaged.fork
} yield (error, demandUnfoldSink(sub, demand))
runtime <- ZIO.runtime[Any].toManaged
demand <- Queue.unbounded[Long].toManaged
subscription = createSubscription(sub, demand, runtime)
_ <- ZManaged.succeed(sub.onSubscribe(subscription))
errorSignaled <- Promise.makeManaged[Nothing, Boolean]
} yield {
val signalError =
(e: E) => ZIO.whenZIO(errorSignaled.complete(UIO.succeedNow(true)))(UIO(sub.onError(e)) *> demand.shutdown).unit

(signalError, demandUnfoldSink(sub, demand))
}
}

def publisherToStream[O](
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/zio/interop/reactivestreams/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.interop
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import zio.IO
import zio.Promise
import zio.UIO
import zio.ZIO
import zio.ZManaged
import zio.ZTraceElement
Expand Down Expand Up @@ -60,7 +60,7 @@ package object reactivestreams {
*/
def toSink[E <: Throwable](implicit
trace: ZTraceElement
): ZManaged[Any, Nothing, (Promise[E, Nothing], ZSink[Any, Nothing, I, I, Unit])] =
): ZManaged[Any, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] =
Adapters.subscriberToSink(subscriber)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package zio.interop.reactivestreams

import org.reactivestreams.tck.TestEnvironment
import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport
import zio.durationInt
import zio.IO
import zio.Task
import zio.UIO
import zio.stream.Stream
import zio.stream.ZStream
import zio.test.Assertion._
import zio.test._

Expand Down Expand Up @@ -33,16 +35,42 @@ object SubscriberToSinkSpec extends DefaultRunnableSpec {
makeSubscriber.flatMap(probe =>
probe.underlying
.toSink[Throwable]
.use { case (error, sink) =>
.use { case (signalError, sink) =>
for {
fiber <- (Stream.fromIterable(seq) ++ Stream.fail(e)).run(sink).catchAll(t => error.fail(t)).fork
fiber <- (Stream.fromIterable(seq) ++ Stream.fail(e)).run(sink).catchAll(signalError).fork
_ <- probe.request(length + 1)
elements <- probe.nextElements(length).exit
err <- probe.expectError.exit
_ <- fiber.join
} yield assert(elements)(succeeds(equalTo(seq))) && assert(err)(succeeds(equalTo(e)))
}
)
},
test("transports errors 2") {
makeSubscriber.flatMap(probe =>
probe.underlying
.toSink[Throwable]
.use { case (signalError, sink) =>
for {
_ <- ZStream.fail(e).run(sink).catchAll(signalError)
err <- probe.expectError.exit
} yield assert(err)(succeeds(equalTo(e)))
}
)
},
test("transports errors only once") {
makeSubscriber.flatMap(probe =>
probe.underlying
.toSink[Throwable]
.use { case (signalError, sink) =>
for {
_ <- ZStream.fail(e).run(sink).catchAll(signalError)
err <- probe.expectError.exit
_ <- signalError(e)
err2 <- probe.expectError.timeout(100.millis).exit
} yield assert(err)(succeeds(equalTo(e))) && assert(err2)(fails(anything))
}
)
}
)

Expand Down

0 comments on commit 15e7e03

Please sign in to comment.