From 66fc1e3b3411f1fda239aaeac69b2e4856d875de Mon Sep 17 00:00:00 2001 From: nikiforo Date: Wed, 10 Nov 2021 13:55:32 +0300 Subject: [PATCH 1/7] par-eval-map --- .../fs2/benchmark/ParEvalMapBenchmark.scala | 4 + core/shared/src/main/scala/fs2/Stream.scala | 124 ++++------ .../src/test/scala/fs2/ParEvalMapSuite.scala | 218 ++++++++++++++++++ .../src/test/scala/fs2/StreamSuite.scala | 43 ++++ .../scala/fs2/concurrent/ChannelSuite.scala | 12 + 5 files changed, 319 insertions(+), 82 deletions(-) create mode 100644 core/shared/src/test/scala/fs2/ParEvalMapSuite.scala diff --git a/benchmark/src/main/scala/fs2/benchmark/ParEvalMapBenchmark.scala b/benchmark/src/main/scala/fs2/benchmark/ParEvalMapBenchmark.scala index 2fb7d2b477..a2791df071 100644 --- a/benchmark/src/main/scala/fs2/benchmark/ParEvalMapBenchmark.scala +++ b/benchmark/src/main/scala/fs2/benchmark/ParEvalMapBenchmark.scala @@ -40,6 +40,10 @@ class ParEvalMapBenchmark { def evalMap(): Unit = execute(getStream.evalMap(_ => dummyLoad)) + @Benchmark + def parEvalMap10(): Unit = + execute(getStream.parEvalMap(10)(_ => dummyLoad)) + @Benchmark def parEvalMapUnordered10(): Unit = execute(getStream.parEvalMapUnordered(10)(_ => dummyLoad)) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 73fc7487ff..85df335e92 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -25,7 +25,7 @@ import scala.annotation.{nowarn, tailrec} import scala.concurrent.TimeoutException import scala.concurrent.duration._ import cats.{Eval => _, _} -import cats.data.{Ior, NonEmptyList} +import cats.data.Ior import cats.effect.{Concurrent, SyncIO} import cats.effect.kernel._ import cats.effect.kernel.implicits._ @@ -2025,6 +2025,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, )(implicit F2: Applicative[F2]): Stream[F2, O] = new Stream(Pull.acquire[F2, Unit](F2.unit, (_, ec) => f(ec)).flatMap(_ => underlying)) + private type Att[O2] = Either[Throwable, O2] + /** Like [[Stream#evalMap]], but will evaluate effects in parallel, emitting the results * downstream in the same order as the input stream. The number of concurrent effects * is limited by the `maxConcurrent` parameter. @@ -2040,40 +2042,12 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, */ def parEvalMap[F2[x] >: F[x], O2]( maxConcurrent: Int - )(f: O => F2[O2])(implicit F: Concurrent[F2]): Stream[F2, O2] = - if (maxConcurrent === 1) evalMap(f) - else { - val fstream: F2[Stream[F2, O2]] = for { - chan <- Channel.bounded[F2, F2[Either[Throwable, O2]]](maxConcurrent) - chanReadDone <- F.deferred[Unit] - } yield { - def forkOnElem(o: O): F2[Stream[F2, Unit]] = - for { - value <- F.deferred[Either[Throwable, O2]] - send = chan.send(value.get).as { - Stream.eval(f(o).attempt.flatMap(value.complete(_).void)) - } - eit <- chanReadDone.get.race(send) - } yield eit match { - case Left(()) => Stream.empty - case Right(stream) => stream - } - - val background = this - .evalMap(forkOnElem) - .parJoin(maxConcurrent) - .onFinalize(chanReadDone.get.race(chan.close).void) - - val foreground = - chan.stream - .evalMap(identity) - .rethrow - .onFinalize(chanReadDone.complete(()).void) - - foreground.concurrently(background) - } - Stream.eval(fstream).flatten - } + )(f: O => F2[O2])(implicit F: Concurrent[F2]): Stream[F2, O2] = { + def init(ch: Channel[F2, F2[Att[O2]]], release: F2[Unit]) = + Deferred[F2, Att[O2]].flatTap(value => ch.send(value.get <* release)) + def send(v: Deferred[F2, Att[O2]]) = (el: Att[O2]) => v.complete(el).void + parEvalMapAction(maxConcurrent, f)((ch, release) => init(ch, release).map(send)) + } /** Like [[Stream#evalMap]], but will evaluate effects in parallel, emitting the results * downstream. The number of concurrent effects is limited by the `maxConcurrent` parameter. @@ -2090,9 +2064,22 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, x ]: Concurrent, O2]( maxConcurrent: Int - )(f: O => F2[O2]): Stream[F2, O2] = + )(f: O => F2[O2]): Stream[F2, O2] = { + val init = ().pure[F2] + def send(ch: Channel[F2, F2[Att[O2]]], release: F2[Unit]) = + (el: Att[O2]) => ch.send(el.pure[F2]) *> release + parEvalMapAction(maxConcurrent, f)((ch, release) => init.as(send(ch, release))) + } + + private def parEvalMapAction[F2[x] >: F[ + x + ]: Concurrent, O2, T]( + maxConcurrent: Int, + f: O => F2[O2] + )(initFork: (Channel[F2, F2[Att[O2]]], F2[Unit]) => F2[Att[O2] => F2[Unit]]): Stream[F2, O2] = if (maxConcurrent == 1) evalMap(f) else { + val F = Concurrent[F2] assert(maxConcurrent > 0, "maxConcurrent must be > 0, was: " + maxConcurrent) // One is taken by inner stream read. @@ -2100,66 +2087,39 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val action = ( Semaphore[F2](concurrency.toLong), - Channel.bounded[F2, O2](concurrency), - Ref[F2].of(none[Either[NonEmptyList[Throwable], Unit]]), + Channel.bounded[F2, F2[Att[O2]]](concurrency), + Deferred[F2, Unit], Deferred[F2, Unit] - ).mapN { (semaphore, channel, result, stopReading) => + ).mapN { (semaphore, channel, stop, end) => val releaseAndCheckCompletion = semaphore.release *> - semaphore.available - .product(result.get) - .flatMap { - case (`concurrency`, Some(_)) => channel.close.void - case _ => ().pure[F2] - } - - val succeed = - result.update { - case None => ().asRight.some - case other => other - } - - val cancelled = stopReading.complete(()) *> succeed - - def failed(ex: Throwable) = - stopReading.complete(()) *> - result.update { - case Some(Left(nel)) => nel.prepend(ex).asLeft.some - case _ => NonEmptyList.one(ex).asLeft.some - } - - val completeStream = - Stream.force { - result.get.map { - case Some(Left(nel)) => Stream.raiseError[F2](CompositeFailure.fromNel(nel)) - case _ => Stream.empty + semaphore.available.flatMap { + case `concurrency` => channel.close *> end.complete(()).void + case _ => ().pure[F2] } - } def forkOnElem(el: O): F2[Unit] = - semaphore.acquire *> - f(el).attempt - .race(stopReading.get) - .flatMap { - case Left(Left(ex)) => failed(ex) - case Left(Right(a)) => channel.send(a).void - case Right(_) => ().pure[F2] + F.uncancelable { poll => + poll(semaphore.acquire) <* + Deferred[F2, Unit].flatMap { pushed => + initFork(channel, pushed.complete(()).void).flatMap { send => + val action = stop.get.race(f(el).attempt.flatMap(send) <* pushed.get) + F.start(poll(action).guarantee(releaseAndCheckCompletion)) + } } - .guarantee(releaseAndCheckCompletion) - .start - .void + } val background = Stream.exec(semaphore.acquire) ++ - interruptWhen(stopReading.get.map(_.asRight[Throwable])) + interruptWhen(stop.get.map(_.asRight[Throwable])) .foreach(forkOnElem) .onFinalizeCase { - case ExitCase.Succeeded => succeed *> releaseAndCheckCompletion - case ExitCase.Errored(ex) => failed(ex) *> releaseAndCheckCompletion - case ExitCase.Canceled => cancelled *> releaseAndCheckCompletion + case ExitCase.Succeeded => releaseAndCheckCompletion + case _ => stop.complete(()) *> releaseAndCheckCompletion } - channel.stream.concurrently(background) ++ completeStream + val foreground = channel.stream.evalMap(identity).rethrow + foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background) } Stream.force(action) diff --git a/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala new file mode 100644 index 0000000000..656e25040a --- /dev/null +++ b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 + +import cats.effect.IO +import cats.effect.kernel.{Deferred, Ref} +import cats.effect.std.CountDownLatch +import cats.syntax.all._ +import org.scalacheck.effect.PropF.forAllF + +import scala.concurrent.duration._ + +class ParEvalMapSuite extends Fs2Suite { + + private implicit class verifyOps[T](val action: IO[T]) { + def assertNotCompletes(): IO[Unit] = + IO.race(IO.sleep(1.second), action).assertEquals(Left(())) + + def assertCompletes(expected: T): IO[Unit] = + IO.race(IO.sleep(1.second), action).assertEquals(Right(expected)) + } + + val u: IO[Unit] = ().pure[IO] + val ex: IO[Unit] = IO.raiseError(new RuntimeException) + + group("issue-2686, max distance of concurrently computing elements") { + test("shouldn't exceed maxConcurrent in parEvalMap") { + run(_.parEvalMap(2)(identity)).assertNotCompletes() + } + + test("can exceed maxConcurrent in parEvalMapUnordered") { + val action = run(_.parEvalMapUnordered(2)(identity)) + action.assertCompletes(Right(())) + } + + def run(pipe: Pipe[IO, IO[Unit], Unit]): IO[Either[Unit, Unit]] = + Deferred[IO, Unit].flatMap { d => + val stream = Stream(IO.sleep(1.minute), u, d.complete(()).void).covary[IO] + IO.race(stream.through(pipe).compile.drain, d.get) + } + } + + group("order") { + + test("should be preserved in parEvalMap") { + run(_.parEvalMap(Int.MaxValue)(identity)).assertEquals(List(3, 2, 1)) + } + + test("may not be preserved in parEvalMapUnordered") { + run(_.parEvalMapUnordered(Int.MaxValue)(identity)).assertEquals(List(1, 2, 3)) + } + + def run(pipe: Pipe[IO, IO[Int], Int]) = + Stream + .emits(List(3, 2, 1)) + .map(i => IO.sleep(50.millis * i).as(i)) + .covary[IO] + .through(pipe) + .compile + .toList + } + + group("should limit concurrency in") { + + test("parEvalMapUnordered") { + forAllF { (l: Int, p: Int) => + val length = math.abs(l % 100) + 1 + val parallel = math.abs(p % 20) + 2 + val requested = math.min(length, parallel) + val action = runWithLatch(length, requested, _.parEvalMapUnordered(parallel)(identity)) + action.assertCompletes(()) + } + } + + test("parEvalMap") { + forAllF { (l: Int, p: Int) => + val length = math.abs(l % 100) + 1 + val parallel = math.abs(p % 20) + 2 + val requested = math.min(length, parallel) + val action = runWithLatch(length, requested, _.parEvalMap(parallel)(identity)) + action.assertCompletes(()) + } + } + + test("parEvalMapUnordered can't launch more than Stream size") { + val action = runWithLatch(100, 101, _.parEvalMapUnordered(Int.MaxValue)(identity)) + action.assertNotCompletes() + } + + test("parEvalMap can't launch more than Stream size") { + val action = runWithLatch(100, 101, _.parEvalMap(Int.MaxValue)(identity)) + action.assertNotCompletes() + } + + test("parEvalMapUnordered shouldn't launch more than maxConcurrent") { + val action = runWithLatch(100, 21, _.parEvalMapUnordered(20)(identity)) + action.assertNotCompletes() + } + + test("parEvalMap shouldn't launch more than maxConcurrent") { + val action = runWithLatch(100, 21, _.parEvalMap(20)(identity)) + action.assertNotCompletes() + } + + def runWithLatch(length: Int, parallel: Int, pipe: Pipe[IO, IO[Unit], Unit]) = + CountDownLatch[IO](parallel).flatMap { latch => + Stream(latch.release *> latch.await).repeatN(length).through(pipe).compile.drain + } + } + + group("if two errors happens only one should be reported") { + test("parEvalMapUnordered") { + forAllF { (i: Int) => + val amount = math.abs(i % 10) + 1 + CountDownLatch[IO](amount).flatMap { latch => + val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] + stream + .parEvalMapUnordered(amount)(identity) + .compile + .drain + .intercept[RuntimeException] + .void + } + } + } + + test("parEvalMap") { + forAllF { (i: Int) => + val amount = math.abs(i % 10) + 1 + CountDownLatch[IO](amount).flatMap { latch => + val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] + stream.parEvalMap(amount)(identity).compile.drain.intercept[RuntimeException].void + } + } + } + } + + group("if error happens after stream succeeds error should be lost") { + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(Int.MaxValue)(identity)) + } + + test("parEvalMap") { + check(_.parEvalMap(Int.MaxValue)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]) = + Deferred[IO, Unit].flatMap { d => + val simple = Stream(u, (d.get *> ex).uncancelable).covary[IO] + val stream = simple.through(pipe).take(1).productL(Stream.eval(d.complete(()).void)) + stream.compile.toList.assertEquals(List(())) + } + } + + group("cancels unneeded") { + + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(2)(identity)) + } + + test("parEvalMapUnordered") { + check(_.parEvalMap(2)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]) = + Deferred[IO, Unit].flatMap { d => + val cancelled = IO.never.onCancel(d.complete(()).void) + val stream = Stream(u, cancelled).covary[IO] + val action = stream.through(pipe).take(1).compile.drain + action *> d.get.assertCompletes(()) + } + } + + group("waits for uncancellable completion") { + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(2)(identity)) + } + + test("parEvalMap") { + check(_.parEvalMap(2)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]): IO[Unit] = { + val uncancMsg = "uncancellable" + val onFin2Msg = "onFin2" + + Ref[IO] + .of(List.empty[String]) + .flatMap { ref => + val io = ref.update(uncancMsg :: _).void + val stream = Stream(u, io.uncancelable).covary[IO] + val onFin2 = ref.update(onFin2Msg :: _) + val action = stream.through(pipe).take(1).compile.drain <* onFin2 + action *> ref.get + } + .assertEquals(List(onFin2Msg, uncancMsg)) + } + } +} diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index 7599271cad..f02320e282 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -987,4 +987,47 @@ class StreamSuite extends Fs2Suite { identity(p) // Avoid unused warning assert(compileErrors("Stream.eval(IO(1)).through(p)").nonEmpty) } + + group("parEvalMap") { + + test("should preserve element ordering") { + forAllF { (stream: Stream[Pure, Int]) => + val smaller = stream.map(i => math.abs(i % 10)) + val delayed = smaller.covary[IO].parEvalMap(Int.MaxValue)(i => IO.sleep(i.millis).as(i)) + delayed.compile.toList.assertEquals(smaller.toList) + } + } + + test("should launch no more than maxConcurrent") { + forAllF { (stream: Stream[Pure, Unit], int: Int) => + val concurrency = math.abs(int % 20) + 1 + Ref[IO] + .of(0) + .flatMap { open => + stream.covary[IO].parEvalMap(concurrency)(_ => findOpen(open)).compile.toList + } + .map(_.forall(_ <= concurrency)) + .assert + } + } + + test("unordered should launch no more than maxConcurrent") { + forAllF { (stream: Stream[Pure, Unit], int: Int) => + val concurrency = math.abs(int % 20) + 1 + Ref[IO] + .of(0) + .flatMap { open => + stream.covary[IO].parEvalMapUnordered(concurrency)(_ => findOpen(open)).compile.toList + } + .map(_.forall(_ <= concurrency)) + .assert + } + } + + def findOpen(open: Ref[IO, Int]) = { + val start = open.update(_ + 1) *> IO.sleep(5.millis) *> open.get + val end = IO.sleep(5.millis) *> open.getAndUpdate(_ - 1) + start *> end + } + } } diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 1ace539fd8..e1d50b23d9 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -107,4 +107,16 @@ class ChannelSuite extends Fs2Suite { p.assertEquals(v) } + test("Closes before channel elements are depleted") { + val p = for { + chan <- Channel.unbounded[IO, Unit] + _ <- chan.send(()) + _ <- chan.close + isClosedBefore <- chan.isClosed + _ <- chan.stream.compile.toVector + } yield isClosedBefore + + p.assertEquals(true) + } + } From 0c15b22be2a103506fd9057e3b6e68dff2be2bda Mon Sep 17 00:00:00 2001 From: nikiforo Date: Tue, 16 Nov 2021 16:21:41 +0300 Subject: [PATCH 2/7] par-eval-v3 --- core/shared/src/main/scala/fs2/Stream.scala | 25 +- .../src/test/scala/fs2/ParEvalMapSuite.scala | 218 ------------------ .../src/test/scala/fs2/StreamSuite.scala | 217 ++++++++++++++--- 3 files changed, 199 insertions(+), 261 deletions(-) delete mode 100644 core/shared/src/test/scala/fs2/ParEvalMapSuite.scala diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 85df335e92..132a11e439 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2025,8 +2025,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, )(implicit F2: Applicative[F2]): Stream[F2, O] = new Stream(Pull.acquire[F2, Unit](F2.unit, (_, ec) => f(ec)).flatMap(_ => underlying)) - private type Att[O2] = Either[Throwable, O2] - /** Like [[Stream#evalMap]], but will evaluate effects in parallel, emitting the results * downstream in the same order as the input stream. The number of concurrent effects * is limited by the `maxConcurrent` parameter. @@ -2043,9 +2041,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def parEvalMap[F2[x] >: F[x], O2]( maxConcurrent: Int )(f: O => F2[O2])(implicit F: Concurrent[F2]): Stream[F2, O2] = { - def init(ch: Channel[F2, F2[Att[O2]]], release: F2[Unit]) = - Deferred[F2, Att[O2]].flatTap(value => ch.send(value.get <* release)) - def send(v: Deferred[F2, Att[O2]]) = (el: Att[O2]) => v.complete(el).void + def init(ch: Channel[F2, F2[Either[Throwable, O2]]], release: F2[Unit]) = + Deferred[F2, Either[Throwable, O2]].flatTap(value => ch.send(value.get <* release)) + def send(v: Deferred[F2, Either[Throwable, O2]]) = + (el: Either[Throwable, O2]) => v.complete(el).void parEvalMapAction(maxConcurrent, f)((ch, release) => init(ch, release).map(send)) } @@ -2066,8 +2065,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, maxConcurrent: Int )(f: O => F2[O2]): Stream[F2, O2] = { val init = ().pure[F2] - def send(ch: Channel[F2, F2[Att[O2]]], release: F2[Unit]) = - (el: Att[O2]) => ch.send(el.pure[F2]) *> release + def send(ch: Channel[F2, F2[Either[Throwable, O2]]], release: F2[Unit]) = + (el: Either[Throwable, O2]) => ch.send(el.pure[F2]) *> release parEvalMapAction(maxConcurrent, f)((ch, release) => init.as(send(ch, release))) } @@ -2076,7 +2075,12 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, ]: Concurrent, O2, T]( maxConcurrent: Int, f: O => F2[O2] - )(initFork: (Channel[F2, F2[Att[O2]]], F2[Unit]) => F2[Att[O2] => F2[Unit]]): Stream[F2, O2] = + )( + initFork: ( + Channel[F2, F2[Either[Throwable, O2]]], + F2[Unit] + ) => F2[Either[Throwable, O2] => F2[Unit]] + ): Stream[F2, O2] = if (maxConcurrent == 1) evalMap(f) else { val F = Concurrent[F2] @@ -2087,7 +2091,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val action = ( Semaphore[F2](concurrency.toLong), - Channel.bounded[F2, F2[Att[O2]]](concurrency), + Channel.bounded[F2, F2[Either[Throwable, O2]]](concurrency), Deferred[F2, Unit], Deferred[F2, Unit] ).mapN { (semaphore, channel, stop, end) => @@ -2102,7 +2106,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, F.uncancelable { poll => poll(semaphore.acquire) <* Deferred[F2, Unit].flatMap { pushed => - initFork(channel, pushed.complete(()).void).flatMap { send => + val init = initFork(channel, pushed.complete(()).void) + poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => val action = stop.get.race(f(el).attempt.flatMap(send) <* pushed.get) F.start(poll(action).guarantee(releaseAndCheckCompletion)) } diff --git a/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala deleted file mode 100644 index 656e25040a..0000000000 --- a/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright (c) 2013 Functional Streams for Scala - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -package fs2 - -import cats.effect.IO -import cats.effect.kernel.{Deferred, Ref} -import cats.effect.std.CountDownLatch -import cats.syntax.all._ -import org.scalacheck.effect.PropF.forAllF - -import scala.concurrent.duration._ - -class ParEvalMapSuite extends Fs2Suite { - - private implicit class verifyOps[T](val action: IO[T]) { - def assertNotCompletes(): IO[Unit] = - IO.race(IO.sleep(1.second), action).assertEquals(Left(())) - - def assertCompletes(expected: T): IO[Unit] = - IO.race(IO.sleep(1.second), action).assertEquals(Right(expected)) - } - - val u: IO[Unit] = ().pure[IO] - val ex: IO[Unit] = IO.raiseError(new RuntimeException) - - group("issue-2686, max distance of concurrently computing elements") { - test("shouldn't exceed maxConcurrent in parEvalMap") { - run(_.parEvalMap(2)(identity)).assertNotCompletes() - } - - test("can exceed maxConcurrent in parEvalMapUnordered") { - val action = run(_.parEvalMapUnordered(2)(identity)) - action.assertCompletes(Right(())) - } - - def run(pipe: Pipe[IO, IO[Unit], Unit]): IO[Either[Unit, Unit]] = - Deferred[IO, Unit].flatMap { d => - val stream = Stream(IO.sleep(1.minute), u, d.complete(()).void).covary[IO] - IO.race(stream.through(pipe).compile.drain, d.get) - } - } - - group("order") { - - test("should be preserved in parEvalMap") { - run(_.parEvalMap(Int.MaxValue)(identity)).assertEquals(List(3, 2, 1)) - } - - test("may not be preserved in parEvalMapUnordered") { - run(_.parEvalMapUnordered(Int.MaxValue)(identity)).assertEquals(List(1, 2, 3)) - } - - def run(pipe: Pipe[IO, IO[Int], Int]) = - Stream - .emits(List(3, 2, 1)) - .map(i => IO.sleep(50.millis * i).as(i)) - .covary[IO] - .through(pipe) - .compile - .toList - } - - group("should limit concurrency in") { - - test("parEvalMapUnordered") { - forAllF { (l: Int, p: Int) => - val length = math.abs(l % 100) + 1 - val parallel = math.abs(p % 20) + 2 - val requested = math.min(length, parallel) - val action = runWithLatch(length, requested, _.parEvalMapUnordered(parallel)(identity)) - action.assertCompletes(()) - } - } - - test("parEvalMap") { - forAllF { (l: Int, p: Int) => - val length = math.abs(l % 100) + 1 - val parallel = math.abs(p % 20) + 2 - val requested = math.min(length, parallel) - val action = runWithLatch(length, requested, _.parEvalMap(parallel)(identity)) - action.assertCompletes(()) - } - } - - test("parEvalMapUnordered can't launch more than Stream size") { - val action = runWithLatch(100, 101, _.parEvalMapUnordered(Int.MaxValue)(identity)) - action.assertNotCompletes() - } - - test("parEvalMap can't launch more than Stream size") { - val action = runWithLatch(100, 101, _.parEvalMap(Int.MaxValue)(identity)) - action.assertNotCompletes() - } - - test("parEvalMapUnordered shouldn't launch more than maxConcurrent") { - val action = runWithLatch(100, 21, _.parEvalMapUnordered(20)(identity)) - action.assertNotCompletes() - } - - test("parEvalMap shouldn't launch more than maxConcurrent") { - val action = runWithLatch(100, 21, _.parEvalMap(20)(identity)) - action.assertNotCompletes() - } - - def runWithLatch(length: Int, parallel: Int, pipe: Pipe[IO, IO[Unit], Unit]) = - CountDownLatch[IO](parallel).flatMap { latch => - Stream(latch.release *> latch.await).repeatN(length).through(pipe).compile.drain - } - } - - group("if two errors happens only one should be reported") { - test("parEvalMapUnordered") { - forAllF { (i: Int) => - val amount = math.abs(i % 10) + 1 - CountDownLatch[IO](amount).flatMap { latch => - val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] - stream - .parEvalMapUnordered(amount)(identity) - .compile - .drain - .intercept[RuntimeException] - .void - } - } - } - - test("parEvalMap") { - forAllF { (i: Int) => - val amount = math.abs(i % 10) + 1 - CountDownLatch[IO](amount).flatMap { latch => - val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] - stream.parEvalMap(amount)(identity).compile.drain.intercept[RuntimeException].void - } - } - } - } - - group("if error happens after stream succeeds error should be lost") { - test("parEvalMapUnordered") { - check(_.parEvalMapUnordered(Int.MaxValue)(identity)) - } - - test("parEvalMap") { - check(_.parEvalMap(Int.MaxValue)(identity)) - } - - def check(pipe: Pipe[IO, IO[Unit], Unit]) = - Deferred[IO, Unit].flatMap { d => - val simple = Stream(u, (d.get *> ex).uncancelable).covary[IO] - val stream = simple.through(pipe).take(1).productL(Stream.eval(d.complete(()).void)) - stream.compile.toList.assertEquals(List(())) - } - } - - group("cancels unneeded") { - - test("parEvalMapUnordered") { - check(_.parEvalMapUnordered(2)(identity)) - } - - test("parEvalMapUnordered") { - check(_.parEvalMap(2)(identity)) - } - - def check(pipe: Pipe[IO, IO[Unit], Unit]) = - Deferred[IO, Unit].flatMap { d => - val cancelled = IO.never.onCancel(d.complete(()).void) - val stream = Stream(u, cancelled).covary[IO] - val action = stream.through(pipe).take(1).compile.drain - action *> d.get.assertCompletes(()) - } - } - - group("waits for uncancellable completion") { - test("parEvalMapUnordered") { - check(_.parEvalMapUnordered(2)(identity)) - } - - test("parEvalMap") { - check(_.parEvalMap(2)(identity)) - } - - def check(pipe: Pipe[IO, IO[Unit], Unit]): IO[Unit] = { - val uncancMsg = "uncancellable" - val onFin2Msg = "onFin2" - - Ref[IO] - .of(List.empty[String]) - .flatMap { ref => - val io = ref.update(uncancMsg :: _).void - val stream = Stream(u, io.uncancelable).covary[IO] - val onFin2 = ref.update(onFin2Msg :: _) - val action = stream.through(pipe).take(1).compile.drain <* onFin2 - action *> ref.get - } - .assertEquals(List(onFin2Msg, uncancMsg)) - } - } -} diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index f02320e282..4348c458d3 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -23,16 +23,14 @@ package fs2 import scala.annotation.{nowarn, tailrec} import scala.concurrent.duration._ - import cats.data.Chain import cats.effect.{Deferred, IO, Outcome, Ref, Resource, SyncIO} -import cats.effect.std.Queue +import cats.effect.std.{CountDownLatch, Queue} import cats.syntax.all._ import org.scalacheck.Gen import org.scalacheck.Arbitrary.arbitrary import org.scalacheck.Prop.forAll import org.scalacheck.effect.PropF.forAllF - import fs2.concurrent.SignallingRef @nowarn("cat=w-flag-dead-code") @@ -988,46 +986,199 @@ class StreamSuite extends Fs2Suite { assert(compileErrors("Stream.eval(IO(1)).through(p)").nonEmpty) } - group("parEvalMap") { + private implicit class verifyOps[T](val action: IO[T]) { + def assertNotCompletes(): IO[Unit] = + IO.race(IO.sleep(1.second), action).assertEquals(Left(())) + + def assertCompletes(expected: T): IO[Unit] = + IO.race(IO.sleep(1.second), action).assertEquals(Right(expected)) + } + + val u: IO[Unit] = ().pure[IO] + + val ex: IO[Unit] = IO.raiseError(new RuntimeException) + + group("issue-2686, max distance of concurrently computing elements") { + test("shouldn't exceed maxConcurrent in parEvalMap") { + run(_.parEvalMap(2)(identity)).assertNotCompletes() + } + + test("can exceed maxConcurrent in parEvalMapUnordered") { + val action = run(_.parEvalMapUnordered(2)(identity)) + action.assertCompletes(Right(())) + } + + def run(pipe: Pipe[IO, IO[Unit], Unit]): IO[Either[Unit, Unit]] = + Deferred[IO, Unit].flatMap { d => + val stream = Stream(IO.sleep(1.minute), u, d.complete(()).void).covary[IO] + IO.race(stream.through(pipe).compile.drain, d.get) + } + } + + group("order") { - test("should preserve element ordering") { - forAllF { (stream: Stream[Pure, Int]) => - val smaller = stream.map(i => math.abs(i % 10)) - val delayed = smaller.covary[IO].parEvalMap(Int.MaxValue)(i => IO.sleep(i.millis).as(i)) - delayed.compile.toList.assertEquals(smaller.toList) + test("should be preserved in parEvalMap") { + forAllF { s: Stream[Pure, Int] => + s.zipWithIndex + .covary[IO] + .parEvalMap(Int.MaxValue) { case (i, ind) => IO.sleep((ind % 3).millis).as(i) } + .compile + .toList + .assertEquals(s.toList) } } - test("should launch no more than maxConcurrent") { - forAllF { (stream: Stream[Pure, Unit], int: Int) => - val concurrency = math.abs(int % 20) + 1 - Ref[IO] - .of(0) - .flatMap { open => - stream.covary[IO].parEvalMap(concurrency)(_ => findOpen(open)).compile.toList - } - .map(_.forall(_ <= concurrency)) - .assert + test("may not be preserved in parEvalMapUnordered") { + run(_.parEvalMapUnordered(Int.MaxValue)(identity)).assertEquals(List(1, 2, 3)) + } + + def run(pipe: Pipe[IO, IO[Int], Int]) = + Stream + .emits(List(3, 2, 1)) + .map(i => IO.sleep(50.millis * i).as(i)) + .covary[IO] + .through(pipe) + .compile + .toList + } + + group("should limit concurrency in") { + + test("parEvalMapUnordered") { + forAllF { (l: Int, p: Int) => + val length = math.abs(l % 100) + 1 + val parallel = math.abs(p % 20) + 2 + val requested = math.min(length, parallel) + val action = runWithLatch(length, requested, _.parEvalMapUnordered(parallel)(identity)) + action.assertCompletes(()) } } - test("unordered should launch no more than maxConcurrent") { - forAllF { (stream: Stream[Pure, Unit], int: Int) => - val concurrency = math.abs(int % 20) + 1 - Ref[IO] - .of(0) - .flatMap { open => - stream.covary[IO].parEvalMapUnordered(concurrency)(_ => findOpen(open)).compile.toList - } - .map(_.forall(_ <= concurrency)) - .assert + test("parEvalMap") { + forAllF { (l: Int, p: Int) => + val length = math.abs(l % 100) + 1 + val parallel = math.abs(p % 20) + 2 + val requested = math.min(length, parallel) + val action = runWithLatch(length, requested, _.parEvalMap(parallel)(identity)) + action.assertCompletes(()) + } + } + + test("parEvalMapUnordered can't launch more than Stream size") { + val action = runWithLatch(100, 101, _.parEvalMapUnordered(Int.MaxValue)(identity)) + action.assertNotCompletes() + } + + test("parEvalMap can't launch more than Stream size") { + val action = runWithLatch(100, 101, _.parEvalMap(Int.MaxValue)(identity)) + action.assertNotCompletes() + } + + test("parEvalMapUnordered shouldn't launch more than maxConcurrent") { + val action = runWithLatch(100, 21, _.parEvalMapUnordered(20)(identity)) + action.assertNotCompletes() + } + + test("parEvalMap shouldn't launch more than maxConcurrent") { + val action = runWithLatch(100, 21, _.parEvalMap(20)(identity)) + action.assertNotCompletes() + } + + def runWithLatch(length: Int, parallel: Int, pipe: Pipe[IO, IO[Unit], Unit]) = + CountDownLatch[IO](parallel).flatMap { latch => + Stream(latch.release *> latch.await).repeatN(length).through(pipe).compile.drain + } + } + + group("if two errors happens only one should be reported") { + test("parEvalMapUnordered") { + forAllF { (i: Int) => + val amount = math.abs(i % 10) + 1 + CountDownLatch[IO](amount).flatMap { latch => + val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] + stream + .parEvalMapUnordered(amount)(identity) + .compile + .drain + .intercept[RuntimeException] + .void + } + } + } + + test("parEvalMap") { + forAllF { (i: Int) => + val amount = math.abs(i % 10) + 1 + CountDownLatch[IO](amount).flatMap { latch => + val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] + stream.parEvalMap(amount)(identity).compile.drain.intercept[RuntimeException].void + } + } + } + } + + group("if error happens after stream succeeds error should be lost") { + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(Int.MaxValue)(identity)) + } + + test("parEvalMap") { + check(_.parEvalMap(Int.MaxValue)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]) = + Deferred[IO, Unit].flatMap { d => + val simple = Stream(u, (d.get *> ex).uncancelable).covary[IO] + val stream = simple.through(pipe).take(1).productL(Stream.eval(d.complete(()).void)) + stream.compile.toList.assertEquals(List(())) + } + } + + group("cancels unneeded") { + + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(2)(identity)) + } + + test("parEvalMapUnordered") { + check(_.parEvalMap(2)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]) = + Deferred[IO, Unit].flatMap { d => + val cancelled = IO.never.onCancel(d.complete(()).void) + val stream = Stream(u, cancelled).covary[IO] + val action = stream.through(pipe).take(1).compile.drain + action *> d.get.assertCompletes(()) } + } + + group("waits for uncancellable completion") { + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(2)(identity)) } - def findOpen(open: Ref[IO, Int]) = { - val start = open.update(_ + 1) *> IO.sleep(5.millis) *> open.get - val end = IO.sleep(5.millis) *> open.getAndUpdate(_ - 1) - start *> end + test("parEvalMap") { + check(_.parEvalMap(2)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]): IO[Unit] = { + val uncancMsg = "uncancellable" + val onFin2Msg = "onFin2" + + Ref[IO] + .of(List.empty[String]) + .flatMap { ref => + val io = ref.update(uncancMsg :: _).void + val onFin2 = ref.update(onFin2Msg :: _) + CountDownLatch[IO](2).flatMap { latch => + val w = latch.release *> latch.await + val stream = Stream(w *> u, (w *> io).uncancelable).covary[IO] + val action = stream.through(pipe).take(1).compile.drain <* onFin2 + action *> ref.get + } + } + .assertEquals(List(onFin2Msg, uncancMsg)) } } } From 4cb3ad4b9da85c4d82169e91ece831c696f23a4b Mon Sep 17 00:00:00 2001 From: nikiforo Date: Wed, 17 Nov 2021 17:21:50 +0300 Subject: [PATCH 3/7] parEvalMap --- core/shared/src/main/scala/fs2/Stream.scala | 47 ++++++++++--------- .../src/test/scala/fs2/StreamSuite.scala | 8 +--- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 132a11e439..9a8f32b557 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -26,7 +26,7 @@ import scala.concurrent.TimeoutException import scala.concurrent.duration._ import cats.{Eval => _, _} import cats.data.Ior -import cats.effect.{Concurrent, SyncIO} +import cats.effect.{Concurrent, IO, SyncIO} import cats.effect.kernel._ import cats.effect.kernel.implicits._ import cats.effect.std.{Console, Queue, QueueSink, QueueSource, Semaphore} @@ -2041,10 +2041,15 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def parEvalMap[F2[x] >: F[x], O2]( maxConcurrent: Int )(f: O => F2[O2])(implicit F: Concurrent[F2]): Stream[F2, O2] = { - def init(ch: Channel[F2, F2[Either[Throwable, O2]]], release: F2[Unit]) = - Deferred[F2, Either[Throwable, O2]].flatTap(value => ch.send(value.get <* release)) - def send(v: Deferred[F2, Either[Throwable, O2]]) = - (el: Either[Throwable, O2]) => v.complete(el).void + + def init(ch: Channel[F2, F2[Outcome[F2, Throwable, O2]]], release: F2[Unit]) = + Deferred[F2, Outcome[F2, Throwable, O2]].flatTap { value => + ch.send(release *> value.get) + } + + def send(v: Deferred[F2, Outcome[F2, Throwable, O2]]) = + (el: Outcome[F2, Throwable, O2]) => v.complete(el).void + parEvalMapAction(maxConcurrent, f)((ch, release) => init(ch, release).map(send)) } @@ -2059,31 +2064,29 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * res0: Unit = () * }}} */ - def parEvalMapUnordered[F2[x] >: F[ - x - ]: Concurrent, O2]( + def parEvalMapUnordered[F2[x] >: F[x], O2]( maxConcurrent: Int - )(f: O => F2[O2]): Stream[F2, O2] = { + )(f: O => F2[O2])(implicit F: Concurrent[F2]): Stream[F2, O2] = { + val init = ().pure[F2] - def send(ch: Channel[F2, F2[Either[Throwable, O2]]], release: F2[Unit]) = - (el: Either[Throwable, O2]) => ch.send(el.pure[F2]) *> release + + def send(ch: Channel[F2, F2[Outcome[F2, Throwable, O2]]], release: F2[Unit]) = + (el: Outcome[F2, Throwable, O2]) => release <* ch.send(el.pure[F2]) + parEvalMapAction(maxConcurrent, f)((ch, release) => init.as(send(ch, release))) } - private def parEvalMapAction[F2[x] >: F[ - x - ]: Concurrent, O2, T]( + private def parEvalMapAction[F2[x] >: F[x], O2, T]( maxConcurrent: Int, f: O => F2[O2] )( initFork: ( - Channel[F2, F2[Either[Throwable, O2]]], + Channel[F2, F2[Outcome[F2, Throwable, O2]]], F2[Unit] - ) => F2[Either[Throwable, O2] => F2[Unit]] - ): Stream[F2, O2] = + ) => F2[Outcome[F2, Throwable, O2] => F2[Unit]] + )(implicit F: Concurrent[F2]): Stream[F2, O2] = if (maxConcurrent == 1) evalMap(f) else { - val F = Concurrent[F2] assert(maxConcurrent > 0, "maxConcurrent must be > 0, was: " + maxConcurrent) // One is taken by inner stream read. @@ -2091,7 +2094,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val action = ( Semaphore[F2](concurrency.toLong), - Channel.bounded[F2, F2[Either[Throwable, O2]]](concurrency), + Channel.bounded[F2, F2[Outcome[F2, Throwable, O2]]](concurrency), Deferred[F2, Unit], Deferred[F2, Unit] ).mapN { (semaphore, channel, stop, end) => @@ -2108,8 +2111,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Deferred[F2, Unit].flatMap { pushed => val init = initFork(channel, pushed.complete(()).void) poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => - val action = stop.get.race(f(el).attempt.flatMap(send) <* pushed.get) - F.start(poll(action).guarantee(releaseAndCheckCompletion)) + val action = f(el).guaranteeCase(send) *> pushed.get + F.start(stop.get.race(action).guarantee(releaseAndCheckCompletion)) } } } @@ -2123,7 +2126,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, case _ => stop.complete(()) *> releaseAndCheckCompletion } - val foreground = channel.stream.evalMap(identity).rethrow + val foreground = channel.stream.evalMap(_.flatMap(_.embed(F.canceled >> F.never))) foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background) } diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index 4348c458d3..cb2e2f82d9 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1019,12 +1019,8 @@ class StreamSuite extends Fs2Suite { test("should be preserved in parEvalMap") { forAllF { s: Stream[Pure, Int] => - s.zipWithIndex - .covary[IO] - .parEvalMap(Int.MaxValue) { case (i, ind) => IO.sleep((ind % 3).millis).as(i) } - .compile - .toList - .assertEquals(s.toList) + val s2 = s.covary[IO].parEvalMap(Int.MaxValue)(i => IO.sleep(math.abs(i % 3).millis).as(i)) + s2.compile.toList.assertEquals(s.toList) } } From a63b61cc7b8b0fa5391136db61114173627884c9 Mon Sep 17 00:00:00 2001 From: nikiforo Date: Sun, 21 Nov 2021 18:21:28 +0300 Subject: [PATCH 4/7] par-eval-v3 - remove unneeded cancellation handling --- core/shared/src/main/scala/fs2/Stream.scala | 24 ++--- .../src/test/scala/fs2/StreamSuite.scala | 101 +++++++++++------- 2 files changed, 73 insertions(+), 52 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 9a8f32b557..0e7c02f245 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2042,13 +2042,13 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, maxConcurrent: Int )(f: O => F2[O2])(implicit F: Concurrent[F2]): Stream[F2, O2] = { - def init(ch: Channel[F2, F2[Outcome[F2, Throwable, O2]]], release: F2[Unit]) = - Deferred[F2, Outcome[F2, Throwable, O2]].flatTap { value => + def init(ch: Channel[F2, F2[Either[Throwable, O2]]], release: F2[Unit]) = + Deferred[F2, Either[Throwable, O2]].flatTap { value => ch.send(release *> value.get) } - def send(v: Deferred[F2, Outcome[F2, Throwable, O2]]) = - (el: Outcome[F2, Throwable, O2]) => v.complete(el).void + def send(v: Deferred[F2, Either[Throwable, O2]]) = + (el: Either[Throwable, O2]) => v.complete(el).void parEvalMapAction(maxConcurrent, f)((ch, release) => init(ch, release).map(send)) } @@ -2070,8 +2070,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val init = ().pure[F2] - def send(ch: Channel[F2, F2[Outcome[F2, Throwable, O2]]], release: F2[Unit]) = - (el: Outcome[F2, Throwable, O2]) => release <* ch.send(el.pure[F2]) + def send(ch: Channel[F2, F2[Either[Throwable, O2]]], release: F2[Unit]) = + (el: Either[Throwable, O2]) => release <* ch.send(el.pure[F2]) parEvalMapAction(maxConcurrent, f)((ch, release) => init.as(send(ch, release))) } @@ -2081,9 +2081,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, f: O => F2[O2] )( initFork: ( - Channel[F2, F2[Outcome[F2, Throwable, O2]]], + Channel[F2, F2[Either[Throwable, O2]]], F2[Unit] - ) => F2[Outcome[F2, Throwable, O2] => F2[Unit]] + ) => F2[Either[Throwable, O2] => F2[Unit]] )(implicit F: Concurrent[F2]): Stream[F2, O2] = if (maxConcurrent == 1) evalMap(f) else { @@ -2094,7 +2094,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val action = ( Semaphore[F2](concurrency.toLong), - Channel.bounded[F2, F2[Outcome[F2, Throwable, O2]]](concurrency), + Channel.bounded[F2, F2[Either[Throwable, O2]]](concurrency), Deferred[F2, Unit], Deferred[F2, Unit] ).mapN { (semaphore, channel, stop, end) => @@ -2111,8 +2111,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Deferred[F2, Unit].flatMap { pushed => val init = initFork(channel, pushed.complete(()).void) poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => - val action = f(el).guaranteeCase(send) *> pushed.get - F.start(stop.get.race(action).guarantee(releaseAndCheckCompletion)) + val action = f(el).attempt.flatMap(send) *> pushed.get + F.start(stop.get.race(action) *> releaseAndCheckCompletion) } } } @@ -2126,7 +2126,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, case _ => stop.complete(()) *> releaseAndCheckCompletion } - val foreground = channel.stream.evalMap(_.flatMap(_.embed(F.canceled >> F.never))) + val foreground = channel.stream.evalMap(_.rethrow) foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background) } diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index cb2e2f82d9..06354753e2 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -987,11 +987,7 @@ class StreamSuite extends Fs2Suite { } private implicit class verifyOps[T](val action: IO[T]) { - def assertNotCompletes(): IO[Unit] = - IO.race(IO.sleep(1.second), action).assertEquals(Left(())) - - def assertCompletes(expected: T): IO[Unit] = - IO.race(IO.sleep(1.second), action).assertEquals(Right(expected)) + def assertNotCompletes(): IO[Unit] = IO.race(IO.sleep(1.second), action).assertEquals(Left(())) } val u: IO[Unit] = ().pure[IO] @@ -1005,7 +1001,7 @@ class StreamSuite extends Fs2Suite { test("can exceed maxConcurrent in parEvalMapUnordered") { val action = run(_.parEvalMapUnordered(2)(identity)) - action.assertCompletes(Right(())) + action.assertEquals(Right(())) } def run(pipe: Pipe[IO, IO[Unit], Unit]): IO[Either[Unit, Unit]] = @@ -1046,7 +1042,7 @@ class StreamSuite extends Fs2Suite { val parallel = math.abs(p % 20) + 2 val requested = math.min(length, parallel) val action = runWithLatch(length, requested, _.parEvalMapUnordered(parallel)(identity)) - action.assertCompletes(()) + action.assertEquals(()) } } @@ -1056,7 +1052,7 @@ class StreamSuite extends Fs2Suite { val parallel = math.abs(p % 20) + 2 val requested = math.min(length, parallel) val action = runWithLatch(length, requested, _.parEvalMap(parallel)(identity)) - action.assertCompletes(()) + action.assertEquals(()) } } @@ -1090,25 +1086,26 @@ class StreamSuite extends Fs2Suite { test("parEvalMapUnordered") { forAllF { (i: Int) => val amount = math.abs(i % 10) + 1 - CountDownLatch[IO](amount).flatMap { latch => - val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] - stream - .parEvalMapUnordered(amount)(identity) - .compile - .drain - .intercept[RuntimeException] - .void - } + CountDownLatch[IO](amount) + .flatMap { latch => + val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] + stream.parEvalMapUnordered(amount)(identity).compile.drain + } + .intercept[RuntimeException] + .void } } test("parEvalMap") { forAllF { (i: Int) => val amount = math.abs(i % 10) + 1 - CountDownLatch[IO](amount).flatMap { latch => - val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] - stream.parEvalMap(amount)(identity).compile.drain.intercept[RuntimeException].void - } + CountDownLatch[IO](amount) + .flatMap { latch => + val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] + stream.parEvalMap(amount)(identity).compile.drain + } + .intercept[RuntimeException] + .void } } } @@ -1123,11 +1120,34 @@ class StreamSuite extends Fs2Suite { } def check(pipe: Pipe[IO, IO[Unit], Unit]) = - Deferred[IO, Unit].flatMap { d => - val simple = Stream(u, (d.get *> ex).uncancelable).covary[IO] - val stream = simple.through(pipe).take(1).productL(Stream.eval(d.complete(()).void)) - stream.compile.toList.assertEquals(List(())) - } + IO.deferred[Unit] + .flatMap { d => + val simple = Stream(u, (d.get *> ex).uncancelable).covary[IO] + val stream = simple.through(pipe).take(1).productL(Stream.eval(d.complete(()).void)) + stream.compile.toList + } + .assertEquals(List(())) + } + + group("cancels running computations when error raised") { + + test("parEvalMapUnordered") { + check(_.parEvalMap(Int.MaxValue)(identity)) + } + + test("parEvalMap") { + check(_.parEvalMap(Int.MaxValue)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]) = + (CountDownLatch[IO](2), IO.deferred[Unit]) + .mapN { (latch, d) => + val w = latch.release *> latch.await + val s = Stream(w *> ex, w *> IO.never.onCancel(d.complete(()).void)).covary[IO] + IO.race(pipe(s).compile.drain, d.get) + } + .flatten + .assertEquals(Right(())) } group("cancels unneeded") { @@ -1136,17 +1156,19 @@ class StreamSuite extends Fs2Suite { check(_.parEvalMapUnordered(2)(identity)) } - test("parEvalMapUnordered") { + test("parEvalMap") { check(_.parEvalMap(2)(identity)) } def check(pipe: Pipe[IO, IO[Unit], Unit]) = - Deferred[IO, Unit].flatMap { d => - val cancelled = IO.never.onCancel(d.complete(()).void) - val stream = Stream(u, cancelled).covary[IO] - val action = stream.through(pipe).take(1).compile.drain - action *> d.get.assertCompletes(()) - } + IO.deferred[Unit] + .flatMap { d => + val cancelled = IO.never.onCancel(d.complete(()).void) + val stream = Stream(u, cancelled).covary[IO] + val action = stream.through(pipe).take(1).compile.drain + action *> d.get + } + .assertEquals(()) } group("waits for uncancellable completion") { @@ -1159,14 +1181,13 @@ class StreamSuite extends Fs2Suite { } def check(pipe: Pipe[IO, IO[Unit], Unit]): IO[Unit] = { - val uncancMsg = "uncancellable" - val onFin2Msg = "onFin2" + val uncancellableMsg = "uncancellable" + val onFinalizeMsg = "onFinalize" - Ref[IO] - .of(List.empty[String]) + IO.ref(Vector.empty[String]) .flatMap { ref => - val io = ref.update(uncancMsg :: _).void - val onFin2 = ref.update(onFin2Msg :: _) + val io = ref.update(_ :+ uncancellableMsg).void + val onFin2 = ref.update(_ :+ onFinalizeMsg) CountDownLatch[IO](2).flatMap { latch => val w = latch.release *> latch.await val stream = Stream(w *> u, (w *> io).uncancelable).covary[IO] @@ -1174,7 +1195,7 @@ class StreamSuite extends Fs2Suite { action *> ref.get } } - .assertEquals(List(onFin2Msg, uncancMsg)) + .assertEquals(Vector(uncancellableMsg, onFinalizeMsg)) } } } From b6000cd0bc21a1efa5c64fe2a1e445aa47b5d7ba Mon Sep 17 00:00:00 2001 From: nikiforo Date: Wed, 1 Dec 2021 01:05:22 +0300 Subject: [PATCH 5/7] par-eval-v3 - added test on issue-2726 --- .../src/test/scala/fs2/StreamSuite.scala | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index 8eba6da44d..4ebdd343be 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -986,7 +986,6 @@ class StreamSuite extends Fs2Suite { assert(compileErrors("Stream.eval(IO(1)).through(p)").nonEmpty) } - group("Stream[F, Either[Throwable, O]]") { test(".evalMap(_.pure.rethrow).mask <-> .rethrow.mask") { forAllF { (stream: Stream[Pure, Int]) => @@ -1156,10 +1155,10 @@ class StreamSuite extends Fs2Suite { .mapN { (latch, d) => val w = latch.release *> latch.await val s = Stream(w *> ex, w *> IO.never.onCancel(d.complete(()).void)).covary[IO] - IO.race(pipe(s).compile.drain, d.get) + pipe(s).compile.drain !> d.get } .flatten - .assertEquals(Right(())) + .assertEquals(()) } group("cancels unneeded") { @@ -1210,4 +1209,21 @@ class StreamSuite extends Fs2Suite { .assertEquals(Vector(uncancellableMsg, onFinalizeMsg)) } } + + group("issue-2726, Stream shouldn't hang after exceptions in") { + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(Int.MaxValue)(identity)) + } + + test("parEvalMap") { + check(_.parEvalMap(Int.MaxValue)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]): IO[Unit] = { + val iterations = 100 + val stream = Stream(IO.raiseError(new RuntimeException), IO.delay(())).covary[IO] + val action = stream.through(pipe).compile.drain.attempt.timeout(2.seconds) + (1 to iterations).toList.as(action).sequence_ + } + } } From 56560207eae60df77341396595e764dc55942cff Mon Sep 17 00:00:00 2001 From: nikiforo Date: Wed, 1 Dec 2021 17:05:27 +0300 Subject: [PATCH 6/7] increase timed interval --- core/shared/src/main/scala/fs2/Stream.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 4403ef1eb6..116181eadd 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -4455,17 +4455,17 @@ object Stream extends StreamLowPriority { * * As a quick example, let's write a timed pull which emits the * string "late!" whenever a chunk of the stream is not emitted - * within 150 milliseconds: + * within 450 milliseconds: * * @example {{{ * scala> import cats.effect.IO * scala> import cats.effect.unsafe.implicits.global * scala> import scala.concurrent.duration._ - * scala> val s = (Stream("elem") ++ Stream.sleep_[IO](200.millis)).repeat.take(3) + * scala> val s = (Stream("elem") ++ Stream.sleep_[IO](600.millis)).repeat.take(3) * scala> s.pull * | .timed { timedPull => * | def go(timedPull: Pull.Timed[IO, String]): Pull[IO, String, Unit] = - * | timedPull.timeout(150.millis) >> // starts new timeout and stops the previous one + * | timedPull.timeout(450.millis) >> // starts new timeout and stops the previous one * | timedPull.uncons.flatMap { * | case Some((Right(elems), next)) => Pull.output(elems) >> go(next) * | case Some((Left(_), next)) => Pull.output1("late!") >> go(next) From 6ef5b9d3d795260d773532ed0877dde6f3b3f8f5 Mon Sep 17 00:00:00 2001 From: nikiforo Date: Fri, 3 Dec 2021 17:15:14 +0300 Subject: [PATCH 7/7] par-eval-v3 - extracted parEvalMap* tests to a dedicated Suite --- .../src/test/scala/fs2/ParEvalMapSuite.scala | 267 ++++++++++++++++++ .../src/test/scala/fs2/StreamSuite.scala | 230 --------------- 2 files changed, 267 insertions(+), 230 deletions(-) create mode 100644 core/shared/src/test/scala/fs2/ParEvalMapSuite.scala diff --git a/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala new file mode 100644 index 0000000000..0f0058267a --- /dev/null +++ b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala @@ -0,0 +1,267 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 + +import cats.effect.std.CountDownLatch +import cats.effect.{Deferred, IO} +import cats.syntax.all._ +import org.scalacheck.effect.PropF.forAllF + +import scala.concurrent.duration._ + +class ParEvalMapSuite extends Fs2Suite { + + private implicit class verifyOps[T](val action: IO[T]) { + def assertNotCompletes(): IO[Unit] = IO.race(IO.sleep(1.second), action).assertEquals(Left(())) + } + + private val u: IO[Unit] = ().pure[IO] + + private val ex: IO[Unit] = IO.raiseError(new RuntimeException) + + group("issue-2686, max distance of concurrently computing elements") { + + test("shouldn't exceed maxConcurrent in parEvalMap") { + run(_.parEvalMap(2)(identity)).assertNotCompletes() + } + + test("can exceed maxConcurrent in parEvalMapUnordered") { + val action = run(_.parEvalMapUnordered(2)(identity)) + action.assertEquals(Right(())) + } + + def run(pipe: Pipe[IO, IO[Unit], Unit]): IO[Either[Unit, Unit]] = + Deferred[IO, Unit].flatMap { d => + val stream = Stream(IO.sleep(1.minute), u, d.complete(()).void).covary[IO] + IO.race(stream.through(pipe).compile.drain, d.get) + } + } + + group("order") { + + test("should be preserved in parEvalMap") { + forAllF { s: Stream[Pure, Int] => + val s2 = s.covary[IO].parEvalMap(Int.MaxValue)(i => IO.sleep(math.abs(i % 3).millis).as(i)) + s2.compile.toList.assertEquals(s.toList) + } + } + + test("may not be preserved in parEvalMapUnordered") { + run(_.parEvalMapUnordered(Int.MaxValue)(identity)).assertEquals(List(1, 2, 3)) + } + + def run(pipe: Pipe[IO, IO[Int], Int]) = + Stream + .emits(List(3, 2, 1)) + .map(i => IO.sleep(50.millis * i).as(i)) + .covary[IO] + .through(pipe) + .compile + .toList + } + + group("should limit concurrency in") { + + test("parEvalMapUnordered") { + forAllF { (l: Int, p: Int) => + val length = math.abs(l % 100) + 1 + val parallel = math.abs(p % 20) + 2 + val requested = math.min(length, parallel) + val action = runWithLatch(length, requested, _.parEvalMapUnordered(parallel)(identity)) + action.assertEquals(()) + } + } + + test("parEvalMap") { + forAllF { (l: Int, p: Int) => + val length = math.abs(l % 100) + 1 + val parallel = math.abs(p % 20) + 2 + val requested = math.min(length, parallel) + val action = runWithLatch(length, requested, _.parEvalMap(parallel)(identity)) + action.assertEquals(()) + } + } + + test("parEvalMapUnordered can't launch more than Stream size") { + val action = runWithLatch(100, 101, _.parEvalMapUnordered(Int.MaxValue)(identity)) + action.assertNotCompletes() + } + + test("parEvalMap can't launch more than Stream size") { + val action = runWithLatch(100, 101, _.parEvalMap(Int.MaxValue)(identity)) + action.assertNotCompletes() + } + + test("parEvalMapUnordered shouldn't launch more than maxConcurrent") { + val action = runWithLatch(100, 21, _.parEvalMapUnordered(20)(identity)) + action.assertNotCompletes() + } + + test("parEvalMap shouldn't launch more than maxConcurrent") { + val action = runWithLatch(100, 21, _.parEvalMap(20)(identity)) + action.assertNotCompletes() + } + + def runWithLatch(length: Int, parallel: Int, pipe: Pipe[IO, IO[Unit], Unit]) = + CountDownLatch[IO](parallel).flatMap { latch => + Stream(latch.release *> latch.await).repeatN(length).through(pipe).compile.drain + } + } + + group("if two errors happens only one should be reported") { + + test("parEvalMapUnordered") { + forAllF { (i: Int) => + val amount = math.abs(i % 10) + 1 + CountDownLatch[IO](amount) + .flatMap { latch => + val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] + stream.parEvalMapUnordered(amount)(identity).compile.drain + } + .intercept[RuntimeException] + .void + } + } + + test("parEvalMap") { + forAllF { (i: Int) => + val amount = math.abs(i % 10) + 1 + CountDownLatch[IO](amount) + .flatMap { latch => + val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] + stream.parEvalMap(amount)(identity).compile.drain + } + .intercept[RuntimeException] + .void + } + } + } + + group("if error happens after stream succeeds error should be lost") { + + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(Int.MaxValue)(identity)) + } + + test("parEvalMap") { + check(_.parEvalMap(Int.MaxValue)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]) = + IO.deferred[Unit] + .flatMap { d => + val simple = Stream(u, (d.get *> ex).uncancelable).covary[IO] + val stream = simple.through(pipe).take(1).productL(Stream.eval(d.complete(()).void)) + stream.compile.toList + } + .assertEquals(List(())) + } + + group("cancels running computations when error raised") { + + test("parEvalMapUnordered") { + check(_.parEvalMap(Int.MaxValue)(identity)) + } + + test("parEvalMap") { + check(_.parEvalMap(Int.MaxValue)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]) = + (CountDownLatch[IO](2), IO.deferred[Unit]) + .mapN { (latch, d) => + val w = latch.release *> latch.await + val s = Stream(w *> ex, w *> IO.never.onCancel(d.complete(()).void)).covary[IO] + pipe(s).compile.drain !> d.get + } + .flatten + .assertEquals(()) + } + + group("cancels unneeded") { + + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(2)(identity)) + } + + test("parEvalMap") { + check(_.parEvalMap(2)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]) = + IO.deferred[Unit] + .flatMap { d => + val cancelled = IO.never.onCancel(d.complete(()).void) + val stream = Stream(u, cancelled).covary[IO] + val action = stream.through(pipe).take(1).compile.drain + action *> d.get + } + .assertEquals(()) + } + + group("waits for uncancellable completion") { + + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(2)(identity)) + } + + test("parEvalMap") { + check(_.parEvalMap(2)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]): IO[Unit] = { + val uncancellableMsg = "uncancellable" + val onFinalizeMsg = "onFinalize" + + IO.ref(Vector.empty[String]) + .flatMap { ref => + val io = ref.update(_ :+ uncancellableMsg).void + val onFin2 = ref.update(_ :+ onFinalizeMsg) + CountDownLatch[IO](2).flatMap { latch => + val w = latch.release *> latch.await + val stream = Stream(w *> u, (w *> io).uncancelable).covary[IO] + val action = stream.through(pipe).take(1).compile.drain <* onFin2 + action *> ref.get + } + } + .assertEquals(Vector(uncancellableMsg, onFinalizeMsg)) + } + } + + group("issue-2726, Stream shouldn't hang after exceptions in") { + + test("parEvalMapUnordered") { + check(_.parEvalMapUnordered(Int.MaxValue)(identity)) + } + + test("parEvalMap") { + check(_.parEvalMap(Int.MaxValue)(identity)) + } + + def check(pipe: Pipe[IO, IO[Unit], Unit]): IO[Unit] = { + val iterations = 100 + val stream = Stream(IO.raiseError(new RuntimeException), IO.delay(())).covary[IO] + val action = stream.through(pipe).compile.drain.attempt.timeout(2.seconds) + (1 to iterations).toList.as(action).sequence_ + } + } +} diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index 4ebdd343be..569ea0e9f5 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -996,234 +996,4 @@ class StreamSuite extends Fs2Suite { } } } - - private implicit class verifyOps[T](val action: IO[T]) { - def assertNotCompletes(): IO[Unit] = IO.race(IO.sleep(1.second), action).assertEquals(Left(())) - } - - val u: IO[Unit] = ().pure[IO] - - val ex: IO[Unit] = IO.raiseError(new RuntimeException) - - group("issue-2686, max distance of concurrently computing elements") { - test("shouldn't exceed maxConcurrent in parEvalMap") { - run(_.parEvalMap(2)(identity)).assertNotCompletes() - } - - test("can exceed maxConcurrent in parEvalMapUnordered") { - val action = run(_.parEvalMapUnordered(2)(identity)) - action.assertEquals(Right(())) - } - - def run(pipe: Pipe[IO, IO[Unit], Unit]): IO[Either[Unit, Unit]] = - Deferred[IO, Unit].flatMap { d => - val stream = Stream(IO.sleep(1.minute), u, d.complete(()).void).covary[IO] - IO.race(stream.through(pipe).compile.drain, d.get) - } - } - - group("order") { - - test("should be preserved in parEvalMap") { - forAllF { s: Stream[Pure, Int] => - val s2 = s.covary[IO].parEvalMap(Int.MaxValue)(i => IO.sleep(math.abs(i % 3).millis).as(i)) - s2.compile.toList.assertEquals(s.toList) - } - } - - test("may not be preserved in parEvalMapUnordered") { - run(_.parEvalMapUnordered(Int.MaxValue)(identity)).assertEquals(List(1, 2, 3)) - } - - def run(pipe: Pipe[IO, IO[Int], Int]) = - Stream - .emits(List(3, 2, 1)) - .map(i => IO.sleep(50.millis * i).as(i)) - .covary[IO] - .through(pipe) - .compile - .toList - } - - group("should limit concurrency in") { - - test("parEvalMapUnordered") { - forAllF { (l: Int, p: Int) => - val length = math.abs(l % 100) + 1 - val parallel = math.abs(p % 20) + 2 - val requested = math.min(length, parallel) - val action = runWithLatch(length, requested, _.parEvalMapUnordered(parallel)(identity)) - action.assertEquals(()) - } - } - - test("parEvalMap") { - forAllF { (l: Int, p: Int) => - val length = math.abs(l % 100) + 1 - val parallel = math.abs(p % 20) + 2 - val requested = math.min(length, parallel) - val action = runWithLatch(length, requested, _.parEvalMap(parallel)(identity)) - action.assertEquals(()) - } - } - - test("parEvalMapUnordered can't launch more than Stream size") { - val action = runWithLatch(100, 101, _.parEvalMapUnordered(Int.MaxValue)(identity)) - action.assertNotCompletes() - } - - test("parEvalMap can't launch more than Stream size") { - val action = runWithLatch(100, 101, _.parEvalMap(Int.MaxValue)(identity)) - action.assertNotCompletes() - } - - test("parEvalMapUnordered shouldn't launch more than maxConcurrent") { - val action = runWithLatch(100, 21, _.parEvalMapUnordered(20)(identity)) - action.assertNotCompletes() - } - - test("parEvalMap shouldn't launch more than maxConcurrent") { - val action = runWithLatch(100, 21, _.parEvalMap(20)(identity)) - action.assertNotCompletes() - } - - def runWithLatch(length: Int, parallel: Int, pipe: Pipe[IO, IO[Unit], Unit]) = - CountDownLatch[IO](parallel).flatMap { latch => - Stream(latch.release *> latch.await).repeatN(length).through(pipe).compile.drain - } - } - - group("if two errors happens only one should be reported") { - test("parEvalMapUnordered") { - forAllF { (i: Int) => - val amount = math.abs(i % 10) + 1 - CountDownLatch[IO](amount) - .flatMap { latch => - val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] - stream.parEvalMapUnordered(amount)(identity).compile.drain - } - .intercept[RuntimeException] - .void - } - } - - test("parEvalMap") { - forAllF { (i: Int) => - val amount = math.abs(i % 10) + 1 - CountDownLatch[IO](amount) - .flatMap { latch => - val stream = Stream(latch.release *> latch.await *> ex).repeatN(amount).covary[IO] - stream.parEvalMap(amount)(identity).compile.drain - } - .intercept[RuntimeException] - .void - } - } - } - - group("if error happens after stream succeeds error should be lost") { - test("parEvalMapUnordered") { - check(_.parEvalMapUnordered(Int.MaxValue)(identity)) - } - - test("parEvalMap") { - check(_.parEvalMap(Int.MaxValue)(identity)) - } - - def check(pipe: Pipe[IO, IO[Unit], Unit]) = - IO.deferred[Unit] - .flatMap { d => - val simple = Stream(u, (d.get *> ex).uncancelable).covary[IO] - val stream = simple.through(pipe).take(1).productL(Stream.eval(d.complete(()).void)) - stream.compile.toList - } - .assertEquals(List(())) - } - - group("cancels running computations when error raised") { - - test("parEvalMapUnordered") { - check(_.parEvalMap(Int.MaxValue)(identity)) - } - - test("parEvalMap") { - check(_.parEvalMap(Int.MaxValue)(identity)) - } - - def check(pipe: Pipe[IO, IO[Unit], Unit]) = - (CountDownLatch[IO](2), IO.deferred[Unit]) - .mapN { (latch, d) => - val w = latch.release *> latch.await - val s = Stream(w *> ex, w *> IO.never.onCancel(d.complete(()).void)).covary[IO] - pipe(s).compile.drain !> d.get - } - .flatten - .assertEquals(()) - } - - group("cancels unneeded") { - - test("parEvalMapUnordered") { - check(_.parEvalMapUnordered(2)(identity)) - } - - test("parEvalMap") { - check(_.parEvalMap(2)(identity)) - } - - def check(pipe: Pipe[IO, IO[Unit], Unit]) = - IO.deferred[Unit] - .flatMap { d => - val cancelled = IO.never.onCancel(d.complete(()).void) - val stream = Stream(u, cancelled).covary[IO] - val action = stream.through(pipe).take(1).compile.drain - action *> d.get - } - .assertEquals(()) - } - - group("waits for uncancellable completion") { - test("parEvalMapUnordered") { - check(_.parEvalMapUnordered(2)(identity)) - } - - test("parEvalMap") { - check(_.parEvalMap(2)(identity)) - } - - def check(pipe: Pipe[IO, IO[Unit], Unit]): IO[Unit] = { - val uncancellableMsg = "uncancellable" - val onFinalizeMsg = "onFinalize" - - IO.ref(Vector.empty[String]) - .flatMap { ref => - val io = ref.update(_ :+ uncancellableMsg).void - val onFin2 = ref.update(_ :+ onFinalizeMsg) - CountDownLatch[IO](2).flatMap { latch => - val w = latch.release *> latch.await - val stream = Stream(w *> u, (w *> io).uncancelable).covary[IO] - val action = stream.through(pipe).take(1).compile.drain <* onFin2 - action *> ref.get - } - } - .assertEquals(Vector(uncancellableMsg, onFinalizeMsg)) - } - } - - group("issue-2726, Stream shouldn't hang after exceptions in") { - test("parEvalMapUnordered") { - check(_.parEvalMapUnordered(Int.MaxValue)(identity)) - } - - test("parEvalMap") { - check(_.parEvalMap(Int.MaxValue)(identity)) - } - - def check(pipe: Pipe[IO, IO[Unit], Unit]): IO[Unit] = { - val iterations = 100 - val stream = Stream(IO.raiseError(new RuntimeException), IO.delay(())).covary[IO] - val action = stream.through(pipe).compile.drain.attempt.timeout(2.seconds) - (1 to iterations).toList.as(action).sequence_ - } - } }