-
Notifications
You must be signed in to change notification settings - Fork 49
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
971 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,3 +13,4 @@ jmh-result.json | |
*.jfr | ||
*.json | ||
*.gpg | ||
test-output |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
kyo-reactive-streams/shared/src/main/scala/kyo/interop/flow/package.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package kyo.interop | ||
|
||
import java.util.concurrent.Flow.* | ||
import kyo.* | ||
import kyo.interop.reactivestreams | ||
import kyo.kernel.Boundary | ||
import org.reactivestreams.FlowAdapters | ||
|
||
package object flow: | ||
inline def fromPublisher[T]( | ||
publisher: Publisher[T], | ||
bufferSize: Int | ||
)( | ||
using | ||
Frame, | ||
Tag[T] | ||
): Stream[T, Async] < IO = reactivestreams.fromPublisher(FlowAdapters.toPublisher(publisher), bufferSize) | ||
|
||
def subscribeToStream[T, Ctx]( | ||
stream: Stream[T, Ctx], | ||
subscriber: Subscriber[? >: T] | ||
)( | ||
using | ||
Boundary[Ctx, IO], | ||
Frame, | ||
Tag[T] | ||
): Subscription < (Resource & IO & Ctx) = | ||
reactivestreams.subscribeToStream(stream, FlowAdapters.toSubscriber(subscriber)).map { subscription => | ||
new Subscription: | ||
override def request(n: Long): Unit = subscription.request(n) | ||
override def cancel(): Unit = subscription.cancel() | ||
} | ||
end flow |
124 changes: 124 additions & 0 deletions
124
...reactive-streams/shared/src/main/scala/kyo/interop/reactive-streams/StreamPublisher.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package kyo.interop.reactivestreams | ||
|
||
import kyo.* | ||
import kyo.interop.reactivestreams.StreamSubscription.StreamFinishState | ||
import kyo.kernel.Boundary | ||
import kyo.kernel.ContextEffect.Isolated | ||
import kyo.kernel.Safepoint | ||
import kyo.scheduler.IOTask | ||
import org.reactivestreams.* | ||
|
||
abstract class StreamPublisher[V, Ctx] private ( | ||
stream: Stream[V, Ctx] | ||
) extends Publisher[V]: | ||
|
||
protected def bind(subscriber: Subscriber[? >: V]): Unit | ||
|
||
override def subscribe(subscriber: Subscriber[? >: V]): Unit = | ||
if isNull(subscriber) then | ||
throw new NullPointerException("Subscriber must not be null.") | ||
else | ||
bind(subscriber) | ||
end subscribe | ||
|
||
end StreamPublisher | ||
|
||
object StreamPublisher: | ||
def apply[V, Ctx]( | ||
stream: Stream[V, Ctx], | ||
capacity: Int = Int.MaxValue | ||
)( | ||
using | ||
boundary: Boundary[Ctx, IO], | ||
frame: Frame, | ||
tag: Tag[V] | ||
): StreamPublisher[V, Ctx] < (Resource & IO & Ctx) = | ||
inline def interruptPanic = Result.Panic(Fiber.Interrupted(frame)) | ||
|
||
def discardSubscriber(subscriber: Subscriber[? >: V]): Unit = | ||
subscriber.onSubscribe(new Subscription: | ||
override def request(n: Long): Unit = () | ||
override def cancel(): Unit = () | ||
) | ||
subscriber.onComplete() | ||
end discardSubscriber | ||
|
||
def consumeChannel( | ||
channel: Channel[Subscriber[? >: V]], | ||
supervisorPromise: Fiber.Promise[Nothing, Unit] | ||
): Unit < (Async & Ctx) = | ||
Loop(()) { _ => | ||
channel.closed.map { | ||
if _ then | ||
Loop.done | ||
else | ||
val result = Abort.run[Closed] { | ||
for | ||
subscriber <- channel.take | ||
subscription <- IO.Unsafe(new StreamSubscription[V, Ctx](stream, subscriber)) | ||
fiber <- subscription.subscribe.andThen(subscription.consume) | ||
_ <- supervisorPromise.onComplete(_ => discard(fiber.interrupt(interruptPanic))) | ||
yield () | ||
} | ||
result.map { | ||
case Result.Success(_) => Loop.continue(()) | ||
case _ => Loop.done | ||
} | ||
} | ||
} | ||
|
||
IO.Unsafe { | ||
for | ||
channel <- | ||
Resource.acquireRelease(Channel.init[Subscriber[? >: V]](capacity))( | ||
_.close.map(_.foreach(_.foreach(discardSubscriber(_)))) | ||
) | ||
publisher <- IO.Unsafe { | ||
new StreamPublisher[V, Ctx](stream): | ||
override protected def bind( | ||
subscriber: Subscriber[? >: V] | ||
): Unit = | ||
channel.unsafe.offer(subscriber) match | ||
case Result.Success(true) => () | ||
case _ => discardSubscriber(subscriber) | ||
} | ||
supervisorPromise <- Fiber.Promise.init[Nothing, Unit] | ||
_ <- Resource.acquireRelease(boundary((trace, context) => | ||
Fiber.fromTask(IOTask(consumeChannel(channel, supervisorPromise), trace, context)) | ||
))( | ||
_.interrupt.map(discard(_)) | ||
) | ||
yield publisher | ||
end for | ||
} | ||
end apply | ||
|
||
object Unsafe: | ||
def apply[V, Ctx]( | ||
stream: Stream[V, Ctx], | ||
subscribeCallback: (Fiber[Nothing, StreamFinishState] < (IO & Ctx)) => Unit | ||
)( | ||
using | ||
allowance: AllowUnsafe, | ||
boundary: Boundary[Ctx, IO], | ||
frame: Frame, | ||
tag: Tag[V] | ||
): StreamPublisher[V, Ctx] = | ||
new StreamPublisher[V, Ctx](stream): | ||
override protected def bind( | ||
subscriber: Subscriber[? >: V] | ||
): Unit = | ||
discard(StreamSubscription.Unsafe.subscribe( | ||
stream, | ||
subscriber | ||
)( | ||
subscribeCallback | ||
)( | ||
using | ||
allowance, | ||
boundary, | ||
frame, | ||
tag | ||
)) | ||
end Unsafe | ||
end StreamPublisher |
Oops, something went wrong.