Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive-stream/Flow Interop #896

Merged
merged 4 commits into from
Dec 17, 2024
Merged

Conversation

HollandDM
Copy link
Contributor

Originally, this was created just to resolve #764, but since #881 was requested, I took sometime to create a dedicated PR for it.
This took both ZIO's and Fs2's approaches as the inspiration, and passes the reactive-stream-tck testcases. I also add some test cases derived from Fs2's codebase. Hopefully it covers enough.
With this, solving #764 will be more trivial.

@HollandDM HollandDM force-pushed the reactive-stream-interop branch from 2ac4f2e to d3933f9 Compare December 4, 2024 11:47
@HollandDM HollandDM marked this pull request as ready for review December 4, 2024 11:47
@HollandDM HollandDM marked this pull request as draft December 4, 2024 11:48
@HollandDM HollandDM force-pushed the reactive-stream-interop branch 3 times, most recently from d8a986e to 09a9680 Compare December 4, 2024 11:57
@HollandDM HollandDM marked this pull request as ready for review December 4, 2024 11:57
@HollandDM HollandDM mentioned this pull request Dec 4, 2024
tag: Tag[Emit[Chunk[V]]],
frame: Frame,
safepoint: Safepoint
): Fiber[Nothing, StreamFinishState] < (IO & Ctx) =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, we need to fork this into a new fiber, which will run the stream until completion. Maybe Poll can help us eliminate the need to fork, which results in eliminating boundary

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can't see why forking is necessary. Wouldn't Poll.run(stream.emit)(poll).map(_._2) directly work?

Copy link
Contributor Author

@HollandDM HollandDM Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other libs like ZIO and Fs2 also have to fork. AFAIK, it's mainly because draining stream is a "runForever" task until interrupted or stream is done. Without forking it will block other thing from being done in the current thread.
I'll try to figure out if I can eliminate it

@HollandDM HollandDM force-pushed the reactive-stream-interop branch from 09a9680 to fe3799a Compare December 6, 2024 11:20

private[reactivestreams] inline def subscribe: Unit < IO = IO(subscriber.onSubscribe(this))

private[reactivestreams] def poll: StreamFinishState < (Async & Poll[Chunk[V]]) =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poll is great, is make the logic much cleaner then the previous attemp

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool!!

package object reactivestreams:
def fromPublisher[T](
publisher: Publisher[T],
bufferSize: Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a question regarding this, do we wait for the entire buffer to be filled up before emitting them as a Chunk on the Stream? Is it possible to have some kind of choice with regards to size & time?

The reason I ask this is because on an infinite stream (let's say we're integrating with a reactive streams kafka library like Helidon or Vertx Kafka), if this buffer size > number of messages on the topic unconsumed then the messages will be held in the buffer until number of messages on the topic unconsumed >= buffer size.

I observed this limitation with the Helidon Kafka client when I was integrating with the FS2 Streams Flow interop where if you used a low buffer size, you can overcome this limitation at the cost of performance since chunk-size = 1 has much worse performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current implementation tries to emit the value to stream as soon as it receives one. Buffer size was used mostly as a params of subscription’s ’request’.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did notice this behavior of fs2 stream and I’m also not a fan of it. So I took the Zio approach. But maybe we can have some kind of “strategy” or different implementations so that users can choose between them. WDYT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be excellent! Amazing work 🙇

import kyo.kernel.Boundary
import org.reactivestreams.FlowAdapters

package object flow:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about exposing these operations as Stream extensions in the kyo package instead? I'd be nice to keep a companion object as well since people might want to import it in isolation:

package kyo

object StreamReactiveStreamsExtensions:
     extension [V, S](self: Stream[V, S]):
         // instance methods
     extension (self: Stream.type):
         // companion methods
         
 // export to the kyo package
 export StreamReactiveStreamsExtensions.*

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, I'' update this

Copy link
Contributor Author

@HollandDM HollandDM Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that I though about it, moving this into kyo package would polute it with reactive-streams dependencies. I don't think that's a good ideal, right?

Copy link
Collaborator

@fwbrasil fwbrasil Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd still be limited to the specific module. kyo-reactive-streams needs to be in the classpath for its extension methods to be available

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you mean now, will update!

using
Boundary[Ctx, IO],
Frame,
Tag[T]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be ideal to take Tag[Emit[Chunk[T]]] evidences directly otherwise the tag macro has to dynamically compose tags, which requires allocation.

subscriber: Subscriber[? >: T]
)(
using
Boundary[Ctx, IO],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some inference issues when using Boundary directly. We've been using a workaround with a user-facing inline method calling another private one that then requires the Boundary evidence. See Async for examples.

): Unit < (Async & Ctx) =
Loop(()) { _ =>
channel.closed.map {
if _ then
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use case true => .. case false => ... instead?


override def onSubscribe(subscription: Subscription): Unit =
throwIfNull(subscription)
var sideEffect: () => Unit = () => ()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be possible to avoid the mutable value by using state.get and state.set directly instead of state.update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes its can be done, but It'll need to be converted into loop method for correctness

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. If a Loop is too cumbersome, another option is introducing an update that allows retuning a "tail" computation

safepoint: Safepoint
): Fiber[Nothing, StreamFinishState] < (IO & Ctx) =
boundary { (trace, context) =>
val fiber = Fiber.fromTask(IOTask(Poll.run(stream.emit)(poll).map(_._2), safepoint.copyTrace(trace), context))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOTask is an internal detail of Fiber/Async. Ideally, we should rely on the existing APIs instead of using it directly

Copy link
Contributor Author

@HollandDM HollandDM Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a copy of Async._run with boundary that does not care about Abort[E] (E =:= Nothing in this case). Maybe after #906 is merged I can use _run instead

libraryDependencies ++= Seq(
"org.reactivestreams" % "reactive-streams" % "1.0.4",
"org.reactivestreams" % "reactive-streams-tck" % "1.0.4" % Test,
"org.scalatestplus" %% "testng-7-5" % "3.2.17.0" % Test
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required to reuse the tck tests? Would it be possible to use scalatest like the other modules?

Copy link
Contributor Author

@HollandDM HollandDM Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to reimplement those tests in scalatest though.

@HollandDM HollandDM force-pushed the reactive-stream-interop branch 2 times, most recently from 8933493 to fb04b3a Compare December 11, 2024 01:45
@HollandDM
Copy link
Contributor Author

@fwbrasil updated, please recheck

@HollandDM HollandDM requested a review from fwbrasil December 11, 2024 01:45
): StreamSubscription[V, Ctx] =
_subscribe(stream, subscriber)(subscribeCallback)

private[kyo] inline def _subscribe[V, Ctx](
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same trick with Async._run, now we don't have to pass context params explicitly

Tag[Poll[Chunk[T]]]
): Publisher[T] < (Resource & IO & Ctx) = StreamPublisher[T, Ctx](stream)

object StreamReactiveStreamsExtensions:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extension methods added

abstract private[reactivestreams] class SubscriberDone
private[reactivestreams] case object SubscriberDone extends SubscriberDone

enum EmitStrategy derives CanEqual:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduce EmitStrategy so that users can choose the behavior they want

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! We could eventually introduce more advanced strategies including timeouts for example

Frame,
Boundary[Ctx, IO & Abort[Nothing]]
): Fiber[Nothing, StreamFinishState] < (IO & Ctx) =
Async
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace boundary usage with Async._run

end onComplete

private[interop] def await(using Frame): Boolean < Async =
@tailrec def handleAwait: Boolean < Async =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@tailrec def handleAwait: Boolean < Async =
@tailrec def handleAwait(): Boolean < Async =

capacity: Int = Int.MaxValue
)(
using
Boundary[Ctx, IO & Abort[Nothing]],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably don't need Abort[Nothing] anymore since it's included in IO

case Result.Success(_) => Loop.continue(())
case _ => Loop.done
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplification suggestion:

                  Loop(()) { _ =>
                         Abort.recover[Closed](_ => Loop.done) {
                            for
                                subscriber   <- channel.take
                                subscription <- IO.Unsafe(new StreamSubscription[V, Ctx](stream, subscriber))
                                fiber        <- subscription.subscribe.andThen(subscription.consume)
                                _            <- supervisor.onComplete(_ => discard(fiber.interrupt(interruptPanic)))
                            yield Loop.continue(())
                        }
                 }

@fwbrasil
Copy link
Collaborator

@HollandDM could you have a look at #918? It seems it could be useful here

import org.reactivestreams.tck.TestEnvironment
import org.scalatestplus.testng.*

final class StreamPublisherTest extends PublisherVerification[Int](new TestEnvironment(1000L)), TestNGSuiteLike:
Copy link
Collaborator

@fwbrasil fwbrasil Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took at the tck tests and it seems they check mostly for correctness without concurrency. If that's the case, I think we'll need to introduce concurrency tests since the code relies heavily on shared state. There are a few examples of concurrency tests like in ChannelTest and MeterTest.

@HollandDM HollandDM force-pushed the reactive-stream-interop branch from fb04b3a to 2a28269 Compare December 13, 2024 09:54
@HollandDM HollandDM requested a review from fwbrasil December 13, 2024 10:13
@HollandDM
Copy link
Contributor Author

I moved the implement from extending reactive-streams to extending flow. I think it's more suitable for Kyo

override def cancel(): Unit = isCanceled.set(true)
end JavaSubscription

final class SreamSubscriberTest extends Test:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to test StreamSubscriber part only, as it's also the only part that use shared atomic state

end for
}

"single publisher & multiple subscribers" - {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some test between publisher and subscribers

import kyo.*
import kyo.Maybe.*

final class JavaSubscription(subscriber: Subscriber[? >: Int], batchSize: Int, counter: java.util.concurrent.atomic.AtomicInteger)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to test a generic Publisher, so I create a generic Java copies

@HollandDM HollandDM force-pushed the reactive-stream-interop branch from 2a28269 to 781353b Compare December 13, 2024 15:36

private[interop] inline def subscribe(using Frame): Unit < IO = IO(subscriber.onSubscribe(this))

private[interop] def poll(using Tag[Poll[Chunk[V]]], Frame): StreamFinishState < (Async & Poll[Chunk[V]]) =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to convert this into channel.stream(), but this provides more controls over the computation

@HollandDM
Copy link
Contributor Author

@fwbrasil Please recheck if more tests are needed

Tag[Emit[Chunk[V]]],
Tag[Poll[Chunk[V]]]
): StreamPublisher[V, Ctx] < (Resource & IO & Ctx) =
inline def interruptPanic = Result.Panic(Fiber.Interrupted(scala.compiletime.summonInline[Frame]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You won't need the explicit summon after: #929

Copy link
Collaborator

@fwbrasil fwbrasil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sizable contribution, congrats! LGTM but I think you'll need to resolve conflicts with #929

@HollandDM HollandDM force-pushed the reactive-stream-interop branch from 781353b to 79cdc47 Compare December 17, 2024 03:18
@HollandDM
Copy link
Contributor Author

@fwbrasil thanks, this PR should be good now

@HollandDM HollandDM requested a review from fwbrasil December 17, 2024 03:34
@fwbrasil fwbrasil merged commit 702fbdc into getkyo:main Dec 17, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Requests: support sttp streaming
3 participants