From b5cdf1b3721939375bc2fb67bb4b985f524f886e Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Sun, 20 Oct 2024 19:10:40 -0700 Subject: [PATCH 01/15] [core] time shift and control --- .../shared/src/main/scala/kyo/Async.scala | 35 +- .../shared/src/main/scala/kyo/Clock.scala | 207 +++++++-- .../shared/src/main/scala/kyo/Fiber.scala | 9 +- .../shared/src/main/scala/kyo/Meter.scala | 4 +- .../shared/src/main/scala/kyo/Timer.scala | 365 ++++----------- .../shared/src/test/scala/kyo/ClockTest.scala | 418 +++++++++++------- .../src/test/scala/kyo/KyoAppTest.scala | 2 +- .../shared/src/test/scala/kyo/TimerTest.scala | 231 +++++----- .../shared/src/main/scala/kyo/Duration.scala | 101 +++-- .../src/test/scala/kyo/DurationSpec.scala | 32 ++ .../scala/examples/ledger/api/Server.scala | 6 +- 11 files changed, 753 insertions(+), 657 deletions(-) diff --git a/kyo-core/shared/src/main/scala/kyo/Async.scala b/kyo-core/shared/src/main/scala/kyo/Async.scala index ff0e11e28..71a200c86 100644 --- a/kyo-core/shared/src/main/scala/kyo/Async.scala +++ b/kyo-core/shared/src/main/scala/kyo/Async.scala @@ -105,19 +105,9 @@ object Async: * @param d * The duration to sleep */ - def sleep(d: Duration)(using Frame): Unit < Async = - if d == Duration.Zero then () - else - IO.Unsafe { - val p = Promise.Unsafe.init[Nothing, Unit]() - if d.isFinite then - Timer.schedule(d)(p.completeDiscard(Result.success(()))).map { t => - IO.ensure(t.cancel.unit)(p.safe.get) - } - else - p.safe.get - end if - } + def sleep(duration: Duration)(using Frame): Unit < Async = + if duration == Duration.Zero then () + else Clock.sleep(duration).map(_.get) /** Runs a computation with a timeout. * @@ -128,17 +118,24 @@ object Async: * @return * The result of the computation, or a Timeout error */ - def timeout[E, A: Flat, Ctx](d: Duration)(v: => A < (Abort[E] & Async & Ctx))( + def timeout[E, A: Flat, Ctx](after: Duration)(v: => A < (Abort[E] & Async & Ctx))( using boundary: Boundary[Ctx, Async & Abort[E | Timeout]], frame: Frame ): A < (Abort[E | Timeout] & Async & Ctx) = - boundary { (trace, context) => - val task = IOTask[Ctx, E | Timeout, A](v, trace, context) - Timer.schedule(d)(task.completeDiscard(Result.fail(Timeout(frame)))).map { t => - IO.ensure(t.cancel.unit)(Async.get(task)) + if !after.isFinite then v + else + boundary { (trace, context) => + Clock.use { clock => + IO.Unsafe { + val sleepFiber = clock.unsafe.sleep(after) + val task = IOTask[Ctx, E | Timeout, A](v, trace, context) + sleepFiber.onComplete(_ => discard(task.complete(Result.fail(Timeout(frame))))) + task.onComplete(_ => discard(sleepFiber.interrupt())) + Async.get(task) + } + } } - } end timeout /** Races multiple computations and returns the result of the first to complete. When one computation completes, all other computations diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index 3999c0520..ec205404c 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -1,19 +1,29 @@ package kyo +import java.util.concurrent.Callable +import java.util.concurrent.Delayed +import java.util.concurrent.DelayQueue +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.LockSupport import kyo.Clock.Deadline import kyo.Clock.Stopwatch +import kyo.Result.Panic +import kyo.scheduler.IOPromise +import kyo.scheduler.util.Threads +import scala.annotation.tailrec +import scala.collection.mutable.PriorityQueue /** A clock that provides time-related operations. */ -abstract class Clock: - def unsafe: Clock.Unsafe +final case class Clock(unsafe: Clock.Unsafe): /** Gets the current time as an Instant. * * @return * The current time */ - def now(using Frame): Instant < IO + def now(using Frame): Instant < IO = IO.Unsafe(unsafe.now()) /** Creates a new stopwatch. * @@ -30,6 +40,9 @@ abstract class Clock: * A new Deadline instance */ def deadline(duration: Duration)(using Frame): Clock.Deadline < IO = IO.Unsafe(unsafe.deadline(duration).safe) + + private[kyo] def sleep(duration: Duration)(using Frame): Fiber[Nothing, Unit] < IO = + IO.Unsafe(unsafe.sleep(duration).safe) end Clock /** Companion object for creating and managing Clock instances. */ @@ -47,7 +60,7 @@ object Clock: object Stopwatch: /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ - class Unsafe(start: Instant, clock: Clock.Unsafe): + final class Unsafe(start: Instant, clock: Clock.Unsafe): def elapsed()(using AllowUnsafe): Duration = clock.now() - start def safe: Stopwatch = Stopwatch(this) end Unsafe @@ -72,7 +85,7 @@ object Clock: object Deadline: /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ - class Unsafe(endInstant: Maybe[Instant], clock: Clock.Unsafe): + final class Unsafe(endInstant: Maybe[Instant], clock: Clock.Unsafe): def timeLeft()(using AllowUnsafe): Duration = endInstant.map(_ - clock.now()).getOrElse(Duration.Infinity) @@ -87,7 +100,18 @@ object Clock: val live: Clock = Clock( new Unsafe: - def now()(using AllowUnsafe): Instant = Instant.fromJava(java.time.Instant.now()) + val executor = Executors.newScheduledThreadPool(2, Threads("kyo-core-clock-executor")) + def now()(using AllowUnsafe) = Instant.fromJava(java.time.Instant.now()) + def sleep(duration: Duration) = + Promise.Unsafe.fromIOPromise { + new IOPromise[Nothing, Unit] with Callable[Unit]: + val task = executor.schedule(this, duration.toNanos, TimeUnit.NANOSECONDS) + override def interrupt(error: Panic): Boolean = + task.cancel(true) + super.interrupt(error) + def call(): Unit = completeDiscard(Result.unit) + } + end sleep ) private val local = Local.init(live) @@ -104,13 +128,150 @@ object Clock: def let[A, S](c: Clock)(f: => A < S)(using Frame): A < S = local.let(c)(f) + /** Gets the current Clock instance from the local context. + * + * This is the primary way to access the current Clock instance when you only need to use it directly. + * + * @return + * The current Clock instance + */ + def get(using Frame): Clock < Any = + use(identity) + + /** Uses the current Clock instance from the local context to perform an operation. + * + * This is useful when you want to perform multiple operations with the same Clock instance without having to repeatedly access it. + * + * @param f + * A function that takes a Clock and returns an effect + * @return + * The result of applying the function to the current Clock + */ + def use[A, S](f: Clock => A < S)(using Frame): A < S = + local.use(f) + + /** Runs an effect with a time-shifted Clock where time appears to pass faster or slower. This is particularly useful for testing + * time-dependent behavior without waiting for real time to pass. + * + * @param factor + * The time scaling factor. Values > 1 speed up time, values < 1 slow down time + * @param v + * The effect to run with scaled time + * @return + * The result of running the effect with scaled time + */ + def withTimeShift[A, S](factor: Double)(v: => A < S)(using Frame): A < (IO & S) = + if factor == 1 then v + else + use { clock => + IO.Unsafe { + val shifted = + new Unsafe: + val underlying = clock.unsafe + val start = underlying.now() + val sleepFactor = (1.toDouble / factor) + def now()(using AllowUnsafe) = + val diff = underlying.now() - start + start + (diff * factor) + end now + override def sleep(duration: Duration) = + underlying.sleep(duration * sleepFactor) + let(Clock(shifted))(v) + } + } + end withTimeShift + + /** Interface for controlling time in a test environment. + * + * WARNING: TimeControl is not thread-safe. All operations should be performed sequentially to avoid race conditions and unexpected + * behavior. + */ + trait TimeControl: + /** Sets the current time to a specific instant. + * + * @param now + * The instant to set the current time to + * @return + * Unit effect that updates the current time + */ + def set(now: Instant): Unit < IO + + /** Advances the current time by the specified duration. + * + * @param duration + * The duration to advance time by + * @return + * Unit effect that advances the current time + */ + def advance(duration: Duration): Unit < IO + end TimeControl + + /** Runs an effect with a controlled Clock that allows manual time manipulation. This is primarily intended for testing scenarios where + * precise control over time progression is needed. + * + * Note: TimeControl is not thread-safe. Operations on TimeControl should be performed sequentially within the same fiber. + * + * @param f + * A function that takes a TimeControl and returns an effect + * @return + * The result of running the effect with controlled time + */ + def withTimeControl[A, S](f: TimeControl => A < S)(using Frame): A < (IO & S) = + IO.Unsafe { + val controlled = + new Unsafe with TimeControl: + @volatile var current = Instant.Epoch + + case class Task(deadline: Instant) extends IOPromise[Nothing, Unit] with Delayed: + def getDelay(unit: TimeUnit): Long = + (deadline - current).to(unit) + def compareTo(other: Delayed): Int = + deadline.toJava.compareTo(other.asInstanceOf[Task].deadline.toJava) + end Task + + val queue = new DelayQueue[Task] + + def now()(using AllowUnsafe) = current + + def sleep(duration: Duration): Fiber.Unsafe[Nothing, Unit] = + val task = new Task(current + duration) + queue.add(task) + Promise.Unsafe.fromIOPromise(task) + end sleep + + def set(now: Instant) = + IO { + current = now + tick() + } + + def advance(duration: Duration) = + IO { + current = current + duration + tick() + } + + def tick(): Unit = + val task = queue.poll() + if task != null then + task.completeDiscard(Result.unit) + tick() + end if + end tick + let(Clock(controlled))(f(controlled)) + } + end withTimeControl + /** Gets the current time using the local Clock instance. * * @return * The current time */ def now(using Frame): Instant < IO = - local.use(_.now) + use(_.now) + + private[kyo] def sleep(duration: Duration)(using Frame): Fiber[Nothing, Unit] < IO = + use(_.sleep(duration)) /** Creates a new stopwatch using the local Clock instance. * @@ -118,7 +279,7 @@ object Clock: * A new Stopwatch instance */ def stopwatch(using Frame): Stopwatch < IO = - local.use(_.stopwatch) + use(_.stopwatch) /** Creates a new deadline with the specified duration using the local Clock instance. * @@ -128,31 +289,27 @@ object Clock: * A new Deadline instance */ def deadline(duration: Duration)(using Frame): Deadline < IO = - local.use(_.deadline(duration)) - - /** Creates a new Clock instance from an Unsafe implementation. - * - * @param u - * The Unsafe implementation - * @return - * A new Clock instance - */ - def apply(u: Unsafe): Clock = - new Clock: - def now(using Frame): Instant < IO = - IO.Unsafe(u.now()) - def unsafe: Unsafe = u + use(_.deadline(duration)) /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ abstract class Unsafe: + import AllowUnsafe.embrace.danger + import Unsafe.SleepTask + def now()(using AllowUnsafe): Instant - def stopwatch()(using AllowUnsafe): Stopwatch.Unsafe = Stopwatch.Unsafe(now(), this) + private[kyo] def sleep(duration: Duration): Fiber.Unsafe[Nothing, Unit] - def deadline(duration: Duration)(using AllowUnsafe): Deadline.Unsafe = + final def stopwatch()(using AllowUnsafe): Stopwatch.Unsafe = Stopwatch.Unsafe(now(), this) + + final def deadline(duration: Duration)(using AllowUnsafe): Deadline.Unsafe = if !duration.isFinite then Deadline.Unsafe(Maybe.empty, this) else Deadline.Unsafe(Maybe(now() + duration), this) - def safe: Clock = Clock(this) + final def safe: Clock = Clock(this) end Unsafe + + object Unsafe: + final private class SleepTask(val deadline: Instant) extends IOPromise[Nothing, Unit] + end Clock diff --git a/kyo-core/shared/src/main/scala/kyo/Fiber.scala b/kyo-core/shared/src/main/scala/kyo/Fiber.scala index 46b3c2c5d..111e3eb93 100644 --- a/kyo-core/shared/src/main/scala/kyo/Fiber.scala +++ b/kyo-core/shared/src/main/scala/kyo/Fiber.scala @@ -365,9 +365,10 @@ object Fiber extends FiberPlatformSpecific: def onComplete(f: Result[E, A] => Unit)(using AllowUnsafe): Unit = self.onComplete(f) def onInterrupt(f: Panic => Unit)(using Frame): Unit = self.onInterrupt(f) def block(deadline: Clock.Deadline.Unsafe)(using AllowUnsafe, Frame): Result[E | Timeout, A] = self.block(deadline) - def interrupt(error: Panic)(using AllowUnsafe): Boolean = self.interrupt(error) - def interruptDiscard(error: Panic)(using AllowUnsafe): Unit = discard(self.interrupt(error)) - def mask()(using AllowUnsafe): Unsafe[E, A] = self.mask() + def interrupt()(using frame: Frame, allow: AllowUnsafe): Boolean = self.interrupt(Panic(Interrupted(frame))) + def interrupt(error: Panic)(using AllowUnsafe): Boolean = self.interrupt(error) + def interruptDiscard(error: Panic)(using AllowUnsafe): Unit = discard(self.interrupt(error)) + def mask()(using AllowUnsafe): Unsafe[E, A] = self.mask() def toFuture()(using E <:< Throwable, AllowUnsafe): Future[A] = val r = scala.concurrent.Promise[A]() @@ -460,6 +461,8 @@ object Fiber extends FiberPlatformSpecific: def init[E, A]()(using AllowUnsafe): Unsafe[E, A] = IOPromise() + private[kyo] def fromIOPromise[E, A](p: IOPromise[E, A]): Unsafe[E, A] = p + extension [E, A](self: Unsafe[E, A]) def complete[E2 <: E, A2 <: A](v: Result[E, A])(using AllowUnsafe): Boolean = self.complete(v) def completeDiscard[E2 <: E, A2 <: A](v: Result[E, A])(using AllowUnsafe): Unit = discard(self.complete(v)) diff --git a/kyo-core/shared/src/main/scala/kyo/Meter.scala b/kyo-core/shared/src/main/scala/kyo/Meter.scala index fe39e003d..e6750fb4a 100644 --- a/kyo-core/shared/src/main/scala/kyo/Meter.scala +++ b/kyo-core/shared/src/main/scala/kyo/Meter.scala @@ -114,7 +114,7 @@ object Meter: new Base(rate): val timerTask = // Schedule periodic task to replenish permits - Timer.live.unsafe.scheduleAtFixedRate(period, period)(replenish()) + IO.Unsafe.run(Timer.repeatAtInterval(period)(replenish())).eval def dispatch[A, S](v: => A < S) = // Don't release a permit since it's managed by the timer task @@ -124,7 +124,7 @@ object Meter: if i < rate && release() then replenish(i + 1) - def onClose() = discard(timerTask.cancel()) + def onClose() = discard(timerTask.unsafe.interrupt()) } /** Combines two Meters into a pipeline. diff --git a/kyo-core/shared/src/main/scala/kyo/Timer.scala b/kyo-core/shared/src/main/scala/kyo/Timer.scala index 7ebefc29b..5fb3479a1 100644 --- a/kyo-core/shared/src/main/scala/kyo/Timer.scala +++ b/kyo-core/shared/src/main/scala/kyo/Timer.scala @@ -1,288 +1,87 @@ package kyo -import java.util.concurrent.* -import kyo.scheduler.util.Threads - -/** A timer for scheduling tasks to run after a delay or periodically. */ -abstract class Timer: - def unsafe: Timer.Unsafe - - /** Schedule a task to run after a specified delay. - * - * @param delay - * The time to wait before executing the task - * @param f - * The task to execute - * @return - * A TimerTask that can be used to cancel the scheduled task - */ - def schedule(delay: Duration)(f: => Unit < Async)(using Frame): TimerTask < IO - - /** Schedule a task to run periodically at a fixed rate. - * - * @param initialDelay - * The time to wait before the first execution - * @param period - * The time between successive executions - * @param f - * The task to execute - * @return - * A TimerTask that can be used to cancel the scheduled task - */ - def scheduleAtFixedRate( - initialDelay: Duration, - period: Duration - )(f: => Unit < Async)(using Frame): TimerTask < IO - - /** Schedule a task to run periodically with a fixed delay between executions. - * - * @param initialDelay - * The time to wait before the first execution - * @param period - * The time to wait between the end of one execution and the start of the next - * @param f - * The task to execute - * @return - * A TimerTask that can be used to cancel the scheduled task - */ - def scheduleWithFixedDelay( - initialDelay: Duration, - period: Duration - )(f: => Unit < Async)(using Frame): TimerTask < IO - -end Timer - object Timer: - /** A default Timer implementation using a scheduled thread pool. */ - val live: Timer = - import AllowUnsafe.embrace.danger - Timer(Unsafe(Executors.newScheduledThreadPool(2, Threads("kyo-timer-default")))) - - private val local = Local.init(Timer.live) - - /** Get the current Timer from the local context. - * - * @param timer - * The Timer to use in the given context - * @param v - * The computation to run with the specified Timer - * @return - * The result of the computation with the Timer in context - */ - def let[A, S](timer: Timer)(v: A < S)(using Frame): A < (IO & S) = - local.let(timer)(v) - - /** Schedule a task to run after a specified delay. - * - * @param delay - * The time to wait before executing the task - * @param f - * The task to execute - * @return - * A TimerTask that can be used to cancel the scheduled task - */ - def schedule(delay: Duration)(f: => Unit < Async)(using Frame): TimerTask < IO = - local.use(_.schedule(delay)(f)) - - /** Schedule a task to run periodically at a fixed rate, starting immediately. - * - * @param period - * The time between successive executions - * @param f - * The task to execute - * @return - * A TimerTask that can be used to cancel the scheduled task - */ - def scheduleAtFixedRate( - period: Duration - )(f: => Unit < Async)(using Frame): TimerTask < IO = - scheduleAtFixedRate(Duration.Zero, period)(f) - - /** Schedule a task to run periodically at a fixed rate. - * - * @param initialDelay - * The time to wait before the first execution - * @param period - * The time between successive executions - * @param f - * The task to execute - * @return - * A TimerTask that can be used to cancel the scheduled task - */ - def scheduleAtFixedRate( - initialDelay: Duration, - period: Duration - )(f: => Unit < Async)(using Frame): TimerTask < IO = - local.use(_.scheduleAtFixedRate(initialDelay, period)(f)) - - /** Schedule a task to run periodically with a fixed delay between executions, starting immediately. - * - * @param period - * The time to wait between the end of one execution and the start of the next - * @param f - * The task to execute - * @return - * A TimerTask that can be used to cancel the scheduled task - */ - def scheduleWithFixedDelay( - period: Duration - )(f: => Unit < Async)(using Frame): TimerTask < IO = - scheduleWithFixedDelay(Duration.Zero, period)(f) - - /** Schedule a task to run periodically with a fixed delay between executions. - * - * @param initialDelay - * The time to wait before the first execution - * @param period - * The time to wait between the end of one execution and the start of the next - * @param f - * The task to execute - * @return - * A TimerTask that can be used to cancel the scheduled task - */ - def scheduleWithFixedDelay( - initialDelay: Duration, - period: Duration - )(f: => Unit < Async)(using Frame): TimerTask < IO = - local.use(_.scheduleWithFixedDelay(initialDelay, period)(f)) - - /** Create a new Timer using the provided ScheduledExecutorService. - * - * @param exec - * The ScheduledExecutorService to use for scheduling tasks - * @return - * A new Timer instance - */ - def apply(u: Unsafe): Timer = - new Timer: - def unsafe: Unsafe = u - - private def eval(f: => Unit < Async)(using Frame): Unit = - import AllowUnsafe.embrace.danger - discard(IO.Unsafe.run(Async.run(f)).eval) - - def schedule(delay: Duration)(f: => Unit < Async)(using Frame): TimerTask < IO = - IO.Unsafe(unsafe.schedule(delay)(eval(f)).safe) - - def scheduleAtFixedRate( - initialDelay: Duration, - period: Duration - )(f: => Unit < Async)(using Frame): TimerTask < IO = - IO.Unsafe(unsafe.scheduleAtFixedRate(initialDelay, period)(eval(f)).safe) - - def scheduleWithFixedDelay( - initialDelay: Duration, - period: Duration - )(f: => Unit < Async)(using Frame): TimerTask < IO = - IO.Unsafe(unsafe.scheduleWithFixedDelay(initialDelay, period)(eval(f)).safe) - - /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ - abstract class Unsafe: - def schedule(delay: Duration)(f: => Unit)(using AllowUnsafe): TimerTask.Unsafe - def scheduleAtFixedRate( - initialDelay: Duration, - period: Duration - )(f: => Unit)(using AllowUnsafe): TimerTask.Unsafe - def scheduleWithFixedDelay( - initialDelay: Duration, - period: Duration - )(f: => Unit)(using AllowUnsafe): TimerTask.Unsafe - def safe: Timer = Timer(this) - end Unsafe - - /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ - object Unsafe: - def apply(exec: ScheduledExecutorService)(using AllowUnsafe): Unsafe = new Unsafe: - final private class FutureTimerTask(task: ScheduledFuture[?]) extends TimerTask.Unsafe: - def cancel()(using AllowUnsafe): Boolean = task.cancel(false) - def cancelled()(using AllowUnsafe): Boolean = task.isCancelled() - def done()(using AllowUnsafe): Boolean = task.isDone() - end FutureTimerTask - - def schedule(delay: Duration)(f: => Unit)(using AllowUnsafe): TimerTask.Unsafe = - if delay.isFinite then - val call = new Callable[Unit]: - def call: Unit = f - new FutureTimerTask(exec.schedule(call, delay.toNanos, TimeUnit.NANOSECONDS)) - else - TimerTask.Unsafe.noop - - def scheduleAtFixedRate( - initialDelay: Duration, - period: Duration - )(f: => Unit)(using AllowUnsafe): TimerTask.Unsafe = - if period.isFinite && initialDelay.isFinite then - new FutureTimerTask( - exec.scheduleAtFixedRate( - () => f, - initialDelay.toNanos, - period.toNanos, - TimeUnit.NANOSECONDS - ) - ) - else - TimerTask.Unsafe.noop - - def scheduleWithFixedDelay( - initialDelay: Duration, - period: Duration - )(f: => Unit)(using AllowUnsafe): TimerTask.Unsafe = - if period.isFinite && initialDelay.isFinite then - new FutureTimerTask( - exec.scheduleWithFixedDelay( - () => f, - initialDelay.toNanos, - period.toNanos, - TimeUnit.NANOSECONDS - ) - ) - else - TimerTask.Unsafe.noop - end Unsafe + def repeatWithDelay[E, S](delay: Duration)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = + repeatWithDelay(Duration.Zero, delay)(f) + + def repeatWithDelay[E, S]( + startAfter: Duration, + delay: Duration + )( + f: => Unit < (Async & Abort[E]) + )(using Frame): Fiber[E, Unit] < IO = + repeatWithDelay(startAfter, delay, ())(_ => f) + + def repeatWithDelay[E, A: Flat, S]( + startAfter: Duration, + delay: Duration, + state: A + )( + f: A => A < (Async & Abort[E]) + )(using Frame): Fiber[E, A] < IO = + repeatWithDelay(Schedule.delay(startAfter).andThen(Schedule.fixed(delay)), state)(f) + + def repeatWithDelay[E, S](delaySchedule: Schedule)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = + repeatWithDelay(delaySchedule, ())(_ => f) + + def repeatWithDelay[E, A: Flat, S]( + delaySchedule: Schedule, + state: A + )( + f: A => A < (Async & Abort[E]) + )(using Frame): Fiber[E, A] < IO = + Async.run { + Clock.use { clock => + Loop(state, delaySchedule) { (state, schedule) => + schedule.next match + case Absent => Loop.done(state) + case Present((duration, nextSchedule)) => + clock.sleep(duration).map(_.use(_ => f(state).map(Loop.continue(_, nextSchedule)))) + } + } + } + + def repeatAtInterval[E, S](interval: Duration)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = + repeatAtInterval(Duration.Zero, interval)(f) + + def repeatAtInterval[E, S]( + startAfter: Duration, + interval: Duration + )( + f: => Unit < (Async & Abort[E]) + )(using Frame): Fiber[E, Unit] < IO = + repeatAtInterval(startAfter, interval, ())(_ => f) + + def repeatAtInterval[E, A: Flat, S]( + startAfter: Duration, + interval: Duration, + state: A + )( + f: A => A < (Async & Abort[E]) + )(using Frame): Fiber[E, A] < IO = + repeatAtInterval(Schedule.delay(startAfter).andThen(Schedule.fixed(interval)), state)(f) + + def repeatAtInterval[E, S](intervalSchedule: Schedule)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = + repeatAtInterval(intervalSchedule, ())(_ => f) + + def repeatAtInterval[E, A: Flat, S]( + intervalSchedule: Schedule, + state: A + )( + f: A => A < (Async & Abort[E]) + )(using Frame): Fiber[E, A] < IO = + Async.run { + Clock.use { clock => + clock.now.map { now => + Loop(now, state, intervalSchedule) { (lastExecution, state, period) => + period.next match + case Absent => Loop.done(state) + case Present((duration, nextSchedule)) => + val nextExecution = lastExecution + duration + clock.sleep(duration).map(_.use(_ => f(state).map(Loop.continue(nextExecution, _, nextSchedule)))) + } + } + } + } end Timer - -/** Represents a scheduled task that can be cancelled. */ -final case class TimerTask private (unsafe: TimerTask.Unsafe) extends AnyVal: - /** Attempt to cancel the execution of this task. - * - * @return - * true if the task was successfully cancelled, false otherwise - */ - def cancel(using Frame): Boolean < IO = IO.Unsafe(unsafe.cancel()) - - /** Check if this task has been cancelled. - * - * @return - * true if the task has been cancelled, false otherwise - */ - def cancelled(using Frame): Boolean < IO = IO.Unsafe(unsafe.cancelled()) - - /** Check if this task has completed its execution. - * - * @return - * true if the task has completed, false otherwise - */ - def done(using Frame): Boolean < IO = IO.Unsafe(unsafe.done()) -end TimerTask - -object TimerTask: - /** A no-op TimerTask that is always considered done and cannot be cancelled. */ - val noop = TimerTask(Unsafe.noop) - - /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ - abstract class Unsafe: - def cancel()(using AllowUnsafe): Boolean - def cancelled()(using AllowUnsafe): Boolean - def done()(using AllowUnsafe): Boolean - def safe: TimerTask = TimerTask(this) - end Unsafe - - /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ - object Unsafe: - val noop = new Unsafe: - def cancel()(using AllowUnsafe) = false - def cancelled()(using AllowUnsafe) = false - def done()(using AllowUnsafe) = true - end Unsafe -end TimerTask diff --git a/kyo-core/shared/src/test/scala/kyo/ClockTest.scala b/kyo-core/shared/src/test/scala/kyo/ClockTest.scala index 89fde94c5..f03f5a3b9 100644 --- a/kyo-core/shared/src/test/scala/kyo/ClockTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/ClockTest.scala @@ -3,240 +3,320 @@ package kyo import java.time.temporal.ChronoUnit import kyo.Clock.Deadline import kyo.Clock.Stopwatch +import kyo.Clock.stopwatch class ClockTest extends Test: - given CanEqual[Instant, Instant] = CanEqual.derived - - class TestClock extends Clock: - var currentTime = Instant.fromJava(java.time.Instant.now()) - - val unsafe: Clock.Unsafe = new Clock.Unsafe: - def now()(using AllowUnsafe): Instant = currentTime - def now(using Frame) = IO(currentTime) - - def advance(duration: Duration): Unit = - currentTime = currentTime + duration - - def set(instant: Instant): Unit = - currentTime = instant - end TestClock - "Clock" - { + def javaNow() = Instant.fromJava(java.time.Instant.now()) + "now" in run { - val testClock = new TestClock - val instant = testClock.currentTime - for - result <- Clock.let(testClock)(Clock.now) - yield assert(result == instant) + Clock.now.map { now => + assert(now - javaNow() < 1.milli) + } } "unsafe now" in { import AllowUnsafe.embrace.danger - val testClock = new TestClock - val instant = testClock.currentTime - assert(testClock.unsafe.now() == instant) + val now = Clock.live.unsafe.now() + assert(now - javaNow() < 1.milli) } "now at epoch" in run { - val testClock = new TestClock - testClock.set(Instant.Epoch) - for - result <- Clock.let(testClock)(Clock.now) - yield assert(result == Instant.Epoch) - } - - "now at max instant" in run { - val testClock = new TestClock - testClock.set(Instant.Max) - for - result <- Clock.let(testClock)(Clock.now) - yield assert(result == Instant.Max) - } - - "handle Duration.Zero and Duration.Infinity" - { - "deadline with Zero duration" in run { - val testClock = new TestClock + Clock.withTimeControl { control => for - deadline <- Clock.let(testClock)(Clock.deadline(Duration.Zero)) - isOverdue <- deadline.isOverdue - timeLeft <- deadline.timeLeft - yield - assert(!isOverdue) - assert(timeLeft == Duration.Zero) - end for + _ <- control.set(Instant.Epoch) + now <- Clock.now + yield assert(now == Instant.Epoch) } + } - "deadline with Infinity duration" in run { - val testClock = new TestClock + "now at max instant" in run { + Clock.withTimeControl { control => for - deadline <- Clock.let(testClock)(Clock.deadline(Duration.Infinity)) - isOverdue <- deadline.isOverdue - timeLeft <- deadline.timeLeft - yield - assert(!isOverdue) - assert(timeLeft == Duration.Infinity) - end for + _ <- control.set(Instant.Max) + now <- Clock.now + yield assert(now == Instant.Max) } } } "Stopwatch" - { "elapsed time" in run { - val testClock = new TestClock - for - stopwatch <- Clock.let(testClock)(Clock.stopwatch) - _ = testClock.advance(5.seconds) - elapsed <- stopwatch.elapsed - yield assert(elapsed == 5.seconds) - end for + Clock.withTimeControl { control => + for + stopwatch <- Clock.stopwatch + _ <- control.advance(5.seconds) + elapsed <- stopwatch.elapsed + yield assert(elapsed == 5.seconds) + end for + } } - "unsafe elapsed time" in { + "unsafe elapsed time" in run { import AllowUnsafe.embrace.danger - val testClock = new TestClock - val stopwatch = testClock.unsafe.stopwatch() - testClock.advance(5.seconds) - assert(stopwatch.elapsed() == 5.seconds) + Clock.withTimeControl { control => + for + clock <- Clock.get + stopwatch <- IO.Unsafe(clock.unsafe.stopwatch()) + _ <- control.advance(5.seconds) + yield assert(stopwatch.elapsed() == 5.seconds) + end for + } } "zero elapsed time" in run { - val testClock = new TestClock - for - stopwatch <- Clock.let(testClock)(Clock.stopwatch) - elapsed <- stopwatch.elapsed - yield assert(elapsed == Duration.Zero) - end for - } - - "measure Zero duration" in run { - val testClock = new TestClock - for - stopwatch <- Clock.let(testClock)(Clock.stopwatch) - elapsed <- stopwatch.elapsed - yield assert(elapsed == Duration.Zero) - end for + Clock.withTimeControl { control => + for + stopwatch <- Clock.stopwatch + elapsed <- stopwatch.elapsed + yield assert(elapsed == 0.seconds) + end for + } } } "Deadline" - { "timeLeft" in run { - val testClock = new TestClock - for - deadline <- Clock.let(testClock)(Clock.deadline(10.seconds)) - _ = testClock.advance(3.seconds) - timeLeft <- deadline.timeLeft - yield assert(timeLeft == 7.seconds) - end for + Clock.withTimeControl { control => + for + deadline <- Clock.deadline(10.seconds) + _ <- control.advance(3.seconds) + timeLeft <- deadline.timeLeft + yield assert(timeLeft == 7.seconds) + end for + } } "isOverdue" in run { - val testClock = new TestClock - for - deadline <- Clock.let(testClock)(Clock.deadline(5.seconds)) - notOverdue <- deadline.isOverdue - _ = testClock.advance(6.seconds) - overdue <- deadline.isOverdue - yield assert(!notOverdue && overdue) - end for + Clock.withTimeControl { control => + for + deadline <- Clock.deadline(5.seconds) + notOverdue <- deadline.isOverdue + _ <- control.advance(6.seconds) + overdue <- deadline.isOverdue + yield assert(!notOverdue && overdue) + } } - "unsafe timeLeft" in { + "unsafe timeLeft" in run { import AllowUnsafe.embrace.danger - val testClock = new TestClock - val deadline = testClock.unsafe.deadline(10.seconds) - testClock.advance(3.seconds) - assert(deadline.timeLeft() == 7.seconds) + Clock.withTimeControl { control => + for + clock <- Clock.get + deadline <- IO.Unsafe(clock.unsafe.deadline(10.seconds)) + _ <- control.advance(3.seconds) + yield assert(deadline.timeLeft() == 7.seconds) + } } - "unsafe isOverdue" in { + "unsafe isOverdue" in run { import AllowUnsafe.embrace.danger - val testClock = new TestClock - val deadline = testClock.unsafe.deadline(5.seconds) - assert(!deadline.isOverdue()) - testClock.advance(6.seconds) - assert(deadline.isOverdue()) + Clock.withTimeControl { control => + for + deadline <- Clock.deadline(5.seconds) + _ <- IO.Unsafe(assert(!deadline.unsafe.isOverdue())) + _ <- control.advance(6.seconds) + yield assert(deadline.unsafe.isOverdue()) + } } "zero duration deadline" in run { - val testClock = new TestClock - for - deadline <- Clock.let(testClock)(Clock.deadline(Duration.Zero)) - isOverdue <- deadline.isOverdue - timeLeft <- deadline.timeLeft - yield assert(!isOverdue && timeLeft == Duration.Zero) - end for + Clock.withTimeControl { control => + for + deadline <- Clock.deadline(Duration.Zero) + isOverdue <- deadline.isOverdue + timeLeft <- deadline.timeLeft + yield assert(!isOverdue && timeLeft == Duration.Zero) + } } "deadline exactly at expiration" in run { - val testClock = new TestClock - for - deadline <- Clock.let(testClock)(Clock.deadline(5.seconds)) - _ = testClock.advance(5.seconds) - isOverdue <- deadline.isOverdue - timeLeft <- deadline.timeLeft - yield assert(!isOverdue && timeLeft == Duration.Zero) - end for + Clock.withTimeControl { control => + for + deadline <- Clock.deadline(5.seconds) + _ <- control.advance(5.seconds) + isOverdue <- deadline.isOverdue + timeLeft <- deadline.timeLeft + yield assert(!isOverdue && timeLeft == Duration.Zero) + } } "handle Zero timeLeft" in run { - val testClock = new TestClock + Clock.withTimeControl { control => + for + deadline <- Clock.deadline(1.second) + _ <- control.advance(1.second) + timeLeft <- deadline.timeLeft + isOverdue <- deadline.isOverdue + yield + assert(timeLeft == Duration.Zero) + assert(!isOverdue) + } + } + + "handle Infinity timeLeft" in run { + Clock.withTimeControl { control => + for + deadline <- Clock.deadline(Duration.Infinity) + timeLeft <- deadline.timeLeft + isOverdue <- deadline.isOverdue + yield + assert(timeLeft == Duration.Infinity) + assert(!isOverdue) + } + } + + "handle Duration.Zero and Duration.Infinity" - { + "deadline with Zero duration" in run { + Clock.withTimeControl { control => + for + deadline <- Clock.deadline(Duration.Zero) + isOverdue <- deadline.isOverdue + timeLeft <- deadline.timeLeft + yield + assert(!isOverdue) + assert(timeLeft == Duration.Zero) + } + } + + "deadline with Infinity duration" in run { + Clock.withTimeControl { control => + for + deadline <- Clock.deadline(Duration.Infinity) + isOverdue <- deadline.isOverdue + timeLeft <- deadline.timeLeft + yield + assert(!isOverdue) + assert(timeLeft == Duration.Infinity) + } + } + } + } + + "Integration" - { + "using stopwatch with deadline" in run { + Clock.withTimeControl { control => + for + stopwatch <- Clock.stopwatch + deadline <- Clock.deadline(10.seconds) + _ <- control.advance(7.seconds) + elapsed <- stopwatch.elapsed + timeLeft <- deadline.timeLeft + yield assert(elapsed == 7.seconds && timeLeft == 3.seconds) + } + } + + "multiple stopwatches and deadlines" in run { + Clock.withTimeControl { control => + for + stopwatch1 <- Clock.stopwatch + deadline1 <- Clock.deadline(10.seconds) + _ <- control.advance(3.seconds) + stopwatch2 <- Clock.stopwatch + deadline2 <- Clock.deadline(5.seconds) + _ <- control.advance(4.seconds) + elapsed1 <- stopwatch1.elapsed + elapsed2 <- stopwatch2.elapsed + timeLeft1 <- deadline1.timeLeft + timeLeft2 <- deadline2.timeLeft + yield + assert(elapsed1 == 7.seconds) + assert(elapsed2 == 4.seconds) + assert(timeLeft1 == 3.seconds) + assert(timeLeft2 == 1.second) + } + } + } + + "Sleep" - { + "sleep for specified duration" in run { for - deadline <- Clock.let(testClock)(Clock.deadline(1.second)) - _ <- Clock.let(testClock)(IO { testClock.advance(1.second) }) - timeLeft <- deadline.timeLeft - isOverdue <- deadline.isOverdue + clock <- Clock.get + start <- Clock.now + fiber <- clock.sleep(1.millis) + _ <- fiber.get + end <- Clock.now yield - assert(timeLeft == Duration.Zero) - assert(!isOverdue) - end for + val elapsed = end - start + assert(elapsed >= 1.millis && elapsed < 10.millis) } - "handle Infinity timeLeft" in run { - val testClock = new TestClock + "multiple sequential sleeps" in run { + for + clock <- Clock.get + start <- Clock.now + fiber1 <- clock.sleep(2.millis) + _ <- fiber1.get + mid <- Clock.now + fiber2 <- clock.sleep(2.millis) + _ <- fiber2.get + end <- Clock.now + yield + assert(mid - start >= 2.millis && mid - start < 10.millis) + assert(end - start >= 4.millis && end - start < 15.millis) + } + + "sleep with zero duration" in run { + for + clock <- Clock.get + start <- Clock.now + fiber <- clock.sleep(Duration.Zero) + _ <- fiber.get + end <- Clock.now + yield assert(end - start < 2.millis) + } + + "concurrency" in run { for - deadline <- Clock.let(testClock)(Clock.deadline(Duration.Infinity)) - timeLeft <- deadline.timeLeft - isOverdue <- deadline.isOverdue + clock <- Clock.get + start <- Clock.now + fibers <- Kyo.fill(100)(clock.sleep(1.millis)) + _ <- Kyo.foreachDiscard(fibers)(_.get) + end <- Clock.now yield - assert(timeLeft == Duration.Infinity) - assert(!isOverdue) - end for + val elapsed = end - start + assert(elapsed >= 1.millis && elapsed < 15.millis) } } - "Integration" - { - "using stopwatch with deadline" in run { - val testClock = new TestClock + "TimeShift" - { + "speed up time" in run { for - stopwatch <- Clock.let(testClock)(Clock.stopwatch) - deadline <- Clock.let(testClock)(Clock.deadline(10.seconds)) - _ = testClock.advance(7.seconds) - elapsed <- stopwatch.elapsed - timeLeft <- deadline.timeLeft - yield assert(elapsed == 7.seconds && timeLeft == 3.seconds) - end for + wallStart <- Clock.now + shiftedEnd <- Clock.withTimeShift(2)(Clock.sleep(10.millis).map(_.get.andThen(Clock.now))) + wallEnd <- Clock.now + yield + val elapsedWall = wallEnd - wallStart + val elapsedShifted = shiftedEnd - wallStart + assert(elapsedWall >= 5.millis && elapsedWall < 10.millis) + assert(elapsedShifted > elapsedWall) } - "multiple stopwatches and deadlines" in run { - val testClock = new TestClock + "slow down time" in run { for - stopwatch1 <- Clock.let(testClock)(Clock.stopwatch) - deadline1 <- Clock.let(testClock)(Clock.deadline(10.seconds)) - _ = testClock.advance(3.seconds) - stopwatch2 <- Clock.let(testClock)(Clock.stopwatch) - deadline2 <- Clock.let(testClock)(Clock.deadline(5.seconds)) - _ = testClock.advance(4.seconds) - elapsed1 <- stopwatch1.elapsed - elapsed2 <- stopwatch2.elapsed - timeLeft1 <- deadline1.timeLeft - timeLeft2 <- deadline2.timeLeft + wallStart <- Clock.now + shiftedEnd <- Clock.withTimeShift(0.1)(Clock.sleep(2.millis).map(_.get.andThen(Clock.now))) + wallEnd <- Clock.now yield - assert(elapsed1 == 7.seconds) - assert(elapsed2 == 4.seconds) - assert(timeLeft1 == 3.seconds) - assert(timeLeft2 == 1.second) - end for + val elapsedWall = wallEnd - wallStart + val elapsedShifted = shiftedEnd - wallStart + assert(elapsedWall >= 20.millis && elapsedWall < 50.millis) + assert(elapsedShifted < elapsedWall) + } + + "with time control" in run { + Clock.withTimeControl { control => + Clock.withTimeShift(2.0) { + for + start <- Clock.now + _ <- control.advance(5.seconds) + end <- Clock.now + yield + val elapsed = end - start + assert(elapsed == 10.seconds) + } + } } } end ClockTest diff --git a/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala b/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala index 6ff86b3d4..2b2c007e0 100644 --- a/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala @@ -33,7 +33,7 @@ class KyoAppTest extends Test: "effects" taggedAs jvmOnly in { def run: Int < (Async & Resource & Abort[Throwable]) = for - _ <- Timer.scheduleAtFixedRate(1.second, 1.second)(()) + _ <- Timer.repeatAtInterval(1.second, 1.second)(()) i <- Random.nextInt _ <- Console.println(s"$i") _ <- Clock.now diff --git a/kyo-core/shared/src/test/scala/kyo/TimerTest.scala b/kyo-core/shared/src/test/scala/kyo/TimerTest.scala index 8ac1ed3cd..ce32e84f5 100644 --- a/kyo-core/shared/src/test/scala/kyo/TimerTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/TimerTest.scala @@ -1,127 +1,150 @@ package kyo -import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicInteger as JAtomicInteger import org.scalatest.compatible.Assertion class TimerTest extends Test: - "schedule" in run { - for - p <- Promise.init[Nothing, String] - _ <- Timer.schedule(1.milli)(p.complete(Result.success("hello")).map(require(_))) - hello <- p.get - yield assert(hello == "hello") - } + def intervals(instants: Seq[Instant]): Seq[Duration] = + instants.sliding(2, 1).filter(_.size == 2).map(seq => seq(1) - seq(0)).toSeq - "custom executor" in runJVM { - val exec = Executors.newSingleThreadScheduledExecutor() - import AllowUnsafe.embrace.danger - Timer.let(Timer(Timer.Unsafe(exec))) { + "repeatAtInterval" - { + "executes function at interval" in run { for - p <- Promise.init[Nothing, String] - _ <- Timer.schedule(1.milli)(p.complete(Result.success("hello")).map(require(_))) - hello <- p.get - yield assert(hello == "hello") + channel <- Channel.init[Instant](10) + task <- Timer.repeatAtInterval(1.millis)(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield + val avgInterval = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) + assert(avgInterval >= 1.millis && avgInterval < 5.millis) + } + "respects interrupt" in run { + for + channel <- Channel.init[Instant](10) + task <- Timer.repeatAtInterval(1.millis)(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + _ <- Async.sleep(2.millis) + result <- channel.poll + yield assert(result.isEmpty) + } + "with time control" in run { + Clock.withTimeControl { control => + for + running <- AtomicBoolean.init(false) + queue <- Queue.Unbounded.init[Instant]() + task <- Timer.repeatAtInterval(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) + _ <- untilTrue(control.advance(1.milli).andThen(running.get)) + _ <- queue.drain + _ <- control.advance(1.milli).repeat(10) + _ <- task.interrupt + instants <- queue.drain + yield + intervals(instants).foreach(v => assert(v == 1.millis)) + succeed + } + } + "with Schedule parameter" in run { + for + channel <- Channel.init[Instant](10) + task <- Timer.repeatAtInterval(Schedule.fixed(1.millis))(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield + val avgInterval = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) + assert(avgInterval >= 1.millis && avgInterval < 5.millis) + } + "with Schedule and state" in run { + for + channel <- Channel.init[Int](10) + task <- Timer.repeatAtInterval(Schedule.fixed(1.millis), 0)(st => channel.put(st).andThen(st + 1)) + numbers <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield assert(numbers.toSeq == (0 until 10)) + } + "completes when schedule completes" in run { + for + channel <- Channel.init[Int](10) + task <- Timer.repeatAtInterval(Schedule.fixed(1.millis).maxDuration(10.millis), 0)(st => channel.put(st).andThen(st + 1)) + lastState <- task.get + numbers <- channel.drain + yield assert(lastState == 10 && numbers.toSeq == (0 until 10)) } } - "cancel" in runJVM { - for - p <- Promise.init[Nothing, String] - task <- Timer.schedule(5.seconds)(p.complete(Result.success("hello")).map(require(_))) - _ <- task.cancel - cancelled <- untilTrue(task.cancelled) - done1 <- p.done - _ <- Async.sleep(5.millis) - done2 <- p.done - yield assert(cancelled && !done1 && !done2) - } - - "scheduleAtFixedRate" in run { - for - ref <- AtomicInt.init(0) - task <- Timer.scheduleAtFixedRate( - 1.milli, - 1.milli - )(ref.incrementAndGet.unit) - _ <- Async.sleep(5.millis) - n <- ref.get - cancelled <- task.cancel - yield assert(n > 0 && cancelled) - } - - "scheduleWithFixedDelay" in runJVM { - for - ref <- AtomicInt.init(0) - task <- Timer.scheduleWithFixedDelay( - 1.milli, - 1.milli - )(ref.incrementAndGet.unit) - _ <- Async.sleep(5.millis) - n <- ref.get - cancelled <- task.cancel - yield assert(n > 0 && cancelled) - } - - "scheduleWithFixedDelay 2" in runJVM { - for - ref <- AtomicInt.init(0) - task <- Timer.scheduleWithFixedDelay( - 1.milli - )(ref.incrementAndGet.unit) - _ <- Async.sleep(5.millis) - n <- ref.get - cancelled <- task.cancel - yield assert(n > 0 && cancelled) - } + "repeatWithDelay" - { + "executes function with delay" in run { + for + channel <- Channel.init[Instant](10) + task <- Timer.repeatWithDelay(1.millis)(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield + val avgDelay = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) + assert(avgDelay >= 1.millis && avgDelay < 5.millis) + } - "unsafe" - { - import AllowUnsafe.embrace.danger + "respects interrupt" in run { + for + channel <- Channel.init[Instant](10) + task <- Timer.repeatWithDelay(1.millis)(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + _ <- Async.sleep(2.millis) + result <- channel.poll + yield assert(result.isEmpty) + } - "should schedule task correctly" in { - val testUnsafe = new TestUnsafeTimer() - val task = testUnsafe.schedule(1.second)(()) - assert(testUnsafe.scheduledTasks.nonEmpty) - assert(task.isInstanceOf[TimerTask.Unsafe]) + "with time control" in run { + Clock.withTimeControl { control => + for + running <- AtomicBoolean.init(false) + queue <- Queue.Unbounded.init[Instant]() + task <- Timer.repeatWithDelay(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) + _ <- untilTrue(control.advance(1.milli).andThen(running.get)) + _ <- queue.drain + _ <- control.advance(1.milli).repeat(10) + _ <- task.interrupt + instants <- queue.drain + yield + intervals(instants).foreach(v => assert(v == 1.millis)) + succeed + } } - "should schedule at fixed rate correctly" in { - val testUnsafe = new TestUnsafeTimer() - val task = testUnsafe.scheduleAtFixedRate(1.second, 2.seconds)(()) - assert(testUnsafe.fixedRateTasks.nonEmpty) - assert(task.isInstanceOf[TimerTask.Unsafe]) + "works with Schedule parameter" in run { + for + channel <- Channel.init[Instant](10) + task <- Timer.repeatWithDelay(Schedule.fixed(1.millis))(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield + val avgDelay = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) + assert(avgDelay >= 1.millis && avgDelay < 5.millis) } - "should schedule with fixed delay correctly" in { - val testUnsafe = new TestUnsafeTimer() - val task = testUnsafe.scheduleWithFixedDelay(1.second, 2.seconds)(()) - assert(testUnsafe.fixedDelayTasks.nonEmpty) - assert(task.isInstanceOf[TimerTask.Unsafe]) + "works with Schedule and state" in run { + val counter = new JAtomicInteger(0) + for + channel <- Channel.init[Int](10) + task <- Timer.repeatWithDelay(Schedule.fixed(1.millis), 0) { state => + channel.put(state).as(state + 1) + } + numbers <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield assert(numbers.toSeq == (0 until 10)) + end for } - "should convert to safe Timer" in { - val testUnsafe = new TestUnsafeTimer() - val safeTimer = testUnsafe.safe - assert(safeTimer.isInstanceOf[Timer]) + "completes when schedule completes" in run { + for + channel <- Channel.init[Int](10) + task <- Timer.repeatWithDelay(Schedule.fixed(1.millis).maxDuration(10.millis), 0)(st => channel.put(st).andThen(st + 1)) + lastState <- task.get + numbers <- channel.drain + yield assert(lastState == 10 && numbers.toSeq == (0 until 10)) } } - class TestUnsafeTimer extends Timer.Unsafe: - var scheduledTasks = List.empty[(Duration, () => Unit)] - var fixedRateTasks = List.empty[(Duration, Duration, () => Unit)] - var fixedDelayTasks = List.empty[(Duration, Duration, () => Unit)] - - def schedule(delay: Duration)(f: => Unit)(using AllowUnsafe): TimerTask.Unsafe = - scheduledTasks = (delay, () => f) :: scheduledTasks - TimerTask.Unsafe.noop - - def scheduleAtFixedRate(initialDelay: Duration, period: Duration)(f: => Unit)(using AllowUnsafe): TimerTask.Unsafe = - fixedRateTasks = (initialDelay, period, () => f) :: fixedRateTasks - TimerTask.Unsafe.noop - - def scheduleWithFixedDelay(initialDelay: Duration, period: Duration)(f: => Unit)(using AllowUnsafe): TimerTask.Unsafe = - fixedDelayTasks = (initialDelay, period, () => f) :: fixedDelayTasks - TimerTask.Unsafe.noop - end TestUnsafeTimer - end TimerTest diff --git a/kyo-data/shared/src/main/scala/kyo/Duration.scala b/kyo-data/shared/src/main/scala/kyo/Duration.scala index c16a33ba0..aa1b7d062 100644 --- a/kyo-data/shared/src/main/scala/kyo/Duration.scala +++ b/kyo-data/shared/src/main/scala/kyo/Duration.scala @@ -3,6 +3,7 @@ package kyo import java.time.Duration as JavaDuration import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit.* +import java.util.concurrent.TimeUnit import kyo.Duration.Units import kyo.Duration.Units.* import scala.concurrent.duration.Duration as ScalaDuration @@ -58,7 +59,7 @@ object Duration: * @return * A Duration instance */ - inline def fromNanos(value: Long): Duration = + def fromNanos(value: Long): Duration = if value <= 0 then Duration.Zero else value /** Creates a Duration from a value and unit. @@ -118,44 +119,52 @@ object Duration: extension (self: Duration) - private inline def toLong: Long = self + private def toLong: Long = self - inline infix def >=(that: Duration): Boolean = self.toLong >= that.toLong - inline infix def <=(that: Duration): Boolean = self.toLong <= that.toLong - inline infix def >(that: Duration): Boolean = self.toLong > that.toLong - inline infix def <(that: Duration): Boolean = self.toLong < that.toLong - inline infix def ==(that: Duration): Boolean = self.toLong == that.toLong - inline infix def !=(that: Duration): Boolean = self.toLong != that.toLong + infix def >=(that: Duration): Boolean = self.toLong >= that.toLong + infix def <=(that: Duration): Boolean = self.toLong <= that.toLong + infix def >(that: Duration): Boolean = self.toLong > that.toLong + infix def <(that: Duration): Boolean = self.toLong < that.toLong + infix def ==(that: Duration): Boolean = self.toLong == that.toLong + infix def !=(that: Duration): Boolean = self.toLong != that.toLong - inline infix def +(that: Duration): Duration = + infix def +(that: Duration): Duration = val sum: Long = self.toLong + that.toLong if sum >= 0 then sum else Duration.Infinity - inline infix def -(that: Duration): Duration = + infix def -(that: Duration): Duration = val diff: Long = self.toLong - that.toLong if diff > 0 then diff else Duration.Zero - inline infix def *(factor: Double): Duration = + infix def *(factor: Double): Duration = if factor <= 0 || self.toLong <= 0L then Duration.Zero else if factor <= Long.MaxValue / self.toLong.toDouble then Math.round(self.toLong.toDouble * factor) else Duration.Infinity - inline def max(that: Duration): Duration = Math.max(self.toLong, that.toLong) - inline def min(that: Duration): Duration = Math.min(self.toLong, that.toLong) + def max(that: Duration): Duration = Math.max(self.toLong, that.toLong) + def min(that: Duration): Duration = Math.min(self.toLong, that.toLong) - inline def to(unit: Units): Long = + def to(unit: Units): Long = Math.max(Math.round(self.toLong / unit.factor), Duration.Zero) - inline def toNanos: Long = self.toLong - inline def toMicros: Long = self.to(Micros) - inline def toMillis: Long = self.to(Millis) - inline def toSeconds: Long = self.to(Seconds) - inline def toMinutes: Long = self.to(Minutes) - inline def toHours: Long = self.to(Hours) - inline def toDays: Long = self.to(Days) - inline def toWeeks: Long = self.to(Weeks) - inline def toMonths: Long = self.to(Months) - inline def toYears: Long = self.to(Years) + def to(timeUnit: TimeUnit): Long = + to(timeUnit.toChronoUnit()) + + def to(chronoUnit: ChronoUnit): Long = + Units.values.find(_.chronoUnit.equals(chronoUnit)) match + case None => throw new UnsupportedOperationException("Chrono unit not suppported: " + chronoUnit) + case Some(unit) => to(unit) + + def toNanos: Long = self.toLong + def toMicros: Long = self.to(Micros) + def toMillis: Long = self.to(Millis) + def toSeconds: Long = self.to(Seconds) + def toMinutes: Long = self.to(Minutes) + def toHours: Long = self.to(Hours) + def toDays: Long = self.to(Days) + def toWeeks: Long = self.to(Weeks) + def toMonths: Long = self.to(Months) + def toYears: Long = self.to(Years) /** Converts the Duration to a Scala Duration. * @@ -202,41 +211,41 @@ object Duration: * true if the Duration is finite, false otherwise */ // TODO Is this Robust enough? - private[kyo] inline def isFinite: Boolean = self < Duration.Infinity + private[kyo] def isFinite: Boolean = self < Duration.Infinity end extension end Duration extension (value: Long) /** Creates a Duration of nanoseconds. */ - inline def nanos: Duration = Duration.fromNanos(value) + def nanos: Duration = Duration.fromNanos(value) /** Creates a Duration of microseconds. */ - inline def micros: Duration = value.asUnit(Micros) + def micros: Duration = value.asUnit(Micros) /** Creates a Duration of milliseconds. */ - inline def millis: Duration = value.asUnit(Millis) + def millis: Duration = value.asUnit(Millis) /** Creates a Duration of seconds. */ - inline def seconds: Duration = value.asUnit(Seconds) + def seconds: Duration = value.asUnit(Seconds) /** Creates a Duration of minutes. */ - inline def minutes: Duration = value.asUnit(Minutes) + def minutes: Duration = value.asUnit(Minutes) /** Creates a Duration of hours. */ - inline def hours: Duration = value.asUnit(Hours) + def hours: Duration = value.asUnit(Hours) /** Creates a Duration of days. */ - inline def days: Duration = value.asUnit(Days) + def days: Duration = value.asUnit(Days) /** Creates a Duration of weeks. */ - inline def weeks: Duration = value.asUnit(Weeks) + def weeks: Duration = value.asUnit(Weeks) /** Creates a Duration of months. */ - inline def months: Duration = value.asUnit(Months) + def months: Duration = value.asUnit(Months) /** Creates a Duration of years. */ - inline def years: Duration = value.asUnit(Years) + def years: Duration = value.asUnit(Years) inline def nano: Duration = compiletime.error("please use `.nanos`") inline def micro: Duration = compiletime.error("please use `.micros`") @@ -256,20 +265,20 @@ extension (value: Long) * @return * A Duration instance */ - inline def asUnit(unit: Units): Duration = + def asUnit(unit: Units): Duration = Duration.fromUnits(value, unit) end extension /** Extension methods for the value 1 to create singular Durations. */ extension (value: 1) - inline def nano: Duration = Duration.fromNanos(value) - inline def micro: Duration = value.asUnit(Micros) - inline def milli: Duration = value.asUnit(Millis) - inline def second: Duration = value.asUnit(Seconds) - inline def minute: Duration = value.asUnit(Minutes) - inline def hour: Duration = value.asUnit(Hours) - inline def day: Duration = value.asUnit(Days) - inline def week: Duration = value.asUnit(Weeks) - inline def month: Duration = value.asUnit(Months) - inline def year: Duration = value.asUnit(Years) + def nano: Duration = Duration.fromNanos(value) + def micro: Duration = value.asUnit(Micros) + def milli: Duration = value.asUnit(Millis) + def second: Duration = value.asUnit(Seconds) + def minute: Duration = value.asUnit(Minutes) + def hour: Duration = value.asUnit(Hours) + def day: Duration = value.asUnit(Days) + def week: Duration = value.asUnit(Weeks) + def month: Duration = value.asUnit(Months) + def year: Duration = value.asUnit(Years) end extension diff --git a/kyo-data/shared/src/test/scala/kyo/DurationSpec.scala b/kyo-data/shared/src/test/scala/kyo/DurationSpec.scala index d5fc8445b..9a7f63f50 100644 --- a/kyo-data/shared/src/test/scala/kyo/DurationSpec.scala +++ b/kyo-data/shared/src/test/scala/kyo/DurationSpec.scala @@ -1,5 +1,7 @@ package kyo +import java.time.temporal.ChronoUnit +import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration as ScalaDuration import zio.Duration as ZDuration import zio.test.{Result as _, *} @@ -256,6 +258,36 @@ object DurationSpec extends ZIOSpecDefault: test("subtracting zero") { assertTrue(10.hours - Duration.Zero == 10.hours) } + ), + suite("Duration TimeUnit/ChronoUnit conversions")( + test("TimeUnit conversions") { + val duration = 1000.millis + TestResult.allSuccesses( + assertTrue(duration.to(TimeUnit.NANOSECONDS) == 1_000_000_000L), + assertTrue(duration.to(TimeUnit.MICROSECONDS) == 1_000_000L), + assertTrue(duration.to(TimeUnit.MILLISECONDS) == 1000L), + assertTrue(duration.to(TimeUnit.SECONDS) == 1L), + assertTrue(duration.to(TimeUnit.MINUTES) == 0L), + assertTrue(duration.to(TimeUnit.HOURS) == 0L), + assertTrue(duration.to(TimeUnit.DAYS) == 0L) + ) + }, + test("ChronoUnit conversions") { + val duration = 24.hours + TestResult.allSuccesses( + assertTrue(duration.to(ChronoUnit.NANOS) == 24L * 60 * 60 * 1_000_000_000L), + assertTrue(duration.to(ChronoUnit.MICROS) == 24L * 60 * 60 * 1_000_000L), + assertTrue(duration.to(ChronoUnit.MILLIS) == 24L * 60 * 60 * 1000L), + assertTrue(duration.to(ChronoUnit.SECONDS) == 24L * 60 * 60), + assertTrue(duration.to(ChronoUnit.MINUTES) == 24L * 60), + assertTrue(duration.to(ChronoUnit.HOURS) == 24L), + assertTrue(duration.to(ChronoUnit.DAYS) == 1L) + ) + }, + test("unsupported ChronoUnit throws exception") { + val result = Result.catching[UnsupportedOperationException](1.second.to(ChronoUnit.FOREVER)) + assertTrue(result.isFail) + } ) ) @@ TestAspect.exceptNative end DurationSpec diff --git a/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala b/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala index 0db5b9209..4eab2198d 100644 --- a/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala +++ b/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala @@ -7,10 +7,6 @@ import sttp.tapir.server.netty.* object Server extends KyoApp: - val timer = - import AllowUnsafe.embrace.danger - Timer(Timer.Unsafe(Executors.newSingleThreadScheduledExecutor())) - run { defer { @@ -41,7 +37,7 @@ object Server extends KyoApp: val handler = await(Env.run(db)(Handler.init)) await(Console.println(s"Server starting on port $port...")) - val binding = await(Routes.run(server)(Timer.let(timer)(Env.run(handler)(Endpoints.init)))) + val binding = await(Routes.run(server)(Env.run(handler)(Endpoints.init))) await(Console.println(s"Server started: ${binding.localSocket}")) } } From afaf42b371d68ad34a9095c952baae109572946b Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Mon, 28 Oct 2024 20:15:02 -0700 Subject: [PATCH 02/15] timer scaladocs + cleanup --- .../shared/src/main/scala/kyo/Clock.scala | 4 - .../shared/src/main/scala/kyo/Timer.scala | 126 ++++++++++++++++++ 2 files changed, 126 insertions(+), 4 deletions(-) diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index ec205404c..347a6d362 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -294,7 +294,6 @@ object Clock: /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ abstract class Unsafe: import AllowUnsafe.embrace.danger - import Unsafe.SleepTask def now()(using AllowUnsafe): Instant @@ -309,7 +308,4 @@ object Clock: final def safe: Clock = Clock(this) end Unsafe - object Unsafe: - final private class SleepTask(val deadline: Instant) extends IOPromise[Nothing, Unit] - end Clock diff --git a/kyo-core/shared/src/main/scala/kyo/Timer.scala b/kyo-core/shared/src/main/scala/kyo/Timer.scala index 5fb3479a1..bea0f0dc0 100644 --- a/kyo-core/shared/src/main/scala/kyo/Timer.scala +++ b/kyo-core/shared/src/main/scala/kyo/Timer.scala @@ -1,10 +1,39 @@ package kyo +/** A utility for scheduling recurring tasks with different timing strategies. + * + * Timer provides two main scheduling approaches: + * - `repeatWithDelay`: Executes tasks with a fixed delay between task completions + * - `repeatAtInterval`: Executes tasks at fixed time intervals, regardless of task duration + */ object Timer: + /** Repeatedly executes a task with a fixed delay between completions. + * + * The delay timer starts after each task completion, making this suitable for tasks that should maintain a minimum gap between + * executions. + * + * @param delay + * The duration to wait after each task completion before starting the next execution + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ def repeatWithDelay[E, S](delay: Duration)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = repeatWithDelay(Duration.Zero, delay)(f) + /** Repeatedly executes a task with a fixed delay between completions, starting after an initial delay. + * + * @param startAfter + * The duration to wait before the first execution + * @param delay + * The duration to wait after each task completion before starting the next execution + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ def repeatWithDelay[E, S]( startAfter: Duration, delay: Duration @@ -13,6 +42,21 @@ object Timer: )(using Frame): Fiber[E, Unit] < IO = repeatWithDelay(startAfter, delay, ())(_ => f) + /** Repeatedly executes a task with a fixed delay between completions, maintaining state between executions. + * + * @param startAfter + * The duration to wait before the first execution + * @param delay + * The duration to wait after each task completion before starting the next execution + * @param state + * The initial state value + * @param f + * A function that takes the current state and returns the next state + * @tparam A + * The type of the state value + * @return + * A Fiber that can be used to control or interrupt the recurring task and access the final state + */ def repeatWithDelay[E, A: Flat, S]( startAfter: Duration, delay: Duration, @@ -22,9 +66,31 @@ object Timer: )(using Frame): Fiber[E, A] < IO = repeatWithDelay(Schedule.delay(startAfter).andThen(Schedule.fixed(delay)), state)(f) + /** Repeatedly executes a task with delays determined by a custom schedule. + * + * @param delaySchedule + * A schedule that determines the timing between executions + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ def repeatWithDelay[E, S](delaySchedule: Schedule)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = repeatWithDelay(delaySchedule, ())(_ => f) + /** Repeatedly executes a task with delays determined by a custom schedule, maintaining state between executions. + * + * @param delaySchedule + * A schedule that determines the timing between executions + * @param state + * The initial state value + * @param f + * A function that takes the current state and returns the next state + * @tparam A + * The type of the state value + * @return + * A Fiber that can be used to control or interrupt the recurring task and access the final state + */ def repeatWithDelay[E, A: Flat, S]( delaySchedule: Schedule, state: A @@ -42,9 +108,32 @@ object Timer: } } + /** Repeatedly executes a task at fixed time intervals. + * + * Unlike repeatWithDelay, this ensures consistent execution intervals regardless of task duration. If a task takes longer than the + * interval, the next execution will start immediately after completion. + * + * @param interval + * The fixed time interval between task starts + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ def repeatAtInterval[E, S](interval: Duration)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = repeatAtInterval(Duration.Zero, interval)(f) + /** Repeatedly executes a task at fixed time intervals, starting after an initial delay. + * + * @param startAfter + * The duration to wait before the first execution + * @param interval + * The fixed time interval between task starts + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ def repeatAtInterval[E, S]( startAfter: Duration, interval: Duration @@ -53,6 +142,21 @@ object Timer: )(using Frame): Fiber[E, Unit] < IO = repeatAtInterval(startAfter, interval, ())(_ => f) + /** Repeatedly executes a task at fixed time intervals, maintaining state between executions. + * + * @param startAfter + * The duration to wait before the first execution + * @param interval + * The fixed time interval between task starts + * @param state + * The initial state value + * @param f + * A function that takes the current state and returns the next state + * @tparam A + * The type of the state value + * @return + * A Fiber that can be used to control or interrupt the recurring task and access the final state + */ def repeatAtInterval[E, A: Flat, S]( startAfter: Duration, interval: Duration, @@ -62,9 +166,31 @@ object Timer: )(using Frame): Fiber[E, A] < IO = repeatAtInterval(Schedule.delay(startAfter).andThen(Schedule.fixed(interval)), state)(f) + /** Repeatedly executes a task with intervals determined by a custom schedule. + * + * @param intervalSchedule + * A schedule that determines the timing between executions + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ def repeatAtInterval[E, S](intervalSchedule: Schedule)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = repeatAtInterval(intervalSchedule, ())(_ => f) + /** Repeatedly executes a task with intervals determined by a custom schedule, maintaining state between executions. + * + * @param intervalSchedule + * A schedule that determines the timing between executions + * @param state + * The initial state value + * @param f + * A function that takes the current state and returns the next state + * @tparam A + * The type of the state value + * @return + * A Fiber that can be used to control or interrupt the recurring task and access the final state + */ def repeatAtInterval[E, A: Flat, S]( intervalSchedule: Schedule, state: A From dbf3e747baac1e99c80fcb45bd467c984f7f7adc Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Mon, 28 Oct 2024 20:32:57 -0700 Subject: [PATCH 03/15] fix scalajs build for kyo-data --- .../shared/src/main/scala/kyo/Clock.scala | 2 +- .../shared/src/main/scala/kyo/Duration.scala | 26 ++++++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index 347a6d362..7af292cc2 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -107,7 +107,7 @@ object Clock: new IOPromise[Nothing, Unit] with Callable[Unit]: val task = executor.schedule(this, duration.toNanos, TimeUnit.NANOSECONDS) override def interrupt(error: Panic): Boolean = - task.cancel(true) + discard(task.cancel(true)) super.interrupt(error) def call(): Unit = completeDiscard(Result.unit) } diff --git a/kyo-data/shared/src/main/scala/kyo/Duration.scala b/kyo-data/shared/src/main/scala/kyo/Duration.scala index aa1b7d062..f81695fc0 100644 --- a/kyo-data/shared/src/main/scala/kyo/Duration.scala +++ b/kyo-data/shared/src/main/scala/kyo/Duration.scala @@ -117,6 +117,26 @@ object Duration: val factor: Double = chronoUnit.getDuration.toNanos.toDouble end Units + object Units: + def fromJava(chronoUnit: ChronoUnit): Units = + Units.values.find(_.chronoUnit.equals(chronoUnit)) match + case None => throw new UnsupportedOperationException("Chrono unit not suppported: " + chronoUnit) + case Some(unit) => unit + + def fromJava(timeUnit: TimeUnit): Units = + given CanEqual[TimeUnit, TimeUnit] = CanEqual.derived + timeUnit match + case TimeUnit.NANOSECONDS => Units.Nanos + case TimeUnit.MICROSECONDS => Units.Micros + case TimeUnit.MILLISECONDS => Units.Millis + case TimeUnit.SECONDS => Units.Seconds + case TimeUnit.MINUTES => Units.Minutes + case TimeUnit.HOURS => Units.Hours + case TimeUnit.DAYS => Units.Days + end match + end fromJava + end Units + extension (self: Duration) private def toLong: Long = self @@ -148,12 +168,10 @@ object Duration: Math.max(Math.round(self.toLong / unit.factor), Duration.Zero) def to(timeUnit: TimeUnit): Long = - to(timeUnit.toChronoUnit()) + to(Units.fromJava(timeUnit)) def to(chronoUnit: ChronoUnit): Long = - Units.values.find(_.chronoUnit.equals(chronoUnit)) match - case None => throw new UnsupportedOperationException("Chrono unit not suppported: " + chronoUnit) - case Some(unit) => to(unit) + to(Units.fromJava(chronoUnit)) def toNanos: Long = self.toLong def toMicros: Long = self.to(Micros) From 23d3891216776ff89843a2e4b39bda61cc0d66db Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Mon, 28 Oct 2024 20:59:46 -0700 Subject: [PATCH 04/15] fixing js build for kyo-core --- .../shared/src/main/scala/kyo/Clock.scala | 39 ++++++----- .../shared/src/test/scala/kyo/TimerTest.scala | 64 +++++++++---------- 2 files changed, 55 insertions(+), 48 deletions(-) diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index 7af292cc2..0dc16d8bc 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -42,7 +42,9 @@ final case class Clock(unsafe: Clock.Unsafe): def deadline(duration: Duration)(using Frame): Clock.Deadline < IO = IO.Unsafe(unsafe.deadline(duration).safe) private[kyo] def sleep(duration: Duration)(using Frame): Fiber[Nothing, Unit] < IO = - IO.Unsafe(unsafe.sleep(duration).safe) + if duration == Duration.Zero then Fiber.unit + else if !duration.isFinite then Fiber.never + else IO.Unsafe(unsafe.sleep(duration).safe) end Clock /** Companion object for creating and managing Clock instances. */ @@ -222,20 +224,16 @@ object Clock: new Unsafe with TimeControl: @volatile var current = Instant.Epoch - case class Task(deadline: Instant) extends IOPromise[Nothing, Unit] with Delayed: - def getDelay(unit: TimeUnit): Long = - (deadline - current).to(unit) - def compareTo(other: Delayed): Int = - deadline.toJava.compareTo(other.asInstanceOf[Task].deadline.toJava) - end Task - - val queue = new DelayQueue[Task] + case class Task(deadline: Instant) extends IOPromise[Nothing, Unit] + val queue = new PriorityQueue[Task](using Ordering.fromLessThan((a, b) => a.deadline < b.deadline)) def now()(using AllowUnsafe) = current def sleep(duration: Duration): Fiber.Unsafe[Nothing, Unit] = val task = new Task(current + duration) - queue.add(task) + queue.synchronized { + queue.enqueue(task) + } Promise.Unsafe.fromIOPromise(task) end sleep @@ -252,12 +250,21 @@ object Clock: } def tick(): Unit = - val task = queue.poll() - if task != null then - task.completeDiscard(Result.unit) - tick() - end if - end tick + queue.synchronized { + queue.headOption match + case Some(task) if task.deadline <= current => + Maybe(queue.dequeue()) + case Some(task) if task.done() => + discard(queue.dequeue()) + Maybe.empty + case _ => + Maybe.empty + } match + case Present(task) => + task.completeDiscard(Result.unit) + tick() + case Absent => + () let(Clock(controlled))(f(controlled)) } end withTimeControl diff --git a/kyo-core/shared/src/test/scala/kyo/TimerTest.scala b/kyo-core/shared/src/test/scala/kyo/TimerTest.scala index ce32e84f5..5043ae4ec 100644 --- a/kyo-core/shared/src/test/scala/kyo/TimerTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/TimerTest.scala @@ -29,22 +29,22 @@ class TimerTest extends Test: result <- channel.poll yield assert(result.isEmpty) } - "with time control" in run { - Clock.withTimeControl { control => - for - running <- AtomicBoolean.init(false) - queue <- Queue.Unbounded.init[Instant]() - task <- Timer.repeatAtInterval(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) - _ <- untilTrue(control.advance(1.milli).andThen(running.get)) - _ <- queue.drain - _ <- control.advance(1.milli).repeat(10) - _ <- task.interrupt - instants <- queue.drain - yield - intervals(instants).foreach(v => assert(v == 1.millis)) - succeed - } - } + // "with time control" in run { + // Clock.withTimeControl { control => + // for + // running <- AtomicBoolean.init(false) + // queue <- Queue.Unbounded.init[Instant]() + // task <- Timer.repeatAtInterval(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) + // _ <- untilTrue(control.advance(1.milli).andThen(running.get)) + // _ <- queue.drain + // _ <- control.advance(1.milli).repeat(10) + // _ <- task.interrupt + // instants <- queue.drain + // yield + // intervals(instants).foreach(v => assert(v == 1.millis)) + // succeed + // } + // } "with Schedule parameter" in run { for channel <- Channel.init[Instant](10) @@ -96,22 +96,22 @@ class TimerTest extends Test: yield assert(result.isEmpty) } - "with time control" in run { - Clock.withTimeControl { control => - for - running <- AtomicBoolean.init(false) - queue <- Queue.Unbounded.init[Instant]() - task <- Timer.repeatWithDelay(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) - _ <- untilTrue(control.advance(1.milli).andThen(running.get)) - _ <- queue.drain - _ <- control.advance(1.milli).repeat(10) - _ <- task.interrupt - instants <- queue.drain - yield - intervals(instants).foreach(v => assert(v == 1.millis)) - succeed - } - } + // "with time control" in run { + // Clock.withTimeControl { control => + // for + // running <- AtomicBoolean.init(false) + // queue <- Queue.Unbounded.init[Instant]() + // task <- Timer.repeatWithDelay(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) + // _ <- untilTrue(control.advance(1.milli).andThen(running.get)) + // _ <- queue.drain + // _ <- control.advance(1.milli).repeat(10) + // _ <- task.interrupt + // instants <- queue.drain + // yield + // intervals(instants).foreach(v => assert(v == 1.millis)) + // succeed + // } + // } "works with Schedule parameter" in run { for From 5f84998921029fd16fde11e09ad107b0a908e8e3 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Mon, 28 Oct 2024 21:04:59 -0700 Subject: [PATCH 05/15] fixing js build for kyo-core --- .../shared/src/main/scala/kyo/Clock.scala | 2 +- .../shared/src/test/scala/kyo/TimerTest.scala | 64 +++++++++---------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index 0dc16d8bc..821f562d6 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -225,7 +225,7 @@ object Clock: @volatile var current = Instant.Epoch case class Task(deadline: Instant) extends IOPromise[Nothing, Unit] - val queue = new PriorityQueue[Task](using Ordering.fromLessThan((a, b) => a.deadline < b.deadline)) + val queue = new PriorityQueue[Task](using Ordering.fromLessThan((a, b) => b.deadline < a.deadline)) def now()(using AllowUnsafe) = current diff --git a/kyo-core/shared/src/test/scala/kyo/TimerTest.scala b/kyo-core/shared/src/test/scala/kyo/TimerTest.scala index 5043ae4ec..a4bbec9ee 100644 --- a/kyo-core/shared/src/test/scala/kyo/TimerTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/TimerTest.scala @@ -29,22 +29,22 @@ class TimerTest extends Test: result <- channel.poll yield assert(result.isEmpty) } - // "with time control" in run { - // Clock.withTimeControl { control => - // for - // running <- AtomicBoolean.init(false) - // queue <- Queue.Unbounded.init[Instant]() - // task <- Timer.repeatAtInterval(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) - // _ <- untilTrue(control.advance(1.milli).andThen(running.get)) - // _ <- queue.drain - // _ <- control.advance(1.milli).repeat(10) - // _ <- task.interrupt - // instants <- queue.drain - // yield - // intervals(instants).foreach(v => assert(v == 1.millis)) - // succeed - // } - // } + "with time control" in runJVM { + Clock.withTimeControl { control => + for + running <- AtomicBoolean.init(false) + queue <- Queue.Unbounded.init[Instant]() + task <- Timer.repeatAtInterval(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) + _ <- untilTrue(control.advance(1.milli).andThen(running.get)) + _ <- queue.drain + _ <- control.advance(1.milli).repeat(10) + _ <- task.interrupt + instants <- queue.drain + yield + intervals(instants).foreach(v => assert(v == 1.millis)) + succeed + } + } "with Schedule parameter" in run { for channel <- Channel.init[Instant](10) @@ -96,22 +96,22 @@ class TimerTest extends Test: yield assert(result.isEmpty) } - // "with time control" in run { - // Clock.withTimeControl { control => - // for - // running <- AtomicBoolean.init(false) - // queue <- Queue.Unbounded.init[Instant]() - // task <- Timer.repeatWithDelay(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) - // _ <- untilTrue(control.advance(1.milli).andThen(running.get)) - // _ <- queue.drain - // _ <- control.advance(1.milli).repeat(10) - // _ <- task.interrupt - // instants <- queue.drain - // yield - // intervals(instants).foreach(v => assert(v == 1.millis)) - // succeed - // } - // } + "with time control" in runJVM { + Clock.withTimeControl { control => + for + running <- AtomicBoolean.init(false) + queue <- Queue.Unbounded.init[Instant]() + task <- Timer.repeatWithDelay(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) + _ <- untilTrue(control.advance(1.milli).andThen(running.get)) + _ <- queue.drain + _ <- control.advance(1.milli).repeat(10) + _ <- task.interrupt + instants <- queue.drain + yield + intervals(instants).foreach(v => assert(v == 1.millis)) + succeed + } + } "works with Schedule parameter" in run { for From 412588061be129f6c211dd163ff929b64437e3f8 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Mon, 28 Oct 2024 21:09:56 -0700 Subject: [PATCH 06/15] fix flaky tests --- kyo-core/shared/src/test/scala/kyo/ClockTest.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kyo-core/shared/src/test/scala/kyo/ClockTest.scala b/kyo-core/shared/src/test/scala/kyo/ClockTest.scala index f03f5a3b9..6e5166080 100644 --- a/kyo-core/shared/src/test/scala/kyo/ClockTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/ClockTest.scala @@ -253,8 +253,8 @@ class ClockTest extends Test: _ <- fiber2.get end <- Clock.now yield - assert(mid - start >= 2.millis && mid - start < 10.millis) - assert(end - start >= 4.millis && end - start < 15.millis) + assert(mid - start >= 2.millis && mid - start < 20.millis) + assert(end - start >= 4.millis && end - start < 30.millis) } "sleep with zero duration" in run { @@ -264,7 +264,7 @@ class ClockTest extends Test: fiber <- clock.sleep(Duration.Zero) _ <- fiber.get end <- Clock.now - yield assert(end - start < 2.millis) + yield assert(end - start < 10.millis) } "concurrency" in run { @@ -276,7 +276,7 @@ class ClockTest extends Test: end <- Clock.now yield val elapsed = end - start - assert(elapsed >= 1.millis && elapsed < 15.millis) + assert(elapsed >= 1.millis && elapsed < 100.millis) } } @@ -289,7 +289,7 @@ class ClockTest extends Test: yield val elapsedWall = wallEnd - wallStart val elapsedShifted = shiftedEnd - wallStart - assert(elapsedWall >= 5.millis && elapsedWall < 10.millis) + assert(elapsedWall >= 5.millis && elapsedWall < 20.millis) assert(elapsedShifted > elapsedWall) } From a237ad00dc33aebcb3517efc4266a3c6f40027f1 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Mon, 28 Oct 2024 21:54:02 -0700 Subject: [PATCH 07/15] fix flaky tests --- kyo-core/shared/src/test/scala/kyo/ClockTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kyo-core/shared/src/test/scala/kyo/ClockTest.scala b/kyo-core/shared/src/test/scala/kyo/ClockTest.scala index 6e5166080..fc016b947 100644 --- a/kyo-core/shared/src/test/scala/kyo/ClockTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/ClockTest.scala @@ -253,8 +253,8 @@ class ClockTest extends Test: _ <- fiber2.get end <- Clock.now yield - assert(mid - start >= 2.millis && mid - start < 20.millis) - assert(end - start >= 4.millis && end - start < 30.millis) + assert(mid - start >= 2.millis && mid - start < 30.millis) + assert(end - start >= 4.millis && end - start < 50.millis) } "sleep with zero duration" in run { From e5e19484d1508336b9d6d16d73ddd01c53b4b9b2 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Mon, 28 Oct 2024 22:03:47 -0700 Subject: [PATCH 08/15] simplify clock executor customization --- .../shared/src/main/scala/kyo/Clock.scala | 34 +++++++++++-------- .../scala/examples/ledger/api/Server.scala | 6 +++- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index 821f562d6..d9fe1f3c6 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -4,6 +4,7 @@ import java.util.concurrent.Callable import java.util.concurrent.Delayed import java.util.concurrent.DelayQueue import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.locks.LockSupport import kyo.Clock.Deadline @@ -100,21 +101,8 @@ object Clock: /** A live Clock instance using the system clock. */ val live: Clock = - Clock( - new Unsafe: - val executor = Executors.newScheduledThreadPool(2, Threads("kyo-core-clock-executor")) - def now()(using AllowUnsafe) = Instant.fromJava(java.time.Instant.now()) - def sleep(duration: Duration) = - Promise.Unsafe.fromIOPromise { - new IOPromise[Nothing, Unit] with Callable[Unit]: - val task = executor.schedule(this, duration.toNanos, TimeUnit.NANOSECONDS) - override def interrupt(error: Panic): Boolean = - discard(task.cancel(true)) - super.interrupt(error) - def call(): Unit = completeDiscard(Result.unit) - } - end sleep - ) + import AllowUnsafe.embrace.danger + Clock(Unsafe(Executors.newScheduledThreadPool(2, Threads("kyo-core-clock-executor")))) private val local = Local.init(live) @@ -315,4 +303,20 @@ object Clock: final def safe: Clock = Clock(this) end Unsafe + object Unsafe: + def apply(executor: ScheduledExecutorService)(using AllowUnsafe): Unsafe = + new Unsafe: + def now()(using AllowUnsafe) = Instant.fromJava(java.time.Instant.now()) + def sleep(duration: Duration) = + Promise.Unsafe.fromIOPromise { + new IOPromise[Nothing, Unit] with Callable[Unit]: + val task = executor.schedule(this, duration.toNanos, TimeUnit.NANOSECONDS) + override def interrupt(error: Panic): Boolean = + discard(task.cancel(true)) + super.interrupt(error) + def call(): Unit = completeDiscard(Result.unit) + } + end sleep + end Unsafe + end Clock diff --git a/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala b/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala index 4eab2198d..4efbfb26b 100644 --- a/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala +++ b/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala @@ -7,6 +7,10 @@ import sttp.tapir.server.netty.* object Server extends KyoApp: + val clock = + import AllowUnsafe.embrace.danger + Clock(Clock.Unsafe(Executors.newSingleThreadScheduledExecutor())) + run { defer { @@ -37,7 +41,7 @@ object Server extends KyoApp: val handler = await(Env.run(db)(Handler.init)) await(Console.println(s"Server starting on port $port...")) - val binding = await(Routes.run(server)(Env.run(handler)(Endpoints.init))) + val binding = await(Routes.run(server)(Clock.let(clock)(Env.run(handler)(Endpoints.init)))) await(Console.println(s"Server started: ${binding.localSocket}")) } } From 0d39616160ba4ba1f2851c9bf5c62d0c11b415c6 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Tue, 29 Oct 2024 08:42:48 -0700 Subject: [PATCH 09/15] Update kyo-core/shared/src/main/scala/kyo/Clock.scala Co-authored-by: Adam Hearn <22334119+hearnadam@users.noreply.github.com> --- kyo-core/shared/src/main/scala/kyo/Clock.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index d9fe1f3c6..4fda04742 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -126,7 +126,7 @@ object Clock: * The current Clock instance */ def get(using Frame): Clock < Any = - use(identity) + local.get /** Uses the current Clock instance from the local context to perform an operation. * From e4f1ac2487663ecaf750d6c1f62adb38a8f0a9bd Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Tue, 29 Oct 2024 09:13:47 -0700 Subject: [PATCH 10/15] fix build --- README.md | 72 ++++++++----------- .../scala/examples/ledger/api/Server.scala | 2 +- 2 files changed, 29 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index c84469ce1..385545a46 100644 --- a/README.md +++ b/README.md @@ -2352,63 +2352,47 @@ import kyo.* val a: Unit < IO = IO(()) -// Schedule a delayed task -val b: TimerTask < IO = - Timer.schedule(delay = 1.second)(a) - -// Recurring task with -// intial delay -val c: TimerTask < IO = - Timer.scheduleAtFixedRate( - initialDelay = 1.minutes, - period = 1.minutes +// Recurring task with a delay between +// executions +val b: Fiber[Nothing, Unit] < IO = + Timer.repeatWithDelay( + startAfter = 1.minute, + delay = 1.minute )(a) -// Recurring task without -// initial delay -val d: TimerTask < IO = - Timer.scheduleAtFixedRate( - period = 1.minutes - )(a) - -// Schedule with fixed delay between tasks -val e: TimerTask < IO = - Timer.scheduleWithFixedDelay( - initialDelay = 1.minutes, - period = 1.minutes - )(a) +// Without an initial delay +val c: Fiber[Nothing, Unit] < IO = + Timer.repeatWithDelay(1.minute)(a) -// without initial delay -val f: TimerTask < IO = - Timer.scheduleWithFixedDelay( - period = 1.minutes +// Schedule at a specific interval, regarless +// of the duration of each execution +val d: Fiber[Nothing, Unit] < IO = + Timer.repeatAtInterval( + startAfter = 1.minute, + interval = 1.minute )(a) -// Specify the 'Timer' explictly -val i: TimerTask < IO = - Timer.let(Timer.live)(f) +// Without an initial delay +val e: Fiber[Nothing, Unit] < IO = + Timer.repeatAtInterval(1.minute)(a) ``` -`TimerTask` offers methods for more granular control over the scheduled tasks. +Use the returned `Fiber` to control scheduled tasks. ```scala import kyo.* -// Example TimerTask -val a: TimerTask < IO = - Timer.schedule(1.second)(()) +// Example task +val a: Fiber[Nothing, Unit] < IO = + Timer.repeatAtInterval(1.second)(()) -// Try to cancel the task -val b: Boolean < IO = - a.map(_.cancel) +// Try to cancel a task +def b(task: Fiber[Nothing, Unit]): Boolean < IO = + task.interrupt -// Check if the task is cancelled -val c: Boolean < IO = - a.map(_.cancelled) - -// Check if the task is done -val d: Boolean < IO = - a.map(_.done) +// Check if a task is done +def c(task: Fiber[Nothing, Unit]): Boolean < IO = + task.done ``` ### Latch: Countdown Synchronization diff --git a/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala b/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala index 4efbfb26b..a88c0bb17 100644 --- a/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala +++ b/kyo-examples/jvm/src/main/scala/examples/ledger/api/Server.scala @@ -9,7 +9,7 @@ object Server extends KyoApp: val clock = import AllowUnsafe.embrace.danger - Clock(Clock.Unsafe(Executors.newSingleThreadScheduledExecutor())) + Clock(Clock.Unsafe(Executors.newSingleThreadScheduledExecutor())) run { From 52b45a20eadd5eb7332f0184a8403004ac9eb849 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Tue, 29 Oct 2024 14:41:32 -0700 Subject: [PATCH 11/15] fix flaky tests --- kyo-core/shared/src/test/scala/kyo/TimerTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kyo-core/shared/src/test/scala/kyo/TimerTest.scala b/kyo-core/shared/src/test/scala/kyo/TimerTest.scala index a4bbec9ee..1c074d856 100644 --- a/kyo-core/shared/src/test/scala/kyo/TimerTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/TimerTest.scala @@ -17,7 +17,7 @@ class TimerTest extends Test: _ <- task.interrupt yield val avgInterval = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) - assert(avgInterval >= 1.millis && avgInterval < 5.millis) + assert(avgInterval >= 1.millis && avgInterval < 10.millis) } "respects interrupt" in run { for @@ -53,7 +53,7 @@ class TimerTest extends Test: _ <- task.interrupt yield val avgInterval = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) - assert(avgInterval >= 1.millis && avgInterval < 5.millis) + assert(avgInterval >= 1.millis && avgInterval < 10.millis) } "with Schedule and state" in run { for @@ -82,7 +82,7 @@ class TimerTest extends Test: _ <- task.interrupt yield val avgDelay = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) - assert(avgDelay >= 1.millis && avgDelay < 5.millis) + assert(avgDelay >= 1.millis && avgDelay < 10.millis) } "respects interrupt" in run { @@ -121,7 +121,7 @@ class TimerTest extends Test: _ <- task.interrupt yield val avgDelay = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) - assert(avgDelay >= 1.millis && avgDelay < 5.millis) + assert(avgDelay >= 1.millis && avgDelay < 10.millis) } "works with Schedule and state" in run { From aef86c4fdad720993ff0fc87ba7d1260f4589852 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Tue, 29 Oct 2024 20:05:15 -0700 Subject: [PATCH 12/15] address review feedback --- kyo-core/shared/src/main/scala/kyo/Clock.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index 4fda04742..0a39197a7 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -309,12 +309,12 @@ object Clock: def now()(using AllowUnsafe) = Instant.fromJava(java.time.Instant.now()) def sleep(duration: Duration) = Promise.Unsafe.fromIOPromise { - new IOPromise[Nothing, Unit] with Callable[Unit]: + new IOPromise[Nothing, Unit] with Runnable: val task = executor.schedule(this, duration.toNanos, TimeUnit.NANOSECONDS) override def interrupt(error: Panic): Boolean = discard(task.cancel(true)) super.interrupt(error) - def call(): Unit = completeDiscard(Result.unit) + def run() = completeDiscard(Result.unit) } end sleep end Unsafe From bb585da42c5b106b2168559848a13d8d6e9a9a59 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Tue, 29 Oct 2024 20:13:11 -0700 Subject: [PATCH 13/15] remove Timer and move its methods to Clock --- README.md | 111 +++++---- .../shared/src/main/scala/kyo/Clock.scala | 203 +++++++++++++++++ .../shared/src/main/scala/kyo/Meter.scala | 2 +- .../shared/src/main/scala/kyo/Timer.scala | 213 ------------------ .../shared/src/test/scala/kyo/ClockTest.scala | 142 ++++++++++++ .../src/test/scala/kyo/KyoAppTest.scala | 2 +- .../shared/src/test/scala/kyo/TimerTest.scala | 150 ------------ 7 files changed, 402 insertions(+), 421 deletions(-) delete mode 100644 kyo-core/shared/src/main/scala/kyo/Timer.scala delete mode 100644 kyo-core/shared/src/test/scala/kyo/TimerTest.scala diff --git a/README.md b/README.md index 199db5b5c..13c29bb99 100644 --- a/README.md +++ b/README.md @@ -1498,7 +1498,7 @@ val f: Unit < (IO & Abort[IOException]) = Console.let(Console.live)(e) ``` -### Clock: Time Management +### Clock: Time Management and Scheduled Tasks The `Clock` effect provides utilities for time-related operations, including getting the current time, creating stopwatches, and managing deadlines. @@ -1545,6 +1545,60 @@ val g: Instant < IO = `Clock` both safe (effectful) and unsafe (non-effectful) versions of its operations. The safe versions are suspended in `IO` and should be used in most cases. The unsafe versions are available through the `unsafe` property and should be used with caution, typically only in performance-critical sections or when integrating with non-effectful code. +`Clock` also offers methods to schedule background tasks: + +```scala +import kyo.* + +// An example computation to +// be scheduled +val a: Unit < IO = + IO(()) + +// Recurring task with a delay between +// executions +val b: Fiber[Nothing, Unit] < IO = + Clock.repeatWithDelay( + startAfter = 1.minute, + delay = 1.minute + )(a) + +// Without an initial delay +val c: Fiber[Nothing, Unit] < IO = + Clock.repeatWithDelay(1.minute)(a) + +// Schedule at a specific interval, regarless +// of the duration of each execution +val d: Fiber[Nothing, Unit] < IO = + Clock.repeatAtInterval( + startAfter = 1.minute, + interval = 1.minute + )(a) + +// Without an initial delay +val e: Fiber[Nothing, Unit] < IO = + Clock.repeatAtInterval(1.minute)(a) +``` + +Use the returned `Fiber` to control scheduled tasks. + +```scala +import kyo.* + +// Example task +val a: Fiber[Nothing, Unit] < IO = + Clock.repeatAtInterval(1.second)(()) + +// Try to cancel a task +def b(task: Fiber[Nothing, Unit]): Boolean < IO = + task.interrupt + +// Check if a task is done +def c(task: Fiber[Nothing, Unit]): Boolean < IO = + task.done +``` + + ### System: Environment Variables and System Properties The `System` effect provides a safe and convenient way to access environment variables and system properties. It offers methods to retrieve values with proper type conversion and fallback options. @@ -2345,61 +2399,6 @@ val e: Maybe[Int] < (Async & Abort[Closed]) = a.map(_.tryRun(Math.cos(42).toInt)) ``` -### Timer: Scheduled Execution - -The `Timer` effect is designed for control over the timing of task execution. - -```scala -import kyo.* - -// An example computation to -// be scheduled -val a: Unit < IO = - IO(()) - -// Recurring task with a delay between -// executions -val b: Fiber[Nothing, Unit] < IO = - Timer.repeatWithDelay( - startAfter = 1.minute, - delay = 1.minute - )(a) - -// Without an initial delay -val c: Fiber[Nothing, Unit] < IO = - Timer.repeatWithDelay(1.minute)(a) - -// Schedule at a specific interval, regarless -// of the duration of each execution -val d: Fiber[Nothing, Unit] < IO = - Timer.repeatAtInterval( - startAfter = 1.minute, - interval = 1.minute - )(a) - -// Without an initial delay -val e: Fiber[Nothing, Unit] < IO = - Timer.repeatAtInterval(1.minute)(a) -``` - -Use the returned `Fiber` to control scheduled tasks. - -```scala -import kyo.* - -// Example task -val a: Fiber[Nothing, Unit] < IO = - Timer.repeatAtInterval(1.second)(()) - -// Try to cancel a task -def b(task: Fiber[Nothing, Unit]): Boolean < IO = - task.interrupt - -// Check if a task is done -def c(task: Fiber[Nothing, Unit]): Boolean < IO = - task.done -``` - ### Latch: Countdown Synchronization The `Latch` effect serves as a coordination mechanism for fibers in a concurrent environment, primarily used for task synchronization. It provides a low-level API for controlling the flow of execution and ensuring certain tasks are completed before others, all while maintaining thread safety. diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index 0a39197a7..1c1306e20 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -286,6 +286,209 @@ object Clock: def deadline(duration: Duration)(using Frame): Deadline < IO = use(_.deadline(duration)) + /** Repeatedly executes a task with a fixed delay between completions. + * + * The delay timer starts after each task completion, making this suitable for tasks that should maintain a minimum gap between + * executions. + * + * @param delay + * The duration to wait after each task completion before starting the next execution + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ + def repeatWithDelay[E, S](delay: Duration)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = + repeatWithDelay(Duration.Zero, delay)(f) + + /** Repeatedly executes a task with a fixed delay between completions, starting after an initial delay. + * + * @param startAfter + * The duration to wait before the first execution + * @param delay + * The duration to wait after each task completion before starting the next execution + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ + def repeatWithDelay[E, S]( + startAfter: Duration, + delay: Duration + )( + f: => Unit < (Async & Abort[E]) + )(using Frame): Fiber[E, Unit] < IO = + repeatWithDelay(startAfter, delay, ())(_ => f) + + /** Repeatedly executes a task with a fixed delay between completions, maintaining state between executions. + * + * @param startAfter + * The duration to wait before the first execution + * @param delay + * The duration to wait after each task completion before starting the next execution + * @param state + * The initial state value + * @param f + * A function that takes the current state and returns the next state + * @tparam A + * The type of the state value + * @return + * A Fiber that can be used to control or interrupt the recurring task and access the final state + */ + def repeatWithDelay[E, A: Flat, S]( + startAfter: Duration, + delay: Duration, + state: A + )( + f: A => A < (Async & Abort[E]) + )(using Frame): Fiber[E, A] < IO = + repeatWithDelay(Schedule.delay(startAfter).andThen(Schedule.fixed(delay)), state)(f) + + /** Repeatedly executes a task with delays determined by a custom schedule. + * + * @param delaySchedule + * A schedule that determines the timing between executions + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ + def repeatWithDelay[E, S](delaySchedule: Schedule)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = + repeatWithDelay(delaySchedule, ())(_ => f) + + /** Repeatedly executes a task with delays determined by a custom schedule, maintaining state between executions. + * + * @param delaySchedule + * A schedule that determines the timing between executions + * @param state + * The initial state value + * @param f + * A function that takes the current state and returns the next state + * @tparam A + * The type of the state value + * @return + * A Fiber that can be used to control or interrupt the recurring task and access the final state + */ + def repeatWithDelay[E, A: Flat, S]( + delaySchedule: Schedule, + state: A + )( + f: A => A < (Async & Abort[E]) + )(using Frame): Fiber[E, A] < IO = + Async.run { + Clock.use { clock => + Loop(state, delaySchedule) { (state, schedule) => + schedule.next match + case Absent => Loop.done(state) + case Present((duration, nextSchedule)) => + clock.sleep(duration).map(_.use(_ => f(state).map(Loop.continue(_, nextSchedule)))) + } + } + } + + /** Repeatedly executes a task at fixed time intervals. + * + * Unlike repeatWithDelay, this ensures consistent execution intervals regardless of task duration. If a task takes longer than the + * interval, the next execution will start immediately after completion. + * + * @param interval + * The fixed time interval between task starts + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ + def repeatAtInterval[E, S](interval: Duration)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = + repeatAtInterval(Duration.Zero, interval)(f) + + /** Repeatedly executes a task at fixed time intervals, starting after an initial delay. + * + * @param startAfter + * The duration to wait before the first execution + * @param interval + * The fixed time interval between task starts + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ + def repeatAtInterval[E, S]( + startAfter: Duration, + interval: Duration + )( + f: => Unit < (Async & Abort[E]) + )(using Frame): Fiber[E, Unit] < IO = + repeatAtInterval(startAfter, interval, ())(_ => f) + + /** Repeatedly executes a task at fixed time intervals, maintaining state between executions. + * + * @param startAfter + * The duration to wait before the first execution + * @param interval + * The fixed time interval between task starts + * @param state + * The initial state value + * @param f + * A function that takes the current state and returns the next state + * @tparam A + * The type of the state value + * @return + * A Fiber that can be used to control or interrupt the recurring task and access the final state + */ + def repeatAtInterval[E, A: Flat, S]( + startAfter: Duration, + interval: Duration, + state: A + )( + f: A => A < (Async & Abort[E]) + )(using Frame): Fiber[E, A] < IO = + repeatAtInterval(Schedule.delay(startAfter).andThen(Schedule.fixed(interval)), state)(f) + + /** Repeatedly executes a task with intervals determined by a custom schedule. + * + * @param intervalSchedule + * A schedule that determines the timing between executions + * @param f + * The task to execute + * @return + * A Fiber that can be used to control or interrupt the recurring task + */ + def repeatAtInterval[E, S](intervalSchedule: Schedule)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = + repeatAtInterval(intervalSchedule, ())(_ => f) + + /** Repeatedly executes a task with intervals determined by a custom schedule, maintaining state between executions. + * + * @param intervalSchedule + * A schedule that determines the timing between executions + * @param state + * The initial state value + * @param f + * A function that takes the current state and returns the next state + * @tparam A + * The type of the state value + * @return + * A Fiber that can be used to control or interrupt the recurring task and access the final state + */ + def repeatAtInterval[E, A: Flat, S]( + intervalSchedule: Schedule, + state: A + )( + f: A => A < (Async & Abort[E]) + )(using Frame): Fiber[E, A] < IO = + Async.run { + Clock.use { clock => + clock.now.map { now => + Loop(now, state, intervalSchedule) { (lastExecution, state, period) => + period.next match + case Absent => Loop.done(state) + case Present((duration, nextSchedule)) => + val nextExecution = lastExecution + duration + clock.sleep(duration).map(_.use(_ => f(state).map(Loop.continue(nextExecution, _, nextSchedule)))) + } + } + } + } + /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ abstract class Unsafe: import AllowUnsafe.embrace.danger diff --git a/kyo-core/shared/src/main/scala/kyo/Meter.scala b/kyo-core/shared/src/main/scala/kyo/Meter.scala index e6750fb4a..45c0bd661 100644 --- a/kyo-core/shared/src/main/scala/kyo/Meter.scala +++ b/kyo-core/shared/src/main/scala/kyo/Meter.scala @@ -114,7 +114,7 @@ object Meter: new Base(rate): val timerTask = // Schedule periodic task to replenish permits - IO.Unsafe.run(Timer.repeatAtInterval(period)(replenish())).eval + IO.Unsafe.run(Clock.repeatAtInterval(period)(replenish())).eval def dispatch[A, S](v: => A < S) = // Don't release a permit since it's managed by the timer task diff --git a/kyo-core/shared/src/main/scala/kyo/Timer.scala b/kyo-core/shared/src/main/scala/kyo/Timer.scala deleted file mode 100644 index bea0f0dc0..000000000 --- a/kyo-core/shared/src/main/scala/kyo/Timer.scala +++ /dev/null @@ -1,213 +0,0 @@ -package kyo - -/** A utility for scheduling recurring tasks with different timing strategies. - * - * Timer provides two main scheduling approaches: - * - `repeatWithDelay`: Executes tasks with a fixed delay between task completions - * - `repeatAtInterval`: Executes tasks at fixed time intervals, regardless of task duration - */ -object Timer: - - /** Repeatedly executes a task with a fixed delay between completions. - * - * The delay timer starts after each task completion, making this suitable for tasks that should maintain a minimum gap between - * executions. - * - * @param delay - * The duration to wait after each task completion before starting the next execution - * @param f - * The task to execute - * @return - * A Fiber that can be used to control or interrupt the recurring task - */ - def repeatWithDelay[E, S](delay: Duration)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = - repeatWithDelay(Duration.Zero, delay)(f) - - /** Repeatedly executes a task with a fixed delay between completions, starting after an initial delay. - * - * @param startAfter - * The duration to wait before the first execution - * @param delay - * The duration to wait after each task completion before starting the next execution - * @param f - * The task to execute - * @return - * A Fiber that can be used to control or interrupt the recurring task - */ - def repeatWithDelay[E, S]( - startAfter: Duration, - delay: Duration - )( - f: => Unit < (Async & Abort[E]) - )(using Frame): Fiber[E, Unit] < IO = - repeatWithDelay(startAfter, delay, ())(_ => f) - - /** Repeatedly executes a task with a fixed delay between completions, maintaining state between executions. - * - * @param startAfter - * The duration to wait before the first execution - * @param delay - * The duration to wait after each task completion before starting the next execution - * @param state - * The initial state value - * @param f - * A function that takes the current state and returns the next state - * @tparam A - * The type of the state value - * @return - * A Fiber that can be used to control or interrupt the recurring task and access the final state - */ - def repeatWithDelay[E, A: Flat, S]( - startAfter: Duration, - delay: Duration, - state: A - )( - f: A => A < (Async & Abort[E]) - )(using Frame): Fiber[E, A] < IO = - repeatWithDelay(Schedule.delay(startAfter).andThen(Schedule.fixed(delay)), state)(f) - - /** Repeatedly executes a task with delays determined by a custom schedule. - * - * @param delaySchedule - * A schedule that determines the timing between executions - * @param f - * The task to execute - * @return - * A Fiber that can be used to control or interrupt the recurring task - */ - def repeatWithDelay[E, S](delaySchedule: Schedule)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = - repeatWithDelay(delaySchedule, ())(_ => f) - - /** Repeatedly executes a task with delays determined by a custom schedule, maintaining state between executions. - * - * @param delaySchedule - * A schedule that determines the timing between executions - * @param state - * The initial state value - * @param f - * A function that takes the current state and returns the next state - * @tparam A - * The type of the state value - * @return - * A Fiber that can be used to control or interrupt the recurring task and access the final state - */ - def repeatWithDelay[E, A: Flat, S]( - delaySchedule: Schedule, - state: A - )( - f: A => A < (Async & Abort[E]) - )(using Frame): Fiber[E, A] < IO = - Async.run { - Clock.use { clock => - Loop(state, delaySchedule) { (state, schedule) => - schedule.next match - case Absent => Loop.done(state) - case Present((duration, nextSchedule)) => - clock.sleep(duration).map(_.use(_ => f(state).map(Loop.continue(_, nextSchedule)))) - } - } - } - - /** Repeatedly executes a task at fixed time intervals. - * - * Unlike repeatWithDelay, this ensures consistent execution intervals regardless of task duration. If a task takes longer than the - * interval, the next execution will start immediately after completion. - * - * @param interval - * The fixed time interval between task starts - * @param f - * The task to execute - * @return - * A Fiber that can be used to control or interrupt the recurring task - */ - def repeatAtInterval[E, S](interval: Duration)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = - repeatAtInterval(Duration.Zero, interval)(f) - - /** Repeatedly executes a task at fixed time intervals, starting after an initial delay. - * - * @param startAfter - * The duration to wait before the first execution - * @param interval - * The fixed time interval between task starts - * @param f - * The task to execute - * @return - * A Fiber that can be used to control or interrupt the recurring task - */ - def repeatAtInterval[E, S]( - startAfter: Duration, - interval: Duration - )( - f: => Unit < (Async & Abort[E]) - )(using Frame): Fiber[E, Unit] < IO = - repeatAtInterval(startAfter, interval, ())(_ => f) - - /** Repeatedly executes a task at fixed time intervals, maintaining state between executions. - * - * @param startAfter - * The duration to wait before the first execution - * @param interval - * The fixed time interval between task starts - * @param state - * The initial state value - * @param f - * A function that takes the current state and returns the next state - * @tparam A - * The type of the state value - * @return - * A Fiber that can be used to control or interrupt the recurring task and access the final state - */ - def repeatAtInterval[E, A: Flat, S]( - startAfter: Duration, - interval: Duration, - state: A - )( - f: A => A < (Async & Abort[E]) - )(using Frame): Fiber[E, A] < IO = - repeatAtInterval(Schedule.delay(startAfter).andThen(Schedule.fixed(interval)), state)(f) - - /** Repeatedly executes a task with intervals determined by a custom schedule. - * - * @param intervalSchedule - * A schedule that determines the timing between executions - * @param f - * The task to execute - * @return - * A Fiber that can be used to control or interrupt the recurring task - */ - def repeatAtInterval[E, S](intervalSchedule: Schedule)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO = - repeatAtInterval(intervalSchedule, ())(_ => f) - - /** Repeatedly executes a task with intervals determined by a custom schedule, maintaining state between executions. - * - * @param intervalSchedule - * A schedule that determines the timing between executions - * @param state - * The initial state value - * @param f - * A function that takes the current state and returns the next state - * @tparam A - * The type of the state value - * @return - * A Fiber that can be used to control or interrupt the recurring task and access the final state - */ - def repeatAtInterval[E, A: Flat, S]( - intervalSchedule: Schedule, - state: A - )( - f: A => A < (Async & Abort[E]) - )(using Frame): Fiber[E, A] < IO = - Async.run { - Clock.use { clock => - clock.now.map { now => - Loop(now, state, intervalSchedule) { (lastExecution, state, period) => - period.next match - case Absent => Loop.done(state) - case Present((duration, nextSchedule)) => - val nextExecution = lastExecution + duration - clock.sleep(duration).map(_.use(_ => f(state).map(Loop.continue(nextExecution, _, nextSchedule)))) - } - } - } - } -end Timer diff --git a/kyo-core/shared/src/test/scala/kyo/ClockTest.scala b/kyo-core/shared/src/test/scala/kyo/ClockTest.scala index fc016b947..11fe4860d 100644 --- a/kyo-core/shared/src/test/scala/kyo/ClockTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/ClockTest.scala @@ -319,4 +319,146 @@ class ClockTest extends Test: } } } + + def intervals(instants: Seq[Instant]): Seq[Duration] = + instants.sliding(2, 1).filter(_.size == 2).map(seq => seq(1) - seq(0)).toSeq + + "repeatAtInterval" - { + "executes function at interval" in run { + for + channel <- Channel.init[Instant](10) + task <- Clock.repeatAtInterval(1.millis)(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield + val avgInterval = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) + assert(avgInterval >= 1.millis && avgInterval < 10.millis) + } + "respects interrupt" in run { + for + channel <- Channel.init[Instant](10) + task <- Clock.repeatAtInterval(1.millis)(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + _ <- Async.sleep(2.millis) + result <- channel.poll + yield assert(result.isEmpty) + } + "with time control" in runJVM { + Clock.withTimeControl { control => + for + running <- AtomicBoolean.init(false) + queue <- Queue.Unbounded.init[Instant]() + task <- Clock.repeatAtInterval(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) + _ <- untilTrue(control.advance(1.milli).andThen(running.get)) + _ <- queue.drain + _ <- control.advance(1.milli).repeat(10) + _ <- task.interrupt + instants <- queue.drain + yield + intervals(instants).foreach(v => assert(v == 1.millis)) + succeed + } + } + "with Schedule parameter" in run { + for + channel <- Channel.init[Instant](10) + task <- Clock.repeatAtInterval(Schedule.fixed(1.millis))(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield + val avgInterval = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) + assert(avgInterval >= 1.millis && avgInterval < 10.millis) + } + "with Schedule and state" in run { + for + channel <- Channel.init[Int](10) + task <- Clock.repeatAtInterval(Schedule.fixed(1.millis), 0)(st => channel.put(st).andThen(st + 1)) + numbers <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield assert(numbers.toSeq == (0 until 10)) + } + "completes when schedule completes" in run { + for + channel <- Channel.init[Int](10) + task <- Clock.repeatAtInterval(Schedule.fixed(1.millis).maxDuration(10.millis), 0)(st => channel.put(st).andThen(st + 1)) + lastState <- task.get + numbers <- channel.drain + yield assert(lastState == 10 && numbers.toSeq == (0 until 10)) + } + } + + "repeatWithDelay" - { + "executes function with delay" in run { + for + channel <- Channel.init[Instant](10) + task <- Clock.repeatWithDelay(1.millis)(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield + val avgDelay = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) + assert(avgDelay >= 1.millis && avgDelay < 10.millis) + } + + "respects interrupt" in run { + for + channel <- Channel.init[Instant](10) + task <- Clock.repeatWithDelay(1.millis)(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + _ <- Async.sleep(2.millis) + result <- channel.poll + yield assert(result.isEmpty) + } + + "with time control" in runJVM { + Clock.withTimeControl { control => + for + running <- AtomicBoolean.init(false) + queue <- Queue.Unbounded.init[Instant]() + task <- Clock.repeatWithDelay(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) + _ <- untilTrue(control.advance(1.milli).andThen(running.get)) + _ <- queue.drain + _ <- control.advance(1.milli).repeat(10) + _ <- task.interrupt + instants <- queue.drain + yield + intervals(instants).foreach(v => assert(v == 1.millis)) + succeed + } + } + + "works with Schedule parameter" in run { + for + channel <- Channel.init[Instant](10) + task <- Clock.repeatWithDelay(Schedule.fixed(1.millis))(Clock.now.map(channel.put)) + instants <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield + val avgDelay = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) + assert(avgDelay >= 1.millis && avgDelay < 10.millis) + } + + "works with Schedule and state" in run { + for + channel <- Channel.init[Int](10) + task <- Clock.repeatWithDelay(Schedule.fixed(1.millis), 0) { state => + channel.put(state).as(state + 1) + } + numbers <- Kyo.fill(10)(channel.take) + _ <- task.interrupt + yield assert(numbers.toSeq == (0 until 10)) + end for + } + + "completes when schedule completes" in run { + for + channel <- Channel.init[Int](10) + task <- Clock.repeatWithDelay(Schedule.fixed(1.millis).maxDuration(10.millis), 0)(st => channel.put(st).andThen(st + 1)) + lastState <- task.get + numbers <- channel.drain + yield assert(lastState == 10 && numbers.toSeq == (0 until 10)) + } + } + end ClockTest diff --git a/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala b/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala index 2b2c007e0..9d8b03343 100644 --- a/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala @@ -33,7 +33,7 @@ class KyoAppTest extends Test: "effects" taggedAs jvmOnly in { def run: Int < (Async & Resource & Abort[Throwable]) = for - _ <- Timer.repeatAtInterval(1.second, 1.second)(()) + _ <- Clock.repeatAtInterval(1.second, 1.second)(()) i <- Random.nextInt _ <- Console.println(s"$i") _ <- Clock.now diff --git a/kyo-core/shared/src/test/scala/kyo/TimerTest.scala b/kyo-core/shared/src/test/scala/kyo/TimerTest.scala deleted file mode 100644 index 1c074d856..000000000 --- a/kyo-core/shared/src/test/scala/kyo/TimerTest.scala +++ /dev/null @@ -1,150 +0,0 @@ -package kyo - -import java.util.concurrent.atomic.AtomicInteger as JAtomicInteger -import org.scalatest.compatible.Assertion - -class TimerTest extends Test: - - def intervals(instants: Seq[Instant]): Seq[Duration] = - instants.sliding(2, 1).filter(_.size == 2).map(seq => seq(1) - seq(0)).toSeq - - "repeatAtInterval" - { - "executes function at interval" in run { - for - channel <- Channel.init[Instant](10) - task <- Timer.repeatAtInterval(1.millis)(Clock.now.map(channel.put)) - instants <- Kyo.fill(10)(channel.take) - _ <- task.interrupt - yield - val avgInterval = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) - assert(avgInterval >= 1.millis && avgInterval < 10.millis) - } - "respects interrupt" in run { - for - channel <- Channel.init[Instant](10) - task <- Timer.repeatAtInterval(1.millis)(Clock.now.map(channel.put)) - instants <- Kyo.fill(10)(channel.take) - _ <- task.interrupt - _ <- Async.sleep(2.millis) - result <- channel.poll - yield assert(result.isEmpty) - } - "with time control" in runJVM { - Clock.withTimeControl { control => - for - running <- AtomicBoolean.init(false) - queue <- Queue.Unbounded.init[Instant]() - task <- Timer.repeatAtInterval(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) - _ <- untilTrue(control.advance(1.milli).andThen(running.get)) - _ <- queue.drain - _ <- control.advance(1.milli).repeat(10) - _ <- task.interrupt - instants <- queue.drain - yield - intervals(instants).foreach(v => assert(v == 1.millis)) - succeed - } - } - "with Schedule parameter" in run { - for - channel <- Channel.init[Instant](10) - task <- Timer.repeatAtInterval(Schedule.fixed(1.millis))(Clock.now.map(channel.put)) - instants <- Kyo.fill(10)(channel.take) - _ <- task.interrupt - yield - val avgInterval = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) - assert(avgInterval >= 1.millis && avgInterval < 10.millis) - } - "with Schedule and state" in run { - for - channel <- Channel.init[Int](10) - task <- Timer.repeatAtInterval(Schedule.fixed(1.millis), 0)(st => channel.put(st).andThen(st + 1)) - numbers <- Kyo.fill(10)(channel.take) - _ <- task.interrupt - yield assert(numbers.toSeq == (0 until 10)) - } - "completes when schedule completes" in run { - for - channel <- Channel.init[Int](10) - task <- Timer.repeatAtInterval(Schedule.fixed(1.millis).maxDuration(10.millis), 0)(st => channel.put(st).andThen(st + 1)) - lastState <- task.get - numbers <- channel.drain - yield assert(lastState == 10 && numbers.toSeq == (0 until 10)) - } - } - - "repeatWithDelay" - { - "executes function with delay" in run { - for - channel <- Channel.init[Instant](10) - task <- Timer.repeatWithDelay(1.millis)(Clock.now.map(channel.put)) - instants <- Kyo.fill(10)(channel.take) - _ <- task.interrupt - yield - val avgDelay = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) - assert(avgDelay >= 1.millis && avgDelay < 10.millis) - } - - "respects interrupt" in run { - for - channel <- Channel.init[Instant](10) - task <- Timer.repeatWithDelay(1.millis)(Clock.now.map(channel.put)) - instants <- Kyo.fill(10)(channel.take) - _ <- task.interrupt - _ <- Async.sleep(2.millis) - result <- channel.poll - yield assert(result.isEmpty) - } - - "with time control" in runJVM { - Clock.withTimeControl { control => - for - running <- AtomicBoolean.init(false) - queue <- Queue.Unbounded.init[Instant]() - task <- Timer.repeatWithDelay(1.milli)(running.set(true).andThen(Clock.now.map(queue.add))) - _ <- untilTrue(control.advance(1.milli).andThen(running.get)) - _ <- queue.drain - _ <- control.advance(1.milli).repeat(10) - _ <- task.interrupt - instants <- queue.drain - yield - intervals(instants).foreach(v => assert(v == 1.millis)) - succeed - } - } - - "works with Schedule parameter" in run { - for - channel <- Channel.init[Instant](10) - task <- Timer.repeatWithDelay(Schedule.fixed(1.millis))(Clock.now.map(channel.put)) - instants <- Kyo.fill(10)(channel.take) - _ <- task.interrupt - yield - val avgDelay = intervals(instants).reduce(_ + _) * (1.toDouble / (instants.size - 2)) - assert(avgDelay >= 1.millis && avgDelay < 10.millis) - } - - "works with Schedule and state" in run { - val counter = new JAtomicInteger(0) - for - channel <- Channel.init[Int](10) - task <- Timer.repeatWithDelay(Schedule.fixed(1.millis), 0) { state => - channel.put(state).as(state + 1) - } - numbers <- Kyo.fill(10)(channel.take) - _ <- task.interrupt - yield assert(numbers.toSeq == (0 until 10)) - end for - } - - "completes when schedule completes" in run { - for - channel <- Channel.init[Int](10) - task <- Timer.repeatWithDelay(Schedule.fixed(1.millis).maxDuration(10.millis), 0)(st => channel.put(st).andThen(st + 1)) - lastState <- task.get - numbers <- channel.drain - yield assert(lastState == 10 && numbers.toSeq == (0 until 10)) - } - } - -end TimerTest From 99f540e970b7c8be841ecaf313ccbc6ec17036e4 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Tue, 29 Oct 2024 20:18:51 -0700 Subject: [PATCH 14/15] cache ChronoUnit to Units lookup --- kyo-data/shared/src/main/scala/kyo/Duration.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kyo-data/shared/src/main/scala/kyo/Duration.scala b/kyo-data/shared/src/main/scala/kyo/Duration.scala index f81695fc0..9022719cd 100644 --- a/kyo-data/shared/src/main/scala/kyo/Duration.scala +++ b/kyo-data/shared/src/main/scala/kyo/Duration.scala @@ -118,10 +118,11 @@ object Duration: end Units object Units: + private val byChronoUnit: Map[ChronoUnit, Units] = Units.values.map(u => (u.chronoUnit, u)).toMap + def fromJava(chronoUnit: ChronoUnit): Units = - Units.values.find(_.chronoUnit.equals(chronoUnit)) match - case None => throw new UnsupportedOperationException("Chrono unit not suppported: " + chronoUnit) - case Some(unit) => unit + byChronoUnit.get(chronoUnit) + .getOrElse(throw new UnsupportedOperationException("Chrono unit not suppported: " + chronoUnit)) def fromJava(timeUnit: TimeUnit): Units = given CanEqual[TimeUnit, TimeUnit] = CanEqual.derived From e07a39b45e428cdf3d76fbe2835ff9dfa1a29fe7 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Tue, 29 Oct 2024 20:27:22 -0700 Subject: [PATCH 15/15] Revert "address review feedback" This reverts commit aef86c4fdad720993ff0fc87ba7d1260f4589852. --- kyo-core/shared/src/main/scala/kyo/Clock.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index 1c1306e20..5b9a939db 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -512,12 +512,12 @@ object Clock: def now()(using AllowUnsafe) = Instant.fromJava(java.time.Instant.now()) def sleep(duration: Duration) = Promise.Unsafe.fromIOPromise { - new IOPromise[Nothing, Unit] with Runnable: + new IOPromise[Nothing, Unit] with Callable[Unit]: val task = executor.schedule(this, duration.toNanos, TimeUnit.NANOSECONDS) override def interrupt(error: Panic): Boolean = discard(task.cancel(true)) super.interrupt(error) - def run() = completeDiscard(Result.unit) + def call(): Unit = completeDiscard(Result.unit) } end sleep end Unsafe