From 2d9f6ca424035dcce8d277bfd9c0451def8cfa53 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Wed, 4 Dec 2024 15:47:53 -0800 Subject: [PATCH] [core]: fix: ScalaJS's KyoApp #784 --- .../scala/kyo/KyoAppPlatformSpecific.scala | 26 +++++++++++++ .../scala/kyo/KyoAppPlatformSpecific.scala | 15 ++++++++ .../shared/src/main/scala/kyo/KyoApp.scala | 26 ++++++------- .../src/test/scala/kyo/KyoAppTest.scala | 38 ++++++++++++++++++- .../java/util/concurrent/ThreadFactory.scala | 7 ---- .../scala/kyo/scheduler/util/Threads.scala | 10 +++-- 6 files changed, 96 insertions(+), 26 deletions(-) create mode 100644 kyo-core/js/src/main/scala/kyo/KyoAppPlatformSpecific.scala create mode 100644 kyo-core/jvm-native/src/main/scala/kyo/KyoAppPlatformSpecific.scala delete mode 100644 kyo-scheduler/js/src/main/scala/java/util/concurrent/ThreadFactory.scala diff --git a/kyo-core/js/src/main/scala/kyo/KyoAppPlatformSpecific.scala b/kyo-core/js/src/main/scala/kyo/KyoAppPlatformSpecific.scala new file mode 100644 index 000000000..86e55c5a6 --- /dev/null +++ b/kyo-core/js/src/main/scala/kyo/KyoAppPlatformSpecific.scala @@ -0,0 +1,26 @@ +package kyo + +import kyo.Maybe.Absent +import kyo.Maybe.Present + +abstract class KyoAppPlatformSpecific extends KyoApp.Base[Async & Resource & Abort[Throwable]]: + + private var maybePreviousAsync: Maybe[Unit < (Async & Abort[Throwable])] = Absent + + final override protected def run[A: Flat](v: => A < (Async & Resource & Abort[Throwable]))(using Frame): Unit = + import AllowUnsafe.embrace.danger + val currentAsync: Unit < (Async & Abort[Throwable]) = + Abort.run(handle(v)).map(result => IO(printResult(result)).andThen(Abort.get(result)).unit) + maybePreviousAsync = maybePreviousAsync match + case Absent => Present(currentAsync) + case Present(previousAsync) => Present(previousAsync.map(_ => currentAsync)) + initCode = maybePreviousAsync.map { previousAsync => () => + val racedAsyncIO = Clock.repeatWithDelay(1.hour)(()).map { fiber => + val race = Async.race(Seq(fiber.get, previousAsync)) + Async.timeout(timeout)(race) + } + val _ = IO.Unsafe.run(Async.run(racedAsyncIO)).eval + }.toList + end run + +end KyoAppPlatformSpecific diff --git a/kyo-core/jvm-native/src/main/scala/kyo/KyoAppPlatformSpecific.scala b/kyo-core/jvm-native/src/main/scala/kyo/KyoAppPlatformSpecific.scala new file mode 100644 index 000000000..4b4e08c81 --- /dev/null +++ b/kyo-core/jvm-native/src/main/scala/kyo/KyoAppPlatformSpecific.scala @@ -0,0 +1,15 @@ +package kyo + +import scala.collection.mutable.ListBuffer + +abstract class KyoAppPlatformSpecific extends KyoApp.Base[Async & Resource & Abort[Throwable]]: + + final override protected def run[A: Flat](v: => A < (Async & Resource & Abort[Throwable]))(using Frame): Unit = + import AllowUnsafe.embrace.danger + initCode = initCode.appended(() => + val result = IO.Unsafe.run(Abort.run(Async.runAndBlock(timeout)(handle(v)))).eval + printResult(result) + ) + end run + +end KyoAppPlatformSpecific diff --git a/kyo-core/shared/src/main/scala/kyo/KyoApp.scala b/kyo-core/shared/src/main/scala/kyo/KyoApp.scala index 99b465cee..1ab8e02ce 100644 --- a/kyo-core/shared/src/main/scala/kyo/KyoApp.scala +++ b/kyo-core/shared/src/main/scala/kyo/KyoApp.scala @@ -1,7 +1,5 @@ package kyo -import scala.collection.mutable.ListBuffer - /** An abstract base class for Kyo applications. * * This class provides a foundation for building applications using the Kyo framework, with built-in support for logging, random number @@ -9,14 +7,11 @@ import scala.collection.mutable.ListBuffer * * Note: This class and its methods are unsafe and should only be used as the entrypoint of an application. */ -abstract class KyoApp extends KyoApp.Base[Async & Resource & Abort[Throwable]]: - def timeout: Duration = Duration.Infinity +abstract class KyoApp extends KyoAppPlatformSpecific: + override def timeout: Duration = Duration.Infinity - override protected def handle[A: Flat](v: A < (Async & Resource & Abort[Throwable]))(using Frame): Unit = - import AllowUnsafe.embrace.danger - val result = KyoApp.Unsafe.runAndBlock(timeout)(v) - if !result.exists(().equals(_)) then - println(pprint.apply(result).plainText) + override protected def handle[A: Flat](v: A < (Async & Resource & Abort[Throwable]))(using Frame): A < (Async & Abort[Throwable]) = + Resource.run(v) end handle end KyoApp @@ -30,21 +25,26 @@ object KyoApp: * The effect type used by the application. */ abstract class Base[S]: + protected def timeout: Duration - protected def handle[A: Flat](v: A < S)(using Frame): Unit + protected def handle[A: Flat](v: A < S)(using Frame): A < (Async & Abort[Throwable]) final protected def args: Array[String] = _args private var _args: Array[String] = null - private val initCode = new ListBuffer[() => Unit] + protected var initCode: List[() => Unit] = List.empty final def main(args: Array[String]) = this._args = args for proc <- initCode do proc() + end main + + protected def run[A: Flat](v: => A < S)(using Frame): Unit - protected def run[A: Flat](v: => A < S)(using Frame): Unit = - initCode += (() => handle(v)) + protected def printResult(result: Result[Any, Any]): Unit = + if !result.exists(().equals(_)) then println(pprint.apply(result).plainText) + end printResult end Base /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ diff --git a/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala b/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala index 4606f2c9e..4c1892d07 100644 --- a/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala @@ -1,6 +1,13 @@ package kyo import Tagged.* +import kyo.Clock.Deadline +import kyo.Clock.Stopwatch +import kyo.Clock.Unsafe +import kyo.internal.LayerMacros.Validated.succeed +import org.scalatest.compatible.Assertion +import scala.collection.mutable.ListBuffer +import scala.util.Try class KyoAppTest extends Test: @@ -27,7 +34,18 @@ class KyoAppTest extends Test: runs <- ref.get yield assert(runs == 3) } - "effects" taggedAs jvmOnly in { + "ordered runs" in { + val x = new ListBuffer[Int] + val promise = scala.concurrent.Promise[Assertion]() + val app = new KyoApp: + run { Async.delay(10.millis)(IO(x += 1)) } + run { Async.delay(10.millis)(IO(x += 2)) } + run { Async.delay(10.millis)(IO(x += 3)) } + run { IO(promise.complete(Try(assert(x.toList == List(1, 2, 3))))) } + app.main(Array.empty) + promise.future + } + "effects in JVM" taggedAs jvmOnly in { def run: Int < (Async & Resource & Abort[Throwable]) = for _ <- Clock.repeatAtInterval(1.second, 1.second)(()) @@ -35,12 +53,28 @@ class KyoAppTest extends Test: _ <- Console.printLine(s"$i") _ <- Clock.now _ <- Resource.ensure(()) - _ <- Async.run(()) + _ <- Async.sleep(1.second) yield 1 import AllowUnsafe.embrace.danger assert(KyoApp.Unsafe.runAndBlock(Duration.Infinity)(run) == Result.success(1)) } + "effects in JS" taggedAs jsOnly in { + val promise = scala.concurrent.Promise[Assertion]() + val app = new KyoApp: + run { + for + _ <- Clock.repeatAtInterval(1.second, 1.second)(()) + i <- Random.nextInt + _ <- Console.printLine(s"$i") + _ <- Clock.now + _ <- Resource.ensure(()) + _ <- Async.sleep(1.second) + yield promise.complete(Try(succeed)) + } + app.main(Array.empty) + promise.future + } "failing effects" taggedAs jvmOnly in { def run: Unit < (Async & Resource & Abort[Throwable]) = for diff --git a/kyo-scheduler/js/src/main/scala/java/util/concurrent/ThreadFactory.scala b/kyo-scheduler/js/src/main/scala/java/util/concurrent/ThreadFactory.scala deleted file mode 100644 index f30181c3b..000000000 --- a/kyo-scheduler/js/src/main/scala/java/util/concurrent/ThreadFactory.scala +++ /dev/null @@ -1,7 +0,0 @@ -package java.util.concurrent - -class ThreadFactory - -object ThreadFactoryStub { - val get = new ThreadFactory -} diff --git a/kyo-scheduler/js/src/main/scala/kyo/scheduler/util/Threads.scala b/kyo-scheduler/js/src/main/scala/kyo/scheduler/util/Threads.scala index 17a26a5b0..028a42574 100644 --- a/kyo-scheduler/js/src/main/scala/kyo/scheduler/util/Threads.scala +++ b/kyo-scheduler/js/src/main/scala/kyo/scheduler/util/Threads.scala @@ -1,13 +1,15 @@ package kyo.scheduler.util +import java.lang.Thread import java.util.concurrent.ThreadFactory -import java.util.concurrent.ThreadFactoryStub object Threads { - def apply(name: String): ThreadFactory = - ThreadFactoryStub.get + def apply(name: String): ThreadFactory = apply(name, _ => Thread.currentThread()) def apply(name: String, create: Runnable => Thread): ThreadFactory = - ThreadFactoryStub.get + new ThreadFactory { + override def newThread(runnable: Runnable): Thread = create(runnable) + } + }