Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add List[Stream]#parJoinUnbounded. #3363

Merged
merged 11 commits into from
Jan 20, 2024
Merged

Conversation

kamilkloch
Copy link
Contributor

@kamilkloch kamilkloch commented Dec 14, 2023

List[Stream]#parJoinUnbounded is 7x faster compared to Stream[Stream]#parJoinUnbounded:

[info] Benchmark                                  Mode  Cnt    Score    Error  Units
[info] ConcurrentBenchmark.listParJoinUnbounded  thrpt    5  489.179 ± 35.120  ops/s
[info] ConcurrentBenchmark.parJoinUnbounded      thrpt    5   66.381 ±  4.879  ops/s
  @Benchmark
  def parJoinUnbounded(): Int = {
    val each = Stream
      .range(0, 1000)
      .map(i => Stream.eval(IO.pure(i)))
    each.parJoinUnbounded.compile.last.unsafeRunSync().get
  }

  @Benchmark
  def listParJoinUnbounded(): Int = {
    val each = List
      .range(0, 1000)
      .map(i => Stream.eval(IO.pure(i)))
    each.parJoinUnbounded.compile.last.unsafeRunSync().get
  }

@kamilkloch kamilkloch changed the title Add List[Stream]#parJoinUnbounded. WIP. Add List[Stream]#parJoinUnbounded. Dec 17, 2023
@kamilkloch kamilkloch marked this pull request as ready for review December 17, 2023 23:45
core/shared/src/main/scala/fs2/Stream.scala Outdated Show resolved Hide resolved
core/shared/src/main/scala/fs2/Stream.scala Outdated Show resolved Hide resolved
def parJoinUnbounded(implicit F: Concurrent[F]): Stream[F, O] =
if (xs.size <= 1) xs.headOption.getOrElse(Stream.empty)
else {
Stream.eval((Channel.bounded[F, Chunk[O]](64), F.deferred[Unit]).tupled).flatMap {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this 64 arbitrary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. Stream#parJoin uses Channel.synchronous, Channel.bounded gives s speed bump, and I could not find a test procedure which distincts the two.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I could not find a test procedure which distincts the two

What about something like:

IO.ref(0).flatMap { ref =>
  List(
    Stream.repeatEval(ref.getAndUpdate(_ + 1).void),
    Stream.empty,
  ).parJoinUnbounded.head.compile.drain.timeoutTo(1.second, ref.get.assertEquals(1))
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the snippet and it never fails. With the removed .timeoutTo the results are as follows:

      IO.ref(0).flatMap { ref =>
        Stream(
          Stream.repeatEval(ref.getAndUpdate(_ + 1).void),
          Stream.empty,
        ).parJoinUnbounded.head.compile.drain >> ref.get.assertEquals(1)
      }

fails with 1 != 2,

      IO.ref(0).flatMap { ref =>
        List(
          Stream.repeatEval(ref.getAndUpdate(_ + 1).void),
          Stream.empty,
        ).parJoinUnbounded.head.compile.drain >> ref.get.assertEquals(1)
      }

fails with 1 != 5.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the snippet and it never fails. With the removed .timeoutTo

Oh sorry, you're right, the timeoutTo doesn't make sense :)

In any case, it seems like it should be assertEquals(2) then. Does your List version always complete with 5? Seems like a race condition so I expect you to get inconsistent values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, 5 is arbitrary. If it must return 2, we shall revert to Channel.synchronous. And add proper tests :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@armanbilge I switched to Channel.synchronous and added the tests. Still, the results are flaky - ref.get returns 1 or 2 (even for Stream#parJoinUnbounded). Both tests assert ref.get <= 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@armanbilge what do you think in general?

/** Provides syntax for list of streams. */
implicit final class ListStreamOps[F[_], O](private val xs: List[Stream[F, O]]) extends AnyVal {

/** Nondeterministically merges a (static) list of streams in to a single output stream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: should

List(a, b, c).parJoinUnbounded

be semantically equivalent to

a.merge(b).merge(c)

?

If not, what's the difference?

If so, then I think the implementation should look more like merge.

/** Interleaves the two inputs nondeterministically. The output stream
* halts after BOTH `s1` and `s2` terminate normally, or in the event
* of an uncaught failure on either `s1` or `s2`. Has the property that
* `merge(Stream.empty, s) == s` and `merge(raiseError(e), s)` will
* eventually terminate with `raiseError(e)`, possibly after emitting some
* elements of `s` first.
*
* The implementation always tries to pull one chunk from each side
* before waiting for it to be consumed by resulting stream.
* As such, there may be up to two chunks (one from each stream)
* waiting to be processed while the resulting stream
* is processing elements.
*
* Also note that if either side produces empty chunk,
* the processing on that side continues,
* w/o downstream requiring to consume result.
*
* If either side does not emit anything (i.e. as result of drain) that side
* will continue to run even when the resulting stream did not ask for more data.
*
* Note that even when this is equivalent to `Stream(this, that).parJoinUnbounded`,
* this implementation is little more efficient
*
* @example {{{
* scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global
* scala> val s1 = Stream.awakeEvery[IO](500.millis).scan(0)((acc, _) => acc + 1)
* scala> val s = s1.merge(Stream.sleep_[IO](250.millis) ++ s1)
* scala> s.take(6).compile.toVector.unsafeRunSync()
* res0: Vector[Int] = Vector(0, 0, 1, 1, 2, 2)
* }}}
*/
def merge[F2[x] >: F[x], O2 >: O](
that: Stream[F2, O2]
)(implicit F: Concurrent[F2]): Stream[F2, O2] =
Stream.force {
// `State` describes the state of an upstream stream (`this` and `that` are both upstream streams)
// None : the stream has not yet terminated
// Some(Left(t)) : the stream terminated with an error
// Some(Right(())) : the stream terminated successfully
type State = Option[Either[Throwable, Unit]]
for {
// `bothStates` keeps track of the state of `this` and `that` stream
// so we can terminate downstream when both upstreams terminate.
bothStates <- SignallingRef.of[F2, (State, State)]((None, None))
// `output` is used to send chunks from upstreams to downstream.
// It sends streams, not chunks, to tie each chunk with a finalizer
output <- Channel.synchronous[F2, Stream[F2, O2]]
// `stopDef` is used to interrupt the upstreams if a) any of the
// upstreams raises an error, or b) the downstream terminates.
stopDef <- Deferred[F2, Unit]
} yield {
val signalStop: F2[Unit] = stopDef.complete(()).void
val stop: F2[Either[Throwable, Unit]] = stopDef.get.as(Right(()))
def complete(result: Either[Throwable, Unit]): F2[Unit] =
bothStates.update {
case (None, None) => (Some(result), None)
case (other, None) => (other, Some(result))
case _ => sys.error("impossible")
}
val bothStopped: PartialFunction[(State, State), Either[Throwable, Unit]] = {
case (Some(r1), Some(r2)) => CompositeFailure.fromResults(r1, r2)
}
def run(s: Stream[F2, O2]): F2[Unit] =
// `guard` ensures we do not pull another chunk until the previous one has been consumed downstream.
Semaphore[F2](1).flatMap { guard =>
def sendChunk(chk: Chunk[O2]): F2[Unit] = {
val outStr = Stream.chunk(chk).onFinalize(guard.release)
output.send(outStr) >> guard.acquire
}
(Stream.exec(guard.acquire) ++ s.chunks.foreach(sendChunk))
// Stop when the other upstream has errored or the downstream has completed.
// This may also interrupt the initial call to `guard.acquire` as the call is made at the
// beginning of the stream.
.interruptWhen(stop)
.compile
.drain
.attempt
.flatMap {
case r @ Left(_) =>
// On error, interrupt the other upstream and downstream.
complete(r) >> signalStop
case r @ Right(()) => complete(r)
}
}
val waitForBoth: F2[Unit] = bothStates.discrete
.collect(bothStopped)
.head
.rethrow
.compile
.drain
.guarantee(output.close.void)
// There is no need to clean up these fibers. If the downstream is cancelled,
// both streams will stop gracefully and the fibers will complete.
val setup: F2[Fiber[F2, Throwable, Unit]] =
run(this).start >> run(that).start >> waitForBoth.start
Stream.bracket(setup)(wfb => signalStop >> wfb.joinWithUnit) >> output.stream.flatten
.interruptWhen(stop)
}
}

Also, in that case I wonder if parJoinUnbounded is the right name. What if we exposed this API like:

object Stream {
  def merge[F[_], A](streams: Stream[F, A]*): Stream[F, A] = ???
}

or mergeAll or mergeMany or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good question, I would go back one step and ask if

Stream(a, b).parJoinUnbounded

is semantically equivalent to

a.merge(b)

?
Citing the merge scaladoc:

Note that even when this is equivalent to `Stream(this, that).parJoinUnbounded`, this implementation is little more efficient

Looking at the implementation, I am not sure, if it is equivalent. merge adds some fairness with regard to chunks? (Also, as for the performance, simple benchmarks hint that the opposite is actually true - merge is slower compared to parJoinUnbounded).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge is indeed more fair, and imho the main use case for it is implementing combinators that aren't concurrent in semantics necessarily but need concurrency in the implementation, and want to preserve pull-basedness as much as possible.

The point of the combinator in this PR, instead, feels more like (a.compile.drain, b.compile.drain).tupled, except that you need the results.

In other words, I don't think consistency with merge is required

@mpilquist mpilquist merged commit 352f331 into typelevel:main Jan 20, 2024
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants