Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utility for kotlinx coroutines to cats.effect.IO #25

Merged
merged 2 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ testcontainers = "1.17.6"
hikari = "5.0.1"
dokka = "1.8.10"
logback = "1.4.6"
kotlinx-coroutines = "1.6.4"
scalaMultiversion = "2.0.4"
ciris = "3.1.0"
http4s = "0.23.18"
Expand All @@ -36,6 +37,7 @@ arrow-fx-coroutines = { module = "io.arrow-kt:arrow-fx-coroutines", version.ref
arrow-resilience = { module = "io.arrow-kt:arrow-resilience", version.ref = "arrow" }
open-ai = { module = "com.theokanning.openai-gpt3-java:service", version.ref = "openai" }
kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-json" }
kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref="kotlinx-coroutines" }
ktor-client = { module = "io.ktor:ktor-client-core", version.ref = "ktor" }
ktor-client-js = { module = "io.ktor:ktor-client-js", version.ref = "ktor" }
ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" }
Expand Down
1 change: 1 addition & 0 deletions scala/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ plugins {
}

dependencies {
implementation(libs.kotlinx.coroutines)
implementation(libs.ciris.core)
implementation(libs.ciris.refined)
implementation(libs.ciris.http4s)
Expand Down
115 changes: 115 additions & 0 deletions scala/src/main/scala/com/xebia/functional/kotlin/CoroutineToIO.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.xebia.functional.kotlin

import java.util.Collections
import java.util.concurrent.AbstractExecutorService
import java.util.concurrent.CancellationException
import java.util.concurrent.TimeUnit

import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContextExecutorService
import scala.util.control.NoStackTrace
import scala.util.control.NonFatal

import cats.effect.*
import cats.effect.kernel.Async
import cats.implicits.*

import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.*

trait CoroutineToIO[F[_]] {
def runCancelable[A](block: kotlin.jvm.functions.Function2[CoroutineScope, Continuation[? >: A], ?]): F[A]

def runCancelable_(block: kotlin.jvm.functions.Function2[CoroutineScope, Continuation[_ >: kotlin.Unit], Any]): F[Unit]
}

@SuppressWarnings(
Array(
"scalafix:DisableSyntax.null",
"scalafix:DisableSyntax.throw",
"scalafix:DisableSyntax.asInstanceOf"
)
)
object CoroutineToIO:

def apply[F[_]: Async]: CoroutineToIO[F] = new CoroutineToIO[F]:

def runCancelable_(block: kotlin.jvm.functions.Function2[CoroutineScope, Continuation[_ >: kotlin.Unit], Any]): F[Unit] =
runCancelable(block).void

def runCancelable[A](
block: kotlin.jvm.functions.Function2[CoroutineScope, Continuation[? >: A], ?]
): F[A] =
coroutineToIOFactory[A](block, buildCancelToken)

private def dispatcher: F[CoroutineDispatcher] =
Async[F].executionContext.map { other =>
Async[F].executionContext
kotlinx.coroutines.ExecutorsKt.from(
new AbstractExecutorService with ExecutionContextExecutorService {
override def isShutdown = false
override def isTerminated = false
override def shutdown() = ()
override def shutdownNow() = Collections.emptyList[Runnable]
override def execute(runnable: Runnable): Unit = other.execute(runnable)
override def reportFailure(t: Throwable): Unit = other.reportFailure(t)
override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false
}
)
}

private def coroutineToIOFactory[A](
block: kotlin.jvm.functions.Function2[CoroutineScope, Continuation[? >: A], ?],
buildCancelToken: (Deferred[_], DisposableHandle) => Option[F[Unit]]
): F[A] =
dispatcher
.flatMap { dispatcher =>
Async[F].async[A] { cb =>
Async[F].delay {
try {
val context = CoroutineContextKt.newCoroutineContext(
GlobalScope.INSTANCE,
dispatcher.asInstanceOf[CoroutineContext]
)
val deferred = kotlinx.coroutines.BuildersKt.async(
GlobalScope.INSTANCE,
context,
CoroutineStart.DEFAULT,
block
)
try {
val dispose = deferred.invokeOnCompletion { (e: Throwable) =>
e match {
case e: Throwable => cb(Left(e))
case null => cb(Right(deferred.getCompleted))
}
kotlin.Unit.INSTANCE
}
buildCancelToken(deferred, dispose)
} catch {
case NonFatal(e) =>
deferred.cancel(null)
throw e
}
} catch {
case NonFatal(e) =>
cb(Left(e))
None
}
}
}
}

private def buildCancelToken(deferred: Deferred[_], dispose: DisposableHandle): Option[F[Unit]] =
Some(Async[F].defer {
deferred.cancel(PleaseCancel)
dispose.dispose()
coroutineToIOFactory[kotlin.Unit](
(_, cont) => deferred.join(cont),
(_, _) => None
).void
})

private object PleaseCancel extends CancellationException with NoStackTrace
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.xebia.functional.kotlin

import scala.concurrent.duration.*
import cats.effect.*
import munit.CatsEffectSuite

class CoroutineToIOSpec extends CatsEffectSuite:

// Sleeping via Kotlin's coroutines
def kotlinSleep(duration: FiniteDuration): IO[Unit] =
CoroutineToIO[IO].runCancelable_ { (_, cont) =>
// Kotlin suspended function calls...
kotlinx.coroutines.DelayKt.delay(duration.toMillis, cont)
}

test("KotlinCoroutines should convert Kotlin’s coroutines into cats.effect.IO") {

val io = for
_ <- IO.println("Running...")
fiber <- kotlinSleep(10.seconds).start
_ <- IO.sleep(1000.millis)
_ <- fiber.cancel
_ <- fiber.joinWithUnit
str = "Done!"
_ <- IO.println(str)
yield str

assertIO(io, "Done!")
}