Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] time shift and control #789

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
72 changes: 28 additions & 44 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2357,63 +2357,47 @@ import kyo.*
val a: Unit < IO =
IO(())

// Schedule a delayed task
val b: TimerTask < IO =
Timer.schedule(delay = 1.second)(a)

// Recurring task with
// intial delay
val c: TimerTask < IO =
Timer.scheduleAtFixedRate(
initialDelay = 1.minutes,
period = 1.minutes
// Recurring task with a delay between
// executions
val b: Fiber[Nothing, Unit] < IO =
Timer.repeatWithDelay(
startAfter = 1.minute,
delay = 1.minute
)(a)

// Recurring task without
// initial delay
val d: TimerTask < IO =
Timer.scheduleAtFixedRate(
period = 1.minutes
)(a)

// Schedule with fixed delay between tasks
val e: TimerTask < IO =
Timer.scheduleWithFixedDelay(
initialDelay = 1.minutes,
period = 1.minutes
)(a)
// Without an initial delay
val c: Fiber[Nothing, Unit] < IO =
Timer.repeatWithDelay(1.minute)(a)

// without initial delay
val f: TimerTask < IO =
Timer.scheduleWithFixedDelay(
period = 1.minutes
// Schedule at a specific interval, regarless
// of the duration of each execution
val d: Fiber[Nothing, Unit] < IO =
Timer.repeatAtInterval(
startAfter = 1.minute,
interval = 1.minute
)(a)

// Specify the 'Timer' explictly
val i: TimerTask < IO =
Timer.let(Timer.live)(f)
// Without an initial delay
val e: Fiber[Nothing, Unit] < IO =
Timer.repeatAtInterval(1.minute)(a)
```

`TimerTask` offers methods for more granular control over the scheduled tasks.
Use the returned `Fiber` to control scheduled tasks.

```scala
import kyo.*

// Example TimerTask
val a: TimerTask < IO =
Timer.schedule(1.second)(())
// Example task
val a: Fiber[Nothing, Unit] < IO =
Timer.repeatAtInterval(1.second)(())

// Try to cancel the task
val b: Boolean < IO =
a.map(_.cancel)
// Try to cancel a task
def b(task: Fiber[Nothing, Unit]): Boolean < IO =
task.interrupt

// Check if the task is cancelled
val c: Boolean < IO =
a.map(_.cancelled)

// Check if the task is done
val d: Boolean < IO =
a.map(_.done)
// Check if a task is done
def c(task: Fiber[Nothing, Unit]): Boolean < IO =
task.done
```

### Latch: Countdown Synchronization
Expand Down
35 changes: 16 additions & 19 deletions kyo-core/shared/src/main/scala/kyo/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,9 @@ object Async:
* @param d
* The duration to sleep
*/
def sleep(d: Duration)(using Frame): Unit < Async =
if d == Duration.Zero then ()
else
IO.Unsafe {
val p = Promise.Unsafe.init[Nothing, Unit]()
if d.isFinite then
Timer.schedule(d)(p.completeDiscard(Result.success(()))).map { t =>
IO.ensure(t.cancel.unit)(p.safe.get)
}
else
p.safe.get
end if
}
def sleep(duration: Duration)(using Frame): Unit < Async =
if duration == Duration.Zero then ()
hearnadam marked this conversation as resolved.
Show resolved Hide resolved
else Clock.sleep(duration).map(_.get)

/** Runs a computation with a timeout.
*
Expand All @@ -128,17 +118,24 @@ object Async:
* @return
* The result of the computation, or a Timeout error
*/
def timeout[E, A: Flat, Ctx](d: Duration)(v: => A < (Abort[E] & Async & Ctx))(
def timeout[E, A: Flat, Ctx](after: Duration)(v: => A < (Abort[E] & Async & Ctx))(
using
boundary: Boundary[Ctx, Async & Abort[E | Timeout]],
frame: Frame
): A < (Abort[E | Timeout] & Async & Ctx) =
boundary { (trace, context) =>
val task = IOTask[Ctx, E | Timeout, A](v, trace, context)
Timer.schedule(d)(task.completeDiscard(Result.fail(Timeout(frame)))).map { t =>
IO.ensure(t.cancel.unit)(Async.get(task))
if !after.isFinite then v
else
boundary { (trace, context) =>
Clock.use { clock =>
IO.Unsafe {
val sleepFiber = clock.unsafe.sleep(after)
val task = IOTask[Ctx, E | Timeout, A](v, trace, context)
sleepFiber.onComplete(_ => discard(task.complete(Result.fail(Timeout(frame)))))
task.onComplete(_ => discard(sleepFiber.interrupt()))
Async.get(task)
}
}
}
}
end timeout

/** Races multiple computations and returns the result of the first to complete. When one computation completes, all other computations
Expand Down
Loading
Loading