Skip to content

Commit

Permalink
[scheduler] scala native support
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Nov 26, 2024
1 parent 70f91b4 commit f362727
Show file tree
Hide file tree
Showing 35 changed files with 51 additions and 19 deletions.
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ lazy val kyoNative = project
)

lazy val `kyo-scheduler` =
crossProject(JSPlatform, JVMPlatform)
crossProject(JSPlatform, JVMPlatform, NativePlatform)
.withoutSuffixFor(JVMPlatform)
.crossType(CrossType.Full)
.dependsOn(`kyo-stats-registry`)
Expand All @@ -154,6 +154,7 @@ lazy val `kyo-scheduler` =
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.5.12" % Test
)
.jvmSettings(mimaCheck(false))
.nativeSettings(`native-settings`)
.jsSettings(
`js-settings`,
libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % "1.1.1"
Expand Down Expand Up @@ -241,7 +242,7 @@ lazy val `kyo-direct` =
.jsSettings(`js-settings`)

lazy val `kyo-stats-registry` =
crossProject(JSPlatform, JVMPlatform)
crossProject(JSPlatform, JVMPlatform, NativePlatform)
.withoutSuffixFor(JVMPlatform)
.crossType(CrossType.Full)
.in(file("kyo-stats-registry"))
Expand All @@ -253,6 +254,7 @@ lazy val `kyo-stats-registry` =
crossScalaVersions := List(scala3Version, scala212Version, scala213Version)
)
.jvmSettings(mimaCheck(false))
.nativeSettings(`native-settings`)
.jsSettings(`js-settings`)

lazy val `kyo-stats-otel` =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Worker.State
import java.lang.invoke.MethodHandles
import java.lang.invoke.VarHandle
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.LongAdder
import kyo.scheduler.top.WorkerStatus
import scala.annotation.nowarn
Expand Down Expand Up @@ -92,7 +93,8 @@ abstract private class Worker(

val a1, a2, a3, a4, a5, a6, a7 = 0L // padding

@volatile private var state: State = State.Idle
private val state = new AtomicReference[State](State.Idle)

@volatile private var mount: Thread = null

val b1, b2, b3, b4, b5, b6, b7 = 0L // padding
Expand Down Expand Up @@ -125,7 +127,7 @@ abstract private class Worker(
* arrives for an idle worker.
*/
def wakeup() = {
if ((state eq State.Idle) && stateHandle.compareAndSet(this, State.Idle, State.Running))
if ((state.get() eq State.Idle) && state.compareAndSet(State.Idle, State.Running))
exec.execute(this)
}

Expand Down Expand Up @@ -163,9 +165,9 @@ abstract private class Worker(
* If checks fail while Running, transitions to Stalled and drains queue. Used by scheduler to skip workers that can't make progress.
*/
def checkAvailability(nowMs: Long): Boolean = {
val state = this.state
val available = !checkStalling(nowMs) && (state ne State.Stalled) && !isBlocked()
if (!available && (state eq State.Running) && stateHandle.compareAndSet(this, State.Running, State.Stalled))
val st = this.state.get()
val available = !checkStalling(nowMs) && (st ne State.Stalled) && !isBlocked()
if (!available && (st eq State.Running) && state.compareAndSet(State.Running, State.Stalled))
drain()
available
}
Expand Down Expand Up @@ -199,7 +201,7 @@ abstract private class Worker(

while (true) {
// Mark worker as actively running
state = State.Running
state.set(State.Running)

if (task eq null)
// Try to get a task from our own queue first
Expand Down Expand Up @@ -227,8 +229,8 @@ abstract private class Worker(
}
} else {
// No tasks available - prepare to go idle
state = State.Idle
if (queue.isEmpty() || !stateHandle.compareAndSet(this, State.Idle, State.Running)) {
state.set(State.Idle)
if (queue.isEmpty() || !state.compareAndSet(State.Idle, State.Running)) {
// Either queue is empty or another thread changed our state
// Clean up and exit
mount = null
Expand All @@ -239,7 +241,7 @@ abstract private class Worker(

// Check if we should stop processing tasks
if (shouldStop()) {
state = State.Idle
state.set(State.Idle)
// Reschedule current task if we have one
if (task ne null) schedule(task)
// Drain remaining tasks from queue
Expand Down Expand Up @@ -320,10 +322,6 @@ private object Worker {

private[Worker] object internal {

val stateHandle: VarHandle =
MethodHandles.privateLookupIn(classOf[Worker], MethodHandles.lookup())
.findVarHandle(classOf[Worker], "state", classOf[State])

val local = new ThreadLocal[Worker]

def setCurrent(worker: Worker): Unit =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package kyo

import java.util.logging.*
import kyo.stats.internal.StatsRegistry

package object scheduler {

private[scheduler] val log = Logger.getLogger("kyo.scheduler")

private[scheduler] def statsScope =
StatsRegistry.scope("kyo", "scheduler")

private[scheduler] def bug(msg: String, ex: Throwable) =
log.log(Level.SEVERE, s"🙈 !!Kyo Scheduler Bug!!", new Exception(msg, ex))
new Exception(s"🙈 !!Kyo Scheduler Bug!! " + msg, ex).printStackTrace()

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package kyo.scheduler.top

import java.lang.management.ManagementFactory
import javax.management.MBeanServer
import javax.management.ObjectName
import javax.management.StandardMBean
import kyo.scheduler.InternalTimer
import scala.concurrent.duration.*

class Reporter(
status: () => Status,
enableTopJMX: Boolean,
enableTopConsoleMs: Int,
timer: InternalTimer
) extends TopMBean {

def getStatus() = status()

def close(): Unit = ()
}

trait TopMBean {
def getStatus(): Status
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package kyo.scheduler.util

import java.util.concurrent.Executor
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.logging.Logger
import scala.util.control.NonFatal

object LoomSupport {
def tryVirtualize(enabled: Boolean, exec: Executor): Executor = exec
}

0 comments on commit f362727

Please sign in to comment.