Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kyo-reactive-streams]: Remove un-necessary sleep time in test cases #939

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,25 @@ abstract private class PublisherToSubscriberTest extends Test:
protected def streamSubscriber: StreamSubscriber[Int] < IO

"should have the same output as input" in runJVM {
val stream = Stream
.range(0, MaxStreamLength, 1, BufferSize)
.map { int =>
Random
.use(_.nextInt(10))
.map(millis => Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis)))
.andThen(int)
}
val stream = Stream.range(0, MaxStreamLength, 1, BufferSize)
for
publisher <- stream.toPublisher
subscriber <- streamSubscriber
_ = publisher.subscribe(subscriber)
(isSame, _) <- subscriber.stream
.runFold(true -> 0) { case ((acc, expected), cur) =>
Random
.use(_.nextInt(10))
.map(millis => Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis)))
.andThen((acc && (expected == cur)) -> (expected + 1))
(acc && (expected == cur)) -> (expected + 1)
}
yield assert(isSame)
end for
}

"should propagate errors downstream" in runJVM {
val inputStream = Stream
val inputStream: Stream[Int, IO] = Stream
.range(0, 10, 1, 1)
.map { int =>
if int < 5 then
Async.sleep(Duration.fromUnits(10, Duration.Units.Millis)).andThen(int)
IO(int)
else
Abort.panic(TestError)
}
Expand Down Expand Up @@ -76,19 +66,16 @@ abstract private class PublisherToSubscriberTest extends Test:

"single publisher & multiple subscribers" - {
"contention" in runJVM {
def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & Async) =
def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & IO) =
ack match
case Ack.Stop => IO(Ack.Stop)
case Ack.Continue(_) =>
for
millis <- Random.use(_.nextInt(10))
_ <- Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis))
value <- counter.getAndIncrement
nextAck <- if value >= MaxStreamLength then
counter.getAndIncrement.map: value =>
if value >= MaxStreamLength then
IO(Ack.Stop)
else
Emit.andMap(Chunk(value))(emit(_, counter))
yield nextAck
end if
end emit

def checkStrictIncrease(chunk: Chunk[Int]): Boolean =
Expand Down Expand Up @@ -128,20 +115,16 @@ abstract private class PublisherToSubscriberTest extends Test:
}

"one subscriber's failure does not affect others." in runJVM {
def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & Async) =
def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & IO) =
ack match
case Ack.Stop => IO(Ack.Stop)
case Ack.Continue(_) =>
for
millis <- Random.use(_.nextInt(10))
_ <- Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis))
value <- counter.getAndIncrement
nextAck <- if value >= MaxStreamLength then
counter.getAndIncrement.map: value =>
if value >= MaxStreamLength then
IO(Ack.Stop)
else
Emit.andMap(Chunk(value))(emit(_, counter))
yield nextAck
end match
end if
end emit

def checkStrictIncrease(chunk: Chunk[Int]): Boolean =
Expand Down Expand Up @@ -188,17 +171,16 @@ abstract private class PublisherToSubscriberTest extends Test:
}

"publisher's interuption should end all subscribed parties" in runJVM {
def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & Async) =
def emit(ack: Ack, counter: AtomicInt): Ack < (Emit[Chunk[Int]] & IO) =
ack match
case Ack.Stop => IO(Ack.Stop)
case Ack.Continue(_) =>
for
millis <- Random.use(_.nextInt(10))
_ <- Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis))
value <- counter.getAndIncrement
nextAck <- Emit.andMap(Chunk(value))(emit(_, counter))
yield nextAck
end match
counter.getAndIncrement.map: value =>
if value >= MaxStreamLength then
IO(Ack.Stop)
else
Emit.andMap(Chunk(value))(emit(_, counter))
end if
end emit

for
Expand All @@ -208,12 +190,10 @@ abstract private class PublisherToSubscriberTest extends Test:
subscriber2 <- streamSubscriber
subscriber3 <- streamSubscriber
subscriber4 <- streamSubscriber
_ <- Fiber.parallelUnbounded[Nothing, Unit, Any](List(
subscriber1.stream.run.unit,
subscriber2.stream.run.unit,
subscriber3.stream.run.unit,
subscriber4.stream.run.unit
))
fiber1 <- Async.run(subscriber1.stream.run.unit)
fiber2 <- Async.run(subscriber2.stream.run.unit)
fiber3 <- Async.run(subscriber3.stream.run.unit)
fiber4 <- Async.run(subscriber4.stream.run.unit)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used Fiber.gather but it some time just got stuck, will do some more investigate later

Copy link
Collaborator

@fwbrasil fwbrasil Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the goal here? Why not use parallelUnbounded? Fiber.gather is a recent addition, it'd be great if you could try to isolate in case there's an issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here is to prove that when publisher fiber got interrupted, all 4 subscribers stream will be interrupted. So ‘gather’ is more correct here.
I’ll try to reproduce the issue later

publisherFiber <- Async.run(Resource.run(
Stream(Emit.andMap(Chunk.empty)(emit(_, counter)))
.toPublisher
Expand All @@ -223,17 +203,16 @@ abstract private class PublisherToSubscriberTest extends Test:
publisher.subscribe(subscriber3)
publisher.subscribe(subscriber4)
}
.andThen(Async.sleep(2.seconds))
.andThen(Async.sleep(1.seconds))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could it be 100ms?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like a keep alive effect, the whole fiber will be interrupted below in 10.millis. So i dont think duration here is that important

))
_ <- Clock.sleep(1.seconds).map { fiber =>
_ <- Clock.sleep(10.millis).map { fiber =>
fiber.onComplete(_ => publisherFiber.interrupt.unit).andThen(fiber)
}.map(_.get)
count1 <- counter.get
_ <- Async.sleep(1.seconds)
count2 <- counter.get
_ <- Async.sleep(1.seconds)
count3 <- counter.get
yield assert((count2 - count1 <= 4) && (count3 - count2 <= 4))
}.map(_.getResult)
_ <- fiber1.getResult
_ <- fiber2.getResult
_ <- fiber3.getResult
_ <- fiber4.getResult
yield assert(true)
end for
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.reactivestreams.tck.PublisherVerification
import org.reactivestreams.tck.TestEnvironment
import org.scalatestplus.testng.*

final class StreamPublisherTest extends PublisherVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike:
final class StreamPublisherTest extends PublisherVerification[Int](new TestEnvironment(10L)), TestNGSuiteLike:
import AllowUnsafe.embrace.danger
given Frame = Frame.internal

Expand All @@ -17,12 +17,7 @@ final class StreamPublisherTest extends PublisherVerification[Int](new TestEnvir
Stream.empty[Int]
else
val chunkSize = Math.sqrt(n).floor.intValue
Stream.range(0, n, 1, chunkSize).map { int =>
Random
.use(_.nextInt(50))
.map(millis => Async.sleep(Duration.fromUnits(millis, Duration.Units.Millis)))
.andThen(int)
}
Stream.range(0, n, 1, chunkSize)

override def createPublisher(n: Long): Publisher[Int] =
if n > Int.MaxValue then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,25 @@ import org.reactivestreams.tck.SubscriberWhiteboxVerification.WhiteboxSubscriber
import org.reactivestreams.tck.TestEnvironment
import org.scalatestplus.testng.*

class EagerStreamSubscriberTest extends SubscriberWhiteboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike:
class StreamSubscriberTest extends SubscriberWhiteboxVerification[Int](new TestEnvironment(10L)), TestNGSuiteLike:
import AllowUnsafe.embrace.danger
private val counter = new AtomicInteger()

def createSubscriber(
p: SubscriberWhiteboxVerification.WhiteboxSubscriberProbe[Int]
): Subscriber[Int] =
IO.Unsafe.evalOrThrow(StreamSubscriber[Int](bufferSize = 1).map(s => new WhiteboxSubscriber(s, p)))
val streamSubscriber = Random.nextInt(2)
.map:
case 0 => StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Buffer)
case 1 => StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Eager)
.map: s =>
new WhiteboxSubscriber(s, p)

def createElement(i: Int): Int = counter.getAndIncrement
end EagerStreamSubscriberTest

class BufferStreamSubscriberTest extends SubscriberWhiteboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike:
import AllowUnsafe.embrace.danger
private val counter = new AtomicInteger()

def createSubscriber(
p: SubscriberWhiteboxVerification.WhiteboxSubscriberProbe[Int]
): Subscriber[Int] =
IO.Unsafe.evalOrThrow(StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Buffer).map(s =>
new WhiteboxSubscriber(s, p)
))
IO.Unsafe.evalOrThrow(streamSubscriber)
end createSubscriber

def createElement(i: Int): Int = counter.getAndIncrement
end BufferStreamSubscriberTest
end StreamSubscriberTest

final class WhiteboxSubscriber[V](
sub: StreamSubscriber[V],
Expand Down Expand Up @@ -105,30 +99,21 @@ final class StreamSubscriberWrapper[V](val streamSubscriber: StreamSubscriber[V]
override def onComplete(): Unit = streamSubscriber.onComplete()
end StreamSubscriberWrapper

final class EagerSubscriberBlackboxSpec extends SubscriberBlackboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike:
final class SubscriberBlackboxSpec extends SubscriberBlackboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike:
import AllowUnsafe.embrace.danger
private val counter = new AtomicInteger()

def createSubscriber(): StreamSubscriberWrapper[Int] =
new StreamSubscriberWrapper(IO.Unsafe.evalOrThrow(StreamSubscriber[Int](bufferSize = 1)))

override def triggerRequest(s: Subscriber[? >: Int]): Unit =
val computation: Long < IO = s.asInstanceOf[StreamSubscriberWrapper[Int]].streamSubscriber.request
discard(IO.Unsafe.evalOrThrow(computation))

def createElement(i: Int): Int = counter.incrementAndGet()
end EagerSubscriberBlackboxSpec

final class BufferSubscriberBlackboxSpec extends SubscriberBlackboxVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike:
import AllowUnsafe.embrace.danger
private val counter = new AtomicInteger()

override def createSubscriber(): Subscriber[Int] =
new StreamSubscriberWrapper(IO.Unsafe.evalOrThrow(StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Buffer)))
val streamSubscriber = Random.nextInt(2)
.map:
case 0 => StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Buffer)
case 1 => StreamSubscriber[Int](bufferSize = 16, EmitStrategy.Eager)
new StreamSubscriberWrapper(IO.Unsafe.evalOrThrow(streamSubscriber))
end createSubscriber

override def triggerRequest(s: Subscriber[? >: Int]): Unit =
val computation: Long < IO = s.asInstanceOf[StreamSubscriberWrapper[Int]].streamSubscriber.request
discard(IO.Unsafe.evalOrThrow(computation))

def createElement(i: Int): Int = counter.incrementAndGet()
end BufferSubscriberBlackboxSpec
end SubscriberBlackboxSpec
Loading