Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pipeline conversions and rewrite internals with channels #373

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

mschuwalow
Copy link
Member

@mschuwalow mschuwalow commented Oct 11, 2023

Reason to go this way is that for pipeline to work correctly, we need to ensure that the Publisher and the Subscriber run concurrently on different fibers. Imo this is easiest / cleanest by relying on the AsynInput abstractions baked into the channelexecutor.

resolves #364

()
}
}
)(implicit trace: Trace): ZIO[R, Nothing, Publisher[O]] = ZIO.runtime[R].map { runtime => subscriber =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guy cannot be implemented on top of channelToPublisher due to the missing R <: Scope constraint

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about just adding the constraint? It would be more correct if the fork was scoped IMO.

unsafe { implicit unsafe =>
val subscription = new SubscriptionProducer[I](subscriber)

def reader: ZChannel[Any, ZNothing, Chunk[I], Any, Nothing, Chunk[I], Unit] = ZChannel.readWithCause(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs a custom implementation because we need to prevent defects from getting send to it

)(implicit trace: Trace): ZPipeline[Any, Throwable, I, O] = ZPipeline.unwrapScoped {
val subscription = new SubscriptionProducer[I](processor)(unsafe)
val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe)
val passthrough = new PassthroughAsyncInput(subscription, subscriber)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs a custom implementation so defects are not passed

@mschuwalow mschuwalow marked this pull request as ready for review October 16, 2023 16:11
@kciesielski
Copy link

Bumping this, as it would really help to implement WebSocket support for tapir-netty-zio, where we use netty-reactive-streams underneath and need a Processor[Req, Resp] created out of a ZPipeline. Any chance to push this a little bit forward, @runtologist?

@dispalt
Copy link

dispalt commented May 2, 2024

This PR also makes cancellations of streams work better, previously cancellations only seemed to complete the stream if read after closing the stream.

Copy link
Member

@runtologist runtologist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this awesome contribution @mschuwalow, and sorry it took so loong before I noticed.

Just a few tiny comments.

()
}
}
)(implicit trace: Trace): ZIO[R, Nothing, Publisher[O]] = ZIO.runtime[R].map { runtime => subscriber =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about just adding the constraint? It would be more correct if the fork was scoped IMO.

if (!isSubscribedOrCanceled.compareAndSet(false, true)) {
s.cancel()
} else {
subscription.unsafe.done(ZIO.succeedNow(s))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

succeedNow is deprecated. Let's switch to succeed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add ZPipeline to Processor conversion
4 participants