Skip to content

Commit

Permalink
reactivestream/flow interop
Browse files Browse the repository at this point in the history
  • Loading branch information
HollandDM committed Dec 4, 2024
1 parent 5d5f0c7 commit d8a986e
Show file tree
Hide file tree
Showing 13 changed files with 974 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ jmh-result.json
*.jfr
*.json
*.gpg
test-output
17 changes: 17 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ lazy val kyoJVM = project
`kyo-stats-registry`.jvm,
`kyo-stats-otel`.jvm,
`kyo-cache`.jvm,
`kyo-reactive-streams`.jvm,
`kyo-sttp`.jvm,
`kyo-tapir`.jvm,
`kyo-caliban`.jvm,
Expand Down Expand Up @@ -306,6 +307,22 @@ lazy val `kyo-cache` =
)
.jvmSettings(mimaCheck(false))

lazy val `kyo-reactive-streams` =
crossProject(JVMPlatform)
.withoutSuffixFor(JVMPlatform)
.crossType(CrossType.Full)
.in(file("kyo-reactive-streams"))
.dependsOn(`kyo-core`)
.settings(
`kyo-settings`,
libraryDependencies ++= Seq(
"org.reactivestreams" % "reactive-streams" % "1.0.4",
"org.reactivestreams" % "reactive-streams-tck" % "1.0.4" % Test,
"org.scalatestplus" %% "testng-7-5" % "3.2.17.0" % Test
)
)
.jvmSettings(mimaCheck(false))

lazy val `kyo-sttp` =
crossProject(JSPlatform, JVMPlatform, NativePlatform)
.withoutSuffixFor(JVMPlatform)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package kyo.interop

import java.util.concurrent.Flow.*
import kyo.*
import kyo.interop.reactivestreams
import kyo.kernel.Boundary
import org.reactivestreams.FlowAdapters

package object flow:
inline def fromPublisher[T](
publisher: Publisher[T],
bufferSize: Int
)(
using
Frame,
Tag[T]
): Stream[T, Async] < IO = reactivestreams.fromPublisher(FlowAdapters.toPublisher(publisher), bufferSize)

def subscribeToStream[T, Ctx](
stream: Stream[T, Ctx],
subscriber: Subscriber[? >: T]
)(
using
Boundary[Ctx, IO],
Frame,
Tag[T]
): Subscription < (Resource & IO & Ctx) =
reactivestreams.subscribeToStream(stream, FlowAdapters.toSubscriber(subscriber)).map { subscription =>
new Subscription:
override def request(n: Long): Unit = subscription.request(n)
override def cancel(): Unit = subscription.cancel()
}
end flow
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package kyo.interop.reactivestreams

import kyo.*
import kyo.interop.reactivestreams.StreamSubscription.StreamFinishState
import kyo.kernel.Boundary
import kyo.kernel.ContextEffect.Isolated
import kyo.kernel.Safepoint
import kyo.scheduler.IOTask
import org.reactivestreams.*

abstract class StreamPublisher[V, Ctx] private (
stream: Stream[V, Ctx]
) extends Publisher[V]:

protected def bind(subscriber: Subscriber[? >: V]): Unit

override def subscribe(subscriber: Subscriber[? >: V]): Unit =
if isNull(subscriber) then
throw new NullPointerException("Subscriber must not be null.")
else
bind(subscriber)
end subscribe

end StreamPublisher

object StreamPublisher:
def apply[V, Ctx](
stream: Stream[V, Ctx],
capacity: Int = Int.MaxValue
)(
using
boundary: Boundary[Ctx, IO],
frame: Frame,
tag: Tag[V]
): StreamPublisher[V, Ctx] < (Resource & IO & Ctx) =
inline def interruptPanic = Result.Panic(Fiber.Interrupted(frame))

def discardSubscriber(subscriber: Subscriber[? >: V]): Unit =
subscriber.onSubscribe(new Subscription:
override def request(n: Long): Unit = ()
override def cancel(): Unit = ()
)
subscriber.onComplete()
end discardSubscriber

def consumeChannel(
channel: Channel[Subscriber[? >: V]],
supervisorPromise: Fiber.Promise[Nothing, Unit]
): Unit < (Async & Ctx) =
Loop(()) { _ =>
channel.closed.map {
if _ then
Loop.done
else
val result = Abort.run[Closed] {
for
subscriber <- channel.take
subscription <- IO.Unsafe(new StreamSubscription[V, Ctx](stream, subscriber))
fiber <- subscription.subscribe.andThen(subscription.consume)
_ <- supervisorPromise.onComplete(_ => discard(fiber.interrupt(interruptPanic)))
yield ()
}
result.map {
case Result.Success(_) => Loop.continue(())
case _ => Loop.done
}
}
}

IO.Unsafe {
for
channel <-
Resource.acquireRelease(Channel.init[Subscriber[? >: V]](capacity))(
_.close.map(_.foreach(_.foreach(discardSubscriber(_))))
)
publisher <- IO.Unsafe {
new StreamPublisher[V, Ctx](stream):
override protected def bind(
subscriber: Subscriber[? >: V]
): Unit =
channel.unsafe.offer(subscriber) match
case Result.Success(true) => ()
case _ => discardSubscriber(subscriber)
}
supervisorPromise <- Fiber.Promise.init[Nothing, Unit]
_ <- Resource.acquireRelease(boundary((trace, context) =>
Fiber.fromTask(IOTask(consumeChannel(channel, supervisorPromise), trace, context))
))(
_.interrupt.map(discard(_))
)
yield publisher
end for
}
end apply

object Unsafe:
def apply[V, Ctx](
stream: Stream[V, Ctx],
subscribeCallback: (Fiber[Nothing, StreamFinishState] < (IO & Ctx)) => Unit
)(
using
allowance: AllowUnsafe,
boundary: Boundary[Ctx, IO],
frame: Frame,
tag: Tag[V]
): StreamPublisher[V, Ctx] =
new StreamPublisher[V, Ctx](stream):
override protected def bind(
subscriber: Subscriber[? >: V]
): Unit =
discard(StreamSubscription.Unsafe.subscribe(
stream,
subscriber
)(
subscribeCallback
)(
using
allowance,
boundary,
frame,
tag
))
end Unsafe
end StreamPublisher
Loading

0 comments on commit d8a986e

Please sign in to comment.