Skip to content

Commit

Permalink
Merge pull request #3107 from BalmungSan/improve-reactive-streams
Browse files Browse the repository at this point in the history
Fix interop.reactivestreams.StreamUnicastPublisher
  • Loading branch information
mpilquist authored Jan 17, 2023
2 parents e7b1372 + ec155c7 commit 599029f
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,36 @@ package fs2
package interop
package reactivestreams

import cats.effect.kernel._
import cats.effect.kernel.{Async, Deferred, Resource, Outcome}
import cats.effect.std.{Dispatcher, Queue}
import cats.syntax.all._

import fs2.concurrent.SignallingRef
import org.reactivestreams._
import cats.effect.syntax.all._
import org.reactivestreams.{Subscription, Subscriber}

/** Implementation of a `org.reactivestreams.Subscription`.
*
* This is used by the [[StreamUnicastPublisher]] to send elements from a `fs2.Stream` to a downstream reactivestreams system.
*
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#3-subscription-code]]
*/
private[reactivestreams] final class StreamSubscription[F[_], A](
private[reactivestreams] final class StreamSubscription[F[_], A] private (
requests: Queue[F, StreamSubscription.Request],
cancelled: SignallingRef[F, Boolean],
canceled: Deferred[F, Unit],
sub: Subscriber[A],
stream: Stream[F, A],
startDispatcher: Dispatcher[F],
requestDispatcher: Dispatcher[F]
)(implicit F: Async[F])
extends Subscription {
import StreamSubscription._

// We want to make sure `cancelled` is set _before_ signalling the subscriber
def onError(e: Throwable) = cancelled.set(true) >> F.delay(sub.onError(e))
def onComplete = cancelled.set(true) >> F.delay(sub.onComplete)
// Ensure we are on a terminal state; i.e. set `canceled`, before signaling the subscriber.
private def onError(ex: Throwable): F[Unit] =
cancelMe >> F.delay(sub.onError(ex))

private def onComplete: F[Unit] =
cancelMe >> F.delay(sub.onComplete)

def unsafeStart(): Unit = {
private[reactivestreams] def run: F[Unit] = {
def subscriptionPipe: Pipe[F, A, A] =
in => {
def go(s: Stream[F, A]): Pull[F, A, Unit] =
Expand All @@ -67,57 +68,101 @@ private[reactivestreams] final class StreamSubscription[F[_], A](
go(in).stream
}

val s =
val events =
stream
.through(subscriptionPipe)
.interruptWhen(cancelled)
.evalMap(x => F.delay(sub.onNext(x)))
.handleErrorWith(e => Stream.eval(onError(e)))
.onFinalize(cancelled.get.ifM(ifTrue = F.unit, ifFalse = onComplete))
.foreach(x => F.delay(sub.onNext(x)))
.compile
.drain

startDispatcher.unsafeRunAndForget(s)
events
.race(canceled.get)
.guaranteeCase {
case Outcome.Succeeded(result) =>
result.flatMap {
case Left(()) => onComplete // Events finished normally.
case Right(()) => F.unit // Events was canceled.
}
case Outcome.Errored(ex) => onError(ex)
case Outcome.Canceled() => cancelMe
}
.void
}

// According to the spec, it's acceptable for a concurrent cancel to not
// be processed immediately, but if you have synchronous `cancel();
// request()`, then the request _must_ be a no op. Fortunately,
// ordering is guaranteed by a sequential d
// request()`, then the request _must_ be a NOOP. Fortunately,
// ordering is guaranteed by a sequential dispatcher.
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46
def cancel(): Unit =
requestDispatcher.unsafeRunAndForget(cancelled.set(true))

def request(n: Long): Unit = {
val request: F[Request] =
if (n == java.lang.Long.MAX_VALUE) (Infinite: Request).pure[F]
else if (n > 0) (Finite(n): Request).pure[F]
else F.raiseError(new IllegalArgumentException(s"3.9 - invalid number of elements [$n]"))
private def cancelMe: F[Unit] =
canceled.complete(()).void

override def cancel(): Unit =
try
requestDispatcher.unsafeRunAndForget(cancelMe)
catch {
case _: IllegalStateException =>
// Dispatcher already shutdown, we are on terminal state, NOOP.
}

val prog = cancelled.get
.ifM(ifTrue = F.unit, ifFalse = request.flatMap(requests.offer).handleErrorWith(onError))
override def request(n: Long): Unit = {
val prog =
canceled.tryGet.flatMap {
case None =>
if (n == java.lang.Long.MAX_VALUE)
requests.offer(Infinite)
else if (n > 0)
requests.offer(Finite(n))
else
onError(
ex = new IllegalArgumentException(s"Invalid number of elements [${n}]")
)

case Some(()) =>
F.unit
}

requestDispatcher.unsafeRunAndForget(prog)
try
requestDispatcher.unsafeRunAndForget(prog)
catch {
case _: IllegalStateException =>
// Dispatcher already shutdown, we are on terminal state, NOOP.
}
}
}

private[reactivestreams] object StreamSubscription {

/** Represents a downstream subscriber's request to publish elements */
sealed trait Request
case object Infinite extends Request
case class Finite(n: Long) extends Request

def apply[F[_]: Async, A](
sub: Subscriber[A],
stream: Stream[F, A],
startDispatcher: Dispatcher[F],
requestDispatcher: Dispatcher[F]
): F[StreamSubscription[F, A]] =
SignallingRef(false).flatMap { cancelled =>
Queue.unbounded[F, Request].map { requests =>
new StreamSubscription(requests, cancelled, sub, stream, startDispatcher, requestDispatcher)
}
private sealed trait Request
private case object Infinite extends Request
private case class Finite(n: Long) extends Request

// Mostly for testing purposes.
def apply[F[_], A](stream: Stream[F, A], subscriber: Subscriber[A])(implicit
F: Async[F]
): Resource[F, StreamSubscription[F, A]] =
(
Dispatcher.sequential[F](await = true),
Resource.eval(Deferred[F, Unit]),
Resource.eval(Queue.unbounded[F, Request])
).mapN { case (requestDispatcher, canceled, requests) =>
new StreamSubscription(
requests,
canceled,
subscriber,
stream,
requestDispatcher
)
}.evalTap { subscription =>
F.delay(subscriber.onSubscribe(subscription))
}

def subscribe[F[_], A](stream: Stream[F, A], subscriber: Subscriber[A])(implicit
F: Async[F]
): F[Unit] =
apply(stream, subscriber).use { subscription =>
subscription.run
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,47 @@ package reactivestreams

import cats.effect.kernel._
import cats.effect.std.Dispatcher
import cats.syntax.all._

import org.reactivestreams._

import scala.util.control.NoStackTrace

/** Implementation of a `org.reactivestreams.Publisher`
*
* This is used to publish elements from a `fs2.Stream` to a downstream reactivestreams system.
* This is used to publish elements from a [[Stream]] to a downstream reactive-streams system.
*
* @note Not longer unicast, this Publisher can be reused for multiple Subscribers:
* each subscription will re-run the [[Stream]] from the beginning.
* However, a _parallel_ `Dispatcher` is required to allow concurrent subscriptions.
* Please, refer to the `apply` factory in the companion object that only requires a stream.
*
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]]
*/
final class StreamUnicastPublisher[F[_]: Async, A] private (
final class StreamUnicastPublisher[F[_]: Async, A](
val stream: Stream[F, A],
startDispatcher: Dispatcher[F],
requestDispatcher: Dispatcher[F]
startDispatcher: Dispatcher[F]
) extends Publisher[A] {

@deprecated("Use StreamUnicastPublisher.apply", "3.4.0")
def this(stream: Stream[F, A], dispatcher: Dispatcher[F]) = this(stream, dispatcher, dispatcher)
// Added only for bincompat, effectively deprecated.
private[reactivestreams] def this(
stream: Stream[F, A],
startDispatcher: Dispatcher[F],
requestDispatcher: Dispatcher[F]
) =
this(stream, startDispatcher)

def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
nonNull(subscriber)
startDispatcher.unsafeRunAndForget {
StreamSubscription(subscriber, stream, startDispatcher, requestDispatcher)
.flatMap { subscription =>
Sync[F].delay {
subscriber.onSubscribe(subscription)
subscription.unsafeStart()
}
}
try
startDispatcher.unsafeRunAndForget(
StreamSubscription.subscribe(stream, subscriber)
)
catch {
case _: IllegalStateException =>
subscriber.onSubscribe(new Subscription {
override def cancel(): Unit = ()
override def request(x$1: Long): Unit = ()
})
subscriber.onError(StreamUnicastPublisher.CanceledStreamPublisherException)
}
}

Expand All @@ -66,10 +78,18 @@ object StreamUnicastPublisher {
s: Stream[F, A],
dispatcher: Dispatcher[F]
): StreamUnicastPublisher[F, A] =
new StreamUnicastPublisher(s, dispatcher, dispatcher)
new StreamUnicastPublisher(s, dispatcher)

def apply[F[_]: Async, A](
s: Stream[F, A]
): Resource[F, StreamUnicastPublisher[F, A]] =
(Dispatcher.sequential[F], Dispatcher.sequential[F]).mapN(new StreamUnicastPublisher(s, _, _))
Dispatcher.parallel[F](await = false).map { startDispatcher =>
new StreamUnicastPublisher(stream = s, startDispatcher)
}

private object CanceledStreamPublisherException
extends IllegalStateException(
"This StreamPublisher is not longer accepting subscribers"
)
with NoStackTrace
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,39 @@ package object reactivestreams {
fromPublisher(publisher)
}

/** Allows subscribing a `org.reactivestreams.Subscriber` to a [[Stream]].
*
* The returned program will run until
* all the stream elements were consumed.
* Cancelling this program will gracefully shutdown the subscription.
*
* @param stream the [[Stream]] that will be consumed by the subscriber.
* @param subscriber the Subscriber that will receive the elements of the stream.
*/
def subscribeStream[F[_], A](stream: Stream[F, A], subscriber: Subscriber[A])(implicit
F: Async[F]
): F[Unit] =
StreamSubscription.subscribe(stream, subscriber)

implicit final class StreamOps[F[_], A](val stream: Stream[F, A]) {

/** Creates a [[StreamUnicastPublisher]] from a stream.
/** Creates a [[StreamUnicastPublisher]] from a [[Stream]].
*
* This publisher can only have a single subscription.
* The stream is only ran when elements are requested.
*
* @note Not longer unicast, this Publisher can be reused for multiple Subscribers:
* each subscription will re-run the [[Stream]] from the beginning.
*/
def toUnicastPublisher(implicit
F: Async[F]
): Resource[F, StreamUnicastPublisher[F, A]] =
StreamUnicastPublisher(stream)

/** Subscribes the provided `org.reactivestreams.Subscriber` to this stream.
*
* @param subscriber the Subscriber that will receive the elements of the stream.
*/
def subscribe(subscriber: Subscriber[A])(implicit F: Async[F]): F[Unit] =
reactivestreams.subscribeStream(stream, subscriber)
}
}
Loading

0 comments on commit 599029f

Please sign in to comment.