Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ZPipeline to Processor conversion #364

Open
pragmaxim opened this issue Jul 9, 2023 · 3 comments · May be fixed by #373
Open

Add ZPipeline to Processor conversion #364

pragmaxim opened this issue Jul 9, 2023 · 3 comments · May be fixed by #373

Comments

@pragmaxim
Copy link

org.reactivestreams.Processor is an equivalent of ZPipeline or akka.stream.scaladsl.Flow and when integrating between different reactive "implementation", it is possible to create it using org.reactivestreams.FlowAdapters.toFlowProcessor ... Akka-stream has flow.toProcessor method but I can't find a way how to turn ZPipeline into a Processor.

Any idea?

@runtologist
Copy link
Member

Not tested, bu shouldn't this work?

  def processorToZPipeline[In, Out](processor: Processor[In, Out]): ZPipeline[Scope, Throwable, In, Out] =
    ZPipeline.unwrap(
      for {
        signalErrorSink    <- processor.toZIOSink
        (signalError, sink) = signalErrorSink
        stream              = processor.toZIOStream()
        sinkPipe            = ZPipeline.fromSink(sink)
        sourcePipe          = ZPipeline.fromChannel(stream.channel)
      } yield sinkPipe >>> sourcePipe
    )

@runtologist
Copy link
Member

Well, it doesn't.

@mschuwalow
Copy link
Member

@runtologist are you looking at this one?
If not -- I just hit a usecase for this at work, so I can take care of it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants