Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picked additional tests from #2856 #3050

Merged
merged 1 commit into from
Nov 12, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 103 additions & 2 deletions core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ import scala.concurrent.duration._
import org.scalacheck.effect.PropF.forAllF

class ChannelSuite extends Fs2Suite {

test("receives some simple elements above capacity and closes") {
val test = Channel.bounded[IO, Int](5).flatMap { chan =>
val senders = 0.until(10).toList.parTraverse_ { i =>
IO.sleep(i.millis) *> chan.send(i)
}

senders &> (IO.sleep(15.millis) *> chan.close *> chan.stream.compile.toVector)
}

TestControl.executeEmbed(test)
}

test("Channel receives all elements and closes") {
forAllF { (source: Stream[Pure, Int]) =>
Channel.unbounded[IO, Int].flatMap { chan =>
Expand Down Expand Up @@ -133,7 +146,7 @@ class ChannelSuite extends Fs2Suite {
p.assertEquals(true)
}

test("Channel.synchronous respects fifo") {
test("synchronous respects fifo") {
val l = for {
chan <- Channel.synchronous[IO, Int]
_ <- (0 until 5).toList.traverse_ { i =>
Expand All @@ -147,7 +160,95 @@ class ChannelSuite extends Fs2Suite {
result <- IO.sleep(5.seconds) *> chan.stream.compile.toList
} yield result

TestControl.executeEmbed(l).assertEquals((0 until 5).toList)
TestControl.executeEmbed(l).assertEquals((0 until 5).toList).parReplicateA_(100)
}

test("complete all blocked sends after closure") {
val test = for {
chan <- Channel.bounded[IO, Int](2)

fiber <- 0.until(5).toList.parTraverse(chan.send(_)).start
_ <- IO.sleep(1.second)
_ <- chan.close

results <- chan.stream.compile.toList
_ <- IO(assert(results.length == 5))
_ <- IO(assert(0.until(5).forall(results.contains(_))))

sends <- fiber.joinWithNever
_ <- IO(assert(sends.forall(_ == Right(()))))
} yield ()

TestControl.executeEmbed(test).parReplicateA_(100)
}

test("eagerly close sendAll upstream") {
for {
countR <- IO.ref(0)
chan <- Channel.unbounded[IO, Unit]

incrementer = Stream.eval(countR.update(_ + 1)).repeat.take(5)
upstream = incrementer ++ Stream.eval(chan.close).drain ++ incrementer

results <- chan.stream.concurrently(upstream.through(chan.sendAll)).compile.toList

_ <- IO(assert(results.length == 5))
count <- countR.get
_ <- IO(assert(count == 6)) // we have to overrun the closure to detect it
} yield ()
}

def blackHole(s: Stream[IO, Unit]) =
s.repeatPull(_.uncons.flatMap {
case None => Pull.pure(None)
case Some((hd, tl)) =>
val action = IO.delay(0.until(hd.size).foreach(_ => ()))
Pull.eval(action).as(Some(tl))
})

@inline
private def sendAll(list: List[Unit], action: IO[Unit]) =
list.foldLeft(IO.unit)((acc, _) => acc *> action)

test("sendPull") {
val test = Channel.bounded[IO, Unit](8).flatMap { channel =>
val action = List.fill(64)(()).traverse_(_ => channel.send(()).void) *> channel.close
action.start *> channel.stream.through(blackHole).compile.drain
}

test.replicateA_(if (isJVM) 1000 else 1)
}

test("sendPullPar8") {
val lists = List.fill(8)(List.fill(8)(()))

val test = Channel.bounded[IO, Unit](8).flatMap { channel =>
val action = lists.parTraverse_(sendAll(_, channel.send(()).void)) *> channel.close

action &> channel.stream.through(blackHole).compile.drain
}

test.replicateA_(if (isJVM) 10000 else 1)
}

test("synchronous with many concurrents and close") {
val test = Channel.synchronous[IO, Int].flatMap { ch =>
0.until(20).toList.parTraverse_(i => ch.send(i).iterateWhile(_.isRight)) &>
ch.stream.concurrently(Stream.eval(ch.close.delayBy(1.seconds))).compile.drain
}

test.parReplicateA(100)
}

test("complete closed immediately without draining") {
val test = Channel.bounded[IO, Int](20).flatMap { ch =>
for {
_ <- 0.until(10).toList.parTraverse_(ch.send(_))
_ <- ch.close
_ <- ch.closed
} yield ()
}

TestControl.executeEmbed(test)
}
}