Skip to content

Commit

Permalink
par-eval-v3
Browse files Browse the repository at this point in the history
  • Loading branch information
nikiforo committed Nov 16, 2021
1 parent 66fc1e3 commit 0c15b22
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 261 deletions.
25 changes: 15 additions & 10 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2025,8 +2025,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
)(implicit F2: Applicative[F2]): Stream[F2, O] =
new Stream(Pull.acquire[F2, Unit](F2.unit, (_, ec) => f(ec)).flatMap(_ => underlying))

private type Att[O2] = Either[Throwable, O2]

/** Like [[Stream#evalMap]], but will evaluate effects in parallel, emitting the results
* downstream in the same order as the input stream. The number of concurrent effects
* is limited by the `maxConcurrent` parameter.
Expand All @@ -2043,9 +2041,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
def parEvalMap[F2[x] >: F[x], O2](
maxConcurrent: Int
)(f: O => F2[O2])(implicit F: Concurrent[F2]): Stream[F2, O2] = {
def init(ch: Channel[F2, F2[Att[O2]]], release: F2[Unit]) =
Deferred[F2, Att[O2]].flatTap(value => ch.send(value.get <* release))
def send(v: Deferred[F2, Att[O2]]) = (el: Att[O2]) => v.complete(el).void
def init(ch: Channel[F2, F2[Either[Throwable, O2]]], release: F2[Unit]) =
Deferred[F2, Either[Throwable, O2]].flatTap(value => ch.send(value.get <* release))
def send(v: Deferred[F2, Either[Throwable, O2]]) =
(el: Either[Throwable, O2]) => v.complete(el).void
parEvalMapAction(maxConcurrent, f)((ch, release) => init(ch, release).map(send))
}

Expand All @@ -2066,8 +2065,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
maxConcurrent: Int
)(f: O => F2[O2]): Stream[F2, O2] = {
val init = ().pure[F2]
def send(ch: Channel[F2, F2[Att[O2]]], release: F2[Unit]) =
(el: Att[O2]) => ch.send(el.pure[F2]) *> release
def send(ch: Channel[F2, F2[Either[Throwable, O2]]], release: F2[Unit]) =
(el: Either[Throwable, O2]) => ch.send(el.pure[F2]) *> release
parEvalMapAction(maxConcurrent, f)((ch, release) => init.as(send(ch, release)))
}

Expand All @@ -2076,7 +2075,12 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
]: Concurrent, O2, T](
maxConcurrent: Int,
f: O => F2[O2]
)(initFork: (Channel[F2, F2[Att[O2]]], F2[Unit]) => F2[Att[O2] => F2[Unit]]): Stream[F2, O2] =
)(
initFork: (
Channel[F2, F2[Either[Throwable, O2]]],
F2[Unit]
) => F2[Either[Throwable, O2] => F2[Unit]]
): Stream[F2, O2] =
if (maxConcurrent == 1) evalMap(f)
else {
val F = Concurrent[F2]
Expand All @@ -2087,7 +2091,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
val action =
(
Semaphore[F2](concurrency.toLong),
Channel.bounded[F2, F2[Att[O2]]](concurrency),
Channel.bounded[F2, F2[Either[Throwable, O2]]](concurrency),
Deferred[F2, Unit],
Deferred[F2, Unit]
).mapN { (semaphore, channel, stop, end) =>
Expand All @@ -2102,7 +2106,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
F.uncancelable { poll =>
poll(semaphore.acquire) <*
Deferred[F2, Unit].flatMap { pushed =>
initFork(channel, pushed.complete(()).void).flatMap { send =>
val init = initFork(channel, pushed.complete(()).void)
poll(init).onCancel(releaseAndCheckCompletion).flatMap { send =>
val action = stop.get.race(f(el).attempt.flatMap(send) <* pushed.get)
F.start(poll(action).guarantee(releaseAndCheckCompletion))
}
Expand Down
218 changes: 0 additions & 218 deletions core/shared/src/test/scala/fs2/ParEvalMapSuite.scala

This file was deleted.

Loading

0 comments on commit 0c15b22

Please sign in to comment.