Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix interop.reactivestreams.StreamUnicastPublisher #3107

Merged
merged 7 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,36 @@ package fs2
package interop
package reactivestreams

import cats.effect.kernel._
import cats.effect.kernel.{Async, Deferred, Resource, Outcome}
import cats.effect.std.{Dispatcher, Queue}
import cats.syntax.all._

import fs2.concurrent.SignallingRef
import org.reactivestreams._
import cats.effect.syntax.all._
import org.reactivestreams.{Subscription, Subscriber}

/** Implementation of a `org.reactivestreams.Subscription`.
*
* This is used by the [[StreamUnicastPublisher]] to send elements from a `fs2.Stream` to a downstream reactivestreams system.
*
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#3-subscription-code]]
*/
private[reactivestreams] final class StreamSubscription[F[_], A](
private[reactivestreams] final class StreamSubscription[F[_], A] private (
requests: Queue[F, StreamSubscription.Request],
cancelled: SignallingRef[F, Boolean],
canceled: Deferred[F, Unit],
sub: Subscriber[A],
stream: Stream[F, A],
startDispatcher: Dispatcher[F],
requestDispatcher: Dispatcher[F]
)(implicit F: Async[F])
extends Subscription {
import StreamSubscription._

// We want to make sure `cancelled` is set _before_ signalling the subscriber
def onError(e: Throwable) = cancelled.set(true) >> F.delay(sub.onError(e))
def onComplete = cancelled.set(true) >> F.delay(sub.onComplete)
// Ensure we are on a terminal state; i.e. set `canceled`, before signaling the subscriber.
private def onError(ex: Throwable): F[Unit] =
cancelMe >> F.delay(sub.onError(ex))

private def onComplete: F[Unit] =
cancelMe >> F.delay(sub.onComplete)

def unsafeStart(): Unit = {
private[reactivestreams] def run: F[Unit] = {
def subscriptionPipe: Pipe[F, A, A] =
in => {
def go(s: Stream[F, A]): Pull[F, A, Unit] =
Expand All @@ -67,57 +68,101 @@ private[reactivestreams] final class StreamSubscription[F[_], A](
go(in).stream
}

val s =
val events =
stream
.through(subscriptionPipe)
.interruptWhen(cancelled)
.evalMap(x => F.delay(sub.onNext(x)))
.handleErrorWith(e => Stream.eval(onError(e)))
.onFinalize(cancelled.get.ifM(ifTrue = F.unit, ifFalse = onComplete))
.foreach(x => F.delay(sub.onNext(x)))
.compile
.drain

startDispatcher.unsafeRunAndForget(s)
events
.race(canceled.get)
.guaranteeCase {
case Outcome.Succeeded(result) =>
result.flatMap {
case Left(()) => onComplete // Events finished normally.
case Right(()) => F.unit // Events was canceled.
}
case Outcome.Errored(ex) => onError(ex)
case Outcome.Canceled() => cancelMe
}
.void
}

// According to the spec, it's acceptable for a concurrent cancel to not
// be processed immediately, but if you have synchronous `cancel();
// request()`, then the request _must_ be a no op. Fortunately,
// ordering is guaranteed by a sequential d
// request()`, then the request _must_ be a NOOP. Fortunately,
// ordering is guaranteed by a sequential dispatcher.
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46
def cancel(): Unit =
requestDispatcher.unsafeRunAndForget(cancelled.set(true))

def request(n: Long): Unit = {
val request: F[Request] =
if (n == java.lang.Long.MAX_VALUE) (Infinite: Request).pure[F]
else if (n > 0) (Finite(n): Request).pure[F]
else F.raiseError(new IllegalArgumentException(s"3.9 - invalid number of elements [$n]"))
private def cancelMe: F[Unit] =
canceled.complete(()).void

override def cancel(): Unit =
try
requestDispatcher.unsafeRunAndForget(cancelMe)
catch {
case _: IllegalStateException =>
// Dispatcher already shutdown, we are on terminal state, NOOP.
}

val prog = cancelled.get
.ifM(ifTrue = F.unit, ifFalse = request.flatMap(requests.offer).handleErrorWith(onError))
override def request(n: Long): Unit = {
val prog =
canceled.tryGet.flatMap {
case None =>
if (n == java.lang.Long.MAX_VALUE)
requests.offer(Infinite)
else if (n > 0)
requests.offer(Finite(n))
else
onError(
ex = new IllegalArgumentException(s"Invalid number of elements [${n}]")
)

case Some(()) =>
F.unit
}

requestDispatcher.unsafeRunAndForget(prog)
try
requestDispatcher.unsafeRunAndForget(prog)
catch {
case _: IllegalStateException =>
// Dispatcher already shutdown, we are on terminal state, NOOP.
}
}
}

private[reactivestreams] object StreamSubscription {

/** Represents a downstream subscriber's request to publish elements */
sealed trait Request
case object Infinite extends Request
case class Finite(n: Long) extends Request

def apply[F[_]: Async, A](
sub: Subscriber[A],
stream: Stream[F, A],
startDispatcher: Dispatcher[F],
requestDispatcher: Dispatcher[F]
): F[StreamSubscription[F, A]] =
SignallingRef(false).flatMap { cancelled =>
Queue.unbounded[F, Request].map { requests =>
new StreamSubscription(requests, cancelled, sub, stream, startDispatcher, requestDispatcher)
}
private sealed trait Request
private case object Infinite extends Request
private case class Finite(n: Long) extends Request

// Mostly for testing purposes.
def apply[F[_], A](stream: Stream[F, A], subscriber: Subscriber[A])(implicit
F: Async[F]
): Resource[F, StreamSubscription[F, A]] =
(
Dispatcher.sequential[F](await = true),
Resource.eval(Deferred[F, Unit]),
Resource.eval(Queue.unbounded[F, Request])
).mapN { case (requestDispatcher, canceled, requests) =>
new StreamSubscription(
requests,
canceled,
subscriber,
stream,
requestDispatcher
)
}.evalTap { subscription =>
F.delay(subscriber.onSubscribe(subscription))
}

def subscribe[F[_], A](stream: Stream[F, A], subscriber: Subscriber[A])(implicit
F: Async[F]
): F[Unit] =
apply(stream, subscriber).use { subscription =>
subscription.run
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,47 @@ package reactivestreams

import cats.effect.kernel._
import cats.effect.std.Dispatcher
import cats.syntax.all._

import org.reactivestreams._

import scala.util.control.NoStackTrace

/** Implementation of a `org.reactivestreams.Publisher`
*
* This is used to publish elements from a `fs2.Stream` to a downstream reactivestreams system.
* This is used to publish elements from a [[Stream]] to a downstream reactive-streams system.
*
* @note Not longer unicast, this Publisher can be reused for multiple Subscribers:
* each subscription will re-run the [[Stream]] from the beginning.
* However, a _parallel_ `Dispatcher` is required to allow concurrent subscriptions.
* Please, refer to the `apply` factory in the companion object that only requires a stream.
*
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]]
*/
final class StreamUnicastPublisher[F[_]: Async, A] private (
final class StreamUnicastPublisher[F[_]: Async, A](
val stream: Stream[F, A],
startDispatcher: Dispatcher[F],
requestDispatcher: Dispatcher[F]
startDispatcher: Dispatcher[F]
) extends Publisher[A] {

@deprecated("Use StreamUnicastPublisher.apply", "3.4.0")
def this(stream: Stream[F, A], dispatcher: Dispatcher[F]) = this(stream, dispatcher, dispatcher)
BalmungSan marked this conversation as resolved.
Show resolved Hide resolved
// Added only for bincompat, effectively deprecated.
private[reactivestreams] def this(
stream: Stream[F, A],
startDispatcher: Dispatcher[F],
requestDispatcher: Dispatcher[F]
) =
this(stream, startDispatcher)

def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
nonNull(subscriber)
startDispatcher.unsafeRunAndForget {
StreamSubscription(subscriber, stream, startDispatcher, requestDispatcher)
.flatMap { subscription =>
Sync[F].delay {
subscriber.onSubscribe(subscription)
subscription.unsafeStart()
}
}
try
startDispatcher.unsafeRunAndForget(
StreamSubscription.subscribe(stream, subscriber)
)
catch {
case _: IllegalStateException =>
subscriber.onSubscribe(new Subscription {
override def cancel(): Unit = ()
override def request(x$1: Long): Unit = ()
})
subscriber.onError(StreamUnicastPublisher.CanceledStreamPublisherException)
BalmungSan marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -66,10 +78,18 @@ object StreamUnicastPublisher {
s: Stream[F, A],
dispatcher: Dispatcher[F]
): StreamUnicastPublisher[F, A] =
new StreamUnicastPublisher(s, dispatcher, dispatcher)
new StreamUnicastPublisher(s, dispatcher)

def apply[F[_]: Async, A](
s: Stream[F, A]
): Resource[F, StreamUnicastPublisher[F, A]] =
(Dispatcher.sequential[F], Dispatcher.sequential[F]).mapN(new StreamUnicastPublisher(s, _, _))
Dispatcher.parallel[F](await = false).map { startDispatcher =>
new StreamUnicastPublisher(stream = s, startDispatcher)
}

private object CanceledStreamPublisherException
extends IllegalStateException(
"This StreamPublisher is not longer accepting subscribers"
)
with NoStackTrace
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,39 @@ package object reactivestreams {
fromPublisher(publisher)
}

/** Allows subscribing a `org.reactivestreams.Subscriber` to a [[Stream]].
*
* The returned program will run until
* all the stream elements were consumed.
* Cancelling this program will gracefully shutdown the subscription.
*
* @param stream the [[Stream]] that will be consumed by the subscriber.
* @param subscriber the Subscriber that will receive the elements of the stream.
*/
def subscribeStream[F[_], A](stream: Stream[F, A], subscriber: Subscriber[A])(implicit
F: Async[F]
): F[Unit] =
StreamSubscription.subscribe(stream, subscriber)

implicit final class StreamOps[F[_], A](val stream: Stream[F, A]) {

/** Creates a [[StreamUnicastPublisher]] from a stream.
/** Creates a [[StreamUnicastPublisher]] from a [[Stream]].
*
* This publisher can only have a single subscription.
* The stream is only ran when elements are requested.
*
* @note Not longer unicast, this Publisher can be reused for multiple Subscribers:
* each subscription will re-run the [[Stream]] from the beginning.
*/
def toUnicastPublisher(implicit
F: Async[F]
): Resource[F, StreamUnicastPublisher[F, A]] =
StreamUnicastPublisher(stream)

/** Subscribes the provided `org.reactivestreams.Subscriber` to this stream.
*
* @param subscriber the Subscriber that will receive the elements of the stream.
*/
def subscribe(subscriber: Subscriber[A])(implicit F: Async[F]): F[Unit] =
reactivestreams.subscribeStream(stream, subscriber)
}
}
Loading