Skip to content

Commit

Permalink
issue-2686 - draft
Browse files Browse the repository at this point in the history
  • Loading branch information
nikiforo committed Oct 22, 2021
1 parent 7a28a63 commit 2690903
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2017,9 +2017,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
def parEvalMap[F2[x] >: F[x], O2](
maxConcurrent: Int
)(f: O => F2[O2])(implicit F: Concurrent[F2]): Stream[F2, O2] = {
def init(ch: Channel[F2, F2[O2]]) = Deferred[F2, O2].flatTap(value => ch.send(value.get))
def init(ch: Channel[F2, F2[O2]], release: F2[Unit]) =
Deferred[F2, O2].flatTap(value => ch.send(value.get <* release))
def send(v: Deferred[F2, O2]) = (el: O2) => v.complete(el).void
parEvalMapAction(maxConcurrent, f)(init(_).map(send))
parEvalMapAction(maxConcurrent, f)((ch, release) => init(ch, release).map(send))
}

/** Like [[Stream#evalMap]], but will evaluate effects in parallel, emitting the results
Expand All @@ -2039,16 +2040,17 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
maxConcurrent: Int
)(f: O => F2[O2]): Stream[F2, O2] = {
val init = ().pure[F2]
def send(ch: Channel[F2, F2[O2]]) = (el: O2) => ch.send(el.pure[F2]).void
parEvalMapAction(maxConcurrent, f)(ch => init.as(send(ch)))
def send(ch: Channel[F2, F2[O2]], release: F2[Unit]) =
(el: O2) => ch.send(el.pure[F2]) *> release
parEvalMapAction(maxConcurrent, f)((ch, release) => init.as(send(ch, release)))
}

private def parEvalMapAction[F2[x] >: F[
x
]: Concurrent, O2, T](
maxConcurrent: Int,
f: O => F2[O2]
)(initFork: Channel[F2, F2[O2]] => F2[O2 => F2[Unit]]): Stream[F2, O2] =
)(initFork: (Channel[F2, F2[O2]], F2[Unit]) => F2[O2 => F2[Unit]]): Stream[F2, O2] =
if (maxConcurrent == 1) evalMap(f)
else {
assert(maxConcurrent > 0, "maxConcurrent must be > 0, was: " + maxConcurrent)
Expand Down Expand Up @@ -2096,15 +2098,14 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,

def forkOnElem(el: O): F2[Unit] =
semaphore.acquire *>
initFork(channel).flatMap { send =>
initFork(channel, releaseAndCheckCompletion).flatMap { send =>
f(el).attempt
.race(stopReading.get)
.flatMap {
case Left(Left(ex)) => failed(ex)
case Left(Left(ex)) => failed(ex) *> releaseAndCheckCompletion
case Left(Right(a)) => send(a)
case Right(_) => ().pure[F2]
case Right(_) => releaseAndCheckCompletion
}
.guarantee(releaseAndCheckCompletion)
.start
.void
}
Expand Down

0 comments on commit 2690903

Please sign in to comment.