Skip to content

Commit

Permalink
Fix concurrency bug with StreamSubscription
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Jan 13, 2023
1 parent 076be91 commit d4f028f
Showing 1 changed file with 36 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import cats.syntax.all._
import fs2.concurrent.SignallingRef
import org.reactivestreams.{Subscription, Subscriber}

import java.util.concurrent.atomic.AtomicBoolean

/** Implementation of a `org.reactivestreams.Subscription`.
*
* This is used by the [[StreamUnicastPublisher]] to send elements from a `fs2.Stream` to a downstream reactivestreams system.
Expand All @@ -39,11 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean
*/
private[reactivestreams] final class StreamSubscription[F[_], A] private (
requests: Queue[F, StreamSubscription.Request],
cancelled: SignallingRef[F, Boolean],
canceled: SignallingRef[F, Boolean],
sub: Subscriber[A],
stream: Stream[F, A],
requestDispatcher: Dispatcher[F],
completed: AtomicBoolean
requestDispatcher: Dispatcher[F]
)(implicit F: Async[F])
extends Subscription {
import StreamSubscription._
Expand Down Expand Up @@ -73,38 +70,52 @@ private[reactivestreams] final class StreamSubscription[F[_], A] private (

stream
.through(subscriptionPipe)
.interruptWhen(cancelled)
.interruptWhen(canceled)
.evalMap(x => F.delay(sub.onNext(x)))
.handleErrorWith(e => Stream.eval(onError(e)))
.onFinalize(cancelled.get.ifM(ifTrue = F.unit, ifFalse = onComplete))
.onFinalize(canceled.get.ifM(ifTrue = F.unit, ifFalse = onComplete))
.compile
.drain
}

// According to the spec, it's acceptable for a concurrent cancel to not
// be processed immediately, but if you have synchronous `cancel();
// request()`, then the request _must_ be a no op. Fortunately,
// request()`, then the request _must_ be a NOOP. Fortunately,
// ordering is guaranteed by a sequential dispatcher.
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46
private def cancelMe: F[Unit] =
F.delay(completed.set(true)) >> cancelled.set(true)
canceled.set(true)
override def cancel(): Unit =
if (!completed.get()) {
requestDispatcher.unsafeRunSync(cancelMe)
try
requestDispatcher.unsafeRunAndForget(cancelMe)
catch {
case _: IllegalStateException =>
// Dispatcher already shutdown, we are on terminal state, NOOP.
}

override def request(n: Long): Unit =
if (!completed.get()) {
val prog =
if (n == java.lang.Long.MAX_VALUE)
requests.offer(Infinite)
else if (n > 0)
requests.offer(Finite(n))
else
onError(new IllegalArgumentException(s"Invalid number of elements [${n}]"))
requestDispatcher.unsafeRunSync(prog)
override def request(n: Long): Unit = {
val prog =
canceled.get.flatMap {
case false =>
if (n == java.lang.Long.MAX_VALUE)
requests.offer(Infinite)
else if (n > 0)
requests.offer(Finite(n))
else
onError(new IllegalArgumentException(s"Invalid number of elements [${n}]"))

case true =>
F.unit
}

try
requestDispatcher.unsafeRunAndForget(prog)
catch {
case _: IllegalStateException =>
// Dispatcher already shutdown, we are on terminal state, NOOP.
}
}
}

private[reactivestreams] object StreamSubscription {
Expand All @@ -121,16 +132,14 @@ private[reactivestreams] object StreamSubscription {
(
Dispatcher.sequential[F](await = true),
Resource.eval(SignallingRef(false)),
Resource.eval(Queue.unbounded[F, Request]),
Resource.eval(F.delay(new AtomicBoolean(false)))
).mapN { case (requestDispatcher, cancelled, requests, completed) =>
Resource.eval(Queue.unbounded[F, Request])
).mapN { case (requestDispatcher, canceled, requests) =>
new StreamSubscription(
requests,
cancelled,
canceled,
subscriber,
stream,
requestDispatcher,
completed
requestDispatcher
)
}.evalTap { subscription =>
F.delay(subscriber.onSubscribe(subscription))
Expand Down

0 comments on commit d4f028f

Please sign in to comment.