From 557bb3601832455e5e2b22889d05485889519e02 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Fri, 11 Nov 2022 15:09:57 -0700 Subject: [PATCH] Cherry-picked additional tests from #2856 --- .../scala/fs2/concurrent/ChannelSuite.scala | 105 +++++++++++++++++- 1 file changed, 103 insertions(+), 2 deletions(-) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 450c402126..ce250c0e07 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -30,6 +30,19 @@ import scala.concurrent.duration._ import org.scalacheck.effect.PropF.forAllF class ChannelSuite extends Fs2Suite { + + test("receives some simple elements above capacity and closes") { + val test = Channel.bounded[IO, Int](5).flatMap { chan => + val senders = 0.until(10).toList.parTraverse_ { i => + IO.sleep(i.millis) *> chan.send(i) + } + + senders &> (IO.sleep(15.millis) *> chan.close *> chan.stream.compile.toVector) + } + + TestControl.executeEmbed(test) + } + test("Channel receives all elements and closes") { forAllF { (source: Stream[Pure, Int]) => Channel.unbounded[IO, Int].flatMap { chan => @@ -133,7 +146,7 @@ class ChannelSuite extends Fs2Suite { p.assertEquals(true) } - test("Channel.synchronous respects fifo") { + test("synchronous respects fifo") { val l = for { chan <- Channel.synchronous[IO, Int] _ <- (0 until 5).toList.traverse_ { i => @@ -147,7 +160,95 @@ class ChannelSuite extends Fs2Suite { result <- IO.sleep(5.seconds) *> chan.stream.compile.toList } yield result - TestControl.executeEmbed(l).assertEquals((0 until 5).toList) + TestControl.executeEmbed(l).assertEquals((0 until 5).toList).parReplicateA_(100) } + test("complete all blocked sends after closure") { + val test = for { + chan <- Channel.bounded[IO, Int](2) + + fiber <- 0.until(5).toList.parTraverse(chan.send(_)).start + _ <- IO.sleep(1.second) + _ <- chan.close + + results <- chan.stream.compile.toList + _ <- IO(assert(results.length == 5)) + _ <- IO(assert(0.until(5).forall(results.contains(_)))) + + sends <- fiber.joinWithNever + _ <- IO(assert(sends.forall(_ == Right(())))) + } yield () + + TestControl.executeEmbed(test).parReplicateA_(100) + } + + test("eagerly close sendAll upstream") { + for { + countR <- IO.ref(0) + chan <- Channel.unbounded[IO, Unit] + + incrementer = Stream.eval(countR.update(_ + 1)).repeat.take(5) + upstream = incrementer ++ Stream.eval(chan.close).drain ++ incrementer + + results <- chan.stream.concurrently(upstream.through(chan.sendAll)).compile.toList + + _ <- IO(assert(results.length == 5)) + count <- countR.get + _ <- IO(assert(count == 6)) // we have to overrun the closure to detect it + } yield () + } + + def blackHole(s: Stream[IO, Unit]) = + s.repeatPull(_.uncons.flatMap { + case None => Pull.pure(None) + case Some((hd, tl)) => + val action = IO.delay(0.until(hd.size).foreach(_ => ())) + Pull.eval(action).as(Some(tl)) + }) + + @inline + private def sendAll(list: List[Unit], action: IO[Unit]) = + list.foldLeft(IO.unit)((acc, _) => acc *> action) + + test("sendPull") { + val test = Channel.bounded[IO, Unit](8).flatMap { channel => + val action = List.fill(64)(()).traverse_(_ => channel.send(()).void) *> channel.close + action.start *> channel.stream.through(blackHole).compile.drain + } + + test.replicateA_(if (isJVM) 1000 else 1) + } + + test("sendPullPar8") { + val lists = List.fill(8)(List.fill(8)(())) + + val test = Channel.bounded[IO, Unit](8).flatMap { channel => + val action = lists.parTraverse_(sendAll(_, channel.send(()).void)) *> channel.close + + action &> channel.stream.through(blackHole).compile.drain + } + + test.replicateA_(if (isJVM) 10000 else 1) + } + + test("synchronous with many concurrents and close") { + val test = Channel.synchronous[IO, Int].flatMap { ch => + 0.until(20).toList.parTraverse_(i => ch.send(i).iterateWhile(_.isRight)) &> + ch.stream.concurrently(Stream.eval(ch.close.delayBy(1.seconds))).compile.drain + } + + test.parReplicateA(100) + } + + test("complete closed immediately without draining") { + val test = Channel.bounded[IO, Int](20).flatMap { ch => + for { + _ <- 0.until(10).toList.parTraverse_(ch.send(_)) + _ <- ch.close + _ <- ch.closed + } yield () + } + + TestControl.executeEmbed(test) + } }