Skip to content

Commit

Permalink
issue-2686 - draft
Browse files Browse the repository at this point in the history
  • Loading branch information
nikiforo committed Oct 25, 2021
1 parent 7a28a63 commit 99c73f0
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
19 changes: 10 additions & 9 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2017,9 +2017,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
def parEvalMap[F2[x] >: F[x], O2](
maxConcurrent: Int
)(f: O => F2[O2])(implicit F: Concurrent[F2]): Stream[F2, O2] = {
def init(ch: Channel[F2, F2[O2]]) = Deferred[F2, O2].flatTap(value => ch.send(value.get))
def init(ch: Channel[F2, F2[O2]], release: F2[Unit]) =
Deferred[F2, O2].flatTap(value => ch.send(value.get <* release))
def send(v: Deferred[F2, O2]) = (el: O2) => v.complete(el).void
parEvalMapAction(maxConcurrent, f)(init(_).map(send))
parEvalMapAction(maxConcurrent, f)((ch, release) => init(ch, release).map(send))
}

/** Like [[Stream#evalMap]], but will evaluate effects in parallel, emitting the results
Expand All @@ -2039,16 +2040,17 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
maxConcurrent: Int
)(f: O => F2[O2]): Stream[F2, O2] = {
val init = ().pure[F2]
def send(ch: Channel[F2, F2[O2]]) = (el: O2) => ch.send(el.pure[F2]).void
parEvalMapAction(maxConcurrent, f)(ch => init.as(send(ch)))
def send(ch: Channel[F2, F2[O2]], release: F2[Unit]) =
(el: O2) => ch.send(el.pure[F2]) *> release
parEvalMapAction(maxConcurrent, f)((ch, release) => init.as(send(ch, release)))
}

private def parEvalMapAction[F2[x] >: F[
x
]: Concurrent, O2, T](
maxConcurrent: Int,
f: O => F2[O2]
)(initFork: Channel[F2, F2[O2]] => F2[O2 => F2[Unit]]): Stream[F2, O2] =
)(initFork: (Channel[F2, F2[O2]], F2[Unit]) => F2[O2 => F2[Unit]]): Stream[F2, O2] =
if (maxConcurrent == 1) evalMap(f)
else {
assert(maxConcurrent > 0, "maxConcurrent must be > 0, was: " + maxConcurrent)
Expand Down Expand Up @@ -2096,15 +2098,14 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,

def forkOnElem(el: O): F2[Unit] =
semaphore.acquire *>
initFork(channel).flatMap { send =>
initFork(channel, releaseAndCheckCompletion).flatMap { send =>
f(el).attempt
.race(stopReading.get)
.flatMap {
case Left(Left(ex)) => failed(ex)
case Left(Left(ex)) => failed(ex) *> releaseAndCheckCompletion
case Left(Right(a)) => send(a)
case Right(_) => ().pure[F2]
case Right(_) => releaseAndCheckCompletion
}
.guarantee(releaseAndCheckCompletion)
.start
.void
}
Expand Down
6 changes: 3 additions & 3 deletions core/shared/src/test/scala/fs2/StreamSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ class StreamSuite extends Fs2Suite {
Ref[IO]
.of(0)
.flatMap { open =>
stream.covary[IO].parEvalMap(concurrency)(_ => findConc(open)).compile.toList
stream.covary[IO].parEvalMap(concurrency)(_ => findOpen(open)).compile.toList
}
.map(_.forall(_ <= concurrency))
.assert
Expand All @@ -1017,14 +1017,14 @@ class StreamSuite extends Fs2Suite {
Ref[IO]
.of(0)
.flatMap { open =>
stream.covary[IO].parEvalMapUnordered(concurrency)(_ => findConc(open)).compile.toList
stream.covary[IO].parEvalMapUnordered(concurrency)(_ => findOpen(open)).compile.toList
}
.map(_.forall(_ <= concurrency))
.assert
}
}

def findConc(open: Ref[IO, Int]) = {
def findOpen(open: Ref[IO, Int]) = {
val start = open.update(_ + 1) *> IO.sleep(5.millis) *> open.get
val end = IO.sleep(5.millis) *> open.getAndUpdate(_ - 1)
start *> end
Expand Down

0 comments on commit 99c73f0

Please sign in to comment.