Skip to content

Commit

Permalink
[core] STM effect (#878)
Browse files Browse the repository at this point in the history
See scaladocs for more info
  • Loading branch information
fwbrasil authored Dec 2, 2024
1 parent e4aac39 commit b09199e
Show file tree
Hide file tree
Showing 17 changed files with 2,179 additions and 4 deletions.
12 changes: 12 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ lazy val kyoJVM = project
`kyo-prelude`.jvm,
`kyo-core`.jvm,
`kyo-direct`.jvm,
`kyo-stm`.jvm,
`kyo-stats-registry`.jvm,
`kyo-stats-otel`.jvm,
`kyo-cache`.jvm,
Expand Down Expand Up @@ -120,6 +121,7 @@ lazy val kyoJS = project
`kyo-prelude`.js,
`kyo-core`.js,
`kyo-direct`.js,
`kyo-stm`.js,
`kyo-stats-registry`.js,
`kyo-sttp`.js,
`kyo-test`.js,
Expand Down Expand Up @@ -250,6 +252,16 @@ lazy val `kyo-direct` =
.nativeSettings(`native-settings`)
.jsSettings(`js-settings`)

lazy val `kyo-stm` =
crossProject(JSPlatform, JVMPlatform)
.withoutSuffixFor(JVMPlatform)
.crossType(CrossType.Full)
.in(file("kyo-stm"))
.dependsOn(`kyo-core`)
.settings(`kyo-settings`)
.jvmSettings(mimaCheck(false))
.jsSettings(`js-settings`)

lazy val `kyo-stats-registry` =
crossProject(JSPlatform, JVMPlatform, NativePlatform)
.withoutSuffixFor(JVMPlatform)
Expand Down
4 changes: 2 additions & 2 deletions kyo-core/shared/src/test/scala/kyo/FiberTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ class FiberTest extends Test:
adder <- LongAdder.init
result <-
Fiber.race(Seq(
Async.delay(15.millis)(adder.increment.andThen(24)),
Async.delay(5.millis)((adder.increment.andThen(42)))
Async.delay(1.second)(adder.increment.andThen(24)),
Async.delay(1.millis)((adder.increment.andThen(42)))
)).map(_.get)
_ <- Async.sleep(50.millis)
executed <- adder.get
Expand Down
6 changes: 5 additions & 1 deletion kyo-core/shared/src/test/scala/kyo/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ abstract class Test extends AsyncFreeSpec with BaseKyoTest[Abort[Any] & Async &

def run(v: Future[Assertion] < (Abort[Any] & Async & Resource)): Future[Assertion] =
import AllowUnsafe.embrace.danger
val a = Async.run(Abort.run(Resource.run(v)).map(_.fold(e => throw new IllegalStateException(s"Test aborted with $e"))(identity)))
val a = Async.run(Abort.run(Resource.run(v)).map(_.fold {
_.getFailure match
case ex: Throwable => throw ex
case e => throw new IllegalStateException(s"Test aborted with $e")
}(identity)))
val b = a.map(_.toFuture).map(_.flatten)
IO.Unsafe.run(b).eval
end run
Expand Down
39 changes: 39 additions & 0 deletions kyo-prelude/shared/src/main/scala/kyo/Kyo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,45 @@ object Kyo:
end match
end collectDiscard

/** Finds the first element in a sequence that satisfies a predicate.
*
* @param seq
* The input sequence
* @param f
* The effect-producing predicate function
* @return
* A new effect that produces Maybe of the first matching element
*/
def findFirst[A, B, S](seq: Seq[A])(f: Safepoint ?=> A => Maybe[B] < S)(using Frame, Safepoint): Maybe[B] < S =
seq.knownSize match
case 0 => Maybe.empty
case 1 => f(seq(0))
case _ =>
seq match
case seq: List[A] =>
Loop(seq) { seq =>
seq match
case Nil => Loop.done(Maybe.empty)
case head :: tail =>
f(head).map {
case Absent => Loop.continue(tail)
case Present(v) => Loop.done(Maybe(v))
}
}
case seq =>
val indexed = toIndexed(seq)
val size = indexed.size
Loop.indexed { idx =>
if idx == size then Loop.done(Maybe.empty)
else
f(indexed(idx)).map {
case Absent => Loop.continue
case Present(v) => Loop.done(Maybe(v))
}
}
end match
end findFirst

/** Takes elements from a sequence while a predicate holds true.
*
* @param seq
Expand Down
17 changes: 16 additions & 1 deletion kyo-prelude/shared/src/main/scala/kyo/Var.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ object Var:
inline def set[V](inline value: V)(using inline tag: Tag[Var[V]], inline frame: Frame): V < Var[V] =
ArrowEffect.suspend[Unit](tag, value: Op[V])

/** Sets a new value and then executes another computation.
*
* @param value
* The new value to set in the Var
* @param f
* The computation to execute after setting the value
* @return
* The result of the computation after setting the new value
*/
private[kyo] inline def setAndThen[V, A, S](inline value: V)(inline f: => A < S)(using
inline tag: Tag[Var[V]],
inline frame: Frame
): A < (Var[V] & S) =
ArrowEffect.suspendAndMap[Unit](tag, value: Op[V])(_ => f)

/** Sets a new value and returns `Unit`.
*
* @param value
Expand Down Expand Up @@ -98,7 +113,7 @@ object Var:
inline def updateDiscard[V](inline f: V => V)(using inline tag: Tag[Var[V]], inline frame: Frame): Unit < Var[V] =
ArrowEffect.suspendAndMap[Unit](tag, (v => f(v)): Update[V])(_ => ())

private inline def runWith[V, A: Flat, S, B, S2](state: V)(v: A < (Var[V] & S))(
private[kyo] inline def runWith[V, A: Flat, S, B, S2](state: V)(v: A < (Var[V] & S))(
inline f: (V, A) => B < S2
)(using inline tag: Tag[Var[V]], inline frame: Frame): B < (S & S2) =
ArrowEffect.handleState(tag, state, v)(
Expand Down
57 changes: 57 additions & 0 deletions kyo-prelude/shared/src/test/scala/kyo/KyoTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,61 @@ class KyoTest extends Test:
assert(result.eval == 10)
}
}

"findFirst" - {
"empty sequence" in {
assert(Kyo.findFirst(Seq.empty[Int])(v => Maybe(v)).eval == Maybe.empty)
}

"single element - found" in {
assert(Kyo.findFirst(Seq(1))(v => Maybe(v)).eval == Maybe(1))
}

"single element - not found" in {
assert(Kyo.findFirst(Seq(1))(v => Maybe.empty).eval == Maybe.empty)
}

"multiple elements - first match" in {
assert(Kyo.findFirst(Seq(1, 2, 3))(v => if v > 0 then Maybe(v) else Maybe.empty).eval == Maybe(1))
}

"multiple elements - middle match" in {
assert(Kyo.findFirst(Seq(1, 2, 3))(v => if v == 2 then Maybe(v) else Maybe.empty).eval == Maybe(2))
}

"multiple elements - no match" in {
assert(Kyo.findFirst(Seq(1, 2, 3))(v => if v > 5 then Maybe(v) else Maybe.empty).eval == Maybe.empty)
}

"works with effects" in {
var count = 0
val result = Env.run(42)(
Kyo.findFirst(Seq(1, 2, 3)) { v =>
Env.use[Int] { env =>
count += 1
if v == env then Maybe(v) else Maybe.empty
}
}
).eval
assert(result == Maybe.empty)
assert(count == 3)
}

"short circuits" in {
var count = 0
val result = Kyo.findFirst(Seq(1, 2, 3, 4, 5)) { v =>
count += 1
if v == 2 then Maybe(v) else Maybe.empty
}.eval
assert(result == Maybe(2))
assert(count == 2)
}

"works with different sequence types" in {
val pred = (v: Int) => if v == 2 then Maybe(v) else Maybe.empty
assert(Kyo.findFirst(List(1, 2, 3))(pred).eval == Maybe(2))
assert(Kyo.findFirst(Vector(1, 2, 3))(pred).eval == Maybe(2))
assert(Kyo.findFirst(Chunk(1, 2, 3))(pred).eval == Maybe(2))
}
}
end KyoTest
7 changes: 7 additions & 0 deletions kyo-prelude/shared/src/test/scala/kyo/VarTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ class VarTest extends Test:
assert(r == (2, 3))
}

"setAndThen" in {
val result = Var.run(1) {
Var.setAndThen(2)(Var.use[Int](_ * 2))
}.eval
assert(result == 4)
}

"scope" - {
"should not affect the outer state" in {
val result = Var.run(42)(
Expand Down
40 changes: 40 additions & 0 deletions kyo-stm/shared/src/main/scala/kyo/RefLog.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package kyo

/** A log of transactional operations performed on TRefs within an STM transaction.
*
* RefLog maintains a mapping from transactional references to their pending read/write operations within a transaction. It tracks both
* read entries (which record the version of data read) and write entries (which contain the new values to be committed).
*
* This type is used internally by the STM implementation and should not be accessed directly by application code.
*
* @note
* This is a private implementation detail of the STM system
*/
opaque type RefLog = Map[TRef[Any], RefLog.Entry[Any]]

private[kyo] object RefLog:

given Tag[RefLog] = Tag[Map[TRef[Any], Entry[Any]]]

val empty: RefLog = Map.empty

extension (self: RefLog)

def put[A](ref: TRef[A], entry: Entry[A]): RefLog =
self.updated(ref.asInstanceOf[TRef[Any]], entry.asInstanceOf[Entry[Any]])

def get[A](ref: TRef[A]): Maybe[Entry[A]] =
val refAny = ref.asInstanceOf[TRef[Any]]
Maybe.when(self.contains(refAny))(self(refAny).asInstanceOf[Entry[A]])

def toSeq: Seq[(TRef[Any], Entry[Any])] =
self.toSeq
end extension

sealed abstract class Entry[A]:
def tid: Long
def value: A

case class Read[A](tid: Long, value: A) extends Entry[A]
case class Write[A](tid: Long, value: A) extends Entry[A]
end RefLog
148 changes: 148 additions & 0 deletions kyo-stm/shared/src/main/scala/kyo/STM.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package kyo

import scala.annotation.tailrec

/** A FailedTransaction exception that is thrown when a transaction fails to commit. Contains the frame where the failure occurred.
*/
case class FailedTransaction(frame: Frame) extends Exception(frame.position.show)

/** Software Transactional Memory (STM) provides concurrent access to shared state using optimistic locking. Rather than acquiring locks
* upfront, transactions execute speculatively and automatically retry if conflicts are detected during commit. While this enables better
* composability than manual locking, applications must be designed to handle potentially frequent transaction retries.
*
* > IMPORTANT: Transactions are atomic, isolated, and composable but may retry multiple times before success. Side effects (like I/O)
* inside transactions must be used with caution as they will be re-executed on retry. Pure operations that only modify transactional
* references are safe and encouraged, while external side effects should be performed after the transaction commits.
*
* The core operations are:
* - TRef.init and TRef.initNow create transactional references that can be shared between threads
* - TRef.get and TRef.set read and modify references within transactions
* - STM.run executes transactions that either fully commit or rollback
* - STM.retry and STM.retryIf provide manual control over transaction retry behavior
* - Configurable retry schedules via STM.run's retrySchedule parameter
*
* The implementation uses optimistic execution with lock-based validation during commit:
* - Transactions execute without acquiring locks, tracking reads and writes in a local log
* - During commit, read-write locks are acquired on affected TRefs to ensure consistency:
* - Multiple readers can hold shared locks on a TRef during commit
* - Writers require an exclusive lock during commit
* - No global locks are used - operations on different refs can commit independently
* - Lock acquisition is ordered by TRef identity to prevent deadlocks
* - Early conflict detection aborts transactions that would fail validation
*
* STM is most effective for operations that rarely conflict and complete quickly. Long-running transactions or high contention scenarios
* may face performance challenges from repeated retries. The approach particularly excels at read-heavy workloads due to its support for
* concurrent readers, while write-heavy workloads may experience more contention due to the need for exclusive write access. The
* fine-grained locking strategy means that transactions only conflict if they actually touch the same references, allowing for high
* concurrency when different transactions operate on different refs.
*/
opaque type STM <: (Var[RefLog] & Abort[FailedTransaction] & Async) =
Var[RefLog] & Abort[FailedTransaction] & Async

object STM:

/** The default retry schedule for failed transactions */
val defaultRetrySchedule = Schedule.fixed(1.millis * 0.5).take(20)

/** Forces a transaction retry by aborting the current transaction and rolling back all changes. This is useful when a transaction
* detects that it cannot proceed due to invalid state.
*
* @return
* Nothing, as this operation always aborts the transaction
*/
def retry(using frame: Frame): Nothing < STM = Abort.fail(FailedTransaction(frame))

/** Conditionally retries a transaction based on a boolean condition. If the condition is true, the transaction will be retried.
* Otherwise, execution continues normally.
*
* @param cond
* The condition that determines whether to retry
*/
def retryIf(cond: Boolean)(using frame: Frame): Unit < STM = Abort.when(cond)(FailedTransaction(frame))

/** Executes a transactional computation with explicit state isolation. This version of run supports additional effects beyond Abort and
* Async through the provided isolate, which ensures proper state management during transaction retries and rollbacks.
*
* @param isolate
* The isolation scope for the transaction
* @param retrySchedule
* The schedule for retrying failed transactions
* @param v
* The transactional computation to run
* @return
* The result of the computation if successful
*/
def run[E, A: Flat, S](isolate: Isolate[S], retrySchedule: Schedule = defaultRetrySchedule)(v: A < (STM & Abort[E] & Async & S))(
using frame: Frame
): A < (S & Async & Abort[E | FailedTransaction]) =
isolate.use { st =>
run(retrySchedule)(isolate.resume(st, v)).map(isolate.restore(_, _))
}

/** Executes a transactional computation with default retry behavior. This version only supports Abort and Async effects within the
* transaction, but provides a simpler interface when additional effect isolation is not needed.
*
* @param v
* The transactional computation to run
* @return
* The result of the computation if successful
*/
def run[E, A: Flat](v: A < (STM & Abort[E] & Async))(using frame: Frame): A < (Async & Abort[E | FailedTransaction]) =
run(defaultRetrySchedule)(v)

/** Executes a transactional computation with custom retry behavior. Like the version above, this only supports Abort and Async effects
* but allows configuring how transaction conflicts are retried.
*
* @param retrySchedule
* The schedule for retrying failed transactions
* @param v
* The transactional computation to run
* @return
* The result of the computation if successful
*/
def run[E, A: Flat](retrySchedule: Schedule)(v: A < (STM & Abort[E] & Async))(
using frame: Frame
): A < (Async & Abort[E | FailedTransaction]) =
TID.use {
case -1L =>
// New transaction without a parent, use regular commit flow
Retry[FailedTransaction](retrySchedule) {
TID.useNew { tid =>
Var.runWith(RefLog.empty)(v) { (log, result) =>
IO.Unsafe {
// Attempt to acquire locks and commit the transaction
val (locked, unlocked) =
// Sort references by identity to prevent deadlocks
log.toSeq.sortBy((ref, _) => ref.hashCode)
.span((ref, entry) => ref.lock(entry))

if unlocked.nonEmpty then
// Failed to acquire some locks - rollback and retry
locked.foreach((ref, entry) => ref.unlock(entry))
Abort.fail(FailedTransaction(frame))
else
// Successfully locked all references - commit changes
locked.foreach((ref, entry) => ref.commit(tid, entry))
// Release all locks
locked.foreach((ref, entry) => ref.unlock(entry))
result
end if
}
}
}
}
case parent =>
// Nested transaction inherits parent's transaction context but isolates RefLog.
// On success: changes propagate to parent. On failure: changes are rolled back
// without affecting parent's state.
val result = Var.isolate.update[RefLog].run(v)

// Can't return `result` directly since it has a pending STM effect
// but it's safe to cast because, if there's a parent transaction,
// then there's a frame upper in the stack that will handle the
// STM effect in the parent transaction's `run`.
result.asInstanceOf[A < (Async & Abort[E | FailedTransaction])]
}

end run
end STM
Loading

0 comments on commit b09199e

Please sign in to comment.