-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unexpected behaviors around the use of Pull #2717
Comments
This might be a minimization. Result changes if def run: IO[Unit] = runIssue2717
def runIssue2717 =
problematicStream.covary[IO]
.map(s => s) // <- THIS MAP
.compile
.toList
.flatMap(list => IO.println(s"Result $list"))
def stream = Stream(1).as(1) ++ Stream(2)
def problematicStream =
stream.pull.uncons1.flatMap { case Some((_, s)) =>
s.zipWith(Stream(2))((_, _)).pull.echo
}.stream |
28c795b is the first bad commit
:040000 040000 4acd111b77f5588fc68b4f5d89bddf647ebbac14 0dddc9c75af3ecad9cb512d8234e31f8cf4a253f M core |
That's not really a minimisation, I've just changed test("issue-2717 - unexpected behavior of Pull") {
val stream = Stream(1).as(1) ++ Stream(2)
val zippedPull =
stream.pull.uncons1.flatMap { case Some((_, s)) =>
s.zipWith(Stream(3))(Tuple2.apply).pull.echo
}
val actual = zippedPull.stream.covary[IO].map(identity).compile.toList
actual.assertEquals(List((2, 3)))
} |
I have been spending a bit of time on this curious example. Here is another aspect of it: when you look import cats.effect.{IO, IOApp, SyncIO}
import fs2.Pull.StreamPullOps
import fs2.Stream
object DebugFs2Issue extends IOApp.Simple {
val stream = Stream(1).as(1) ++ Stream(2)
val unconsed: Pull[Pure, INothing, Option[(Int, Stream[Pure, Int])]] =
stream.pull.uncons1
val problematicStream: Pull[Pure, (Int, Int), Unit] =
unconsed.flatMap { case Some((_, s)) =>
s.zipWith(Stream(2))((_, _)).pull.echo
}
def run: IO[Unit] = {
val listF = problematicStream.stream
.map(s => s) // <- THIS MAP
.covary[SyncIO] // A
//.covary[IO] // B
.compile
.toList
IO(listF.flatMap { list => SyncIO( println(s"Result $list")) }.unsafeRunSync()) // A
//listF.flatMap(list => IO.println(s"Result $list")) // B
}
} Without removing the map line (
|
Just to clarify, it seems the issue is not .compile(Compiler.target(Compiler.Target.forSync)) This will give |
Changing the fs2/core/shared/src/main/scala/fs2/Compiler.scala Lines 191 to 193 in f0272ce
|
I added a bunch of debug statements to the interpreter. For some reason, the interpreter thinks the scope which emits 2 is interrupted here: fs2/core/shared/src/main/scala/fs2/Pull.scala Line 1085 in b8d2c35
I'm not sure why it thinks that. |
Sample interpreter trace, which shows that a child scope is getting canceled/interrupted:
The interruption is occurring here: https://github.com/typelevel/fs2/blob/main/core/shared/src/main/scala/fs2/internal/InterruptContext.scala#L78-L79 |
I'm not even sure how to describe the problem. Please feel free to change the title to something more appropriate.
<- THIS MAP
, "Printing: (emitted,2)" is printed. This is unexpected.successPipe
instead offailurePipe
, "Printing: (emitted,2)" is also printed. This is also unexpected, since I would predict that_.zipWith(Stream.emit(2))((_, _))
is equivalent to_.map(v => (v, 2))
..emit(1).map(Left(_))
with.emit(Left(1))
, "Printing: (emitted,2)" is printed. I would expect those two commands to be equivalent.The text was updated successfully, but these errors were encountered: