From fbe89b1d47f6f61c2f461874b0fb934a6631e48e Mon Sep 17 00:00:00 2001 From: mpilquist Date: Mon, 6 Dec 2021 10:47:19 -0500 Subject: [PATCH] Fix #2717 --- .../src/main/scala/fs2/internal/Scope.scala | 15 +++++++++++---- .../src/test/scala/fs2/StreamZipSuite.scala | 10 ++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/fs2/internal/Scope.scala b/core/shared/src/main/scala/fs2/internal/Scope.scala index 188967982e..b09da9a3f3 100644 --- a/core/shared/src/main/scala/fs2/internal/Scope.scala +++ b/core/shared/src/main/scala/fs2/internal/Scope.scala @@ -81,7 +81,7 @@ import fs2.internal.InterruptContext.InterruptionOutcome private[fs2] final class Scope[F[_]] private ( val id: Unique.Token, private val parent: Option[Scope[F]], - interruptible: Option[InterruptContext[F]], + private val interruptible: Option[InterruptContext[F]], private val state: Ref[F, Scope.State[F]] )(implicit val F: Compiler.Target[F]) { self => @@ -276,6 +276,13 @@ private[fs2] final class Scope[F[_]] private ( case _: Scope.State.Closed[F] => F.pure(Right(())) } + /** Like `openAncestor` but returns self if open. */ + private def openScope: F[Scope[F]] = + state.get.flatMap { + case _: Scope.State.Open[F] => F.pure(self) + case _: Scope.State.Closed[F] => openAncestor + } + /** Returns closest open parent scope or root. */ def openAncestor: F[Scope[F]] = self.parent.fold(F.pure(self)) { parent => @@ -412,13 +419,13 @@ private[fs2] final class Scope[F[_]] private ( iCtx.completeWhen(outcome) } - /** Checks if current scope is interrupted. + /** Checks if the nearest open scope is interrupted. * If yields to None, scope is not interrupted and evaluation may normally proceed. - * If yields to Some(Right(scope,next)) that yields to next `scope`, that has to be run and `next` stream + * If yields to Some(Right(scope,next)) that yields to next `scope`, that has to be run and `next` stream * to evaluate */ def isInterrupted: F[Option[InterruptionOutcome]] = - interruptible match { + openScope.map(_.interruptible).flatMap { case None => F.pure(None) case Some(iCtx) => iCtx.ref.get } diff --git a/core/shared/src/test/scala/fs2/StreamZipSuite.scala b/core/shared/src/test/scala/fs2/StreamZipSuite.scala index 291062f1db..8e1bab8c83 100644 --- a/core/shared/src/test/scala/fs2/StreamZipSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamZipSuite.scala @@ -416,5 +416,15 @@ class StreamZipSuite extends Fs2Suite { .compile .drain } + + test("#2717 - unexpected behavior of Pull") { + val stream = Stream(1).as(1).scope ++ Stream(2) + val zippedPull = + stream.pull.uncons1.flatMap { case Some((_, s)) => + s.zipWith(Stream(3))((_, _)).pull.echo + } + val actual = zippedPull.stream.map(identity).covary[IO].compile.toList + actual.assertEquals(List((2, 3))) + } } }