From 009dce820c7b53190ec357dd69993f9443a96022 Mon Sep 17 00:00:00 2001 From: Kamil Kloch Date: Sun, 17 Dec 2023 13:04:42 +0100 Subject: [PATCH] Properly handle short-circuiting. --- core/shared/src/main/scala/fs2/Stream.scala | 18 +++-- .../test/scala/fs2/StreamParJoinSuite.scala | 65 +++++++++---------- 2 files changed, 44 insertions(+), 39 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 70240b08c0..679a772b7b 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -4200,11 +4200,19 @@ object Stream extends StreamLowPriority { implicit final class ListStreamOps[F[_], O](private val xs: List[Stream[F, O]]) extends AnyVal { def parJoinUnbounded(implicit F: Concurrent[F]): Stream[F, O] = - Stream.eval(Channel.bounded[F, Chunk[O]](64)).flatMap { c => - val producers = xs.parTraverse_(_.chunks.foreach(x => c.send(x).void).compile.drain) - c.stream - .concurrently(Stream.exec(producers *> c.close.void)) - .unchunks + if (xs.size == 1) xs.head + else { + Stream.eval((Channel.bounded[F, Chunk[O]](64), F.deferred[Unit]).tupled).flatMap { + case (c, stopPublishers) => + val outcomes = xs + .parTraverse_(_.chunks.foreach(x => c.send(x).void).compile.drain) + .guarantee(c.close.void) + + Stream + .bracket(F.start(outcomes.race(stopPublishers.get).void))(fiber => + stopPublishers.complete(()) >> fiber.joinWithUnit + ) >> c.stream.unchunks + } } } diff --git a/core/shared/src/test/scala/fs2/StreamParJoinSuite.scala b/core/shared/src/test/scala/fs2/StreamParJoinSuite.scala index 729462cc50..d62bfb33ea 100644 --- a/core/shared/src/test/scala/fs2/StreamParJoinSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamParJoinSuite.scala @@ -254,17 +254,16 @@ class StreamParJoinSuite extends Fs2Suite { .value .flatMap { actual => IO(assertEquals(actual, Left(TestException))) - } - -// List(1, 2, 3) -// .map(f) -// .parJoinUnbounded -// .compile -// .toList -// .value -// .flatMap { actual => -// IO(assertEquals(actual, Left(TestException))) -// } + } >> + List(1, 2, 3) + .map(f) + .parJoinUnbounded + .compile + .toList + .value + .flatMap { actual => + IO(assertEquals(actual, Left(TestException))) + } } test( @@ -322,18 +321,17 @@ class StreamParJoinSuite extends Fs2Suite { .value .flatMap { actual => IO(assertEquals(actual, Some(Set("1", "2", "3")))) - } - - List(1, 2, 3) - .map(f) - .parJoinUnbounded - .compile - .toList - .map(_.toSet) - .value - .flatMap { actual => - IO(assertEquals(actual, Some(Set("1", "2", "3")))) - } + } >> + List(1, 2, 3) + .map(f) + .parJoinUnbounded + .compile + .toList + .map(_.toSet) + .value + .flatMap { actual => + IO(assertEquals(actual, Some(Set("1", "2", "3")))) + } } test( @@ -351,17 +349,16 @@ class StreamParJoinSuite extends Fs2Suite { .value .flatMap { actual => IO(assertEquals(actual, None)) - } - -// List(1, 2, 3) -// .map(f) -// .parJoinUnbounded -// .compile -// .toList -// .value -// .flatMap { actual => -// IO(assertEquals(actual, None)) -// } + } >> + List(1, 2, 3) + .map(f) + .parJoinUnbounded + .compile + .toList + .value + .flatMap { actual => + IO(assertEquals(actual, None)) + } } test("do not block while evaluating an OptionT.none outer stream") {