Skip to content

Commit

Permalink
Properly handle short-circuiting.
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilkloch committed Dec 17, 2023
1 parent 86e3e99 commit 009dce8
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 39 deletions.
18 changes: 13 additions & 5 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4200,11 +4200,19 @@ object Stream extends StreamLowPriority {
implicit final class ListStreamOps[F[_], O](private val xs: List[Stream[F, O]]) extends AnyVal {

def parJoinUnbounded(implicit F: Concurrent[F]): Stream[F, O] =
Stream.eval(Channel.bounded[F, Chunk[O]](64)).flatMap { c =>
val producers = xs.parTraverse_(_.chunks.foreach(x => c.send(x).void).compile.drain)
c.stream
.concurrently(Stream.exec(producers *> c.close.void))
.unchunks
if (xs.size == 1) xs.head
else {
Stream.eval((Channel.bounded[F, Chunk[O]](64), F.deferred[Unit]).tupled).flatMap {
case (c, stopPublishers) =>
val outcomes = xs
.parTraverse_(_.chunks.foreach(x => c.send(x).void).compile.drain)
.guarantee(c.close.void)

Stream
.bracket(F.start(outcomes.race(stopPublishers.get).void))(fiber =>
stopPublishers.complete(()) >> fiber.joinWithUnit
) >> c.stream.unchunks
}
}
}

Expand Down
65 changes: 31 additions & 34 deletions core/shared/src/test/scala/fs2/StreamParJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -254,17 +254,16 @@ class StreamParJoinSuite extends Fs2Suite {
.value
.flatMap { actual =>
IO(assertEquals(actual, Left(TestException)))
}

// List(1, 2, 3)
// .map(f)
// .parJoinUnbounded
// .compile
// .toList
// .value
// .flatMap { actual =>
// IO(assertEquals(actual, Left(TestException)))
// }
} >>
List(1, 2, 3)
.map(f)
.parJoinUnbounded
.compile
.toList
.value
.flatMap { actual =>
IO(assertEquals(actual, Left(TestException)))
}
}

test(
Expand Down Expand Up @@ -322,18 +321,17 @@ class StreamParJoinSuite extends Fs2Suite {
.value
.flatMap { actual =>
IO(assertEquals(actual, Some(Set("1", "2", "3"))))
}

List(1, 2, 3)
.map(f)
.parJoinUnbounded
.compile
.toList
.map(_.toSet)
.value
.flatMap { actual =>
IO(assertEquals(actual, Some(Set("1", "2", "3"))))
}
} >>
List(1, 2, 3)
.map(f)
.parJoinUnbounded
.compile
.toList
.map(_.toSet)
.value
.flatMap { actual =>
IO(assertEquals(actual, Some(Set("1", "2", "3"))))
}
}

test(
Expand All @@ -351,17 +349,16 @@ class StreamParJoinSuite extends Fs2Suite {
.value
.flatMap { actual =>
IO(assertEquals(actual, None))
}

// List(1, 2, 3)
// .map(f)
// .parJoinUnbounded
// .compile
// .toList
// .value
// .flatMap { actual =>
// IO(assertEquals(actual, None))
// }
} >>
List(1, 2, 3)
.map(f)
.parJoinUnbounded
.compile
.toList
.value
.flatMap { actual =>
IO(assertEquals(actual, None))
}
}

test("do not block while evaluating an OptionT.none outer stream") {
Expand Down

0 comments on commit 009dce8

Please sign in to comment.