Skip to content

Commit

Permalink
rx (feature): Support Scala Native (#3506)
Browse files Browse the repository at this point in the history
- **Implement rx-native**
- **Support Scala Native**
- TODO: #3507
  • Loading branch information
xerial authored Apr 23, 2024
1 parent 689ef45 commit 77e7729
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.airframe.rx
import java.util.concurrent.LinkedBlockingQueue

/**
* Blocking queue implementation for supporting gRPC streaming with Rx
*/
class RxBlockingQueue[A] extends RxSource[A]:
override def parents: Seq[Rx[_]] = Seq.empty

private val blockingQueue = new LinkedBlockingQueue[RxEvent]()

override def add(event: RxEvent): Unit =
blockingQueue.add(event)
override def next: RxEvent =
blockingQueue.take()
91 changes: 81 additions & 10 deletions airframe-rx/.native/src/main/scala-3/wvlet/airframe/rx/compat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,94 @@
package wvlet.airframe.rx

import scala.util.Try
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Promise}
import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}

import scala.scalanative.posix.timer.timer_create
import scala.scalanative.posix.time.*
import scala.scalanative.posix.signal.*
import scala.scalanative.posix.sys.types.timer_t
import scala.scalanative.unsafe.*

/**
*/
object compat {
def defaultExecutionContext: scala.concurrent.ExecutionContext = ???
object compat:

private def newDaemonThreadFactory(name: String): ThreadFactory = new ThreadFactory:
private val group: ThreadGroup = new ThreadGroup(Thread.currentThread().getThreadGroup(), name)
private val threadNumber = new AtomicInteger(1)

override def newThread(r: Runnable): Thread =
val threadName = s"${name}-${threadNumber.getAndIncrement()}"
val thread = new Thread(group, r, threadName)
thread.setName(threadName)
thread.setDaemon(true)
thread

private lazy val defaultExecutor =
ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(newDaemonThreadFactory("airframe-rx")))

def defaultExecutionContext: scala.concurrent.ExecutionContext = defaultExecutor

def newTimer: Timer = ???
// Zone {
// val timerId: Ptr[timer_t] = alloc[timer_t]()
// val se: sigevent = stackalloc[sigevent]()
// se._1 = SIGEV_THREAD
//// sigev_notify = SIGEV_THREAD,
//// sigev_signo = SIGALRM,
//// sigev_value = sigval(0),
//// sigev_notify_thread_id = pthread_self()
//// )
//// timer_create is not yet implemented in Scala Native
// // timer_create(CLOCK_REALTIME, se, timerId)

def scheduleOnce[U](delayMills: Long)(body: => U): Cancelable = ???
def scheduleOnce[U](delayMills: Long)(body: => U): Cancelable =
val thread = Executors.newScheduledThreadPool(1)
val schedule = thread.schedule(
new Runnable:
override def run(): Unit =
body
,
delayMills,
TimeUnit.MILLISECONDS
)
// Immediately start the thread pool shutdown to avoid thread leak
thread.shutdown()
Cancelable { () =>
try
schedule.cancel(false)
finally
thread.shutdown()
}

def toSeq[A](rx: Rx[A]): Seq[A] = {
throw new UnsupportedOperationException("Rx.toSeq is unsupported in Scala.native")
}
def toSeq[A](rx: Rx[A]): Seq[A] =
val ready = new AtomicBoolean(true)
val s = Seq.newBuilder[A]
var c = Cancelable.empty
c = RxRunner.run(rx) {
case OnNext(v) => s += v.asInstanceOf[A]
case OnError(e) =>
c.cancel
throw e
case OnCompletion =>
c.cancel
ready.set(true)
}
while !ready.get() do {}
s.result()

private[rx] def await[A](rx: RxOps[A]): A = {
throw new UnsupportedOperationException("Rx.await is unsupported in Scala.native")
}
}
private[rx] def await[A](rx: RxOps[A]): A =
val p = Promise[A]()
val c = RxRunner.runOnce(rx) {
case OnNext(v) => p.success(v.asInstanceOf[A])
case OnError(e) => p.failure(e)
case OnCompletion => p.failure(new IllegalStateException(s"OnCompletion should not be issued in: ${rx}"))
}
try
Await.result(p.future, Duration.Inf)
finally
c.cancel
15 changes: 9 additions & 6 deletions airframe-rx/src/test/scala/wvlet/airframe/rx/IntervalTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import wvlet.airspec.AirSpec
/**
*/
class IntervalTest extends AirSpec {
private def pendingScalaJS = if (isScalaJS) {
private def pendingInScalaJSAndScalaNative = if (isScalaJS) {
pending("Async test is required")
} else if (isScalaNative) {
pending("Timer is not yet supported in Scala Native")
}

test("timeIntervalMillis") {
pendingScalaJS
pendingInScalaJSAndScalaNative

val counter = new AtomicInteger(0)
val rx = Rx
Expand Down Expand Up @@ -53,7 +56,7 @@ class IntervalTest extends AirSpec {
}

test("timer/delay") {
pendingScalaJS
pendingInScalaJSAndScalaNative
val counter = new AtomicInteger(0)
val rx = Rx
.delay(1, TimeUnit.MILLISECONDS)
Expand All @@ -78,7 +81,7 @@ class IntervalTest extends AirSpec {
}

test("throttleFirst") {
pendingScalaJS
pendingInScalaJSAndScalaNative
val rx = Rx
.sequence(1, 2, 3, 4, 5, 6)
.throttleFirst(10000, TimeUnit.MILLISECONDS)
Expand All @@ -96,7 +99,7 @@ class IntervalTest extends AirSpec {
}

test("throttleLast") {
pendingScalaJS
pendingInScalaJSAndScalaNative

flaky {
val rx =
Expand All @@ -115,7 +118,7 @@ class IntervalTest extends AirSpec {
}

test("throttleLast of empty seq") {
pendingScalaJS
pendingInScalaJSAndScalaNative
val rx = Rx.fromSeq(Seq.empty[Int]).throttleLast(1, TimeUnit.MILLISECONDS)
val c = rx.run { x => }
compat.scheduleOnce(100) {
Expand Down
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ lazy val nativeProjects: Seq[ProjectReference] = Seq(
metrics.native,
json.native,
msgpack.native,
ulid.native
ulid.native,
rx.native
)

// Integration test projects
Expand Down Expand Up @@ -701,7 +702,7 @@ lazy val jdbc =
.dependsOn(di.jvm, control.jvm, config)

lazy val rx =
crossProject(JVMPlatform, JSPlatform)
crossProject(JVMPlatform, JSPlatform, NativePlatform)
.crossType(CrossType.Pure)
.in(file("airframe-rx"))
.settings(buildSettings)
Expand All @@ -719,6 +720,7 @@ lazy val rx =
// For addressing the fairness issue of the global ExecutorContext https://github.com/scala-js/scala-js/issues/4129
libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % "1.1.1"
)
.nativeSettings(nativeBuildSettings)
.dependsOn(log)

lazy val http =
Expand Down

0 comments on commit 77e7729

Please sign in to comment.