Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] time shift and control #789

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open

[core] time shift and control #789

wants to merge 16 commits into from

Conversation

fwbrasil
Copy link
Collaborator

This PR uses the new Schedule in kyo-data to reimplement timers in kyo-core. The new design provides two mechanisms to control time: Clock.withTimeShift speeds up or slows down the time for a computation based on a factor and Clock.withTimeControl provides an API for users to manually set the time. Both methods not only affect Clock.now but also scheduled operations in Timer and Async.sleep calls.

def now()(using AllowUnsafe): Instant = Instant.fromJava(java.time.Instant.now())
val executor = Executors.newScheduledThreadPool(2, Threads("kyo-core-clock-executor"))
def now()(using AllowUnsafe) = Instant.fromJava(java.time.Instant.now())
def sleep(duration: Duration) =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to move the management of timed tasks to Clock so it's possible to customize the behavior for time shifting and control

deadline.toJava.compareTo(other.asInstanceOf[Task].deadline.toJava)
end Task

val queue = new DelayQueue[Task]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As simple timer implementation using DelayQueue. The queue is synchronized so sleep can be called concurrently by multiple fibers

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change it to a manually-synchronized PriorityQueue because DelayQueue isn't available in js

tick()
}

def tick(): Unit =
Copy link
Collaborator Author

@fwbrasil fwbrasil Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the user changes the time, this method is called to execute the delayed tasks if it's time=

def interrupt(error: Panic)(using AllowUnsafe): Boolean = self.interrupt(error)
def interruptDiscard(error: Panic)(using AllowUnsafe): Unit = discard(self.interrupt(error))
def mask()(using AllowUnsafe): Unsafe[E, A] = self.mask()
def interrupt()(using frame: Frame, allow: AllowUnsafe): Boolean = self.interrupt(Panic(Interrupted(frame)))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new overloaded interrupt without params. The following methods were changed only because of formatting

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be useful! I think kyo-zio can use it.

def repeatWithDelay[E, S](delaySchedule: Schedule)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO =
repeatWithDelay(delaySchedule, ())(_ => f)

def repeatWithDelay[E, A: Flat, S](
Copy link
Collaborator Author

@fwbrasil fwbrasil Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of relying on the underlying java timer to manage repeated tasks, this new Timer implementation spins up new fibers that use the provided Schedule to repeat and Clock to sleep when necessary. This approach also allows scheduled tasks to maintain internal state between iterations and return a value at the end (see the A type).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is much more useful!

@@ -58,7 +59,7 @@ object Duration:
* @return
* A Duration instance
*/
inline def fromNanos(value: Long): Duration =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that was quite annoying while working on this change were assertion errors reporting inlined trees from Duration operations. It becomes a large and confusing error message. Since these methods don't take functions, the benefit of inlining them is low so I've decided to remove the modifier

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I think this is the right move. I've noticed this issue with loop as well. If you incorrectly return a non-loop (Done/Continue) value the expanded tree is massive.

inline def toWeeks: Long = self.to(Weeks)
inline def toMonths: Long = self.to(Months)
inline def toYears: Long = self.to(Years)
def to(timeUnit: TimeUnit): Long =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new methods to integrate with TimeUnit and ChronoUnit

kyo-core/shared/src/main/scala/kyo/Clock.scala Outdated Show resolved Hide resolved
def repeatWithDelay[E, S](delaySchedule: Schedule)(f: => Unit < (Async & Abort[E]))(using Frame): Fiber[E, Unit] < IO =
repeatWithDelay(delaySchedule, ())(_ => f)

def repeatWithDelay[E, A: Flat, S](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is much more useful!

* - `repeatWithDelay`: Executes tasks with a fixed delay between task completions
* - `repeatAtInterval`: Executes tasks at fixed time intervals, regardless of task duration
*/
object Timer:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a distinct object for Timer? I generally prefer extension methods (or at least avoiding too many distinct objects) for discoverability.

I think most other effect systems don't have a concept of 'Timer'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it doesn't feel necessary. Perhaps the methods could be in Clock since sleep is already there?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made the change, it seems ok

@@ -58,7 +59,7 @@ object Duration:
* @return
* A Duration instance
*/
inline def fromNanos(value: Long): Duration =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I think this is the right move. I've noticed this issue with loop as well. If you incorrectly return a non-loop (Done/Continue) value the expanded tree is massive.

@@ -116,46 +117,72 @@ object Duration:
val factor: Double = chronoUnit.getDuration.toNanos.toDouble
end Units

object Units:
def fromJava(chronoUnit: ChronoUnit): Units =
Units.values.find(_.chronoUnit.equals(chronoUnit)) match
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a Map?

def interrupt(error: Panic)(using AllowUnsafe): Boolean = self.interrupt(error)
def interruptDiscard(error: Panic)(using AllowUnsafe): Unit = discard(self.interrupt(error))
def mask()(using AllowUnsafe): Unsafe[E, A] = self.mask()
def interrupt()(using frame: Frame, allow: AllowUnsafe): Boolean = self.interrupt(Panic(Interrupted(frame)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be useful! I think kyo-zio can use it.

def now()(using AllowUnsafe) = Instant.fromJava(java.time.Instant.now())
def sleep(duration: Duration) =
Promise.Unsafe.fromIOPromise {
new IOPromise[Nothing, Unit] with Callable[Unit]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to use Callable[Unit] vs Runnable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had changed it to Runnable but then I remembered why I used Callable. It's what's supported by the ScalaJS stubs:

[info] compiling 27 Scala sources to /home/runner/work/kyo/kyo/kyo-prelude/js/target/scala-3.5.2/test-classes ...
[error] -- [E007] Type Mismatch Error: /home/runner/work/kyo/kyo/kyo-core/shared/src/main/scala/kyo/Clock.scala:516:57 
[error] 516 |                            val task = executor.schedule(this, duration.toNanos, TimeUnit.NANOSECONDS)
[error]     |                                                         ^^^^
[error]     |Found:    ($anon.this : kyo.scheduler.IOPromise[Nothing, Unit] with Runnable {...})
[error]     |Required: java.util.concurrent.Callable[A]
[error]     |
[error]     |where:    A is a type variable
[error]     |
[error]     | longer explanation available when compiling with `-explain`
[error] one error found

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants