Skip to content

Commit

Permalink
data: Schedule (#733)
Browse files Browse the repository at this point in the history
Inspired by ZIO's `Schedule` but as pure data structure.
  • Loading branch information
fwbrasil authored Oct 19, 2024
1 parent 3e5682b commit 5ac1a86
Show file tree
Hide file tree
Showing 11 changed files with 1,307 additions and 133 deletions.
25 changes: 6 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2038,30 +2038,17 @@ import scala.concurrent.duration.*
val unreliableComputation: Int < Abort[Exception] =
Abort.catching[Exception](throw new Exception("Temporary failure"))

// Customize retry policy
val customPolicy = Retry.Policy.default
.limit(5)
.exponential(100.millis, maxBackoff = 5.seconds)
// Customize retry schedule
val shedule =
Schedule.exponentialBackoff(initial = 100.millis, factor = 2, maxBackoff = 5.seconds)
.take(5)

val a: Int < (Abort[Exception] & Async) =
Retry[Exception](customPolicy)(unreliableComputation)
Retry[Exception](shedule)(unreliableComputation)

// Use a custom policy builder
val b: Int < (Abort[Exception] & Async) =
Retry[Exception] { policy =>
policy
.limit(10)
.backoff(attempt => (attempt * 100).millis)
}(unreliableComputation)
```

The `Retry` effect automatically adds the `Async` effect to handle the backoff delays between retry attempts. The `Policy` class allows for fine-tuning of the retry behavior:

- `limit`: Sets the maximum number of retry attempts.
- `exponential`: Configures exponential backoff with a starting delay and optional maximum delay.
- `backoff`: Allows for custom backoff strategies based on the attempt number.

`Retry` will continue attempting the computation until it succeeds, the retry limit is reached, or an unhandled exception is thrown. If all retries fail, the last failure is propagated.
The `Retry` effect automatically adds the `Async` effect to handle the provided `Schedule`. `Retry` will continue attempting the computation until it succeeds, the retry schedule is done, or an unhandled exception is thrown. If all retries fail, the last failure is propagated.

### Queue: Concurrent Queuing

Expand Down
29 changes: 10 additions & 19 deletions kyo-combinators/shared/src/main/scala/kyo/Combinators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,13 @@ extension [A, S](effect: A < S)
* @return
* A computation that produces the result of the last execution
*/
def repeat(policy: Retry.Policy)(using Flat[A], Frame): A < (S & Async) =
Loop.indexed { i =>
if i >= policy.limit then effect.map(Loop.done)
else effect.delayed(policy.backoff(i)).as(Loop.continue)
def repeat(schedule: Schedule)(using Flat[A], Frame): A < (S & Async) =
Loop(schedule) { schedule =>
schedule.next.map { (delay, nextSchedule) =>
effect.delayed(delay).as(Loop.continue(nextSchedule))
}.getOrElse {
effect.map(Loop.done)
}
}

/** Performs this computation repeatedly with a limit.
Expand Down Expand Up @@ -186,8 +189,8 @@ extension [A, S](effect: A < S)
* @return
* A computation that produces the result of this computation with Async and Abort[Throwable] effects
*/
def retry(policy: Retry.Policy)(using Flat[A], Frame): A < (S & Async & Abort[Throwable]) =
Retry[Throwable](policy)(effect)
def retry(schedule: Schedule)(using Flat[A], Frame): A < (S & Async & Abort[Throwable]) =
Retry[Throwable](schedule)(effect)

/** Performs this computation repeatedly with a limit in case of failures.
*
Expand All @@ -197,19 +200,7 @@ extension [A, S](effect: A < S)
* A computation that produces the result of this computation with Async and Abort[Throwable] effects
*/
def retry(n: Int)(using Flat[A], Frame): A < (S & Async & Abort[Throwable]) =
Retry[Throwable](Retry.Policy(_ => Duration.Zero, n))(effect)

/** Performs this computation repeatedly with a backoff policy and a limit in case of failures.
*
* @param backoff
* The backoff policy to use
* @param limit
* The limit to use
* @return
* A computation that produces the result of this computation with Async and Abort[Throwable] effects
*/
def retry(backoff: Int => Duration, n: Int)(using Flat[A], Frame): A < (S & Async & Abort[Throwable]) =
Retry[Throwable](Retry.Policy(backoff, n))(effect)
Retry[Throwable](Schedule.repeat(n))(effect)

/** Performs this computation indefinitely.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ class EffectCombinatorTest extends Test:

"repeat with policy" - {
"repeat with custom policy" in run {
var count = 0
val policy = Retry.Policy(_ => Duration.Zero, 3)
val effect = IO { count += 1; count }.repeat(policy)
var count = 0
val schedule = Schedule.repeat(3)
val effect = IO { count += 1; count }.repeat(schedule)
Async.run(effect).map(_.toFuture).map { handled =>
handled.map { v =>
assert(v == 4)
Expand Down Expand Up @@ -321,7 +321,7 @@ class EffectCombinatorTest extends Test:
"retry with policy" - {
"successful after retries with custom policy" in run {
var count = 0
val policy = Retry.Policy(_ => 10.millis, 3)
val policy = Schedule.fixed(10.millis).take(3)
val effect = IO {
count += 1
if count < 3 then throw new Exception("Retry")
Expand All @@ -333,20 +333,6 @@ class EffectCombinatorTest extends Test:
}
}

"retry with backoff and limit" - {
"successful after retries with exponential backoff" in run {
var count = 0
val backoff = (i: Int) => Math.pow(2, i).toLong.millis
val effect = IO {
count += 1
if count < 3 then throw new Exception("Retry")
else count
}.retry(backoff, 3)
Async.run(effect).map(_.toFuture).map { handled =>
handled.map(v => assert(v == 3))
}
}
}
}

"explicitThrowable" - {
Expand Down
80 changes: 8 additions & 72 deletions kyo-core/shared/src/main/scala/kyo/Retry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,74 +6,12 @@ import scala.util.*
/** Provides utilities for retrying operations with customizable policies. */
object Retry:

/** Represents a retry policy with backoff strategy and attempt limit. */
final case class Policy(backoff: Int => Duration, limit: Int):

/** Creates an exponential backoff strategy.
*
* @param startBackoff
* The initial backoff duration.
* @param factor
* The multiplier for each subsequent backoff.
* @param maxBackoff
* The maximum backoff duration.
* @return
* A new Policy with exponential backoff.
*/
def exponential(
startBackoff: Duration,
factor: Int = 2,
maxBackoff: Duration = Duration.Infinity
): Policy =
backoff { i =>
(startBackoff * factor * (i + 1)).min(maxBackoff)
}

/** Sets a custom backoff function.
*
* @param f
* A function that takes the attempt number and returns a Duration.
* @return
* A new Policy with the custom backoff function.
*/
def backoff(f: Int => Duration): Policy =
copy(backoff = f)

/** Sets the maximum number of retry attempts.
*
* @param v
* The maximum number of attempts.
* @return
* A new Policy with the specified attempt limit.
*/
def limit(v: Int): Policy =
copy(limit = v)
end Policy

object Policy:
/** The default retry policy with no backoff and 3 attempts. */
val default = Policy(_ => Duration.Zero, 3)
/** The default retry schedule. */
val defaultSchedule = Schedule.exponentialBackoff(initial = 100.millis, factor = 2, maxBackoff = 5.seconds).take(3)

/** Provides retry operations for a specific error type. */
final class RetryOps[E >: Nothing](dummy: Unit) extends AnyVal:

/** Retries an operation using the specified policy.
*
* @param policy
* The retry policy to use.
* @param v
* The operation to retry.
* @return
* The result of the operation, or an abort if all retries fail.
*/
def apply[A: Flat, S](policy: Policy)(v: => A < S)(
using
SafeClassTag[E],
Tag[E],
Frame
): A < (Async & Abort[E] & S) =
apply(_ => policy)(v)

/** Retries an operation using a custom policy builder.
*
* @param builder
Expand All @@ -83,21 +21,19 @@ object Retry:
* @return
* The result of the operation, or an abort if all retries fail.
*/
def apply[A: Flat, S](builder: Policy => Policy)(v: => A < (Abort[E] & S))(
def apply[A: Flat, S](schedule: Schedule)(v: => A < (Abort[E] & S))(
using
SafeClassTag[E],
Tag[E],
Frame
): A < (Async & Abort[E] & S) =
val b = builder(Policy.default)
Loop.indexed { attempt =>
Loop(schedule) { schedule =>
Abort.run[E](v).map(_.fold { r =>
if attempt < b.limit then
Async.sleep(b.backoff(attempt)).andThen {
Loop.continue
}
else
schedule.next.map { (delay, nextSchedule) =>
Async.delay(delay)(Loop.continue(nextSchedule))
}.getOrElse {
Abort.get(r)
}
}(Loop.done(_)))
}
end apply
Expand Down
10 changes: 5 additions & 5 deletions kyo-core/shared/src/test/scala/kyo/RetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class RetryTest extends Test:
"no retries" - {
"ok" in run {
var calls = 0
Retry[Any](_.limit(0)) {
Retry[Any](Schedule.never) {
calls += 1
42
}.map { v =>
Expand All @@ -17,7 +17,7 @@ class RetryTest extends Test:
"nok" in run {
var calls = 0
Abort.run[Exception] {
Retry[Exception](_.limit(0)) {
Retry[Exception](Schedule.never) {
calls += 1
throw ex
}
Expand All @@ -30,7 +30,7 @@ class RetryTest extends Test:
"retries" - {
"ok" in run {
var calls = 0
Retry[Any](_.limit(3)) {
Retry[Any](Schedule.repeat(3)) {
calls += 1
42
}.map { v =>
Expand All @@ -40,7 +40,7 @@ class RetryTest extends Test:
"nok" in run {
var calls = 0
Abort.run[Exception] {
Retry[Exception](_.limit(3)) {
Retry[Exception](Schedule.repeat(3)) {
calls += 1
throw ex
}
Expand All @@ -54,7 +54,7 @@ class RetryTest extends Test:
var calls = 0
val start = java.lang.System.currentTimeMillis()
Abort.run[Exception] {
Retry[Exception](_.limit(4).exponential(1.milli)) {
Retry[Exception](Schedule.exponentialBackoff(1.milli, 2.0, Duration.Infinity).take(4)) {
calls += 1
throw ex
}
Expand Down
4 changes: 4 additions & 0 deletions kyo-data/shared/src/main/scala/kyo/Duration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ object Duration:
val sum: Long = self.toLong + that.toLong
if sum >= 0 then sum else Duration.Infinity

inline infix def -(that: Duration): Duration =
val diff: Long = self.toLong - that.toLong
if diff > 0 then diff else Duration.Zero

inline infix def *(factor: Double): Duration =
if factor <= 0 || self.toLong <= 0L then Duration.Zero
else if factor <= Long.MaxValue / self.toLong.toDouble then Math.round(self.toLong.toDouble * factor)
Expand Down
18 changes: 18 additions & 0 deletions kyo-data/shared/src/main/scala/kyo/Instant.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ object Instant:
def truncatedTo(unit: Duration.Units & Duration.Truncatable): Instant =
instant.truncatedTo(unit.chronoUnit)

/** Returns the minimum of this Instant and another.
*
* @param other
* The other Instant to compare with.
* @return
* The earlier of the two Instants.
*/
infix def min(other: Instant): Instant = if instant.isBefore(other) then instant else other

/** Returns the maximum of this Instant and another.
*
* @param other
* The other Instant to compare with.
* @return
* The later of the two Instants.
*/
infix def max(other: Instant): Instant = if instant.isAfter(other) then instant else other

/** Converts this Instant to a human-readable ISO-8601 formatted string.
*
* @return
Expand Down
Loading

0 comments on commit 5ac1a86

Please sign in to comment.