Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment.from will never terminate when run #1039

Closed
pchlupacek opened this issue Dec 30, 2017 · 27 comments
Closed

Segment.from will never terminate when run #1039

pchlupacek opened this issue Dec 30, 2017 · 27 comments
Milestone

Comments

@pchlupacek
Copy link
Contributor

pchlupacek commented Dec 30, 2017

Program

Segment.from(0L).take(5).drain.force.run.map(_.force.toVector)

will never terminate

Seems that any from variant never terminate.

@pchlupacek
Copy link
Contributor Author

Also found

Segment.unfold(0l)(l => Some((l, l+ 1))).take(5).drain.force.run.map(_.force.toVector)

won't terminate
however

Segment.unfold(0l)(l => Some((l, l+ 1))).take(5).drain.force.toList

does as expected, unfortunately discarding the result.

@SystemFw
Copy link
Collaborator

Segment.from(0L).take(5).force.toVector

This works as expected, weird

@pchlupacek
Copy link
Contributor Author

@mpilquist can you please tak a look? I think run is broken for segments that are generated from the unfold like pattern that terminates only by take/drop etc.

@SystemFw
Copy link
Collaborator

SystemFw commented Dec 30, 2017

Actually I think this might be correct.

 val a = Segment.from(0L).take(5).drain.force.run.map(_.take(5).force.toVector)`

evaluates to Vector(5,6,7,8,9)

In fact, the scaladoc for take says:

Lazily takes `n` elements from this segment. The result of the returned segment is either a left
containing the result of the original segment and the number of elements remaining to take when
the end of the source segment was reached, or a right containing the remainder of the source
segment after `n` elements are taken.

So if you take(5) from an infinite segment you will output 5 elems, and return the (infinite) remainder,
drain suppresses the output, force is needed to run it, and run returns the result, which in this case is an infinite segment. When you call toVector on the infinite segment, it of course runs forever, resulting in the behaviour you observed.

@pchlupacek
Copy link
Contributor Author

@SystemFw I need somthing on segment like

def runFold(b:B)(g:(B,O) => B):(R, B)

this is missing. Do you think you can implement it?

@pchlupacek
Copy link
Contributor Author

@i run in trouble when manually steping the segment in terms of uncons, that seemed to ignore certain combinators. I am trying to reproduce that, and will pos it here shortly.

@SystemFw
Copy link
Collaborator

I can definitely give it a shot, could you give me a fuller signature and a few words about the expected behaviour for runFold?

@pchlupacek
Copy link
Contributor Author

pchlupacek commented Dec 30, 2017

@SystemFw I need essentially same behaviour like we have on runFold in Algebra, that means :

Segment[O, R] {
  def runFold(b:B)(g:(B,O) => B):(R, B) = ???
}

where B is accumulated value, R is result and O is element of the segment.

@SystemFw
Copy link
Collaborator

Right, so you want to preserve the result of the original segment, and in addition to that, return the fold of all the output values. I'll start thinking about an implementation.

@pchlupacek
Copy link
Contributor Author

pchlupacek commented Dec 30, 2017

exactly :-) you golden.

@pchlupacek
Copy link
Contributor Author

I tried something around this lines, but somehow doesn't work well for takeWhile,
this is extracted from which fails when passed to go as a part of Run algebra. Seems only this combinator is affected, all others seems to pass well through now

  @tailrec
  def go[O, R](values: Segment[O, R], acc: Vector[O]): (Vector[O], R) = {
    println(s">>> $values")
    values.force.uncons match {
      case Left(result) => (acc, result)
      case Right((h, t)) => go(t, h.fold(acc)({ case (acc, o) => acc :+ o }).force.run)
    }
  }

  val s = Segment.unfold(0)({ i => if (i < 1000) Some((i, i+ 1)) else None}).takeWhile(_ != 5, takeFailure = true)
  val (b, result) = go(s, Vector.empty)

@SystemFw SystemFw changed the title Stream.from will never terminate when run Segment.from will never terminate when run Dec 30, 2017
@mpilquist
Copy link
Member

I think fold does what you want but it discards the original result. Here's a PR that fixes that: #1040

@SystemFw
Copy link
Collaborator

well that was fast :)

@pchlupacek
Copy link
Contributor Author

@SystemFw @mpilquist thank you so much for this.

@SystemFw
Copy link
Collaborator

I think this can be closed now?

@pchlupacek
Copy link
Contributor Author

@mpilquist, @SystemFw I am still struggling to get these segments to run // complete:

Segment.from(0L).filter(_ < 2).take(2).force.toList
Segment.from(0L).filter(_ < 2).take(2).fold(acc)(g).force.run

Somehow the splitAt approach used in Algebra.uncons terminates, however the run approach does not. Is there any rationale why the segments never terminate?

@SystemFw
Copy link
Collaborator

SystemFw commented Dec 31, 2017

I think this might me a bug, and it seems to manifest only with filter and take

  def a = Segment.from(0L).take(5).force.toList
  //  res0: List[Long] = List(0, 1)
  def b = Segment(1,2,3,4,5,6,7,8,9,10).filter(_ % 2 == 0).take(5).force.toList
  // res1: List[Int] = List(2, 4, 6, 8, 10)
  def c = Segment.from(0L).filter(_ < 2).take(5).force.toList
  // hangs
  def d = Segment.from(0L).map(_ + 1).take(5).force.toList
  // res0: List[Long] = List(1, 2, 3, 4, 5)
  def e = Segment(1,2,3,4,5,6,7,8,9,10).filter(_ % 2 == 0).takeWhile(_ < 5).force.toList
  // res2: List[Long] = List(1, 2, 3, 4, 5)
  def f = Segment(1,2,3,4,5,6,7,8,9,10).takeWhile(_ < 7).take(5).force.toList
  // res1: List[Int] = List(1, 2, 3, 4, 5)

@pchlupacek
Copy link
Contributor Author

@SystemFw This is consistent with which Stream tests are failing, essentially only takeWhile had a problem all others were passing ok.

@SystemFw
Copy link
Collaborator

SystemFw commented Jan 1, 2018

I'm getting closer to fixing this (I'm still on limited bandwidth however)

@SystemFw
Copy link
Collaborator

SystemFw commented Jan 1, 2018

@pchlupacek It's a bug in take, I'll post a PR with a possible solution as soon as I have it.
In particular, filter has a call to emits(Chunk.empty) here

The bug in take is here, done is never called. Most other code paths don't hit that branch so take generally works.

You can reproduce by running Segment.from(0L).filter(_ < n).take(m) where m >= n

@SystemFw
Copy link
Collaborator

SystemFw commented Jan 2, 2018

This is the problem I'm having:

    final def take(n: Long): Segment[O,Either[(R,Long),Segment[O,R]]] =
    if (n <= 0) Segment.pure(Right(this))
    else new Segment[O,Either[(R,Long),Segment[O,R]]] {
      def stage0(depth: Depth, defer: Defer, emit: O => Unit, emits: Chunk[O] => Unit, done: Either[(R,Long),Segment[O,R]] => Unit) = Eval.later {
        var rem = n
        var staged: Step[O,R] = null
        staged = self.stage(depth.increment, defer,
          o => { if (rem > 0) { rem -= 1; emit(o) } else done(Right(staged.remainder.cons(o))) },
          os => {
            if (os.size == 0) done(Left(i-need-r-here -> rem))
            else if (os.size <= rem) { rem -= os.size; emits(os) }
            else {
                    var i = 0
                    while (rem > 0) { rem -= 1; emit(os(i)); i += 1; }
                    done(Right(staged.remainder.prepend(Segment.chunk(os.drop(i)))))
            }
          },
          r => done(Left(r -> rem))
        ).value
        staged.mapRemainder(_.take(rem))
      }

The first branch of the if in emits needs the result r to comply with the contract of the function, but that is only available on the done continuation.
Any ideas?

@mpilquist
Copy link
Member

@SystemFw I don't think that's right -- if an empty chunk is emitted, we're not necessarily at the end of the segment. It's totally fine to emit n empty chunks followed by a non-empty chunk.

@SystemFw
Copy link
Collaborator

SystemFw commented Jan 2, 2018

@mpilquist Ah, that makes sense.
I'm at a point where I need your help I think (I have tried different things, but can't quite get the exact semantics). It's true that an empty chunk doesn't necessarily means we're at the end, but the code is behaving like we can never be at the end of a segment if an empty chunk is emitted (at least I think that's what's happening), which is causing the hang.

I had tried using a condition of rem == 0 before, but that breaks in cases like Segment.from(0L).filter(_ < 10).take(15) (the hang happens when rem is 5)

@SystemFw
Copy link
Collaborator

SystemFw commented Jan 2, 2018

I think the problem might be fundamental, and can be described as:

  • take can only stop independently (calling done on its own) if rem <= 0. If rem doesn't get to zero , take can't know that there aren't more elements coming in the future.
  • In cases where take can't stop independently, it stops when the source segment calls done.

So, cases where the source segment never calls done, but where we don't have enough elements to bring rem to zero result in an infinite loop.

Segment.from(0L).filter(_ < 10).take(15) is one such case.

@pchlupacek
Copy link
Contributor Author

pchlupacek commented Jan 2, 2018

@SystemFw I think Segment.from(0L).filter(_ < 10).take(15) this is obviously never ending Segment and it cannot be finished ever. However
Segment.from(0L).filter(_ < 2).take(2).force.toList shall finish normally, hence you will emit exactly 2 elements and thats what you are taking right?

@pchlupacek
Copy link
Contributor Author

@mpilquist thanks for fix

@SystemFw
Copy link
Collaborator

SystemFw commented Jan 2, 2018

Thanks @mpilquist, I was getting stuck :)

mpilquist added a commit that referenced this issue Jan 2, 2018
@mpilquist mpilquist added this to the 0.10 milestone Jan 6, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants