Skip to content

Commit

Permalink
[SPARK-26586][SS] Fix race condition that causes streams to run with …
Browse files Browse the repository at this point in the history
…unexpected confs

## What changes were proposed in this pull request?

Fix race condition where streams can have unexpected conf values.

New streaming queries should run with isolated SparkSessions so that they aren't affected by conf updates after they are started. In StreamExecution, the parent SparkSession is cloned and used to run each batch, but this cloning happens in a separate thread and may happen after DataStreamWriter.start() returns. If a stream is started and a conf key is set immediately after, the stream is likely to have the new value.

## How was this patch tested?

New unit test that fails prior to the production change and passes with it.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#23513 from mukulmurthy/26586.

Authored-by: Mukul Murthy <mukul.murthy@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
(cherry picked from commit ae382c9)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
  • Loading branch information
mukulmurthy authored and kai-chi committed Aug 1, 2019
1 parent 9b4d3cd commit ad02392
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ abstract class StreamExecution(
lazy val streamMetrics = new MetricsReporter(
this, s"spark.streaming.${Option(name).getOrElse(id)}")

/** Isolated spark session to run the batches with. */
private val sparkSessionForStream = sparkSession.cloneSession()

/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a
Expand Down Expand Up @@ -265,8 +268,6 @@ abstract class StreamExecution(
// force initialization of the logical plan so that the sources can be created
logicalPlan

// Isolated spark session to run the batches with.
val sparkSessionForStream = sparkSession.cloneSession()
// Adaptive execution can change num shuffle partitions, disallow
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming.test

import java.io.File
import java.util.ConcurrentModificationException
import java.util.Locale
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -651,4 +652,27 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {

LastOptions.clear()
}

test("SPARK-26586: Streams should have isolated confs") {
import testImplicits._
val input = MemoryStream[Int]
input.addData(1 to 10)
spark.conf.set("testKey1", 0)
val queries = (1 to 10).map { i =>
spark.conf.set("testKey1", i)
input.toDF().writeStream
.foreachBatch { (df: Dataset[Row], id: Long) =>
val v = df.sparkSession.conf.get("testKey1").toInt
if (i != v) {
throw new ConcurrentModificationException(s"Stream $i has the wrong conf value $v")
}
}
.start()
}
try {
queries.foreach(_.processAllAvailable())
} finally {
queries.foreach(_.stop())
}
}
}

0 comments on commit ad02392

Please sign in to comment.