From df6c125306d0a89cd7c99e5ee40dfb3cfb3f1b43 Mon Sep 17 00:00:00 2001 From: HollandDM Date: Tue, 17 Dec 2024 20:57:24 +0700 Subject: [PATCH 1/2] remove unnecessary sleep time in test cases --- .../flow/PublisherToSubscriberTest.scala | 70 +++++++------------ .../StreamPublisherTest.scala | 7 +- .../StreamSubscriberTest.scala | 51 +++++--------- 3 files changed, 43 insertions(+), 85 deletions(-) diff --git a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/PublisherToSubscriberTest.scala b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/PublisherToSubscriberTest.scala index c4d24dc53..31528798f 100644 --- a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/PublisherToSubscriberTest.scala +++ b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/PublisherToSubscriberTest.scala @@ -11,35 +11,25 @@ abstract private class PublisherToSubscriberTest extends Test: protected def streamSubscriber: StreamSubscriber[Int] < IO "should have the same output as input" in runJVM { - val stream = Stream - .range(0, MaxStreamLength, 1, BufferSize) - .map { int => - Random - .use(_.nextInt(10)) - .map(millis => Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis))) - .andThen(int) - } + val stream = Stream.range(0, MaxStreamLength, 1, BufferSize) for publisher <- stream.toPublisher subscriber <- streamSubscriber _ = publisher.subscribe(subscriber) (isSame, _) <- subscriber.stream .runFold(true -> 0) { case ((acc, expected), cur) => - Random - .use(_.nextInt(10)) - .map(millis => Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis))) - .andThen((acc && (expected == cur)) -> (expected + 1)) + (acc && (expected == cur)) -> (expected + 1) } yield assert(isSame) end for } "should propagate errors downstream" in runJVM { - val inputStream = Stream + val inputStream: Stream[Int, IO] = Stream .range(0, 10, 1, 1) .map { int => if int < 5 then - Async.sleep(Duration.fromUnits(10, Duration.Units.Millis)).andThen(int) + IO(int) else Abort.panic(TestError) } @@ -76,19 +66,16 @@ abstract private class PublisherToSubscriberTest extends Test: "single publisher & multiple subscribers" - { "contention" in runJVM { - def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & Async) = + def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & IO) = ack match case Ack.Stop => IO(Ack.Stop) case Ack.Continue(_) => - for - millis <- Random.use(_.nextInt(10)) - _ <- Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis)) - value <- counter.getAndIncrement - nextAck <- if value >= MaxStreamLength then + counter.getAndIncrement.map: value => + if value >= MaxStreamLength then IO(Ack.Stop) else Emit.andMap(Chunk(value))(emit(_, counter)) - yield nextAck + end if end emit def checkStrictIncrease(chunk: Chunk[Int]): Boolean = @@ -128,20 +115,16 @@ abstract private class PublisherToSubscriberTest extends Test: } "one subscriber's failure does not affect others." in runJVM { - def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & Async) = + def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & IO) = ack match case Ack.Stop => IO(Ack.Stop) case Ack.Continue(_) => - for - millis <- Random.use(_.nextInt(10)) - _ <- Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis)) - value <- counter.getAndIncrement - nextAck <- if value >= MaxStreamLength then + counter.getAndIncrement.map: value => + if value >= MaxStreamLength then IO(Ack.Stop) else Emit.andMap(Chunk(value))(emit(_, counter)) - yield nextAck - end match + end if end emit def checkStrictIncrease(chunk: Chunk[Int]): Boolean = @@ -188,17 +171,16 @@ abstract private class PublisherToSubscriberTest extends Test: } "publisher's interuption should end all subscribed parties" in runJVM { - def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & Async) = + def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & IO) = ack match case Ack.Stop => IO(Ack.Stop) case Ack.Continue(_) => - for - millis <- Random.use(_.nextInt(10)) - _ <- Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis)) - value <- counter.getAndIncrement - nextAck <- Emit.andMap(Chunk(value))(emit(_, counter)) - yield nextAck - end match + counter.getAndIncrement.map: value => + if value >= MaxStreamLength then + IO(Ack.Stop) + else + Emit.andMap(Chunk(value))(emit(_, counter)) + end if end emit for @@ -208,7 +190,7 @@ abstract private class PublisherToSubscriberTest extends Test: subscriber2 <- streamSubscriber subscriber3 <- streamSubscriber subscriber4 <- streamSubscriber - _ <- Fiber.parallelUnbounded[Nothing, Unit, Any](List( + gatherFiber <- Fiber.gather[Nothing, Unit, Any](List( subscriber1.stream.run.unit, subscriber2.stream.run.unit, subscriber3.stream.run.unit, @@ -223,17 +205,13 @@ abstract private class PublisherToSubscriberTest extends Test: publisher.subscribe(subscriber3) publisher.subscribe(subscriber4) } - .andThen(Async.sleep(2.seconds)) + .andThen(Async.sleep(1.seconds)) )) - _ <- Clock.sleep(1.seconds).map { fiber => + _ <- Clock.sleep(10.millis).map { fiber => fiber.onComplete(_ => publisherFiber.interrupt.unit).andThen(fiber) }.map(_.get) - count1 <- counter.get - _ <- Async.sleep(1.seconds) - count2 <- counter.get - _ <- Async.sleep(1.seconds) - count3 <- counter.get - yield assert((count2 - count1 <= 4) && (count3 - count2 <= 4)) + _ <- gatherFiber.get + yield assert(true) end for } } diff --git a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamPublisherTest.scala b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamPublisherTest.scala index 114917f9a..3c151b265 100644 --- a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamPublisherTest.scala +++ b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamPublisherTest.scala @@ -17,12 +17,7 @@ final class StreamPublisherTest extends PublisherVerification[Int](new TestEnvir Stream.empty[Int] else val chunkSize = Math.sqrt(n).floor.intValue - Stream.range(0, n, 1, chunkSize).map { int => - Random - .use(_.nextInt(50)) - .map(millis => Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis))) - .andThen(int) - } + Stream.range(0, n, 1, chunkSize) override def createPublisher(n: Long): Publisher[Int] = if n > Int.MaxValue then diff --git a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamSubscriberTest.scala b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamSubscriberTest.scala index 5f9eb8a0b..89f7c6851 100644 --- a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamSubscriberTest.scala +++ b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamSubscriberTest.scala @@ -12,31 +12,25 @@ import org.reactivestreams.tck.SubscriberWhiteboxVerification.WhiteboxSubscriber import org.reactivestreams.tck.TestEnvironment import org.scalatestplus.testng.* -class EagerStreamSubscriberTest extends SubscriberWhiteboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike: +class StreamSubscriberTest extends SubscriberWhiteboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike: import AllowUnsafe.embrace.danger private val counter = new AtomicInteger() def createSubscriber( p: SubscriberWhiteboxVerification.WhiteboxSubscriberProbe[Int] ): Subscriber[Int] = - IO.Unsafe.evalOrThrow(StreamSubscriber[Int](bufferSize = 1).map(s => new WhiteboxSubscriber(s, p))) + val streamSubscriber = Random.nextInt(2) + .map: + case 0 => StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Buffer) + case 1 => StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Eager) + .map: s => + new WhiteboxSubscriber(s, p) - def createElement(i: Int): Int = counter.getAndIncrement -end EagerStreamSubscriberTest - -class BufferStreamSubscriberTest extends SubscriberWhiteboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike: - import AllowUnsafe.embrace.danger - private val counter = new AtomicInteger() - - def createSubscriber( - p: SubscriberWhiteboxVerification.WhiteboxSubscriberProbe[Int] - ): Subscriber[Int] = - IO.Unsafe.evalOrThrow(StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Buffer).map(s => - new WhiteboxSubscriber(s, p) - )) + IO.Unsafe.evalOrThrow(streamSubscriber) + end createSubscriber def createElement(i: Int): Int = counter.getAndIncrement -end BufferStreamSubscriberTest +end StreamSubscriberTest final class WhiteboxSubscriber[V]( sub: StreamSubscriber[V], @@ -105,30 +99,21 @@ final class StreamSubscriberWrapper[V](val streamSubscriber: StreamSubscriber[V] override def onComplete(): Unit = streamSubscriber.onComplete() end StreamSubscriberWrapper -final class EagerSubscriberBlackboxSpec extends SubscriberBlackboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike: +final class SubscriberBlackboxSpec extends SubscriberBlackboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike: import AllowUnsafe.embrace.danger private val counter = new AtomicInteger() def createSubscriber(): StreamSubscriberWrapper[Int] = - new StreamSubscriberWrapper(IO.Unsafe.evalOrThrow(StreamSubscriber[Int](bufferSize = 1))) - - override def triggerRequest(s: Subscriber[? >: Int]): Unit = - val computation: Long < IO = s.asInstanceOf[StreamSubscriberWrapper[Int]].streamSubscriber.request - discard(IO.Unsafe.evalOrThrow(computation)) - - def createElement(i: Int): Int = counter.incrementAndGet() -end EagerSubscriberBlackboxSpec - -final class BufferSubscriberBlackboxSpec extends SubscriberBlackboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike: - import AllowUnsafe.embrace.danger - private val counter = new AtomicInteger() - - override def createSubscriber(): Subscriber[Int] = - new StreamSubscriberWrapper(IO.Unsafe.evalOrThrow(StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Buffer))) + val streamSubscriber = Random.nextInt(2) + .map: + case 0 => StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Buffer) + case 1 => StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Eager) + new StreamSubscriberWrapper(IO.Unsafe.evalOrThrow(streamSubscriber)) + end createSubscriber override def triggerRequest(s: Subscriber[? >: Int]): Unit = val computation: Long < IO = s.asInstanceOf[StreamSubscriberWrapper[Int]].streamSubscriber.request discard(IO.Unsafe.evalOrThrow(computation)) def createElement(i: Int): Int = counter.incrementAndGet() -end BufferSubscriberBlackboxSpec +end SubscriberBlackboxSpec From 5a87bbb114417e967be7466c9c93c57089a1c0fb Mon Sep 17 00:00:00 2001 From: HollandDM Date: Thu, 19 Dec 2024 11:02:09 +0700 Subject: [PATCH 2/2] reduce timeout --- .../flow/PublisherToSubscriberTest.scala | 17 +++++++++-------- .../reactive-streams/StreamPublisherTest.scala | 2 +- .../reactive-streams/StreamSubscriberTest.scala | 2 +- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/PublisherToSubscriberTest.scala b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/PublisherToSubscriberTest.scala index 31528798f..1e9c9703b 100644 --- a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/PublisherToSubscriberTest.scala +++ b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/flow/PublisherToSubscriberTest.scala @@ -190,12 +190,10 @@ abstract private class PublisherToSubscriberTest extends Test: subscriber2 <- streamSubscriber subscriber3 <- streamSubscriber subscriber4 <- streamSubscriber - gatherFiber <- Fiber.gather[Nothing, Unit, Any](List( - subscriber1.stream.run.unit, - subscriber2.stream.run.unit, - subscriber3.stream.run.unit, - subscriber4.stream.run.unit - )) + fiber1 <- Async.run(subscriber1.stream.run.unit) + fiber2 <- Async.run(subscriber2.stream.run.unit) + fiber3 <- Async.run(subscriber3.stream.run.unit) + fiber4 <- Async.run(subscriber4.stream.run.unit) publisherFiber <- Async.run(Resource.run( Stream(Emit.andMap(Chunk.empty)(emit(_, counter))) .toPublisher @@ -209,8 +207,11 @@ abstract private class PublisherToSubscriberTest extends Test: )) _ <- Clock.sleep(10.millis).map { fiber => fiber.onComplete(_ => publisherFiber.interrupt.unit).andThen(fiber) - }.map(_.get) - _ <- gatherFiber.get + }.map(_.getResult) + _ <- fiber1.getResult + _ <- fiber2.getResult + _ <- fiber3.getResult + _ <- fiber4.getResult yield assert(true) end for } diff --git a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamPublisherTest.scala b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamPublisherTest.scala index 3c151b265..30b52f945 100644 --- a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamPublisherTest.scala +++ b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamPublisherTest.scala @@ -8,7 +8,7 @@ import org.reactivestreams.tck.PublisherVerification import org.reactivestreams.tck.TestEnvironment import org.scalatestplus.testng.* -final class StreamPublisherTest extends PublisherVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike: +final class StreamPublisherTest extends PublisherVerification[Int](new TestEnvironment(10L)), TestNGSuiteLike: import AllowUnsafe.embrace.danger given Frame = Frame.internal diff --git a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamSubscriberTest.scala b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamSubscriberTest.scala index 89f7c6851..98cb8c071 100644 --- a/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamSubscriberTest.scala +++ b/kyo-reactive-streams/shared/src/test/scala/kyo/interop/reactive-streams/StreamSubscriberTest.scala @@ -12,7 +12,7 @@ import org.reactivestreams.tck.SubscriberWhiteboxVerification.WhiteboxSubscriber import org.reactivestreams.tck.TestEnvironment import org.scalatestplus.testng.* -class StreamSubscriberTest extends SubscriberWhiteboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike: +class StreamSubscriberTest extends SubscriberWhiteboxVerification[Int](new TestEnvironment(10L)), TestNGSuiteLike: import AllowUnsafe.embrace.danger private val counter = new AtomicInteger()