Skip to content

Commit

Permalink
Apply review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Jan 15, 2023
1 parent 7537357 commit 4f59581
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 33 deletions.
7 changes: 1 addition & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,7 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.readBytesFromInputStream"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.readInputStreamGeneric"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.<clinit>"),
// Private cosntructor.
// Added in #3107
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.interop.reactivestreams.StreamUnicastPublisher.this"
)
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.<clinit>")
)

lazy val root = tlCrossRootProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ package fs2
package interop
package reactivestreams

import cats.effect.kernel.{Async, Resource}
import cats.effect.kernel.{Async, Deferred, Resource, Outcome}
import cats.effect.std.{Dispatcher, Queue}
import cats.syntax.all._
import fs2.concurrent.SignallingRef
import cats.effect.syntax.all._
import org.reactivestreams.{Subscription, Subscriber}

/** Implementation of a `org.reactivestreams.Subscription`.
Expand All @@ -37,17 +37,17 @@ import org.reactivestreams.{Subscription, Subscriber}
*/
private[reactivestreams] final class StreamSubscription[F[_], A] private (
requests: Queue[F, StreamSubscription.Request],
canceled: SignallingRef[F, Boolean],
canceled: Deferred[F, Unit],
sub: Subscriber[A],
stream: Stream[F, A],
requestDispatcher: Dispatcher[F]
)(implicit F: Async[F])
extends Subscription {
import StreamSubscription._

// Ensure we are on a terminal state before signaling the subscriber.
private def onError(e: Throwable): F[Unit] =
cancelMe >> F.delay(sub.onError(e))
// Ensure we are on a terminal state; i.e. set `canceled`, before signaling the subscriber.
private def onError(ex: Throwable): F[Unit] =
cancelMe >> F.delay(sub.onError(ex))

private def onComplete: F[Unit] =
cancelMe >> F.delay(sub.onComplete)
Expand All @@ -68,14 +68,21 @@ private[reactivestreams] final class StreamSubscription[F[_], A] private (
go(in).stream
}

stream
.through(subscriptionPipe)
.interruptWhen(canceled)
.evalMap(x => F.delay(sub.onNext(x)))
.handleErrorWith(e => Stream.eval(onError(e)))
.onFinalize(canceled.get.ifM(ifTrue = F.unit, ifFalse = onComplete))
.compile
.drain
val events =
stream
.through(subscriptionPipe)
.foreach(x => F.delay(sub.onNext(x)))
.compile
.drain

val prog =
events.guaranteeCase {
case Outcome.Succeeded(_) => onComplete
case Outcome.Errored(ex) => onError(ex)
case Outcome.Canceled() => F.unit
}.race(canceled.get).void

F.onCancel(prog, cancelMe)
}

// According to the spec, it's acceptable for a concurrent cancel to not
Expand All @@ -85,7 +92,7 @@ private[reactivestreams] final class StreamSubscription[F[_], A] private (
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46
private def cancelMe: F[Unit] =
canceled.set(true)
canceled.complete(()).void
override def cancel(): Unit =
try
requestDispatcher.unsafeRunAndForget(cancelMe)
Expand All @@ -96,16 +103,18 @@ private[reactivestreams] final class StreamSubscription[F[_], A] private (

override def request(n: Long): Unit = {
val prog =
canceled.get.flatMap {
case false =>
canceled.tryGet.flatMap {
case None =>
if (n == java.lang.Long.MAX_VALUE)
requests.offer(Infinite)
else if (n > 0)
requests.offer(Finite(n))
else
onError(new IllegalArgumentException(s"Invalid number of elements [${n}]"))
onError(
ex = new IllegalArgumentException(s"Invalid number of elements [${n}]")
)

case true =>
case Some(()) =>
F.unit
}

Expand All @@ -131,7 +140,7 @@ private[reactivestreams] object StreamSubscription {
): Resource[F, StreamSubscription[F, A]] =
(
Dispatcher.sequential[F](await = true),
Resource.eval(SignallingRef(false)),
Resource.eval(Deferred[F, Unit]),
Resource.eval(Queue.unbounded[F, Request])
).mapN { case (requestDispatcher, canceled, requests) =>
new StreamSubscription(
Expand All @@ -149,6 +158,6 @@ private[reactivestreams] object StreamSubscription {
F: Async[F]
): F[Unit] =
apply(stream, subscriber).use { subscription =>
F.onCancel(subscription.run, subscription.cancelMe)
subscription.run
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,27 @@ import scala.util.control.NoStackTrace

/** Implementation of a `org.reactivestreams.Publisher`
*
* This is used to publish elements from a `fs2.Stream` to a downstream reactivestreams system.
* This is used to publish elements from a [[Stream]] to a downstream reactive-streams system.
*
* @note Not longer unicast, this Publisher can be reused for multiple Subscribers.
* @note Not longer unicast, this Publisher can be reused for multiple Subscribers:
* each subscription will re-run the [[Stream]] from the beginning.
* However, a _parallel_ `Dispatcher` is required to allow concurrent subscriptions.
* Please, refer to the `apply` factory in the companion object that only requires a stream.
*
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]]
*/
final class StreamUnicastPublisher[F[_]: Async, A](
val stream: Stream[F, A],
startDispatcher: Dispatcher[F]
) extends Publisher[A] {
// Added only for bincompat, effectively deprecated.
private[reactivestreams] def this(
stream: Stream[F, A],
startDispatcher: Dispatcher[F],
requestDispatcher: Dispatcher[F]
) =
this(stream, startDispatcher)

def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
nonNull(subscriber)
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ package object reactivestreams {

implicit final class StreamOps[F[_], A](val stream: Stream[F, A]) {

/** Creates a [[StreamUnicastPublisher]] from a stream.
/** Creates a [[StreamUnicastPublisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
*
* @note Not longer unicast, this Publisher can be reused for multiple Subscribers:
* each subscription will re-run the [[Stream]] from the beginning.
*/
def toUnicastPublisher(implicit
F: Async[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ final class StreamUnicastPublisherSpec
}

override def createFailedPublisher(): StreamUnicastPublisher[IO, Int] = {
val (publisher, close) =
StreamUnicastPublisher[IO, Int](Stream.empty).allocated.unsafeRunSync()
close.unsafeRunSync() // If the resource is closed then the publisher is failed.
val publisher = // If the resource is closed then the publisher is failed.
StreamUnicastPublisher[IO, Int](Stream.empty).use(IO.pure).unsafeRunSync()
publisher
}
}

0 comments on commit 4f59581

Please sign in to comment.