Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

interleaveOrdered unexpected behavior #3156

Closed
aartigao opened this issue Feb 28, 2023 · 2 comments · Fixed by #3161
Closed

interleaveOrdered unexpected behavior #3156

aartigao opened this issue Feb 28, 2023 · 2 comments · Fixed by #3161
Labels

Comments

@aartigao
Copy link

aartigao commented Feb 28, 2023

Version

3.6.1

Bug description

While investigating an issue with fs2 and Kafka I found a strange behavior for interleaveOrdered. In the following code:

import cats.effect.std.Queue
import cats.effect.{IO, IOApp}
import fs2.Stream

import scala.concurrent.duration.DurationInt

object StreamInterleave extends IOApp.Simple {

  override def run: IO[Unit] = pures *> queues

  def pures = IO.println(Stream(1, 2, 3).interleaveOrdered(Stream.empty).toList)

  def queues =
    for {
      q1 <- Queue.unbounded[IO, Option[Int]]
      q2 <- Queue.unbounded[IO, Option[Int]]
      _ <- q1.tryOfferN(List(Some(1), Some(2), Some(3), None))
      _ <- q2.offer(None)
      r <- Stream.fromQueueNoneTerminated(q1).interleaveOrdered(Stream.fromQueueNoneTerminated(q2).debug()).debug().timeout(5.seconds).compile.toList
      _ <- IO.println(r)
    } yield ()

}

pures runs fine and prints

List(1, 2, 3)

whereas queues times out and prints nothing:

java.util.concurrent.TimeoutException: Timed out after 5 seconds
	at fs2.Stream.timeout(Stream.scala:2814)
	at com.stuart.routing.state.StreamInterleave$.$anonfun$queues$4(StreamInterleave.scala:21)
	at get @ fs2.internal.Scope.openScope(Scope.scala:281)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
	at flatMap @ fs2.Pull$.$anonfun$compile$21(Pull.scala:1214)
	at update @ fs2.internal.Scope.releaseChildScope(Scope.scala:224)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
	at modify @ fs2.internal.Scope.close(Scope.scala:262)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
	at flatMap @ fs2.Pull$.$anonfun$compile$18(Pull.scala:1213)
	at handleErrorWith @ fs2.Compiler$Target.handleErrorWith(Compiler.scala:161)
	at flatMap @ fs2.Pull$.goCloseScope$1(Pull.scala:1199)
	at map @ fs2.internal.InterruptContext.$anonfun$eval$2(InterruptContext.scala:99)

Scastie

@aartigao aartigao added the bug label Feb 28, 2023
@armanbilge
Copy link
Member

Ping @ivan-klass if you want to take a look :)

@aartigao
Copy link
Author

Just in case someone is in the same situation as me, I've created a naive merger using queues:

https://gist.github.com/aartigao/43f321f4231c066daf5539ab8be9a1c4

I can imagine this is far from ideal from the performance POV, but it's working for my use case 🙃

ivan-klass added a commit to ivan-klass/fs2 that referenced this issue Mar 11, 2023
mpilquist added a commit that referenced this issue Mar 20, 2023
Fix interleaveOrdered effectful cases [Fixes #3156]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants