Skip to content

Commit

Permalink
introduce KyoException
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Dec 13, 2024
1 parent d093cee commit 54e30aa
Show file tree
Hide file tree
Showing 18 changed files with 90 additions and 73 deletions.
8 changes: 4 additions & 4 deletions kyo-core/shared/src/main/scala/kyo/Admission.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kyo.scheduler.Scheduler
* @param frame
* The execution frame where the task was rejected
*/
case class Rejected(frame: Frame) extends Exception(frame.position.show)
class Rejected()(using Frame) extends KyoException("Admisssion control has rejected execution to mitigate overloading.")

/** Admission control mechanism that helps prevent system overload by selectively rejecting tasks at the boundary of incoming workload.
*
Expand Down Expand Up @@ -47,7 +47,7 @@ object Admission:
*/
def run[A, S](key: String)(v: A < S)(using frame: Frame): A < (IO & S & Abort[Rejected]) =
IO {
if Scheduler.get.reject(key) then Abort.fail(Rejected(frame))
if Scheduler.get.reject(key) then Abort.fail(Rejected())
else v
}

Expand All @@ -72,7 +72,7 @@ object Admission:
*/
def run[A, S](key: Int)(v: A < S)(using frame: Frame): A < (IO & S & Abort[Rejected]) =
IO {
if Scheduler.get.reject(key) then Abort.fail(Rejected(frame))
if Scheduler.get.reject(key) then Abort.fail(Rejected())
else v
}

Expand All @@ -95,7 +95,7 @@ object Admission:
*/
def run[A, S](v: A < S)(using frame: Frame): A < (IO & S & Abort[Rejected]) =
IO {
if Scheduler.get.reject() then Abort.fail(Rejected(frame))
if Scheduler.get.reject() then Abort.fail(Rejected())
else v
}

Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/main/scala/kyo/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ object Async:
IO.Unsafe {
val sleepFiber = clock.unsafe.sleep(after)
val task = IOTask[Ctx, E | Timeout, A](v, trace, context)
sleepFiber.onComplete(_ => discard(task.interrupt(Result.Fail(Timeout(frame)))))
sleepFiber.onComplete(_ => discard(task.interrupt(Result.Fail(Timeout()))))
task.onComplete(_ => discard(sleepFiber.interrupt()))
Async.get(task)
}
Expand Down
4 changes: 2 additions & 2 deletions kyo-core/shared/src/main/scala/kyo/Closed.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package kyo

import scala.util.control.NoStackTrace

case class Closed(message: String, createdAt: Frame, failedAt: Frame)
extends Exception(s"Resource created at ${createdAt.position.show} is closed. Failure at ${failedAt.position.show}: $message")
class Closed(resource: Text, createdAt: Frame, details: Text = "")(using Frame)
extends KyoException(t"$resource created at ${createdAt.position.show} is closed.", details)
with NoStackTrace
4 changes: 2 additions & 2 deletions kyo-core/shared/src/main/scala/kyo/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class Hub[A] private[kyo] (
* a Maybe containing any remaining elements in the Hub
*/
def close(using frame: Frame): Maybe[Seq[A]] < IO =
fiber.interruptDiscard(Result.Panic(Closed("Hub", initFrame, frame))).andThen {
fiber.interruptDiscard(Result.Panic(Closed("Hub", initFrame))).andThen {
ch.close.map { r =>
IO {
val array = listeners.toArray()
Expand Down Expand Up @@ -111,7 +111,7 @@ class Hub[A] private[kyo] (
* a new Listener
*/
def listen(bufferSize: Int)(using frame: Frame): Listener[A] < (IO & Abort[Closed]) =
def fail = Abort.fail(Closed("Hub", initFrame, frame))
def fail = Abort.fail(Closed("Hub", initFrame))
closed.map {
case true => fail
case false =>
Expand Down
3 changes: 3 additions & 0 deletions kyo-core/shared/src/main/scala/kyo/Interrupt.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package kyo

class Interrupt()(using Frame) extends KyoException("Fiber has been interrupted.")
2 changes: 1 addition & 1 deletion kyo-core/shared/src/main/scala/kyo/Meter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ object Meter:
val st = state.getAndSet(Int.MinValue)
val ok = st != Int.MinValue // The meter wasn't already closed
if ok then
val fail = Result.fail(Closed("Semaphore is closed", initFrame, frame))
val fail = Result.fail(Closed("Meter", initFrame))
// Complete the closed promise to fail new operations
closed.completeDiscard(fail)
// Drain the pending waiters
Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/main/scala/kyo/Queue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ object Queue:
final protected val _closed = AtomicRef.Unsafe.init(Maybe.empty[Result.Error[Closed]])

final def close()(using frame: Frame, allow: AllowUnsafe) =
val fail = Result.Fail(Closed("Queue", initFrame, frame))
val fail = Result.Fail(Closed("Queue", initFrame))
Maybe.when(_closed.compareAndSet(Maybe.empty, Maybe(fail)))(_drain())
end close

Expand Down
6 changes: 3 additions & 3 deletions kyo-core/shared/src/main/scala/kyo/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ object Resource:
case Result.Success(_) => ()
case _ =>
throw new Closed(
"Resource finalizer queue already closed. This may happen if " +
"a background fiber escapes the scope of a 'Resource.run' call.",
"Finalizer",
finalizer.createdAt,
frame
"The finalizer queue is already closed. This may happen if " +
"a background fiber escapes the scope of a 'Resource.run' call."
)
}
}
Expand Down
41 changes: 23 additions & 18 deletions kyo-core/shared/src/main/scala/kyo/System.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,46 +200,51 @@ object System:
* @return
* A Result containing either the parsed value or an error.
*/
def apply(s: String): Result[E, A]
def apply(s: String)(using Frame): Result[E, A]
end Parser

/** Companion object for Parser, containing default implementations. */
object Parser:
given Parser[Nothing, String] = v => Result.success(v)
given Parser[NumberFormatException, Int] = v => Result.catching[NumberFormatException](v.toInt)
given Parser[NumberFormatException, Long] = v => Result.catching[NumberFormatException](v.toLong)
given Parser[NumberFormatException, Float] = v => Result.catching[NumberFormatException](v.toFloat)
given Parser[NumberFormatException, Double] = v => Result.catching[NumberFormatException](v.toDouble)
given Parser[IllegalArgumentException, Boolean] = v => Result.catching[IllegalArgumentException](v.toBoolean)
given Parser[NumberFormatException, Byte] = v => Result.catching[NumberFormatException](v.toByte)
given Parser[NumberFormatException, Short] = v => Result.catching[NumberFormatException](v.toShort)
given Parser[Duration.InvalidDuration, Duration] = v => Duration.parse(v)
def apply[E, A](f: Frame ?=> String => Result[E, A]) =
new Parser[E, A]:
def apply(s: String)(using Frame) = f(s)

given Parser[Nothing, String] = Parser(v => Result.success(v))
given Parser[NumberFormatException, Int] = Parser(v => Result.catching[NumberFormatException](v.toInt))
given Parser[NumberFormatException, Long] = Parser(v => Result.catching[NumberFormatException](v.toLong))
given Parser[NumberFormatException, Float] = Parser(v => Result.catching[NumberFormatException](v.toFloat))
given Parser[NumberFormatException, Double] = Parser(v => Result.catching[NumberFormatException](v.toDouble))
given Parser[IllegalArgumentException, Boolean] = Parser(v => Result.catching[IllegalArgumentException](v.toBoolean))
given Parser[NumberFormatException, Byte] = Parser(v => Result.catching[NumberFormatException](v.toByte))
given Parser[NumberFormatException, Short] = Parser(v => Result.catching[NumberFormatException](v.toShort))
given Parser[Duration.InvalidDuration, Duration] = Parser(v => Duration.parse(v))

given Parser[IllegalArgumentException, java.util.UUID] =
v => Result.catching[IllegalArgumentException](java.util.UUID.fromString(v))
Parser(v => Result.catching[IllegalArgumentException](java.util.UUID.fromString(v)))

given Parser[DateTimeParseException, java.time.LocalDate] =
v => Result.catching[DateTimeParseException](java.time.LocalDate.parse(v))
Parser(v => Result.catching[DateTimeParseException](java.time.LocalDate.parse(v)))

given Parser[DateTimeParseException, java.time.LocalTime] =
v => Result.catching[DateTimeParseException](java.time.LocalTime.parse(v))
Parser(v => Result.catching[DateTimeParseException](java.time.LocalTime.parse(v)))

given Parser[DateTimeParseException, java.time.LocalDateTime] =
v => Result.catching[DateTimeParseException](java.time.LocalDateTime.parse(v))
Parser(v => Result.catching[DateTimeParseException](java.time.LocalDateTime.parse(v)))

given Parser[URISyntaxException, java.net.URI] =
v => Result.catching[URISyntaxException](new java.net.URI(v))
Parser(v => Result.catching[URISyntaxException](new java.net.URI(v)))

given Parser[MalformedURLException, java.net.URL] =
v => Result.catching[MalformedURLException](new java.net.URL(v))
Parser(v => Result.catching[MalformedURLException](new java.net.URL(v)))

given [E, A](using p: Parser[E, A], frame: Frame): Parser[E, Seq[A]] =
v => Result.collect(Chunk.from(v.split(",")).map(v => p(v.trim())))
Parser(v => Result.collect(Chunk.from(v.split(",")).map(v => p(v.trim()))))

given Parser[IllegalArgumentException, Char] =
v =>
Parser { v =>
if v.length() == 1 then Result.success(v(0))
else Result.fail(new IllegalArgumentException("String must have exactly one character"))
}

end Parser

Expand Down
4 changes: 1 addition & 3 deletions kyo-core/shared/src/main/scala/kyo/Timeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,4 @@ package kyo

import scala.util.control.NoStackTrace

case class Timeout(frame: Frame)
extends Exception(frame.position.show)
with NoStackTrace
class Timeout()(using Frame) extends KyoException(t"Computation has timed out.")
6 changes: 2 additions & 4 deletions kyo-core/shared/src/main/scala/kyo/scheduler/IOPromise.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[kyo] class IOPromise[+E, +A](init: State[E, A]) extends Safepoint.Interc
case l: Linked[E, A] @unchecked =>
interruptsLoop(l.p)
case _ =>
try discard(other.interrupt(Result.Panic(Interrupt(frame))))
try discard(other.interrupt(Result.Panic(Interrupt())))
catch
case ex if NonFatal(ex) =>
import AllowUnsafe.embrace.danger
Expand Down Expand Up @@ -194,7 +194,7 @@ private[kyo] class IOPromise[+E, +A](init: State[E, A]) extends Safepoint.Interc
import kyo.AllowUnsafe.embrace.danger
if isNull(result) then
if deadline.isOverdue() then
return Result.fail(Timeout(frame))
return Result.fail(Timeout())
val timeLeft = deadline.timeLeft()
if !timeLeft.isFinite then
LockSupport.park(this)
Expand Down Expand Up @@ -228,8 +228,6 @@ end IOPromise

private[kyo] object IOPromise:

case class Interrupt(origin: Frame) extends Exception with NoStackTrace

type State[+E, +A] = Result[E, A] | Pending[E, A] | Linked[E, A]

case class Linked[+E, +A](p: IOPromise[E, A])
Expand Down
8 changes: 4 additions & 4 deletions kyo-core/shared/src/test/scala/kyo/AsyncTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class AsyncTest extends Test:
.pipe(Async.runAndBlock(Duration.Infinity))
.pipe(Abort.run[Timeout](_))
.map {
case Result.Fail(Timeout(_)) => succeed
case Result.Fail(_: Timeout) => succeed
case v => fail(v.toString())
}
}
Expand All @@ -82,7 +82,7 @@ class AsyncTest extends Test:
.pipe(Async.runAndBlock(10.millis))
.pipe(Abort.run[Timeout](_))
.map {
case Result.Fail(Timeout(_)) => succeed
case Result.Fail(_: Timeout) => succeed
case v => fail(v.toString())
}
}
Expand All @@ -92,7 +92,7 @@ class AsyncTest extends Test:
.pipe(Async.runAndBlock(10.millis))
.pipe(Abort.run[Timeout](_))
.map {
case Result.Fail(Timeout(_)) => succeed
case Result.Fail(_: Timeout) => succeed
case v => fail(v.toString())
}
}
Expand Down Expand Up @@ -520,7 +520,7 @@ class AsyncTest extends Test:
yield value

Abort.run[Timeout](result).map {
case Result.Fail(Timeout(_)) => succeed
case Result.Fail(_: Timeout) => succeed
case other => fail(s"Expected Timeout, got $other")
}
}
Expand Down
6 changes: 3 additions & 3 deletions kyo-core/shared/src/test/scala/kyo/ChannelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,9 @@ class ChannelTest extends Test:
stream = c.stream().mapChunk(ch => Chunk(ch))
v <- Abort.run(stream.run)
yield v match
case Result.Success(v) => fail(s"Stream succeeded unexpectedly: ${v}")
case Result.Fail(Closed(_, _, _)) => assert(true)
case Result.Panic(ex) => fail(s"Stream panicked unexpectedly: ${ex}")
case Result.Success(v) => fail(s"Stream succeeded unexpectedly: ${v}")
case Result.Fail(_: Closed) => assert(true)
case Result.Panic(ex) => fail(s"Stream panicked unexpectedly: ${ex}")
}
}

Expand Down
4 changes: 2 additions & 2 deletions kyo-data/shared/src/main/scala/kyo/Duration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object Duration:
inline given Flat[Duration] = Flat.unsafe.bypass

/** Exception thrown for invalid duration parsing. */
case class InvalidDuration(message: String) extends Exception(message)
class InvalidDuration(message: Text)(using Frame) extends KyoException(message)

/** Parses a string representation of a duration.
*
Expand All @@ -29,7 +29,7 @@ object Duration:
* @return
* A Result containing either the parsed Duration or an InvalidDuration error
*/
def parse(s: String): Result[InvalidDuration, Duration] =
def parse(s: String)(using Frame): Result[InvalidDuration, Duration] =
val pattern = """(\d+)\s*([a-zA-Z]+)""".r
s.trim.toLowerCase match
case "infinity" | "inf" => Result.success(Infinity)
Expand Down
28 changes: 28 additions & 0 deletions kyo-data/shared/src/main/scala/kyo/KyoException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package kyo

import kyo.*
import kyo.Ansi.*
import scala.annotation.targetName
import scala.util.control.NoStackTrace

class KyoException private[kyo] (
message: Text = null,
cause: Text | Throwable = null
)(using val frame: Frame) extends NoStackTrace:

override def getCause(): Throwable =
cause match
case cause: Throwable => cause
case _ => null

override def getMessage(): String =
val detail =
cause match
case _: Throwable => Absent
case cause: Text @unchecked => Maybe(cause)

val msg = frame.render(("⚠️ KyoException".red.bold :: Maybe(message).toList ::: detail.toList).map(_.show)*)
s"\n$msg\n"
end getMessage

end KyoException
7 changes: 2 additions & 5 deletions kyo-prelude/shared/src/main/scala/kyo/Parse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object Parse:
Var.use[State](s => fail(Seq(s), message))

private def fail(states: Seq[State], message: String)(using frame: Frame): Nothing < Abort[ParseFailed] =
Abort.fail(ParseFailed(frame, states, message))
Abort.fail(ParseFailed(states, message))

/** Reads and consumes characters from the input as long as they satisfy the given predicate.
*
Expand Down Expand Up @@ -915,7 +915,4 @@ object Parse:

end Parse

case class ParseFailed(frame: Frame, states: Seq[Parse.State], message: String) extends Exception with Serializable:

override def getMessage() = frame.render("Parse failed! ".red.bold + message, states)
end ParseFailed
case class ParseFailed(states: Seq[Parse.State], message: String)(using Frame) extends KyoException(message, Render.asText(states))
14 changes: 6 additions & 8 deletions kyo-stm/shared/src/main/scala/kyo/STM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.util.control.NoStackTrace

/** A FailedTransaction exception that is thrown when a transaction fails to commit. Contains the frame where the failure occurred.
*/
case class FailedTransaction(frame: Frame) extends Exception(frame.position.show) with NoStackTrace
class FailedTransaction()(using Frame) extends KyoException

/** Software Transactional Memory (STM) provides concurrent access to shared state using optimistic locking. Rather than acquiring locks
* upfront, transactions execute speculatively and automatically retry if conflicts are detected during commit. While this enables better
Expand Down Expand Up @@ -53,15 +53,15 @@ object STM:
* @return
* Nothing, as this operation always aborts the transaction
*/
def retry(using frame: Frame): Nothing < STM = Abort.fail(FailedTransaction(frame))
def retry(using Frame): Nothing < STM = Abort.fail(FailedTransaction())

/** Conditionally retries a transaction based on a boolean condition. If the condition is true, the transaction will be retried.
* Otherwise, execution continues normally.
*
* @param cond
* The condition that determines whether to retry
*/
def retryIf(cond: Boolean)(using frame: Frame): Unit < STM = Abort.when(cond)(FailedTransaction(frame))
def retryIf(cond: Boolean)(using Frame): Unit < STM = Abort.when(cond)(FailedTransaction())

/** Executes a transactional computation with explicit state isolation. This version of run supports additional effects beyond Abort and
* Async through the provided isolate, which ensures proper state management during transaction retries and rollbacks.
Expand Down Expand Up @@ -103,9 +103,7 @@ object STM:
* @return
* The result of the computation if successful
*/
def run[E, A: Flat](retrySchedule: Schedule)(v: A < (STM & Abort[E] & Async))(
using frame: Frame
): A < (Async & Abort[E | FailedTransaction]) =
def run[E, A: Flat](retrySchedule: Schedule)(v: A < (STM & Abort[E] & Async))(using Frame): A < (Async & Abort[E | FailedTransaction]) =
TID.use {
case -1L =>
// New transaction without a parent, use regular commit flow
Expand All @@ -128,7 +126,7 @@ object STM:
ref.unlock(entry)
result
else
Abort.fail(FailedTransaction(frame))
Abort.fail(FailedTransaction())
end if
}
case size =>
Expand All @@ -138,7 +136,7 @@ object STM:
val array = new Array[Any](size * 2)

try
def fail = throw new FailedTransaction(frame)
def fail = throw new FailedTransaction()

var i = 0
// Pre-validate and dump the log to the flat array
Expand Down
Loading

0 comments on commit 54e30aa

Please sign in to comment.