Skip to content

Commit

Permalink
[kyo-reactive-stream]: fix feedbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
HollandDM committed Dec 11, 2024
1 parent 40f4b2b commit fb04b3a
Show file tree
Hide file tree
Showing 8 changed files with 552 additions and 300 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,71 @@ package kyo.interop
import java.util.concurrent.Flow.*
import kyo.*
import kyo.interop.reactivestreams
import kyo.interop.reactivestreams.StreamSubscriber.EmitStrategy
import kyo.kernel.Boundary
import org.reactivestreams.FlowAdapters
import scala.annotation.nowarn

package object flow:
inline def fromPublisher[T](
publisher: Publisher[T],
bufferSize: Int
bufferSize: Int,
emitStrategy: EmitStrategy = EmitStrategy.Eager
)(
using
Frame,
Tag[T]
): Stream[T, Async] < IO = reactivestreams.fromPublisher(FlowAdapters.toPublisher(publisher), bufferSize)
Tag[Emit[Chunk[T]]],
Tag[Poll[Chunk[T]]]
): Stream[T, Async] < IO = reactivestreams.fromPublisher(FlowAdapters.toPublisher(publisher), bufferSize, emitStrategy)

def subscribeToStream[T, Ctx](
@nowarn("msg=anonymous")
inline def subscribeToStream[T, Ctx](
stream: Stream[T, Ctx],
subscriber: Subscriber[? >: T]
)(
using
Boundary[Ctx, IO],
Frame,
Tag[T]
Tag[Emit[Chunk[T]]],
Tag[Poll[Chunk[T]]]
): Subscription < (Resource & IO & Ctx) =
reactivestreams.subscribeToStream(stream, FlowAdapters.toSubscriber(subscriber)).map { subscription =>
new Subscription:
override def request(n: Long): Unit = subscription.request(n)
override def cancel(): Unit = subscription.cancel()
}

inline def streamToPublisher[T, Ctx](
stream: Stream[T, Ctx]
)(
using
Frame,
Tag[Emit[Chunk[T]]],
Tag[Poll[Chunk[T]]]
): Publisher[T] < (Resource & IO & Ctx) = reactivestreams.streamToPublisher(stream).map { publisher =>
FlowAdapters.toFlowPublisher(publisher)
}

object StreamReactiveStreamsExtensions:
extension [T, Ctx](stream: Stream[T, Ctx])
inline def subscribe(
subscriber: Subscriber[? >: T]
)(
using
Frame,
Tag[Emit[Chunk[T]]],
Tag[Poll[Chunk[T]]]
): Subscription < (Resource & IO & Ctx) =
subscribeToStream(stream, subscriber)

inline def toPublisher(
using
Frame,
Tag[Emit[Chunk[T]]],
Tag[Poll[Chunk[T]]]
): Publisher[T] < (Resource & IO & Ctx) =
streamToPublisher(stream)
end extension
end StreamReactiveStreamsExtensions

export StreamReactiveStreamsExtensions.*
end flow
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import kyo.kernel.ContextEffect.Isolated
import kyo.kernel.Safepoint
import kyo.scheduler.IOTask
import org.reactivestreams.*
import scala.annotation.nowarn

abstract class StreamPublisher[V, Ctx] private (
abstract private[kyo] class StreamPublisher[V, Ctx](
stream: Stream[V, Ctx]
) extends Publisher[V]:

Expand All @@ -24,16 +25,18 @@ abstract class StreamPublisher[V, Ctx] private (
end StreamPublisher

object StreamPublisher:

def apply[V, Ctx](
stream: Stream[V, Ctx],
capacity: Int = Int.MaxValue
)(
using
boundary: Boundary[Ctx, IO],
frame: Frame,
tag: Tag[V]
Boundary[Ctx, IO & Abort[Nothing]],
Frame,
Tag[Emit[Chunk[V]]],
Tag[Poll[Chunk[V]]]
): StreamPublisher[V, Ctx] < (Resource & IO & Ctx) =
inline def interruptPanic = Result.Panic(Fiber.Interrupted(frame))
inline def interruptPanic = Result.Panic(Fiber.Interrupted(scala.compiletime.summonInline[Frame]))

def discardSubscriber(subscriber: Subscriber[? >: V]): Unit =
subscriber.onSubscribe(new Subscription:
Expand All @@ -45,80 +48,67 @@ object StreamPublisher:

def consumeChannel(
channel: Channel[Subscriber[? >: V]],
supervisorPromise: Fiber.Promise[Nothing, Unit]
supervisor: Fiber.Promise[Nothing, Unit]
): Unit < (Async & Ctx) =
Loop(()) { _ =>
channel.closed.map {
if _ then
Loop.done
else
val result = Abort.run[Closed] {
case true => Loop.done
case false =>
Abort.run[Closed] {
for
subscriber <- channel.take
subscription <- IO.Unsafe(new StreamSubscription[V, Ctx](stream, subscriber))
fiber <- subscription.subscribe.andThen(subscription.consume)
_ <- supervisorPromise.onComplete(_ => discard(fiber.interrupt(interruptPanic)))
_ <- supervisor.onComplete(_ => discard(fiber.interrupt(interruptPanic)))
yield ()
}
result.map {
}.map {
case Result.Success(_) => Loop.continue(())
case _ => Loop.done
}
}
}

IO.Unsafe {
for
channel <-
Resource.acquireRelease(Channel.init[Subscriber[? >: V]](capacity))(
_.close.map(_.foreach(_.foreach(discardSubscriber(_))))
)
publisher <- IO.Unsafe {
new StreamPublisher[V, Ctx](stream):
override protected def bind(
subscriber: Subscriber[? >: V]
): Unit =
channel.unsafe.offer(subscriber) match
case Result.Success(true) => ()
case _ => discardSubscriber(subscriber)
}
supervisorPromise <- Fiber.Promise.init[Nothing, Unit]
_ <- Resource.acquireRelease(boundary((trace, context) =>
Fiber.fromTask(IOTask(consumeChannel(channel, supervisorPromise), trace, context))
))(
_.interrupt.map(discard(_))
for
channel <-
Resource.acquireRelease(Channel.init[Subscriber[? >: V]](capacity))(
_.close.map(_.foreach(_.foreach(discardSubscriber(_))))
)
yield publisher
end for
}
publisher <- IO.Unsafe {
new StreamPublisher[V, Ctx](stream):
override protected def bind(
subscriber: Subscriber[? >: V]
): Unit =
channel.unsafe.offer(subscriber) match
case Result.Success(true) => ()
case _ => discardSubscriber(subscriber)
}
supervisor <- Resource.acquireRelease(Fiber.Promise.init[Nothing, Unit])(_.interrupt.map(discard(_)))
_ <- Resource.acquireRelease(Async._run(consumeChannel(channel, supervisor)))(_.interrupt.map(discard(_)))
yield publisher
end for
end apply

object Unsafe:
def apply[V, Ctx](
@nowarn("msg=anonymous")
inline def apply[V, Ctx](
stream: Stream[V, Ctx],
subscribeCallback: (Fiber[Nothing, StreamFinishState] < (IO & Ctx)) => Unit
)(
using
allowance: AllowUnsafe,
boundary: Boundary[Ctx, IO],
frame: Frame,
tag: Tag[V]
AllowUnsafe,
Frame,
Tag[Emit[Chunk[V]]],
Tag[Poll[Chunk[V]]]
): StreamPublisher[V, Ctx] =
new StreamPublisher[V, Ctx](stream):
override protected def bind(
subscriber: Subscriber[? >: V]
): Unit =
discard(StreamSubscription.Unsafe.subscribe(
discard(StreamSubscription.Unsafe._subscribe(
stream,
subscriber
)(
subscribeCallback
)(
using
allowance,
boundary,
frame,
tag
))
end Unsafe
end StreamPublisher
Loading

0 comments on commit fb04b3a

Please sign in to comment.