Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inconsistent filter+foreach behavior with LazyList #11689

Closed
aschofie opened this issue Aug 16, 2019 · 1 comment
Closed

inconsistent filter+foreach behavior with LazyList #11689

aschofie opened this issue Aug 16, 2019 · 1 comment

Comments

@aschofie
Copy link

I have adapted the code below from a Kafka stream processor I am developing, where I discovered the aberrant behavior.

import scala.util.Random
import scala.collection.immutable._
import scala.collection.parallel.CollectionConverters._

def fromBatchGenerator[T](generator: () => Seq[T]): LazyList[T] = {
  val generation: LazyList[T] =
    LazyList.empty[T].lazyAppendedAll(generator().to(LazyList))
  generation.lazyAppendedAll(fromBatchGenerator(generator))
}

def scan[T](mapFn: Int => T): LazyList[T] = {
  def batchGenerator(): Seq[T] = {
    val records = Range.inclusive(0, Random.nextInt(10)).map(_ => Random.nextInt())
    val mappedBatch = records.par.map(mapFn)
    mappedBatch.toList
  }
  fromBatchGenerator(batchGenerator)
}

Say you want to print out continuously. When a reference is maintained in the REPL it works:

val positives = scan(identity).filter(_ > 0)
positives.foreach(println)

Whereas the following just hangs (batchGenerator() is never called):

scan(identity).filter(_ > 0).foreach(println)

Requiring a reference to be maintained to the filtered LazyList means that I cannot write stream processors with LazyList, because they will eventually run out of memory. But I think the problem runs deeper than that, because I actually can get something similar to work without maintaining any REPL references:

import scala.util.Random
import scala.collection.immutable._
import scala.collection.parallel.CollectionConverters._

def fromBatchGenerator[T](generator: () => Seq[T]): LazyList[T] = {
  val generation: LazyList[T] =
    LazyList.empty[T].lazyAppendedAll(generator().to(LazyList))
  generation.lazyAppendedAll(fromBatchGenerator(generator))
}

def batchGenerator(): Seq[Int] = {
  val records = Range.inclusive(0, Random.nextInt(10)).map(_ => Random.nextInt())
  val mappedBatch = records.par.map(identity)
  mappedBatch.toList
}

fromBatchGenerator(batchGenerator).filter(_ > 0).foreach(println)

Bug?

@som-snytt
Copy link

There are issues about thread deadlock in object initialization in REPL. The workaround is -Yrepl-class-based.

#8119
scala/scala-dev#195

$ scala -Yrepl-class-based -cp ./scala-parallel-collections_2.13-0.2.0.jar 
Welcome to Scala 2.13.0 (OpenJDK 64-Bit Server VM, Java 12.0.1).
Type in expressions for evaluation. Or try :help.

scala> :load t11689.scala
args: Array[String] = Array()
Loading t11689.scala...
import scala.util.Random
import scala.collection.immutable._
import scala.collection.parallel.CollectionConverters._
defined object Test

scala> import Test._
import Test._

scala> scan(identity).filter(_ > 0).foreach(println)
121206514
1374510841
1996822155

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants