Skip to content

Commit

Permalink
add failing test
Browse files Browse the repository at this point in the history
  • Loading branch information
mukulmurthy committed Jan 10, 2019
1 parent 73c7b12 commit 10f0994
Showing 1 changed file with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.compat.Platform.ConcurrentModificationException
import scala.concurrent.duration._

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -651,4 +652,24 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {

LastOptions.clear()
}

test("SPARK-26586: Streams should have isolated confs") {
import testImplicits._
val input = MemoryStream[Int]
input.addData(1 to 10)
spark.conf.set("testKey1", 0)
val queries = (1 to 10).map { i =>
spark.conf.set("testKey1", i)
input.toDF().writeStream
.foreachBatch { (df: Dataset[Row], id: Long) =>
val v = df.sparkSession.conf.get("testKey1").toInt
if (i != v) {
throw new ConcurrentModificationException(s"Stream $i has the wrong conf value $v")
}
}
.start()
}
queries.foreach(_.processAllAvailable())
queries.foreach(_.stop())
}
}

0 comments on commit 10f0994

Please sign in to comment.