Skip to content

Commit

Permalink
clone the spark session in StreamExecution constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
mukulmurthy committed Jan 10, 2019
1 parent 10f0994 commit b35eef0
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ abstract class StreamExecution(
lazy val streamMetrics = new MetricsReporter(
this, s"spark.streaming.${Option(name).getOrElse(id)}")

/** Isolated spark session to run the batches with. */
private val sparkSessionForStream = sparkSession.cloneSession()

/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a
Expand Down Expand Up @@ -270,8 +273,6 @@ abstract class StreamExecution(
// force initialization of the logical plan so that the sources can be created
logicalPlan

// Isolated spark session to run the batches with.
val sparkSessionForStream = sparkSession.cloneSession()
// Adaptive execution can change num shuffle partitions, disallow
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
Expand Down

0 comments on commit b35eef0

Please sign in to comment.