Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group #17765

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ class StreamExecution(
*/
private def runBatches(): Unit = {
try {
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz is this okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this seems fine

Copy link
Member

@zsxwing zsxwing Apr 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note here it is setJobGroup(..., interruptOnCancel = false). Should we add a sql conf for interruptOnCancel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would you want to set this to true or false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had this set to false due to HDFS-1208, but setting it to true since the HDFS bug is 7 years old.

if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
Expand Down Expand Up @@ -308,6 +309,7 @@ class StreamExecution(
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update job description with updated currentBatchId after each batch.

} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Expand Down Expand Up @@ -418,6 +420,7 @@ class StreamExecution(
/* First assume that we are re-executing the latest known batch
* in the offset log */
currentBatchId = latestBatchId
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
availableOffsets = nextOffsets.toStreamProgress(sources)
/* Initialize committed offsets to a committed batch, which at this
* is the second latest batch id in the offset log. */
Expand Down Expand Up @@ -463,6 +466,7 @@ class StreamExecution(
}
}
currentBatchId = latestCommittedBatchId + 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
constructNextBatch()
Expand All @@ -478,6 +482,7 @@ class StreamExecution(
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
constructNextBatch()
}
}
Expand Down Expand Up @@ -684,8 +689,11 @@ class StreamExecution(
// intentionally
state.set(TERMINATED)
if (microBatchThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
microBatchThread.interrupt()
microBatchThread.join()
// microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
logInfo(s"Query $prettyIdString was stopped")
}
Expand Down Expand Up @@ -825,6 +833,11 @@ class StreamExecution(
}
}

private def getBatchDescriptionString: String = {
val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString
Option(name).map(_ + " ").getOrElse("") +
s"[batch = $batchDescription, id = $id, runId = $runId]"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description looks good to me. @marmbrus what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,34 @@ class StreamSuite extends StreamTest {
}
}
}

test("calling stop() on a query cancels related jobs") {
val input = MemoryStream[Int]
val query = input
.toDS()
.map { i =>
while (!org.apache.spark.TaskContext.get().isInterrupted()) {
// keep looping till interrupted by query.stop()
Thread.sleep(100)
}
i
}
.writeStream
.format("console")
.start()

input.addData(1)
// wait for jobs to start
eventually(timeout(streamingTimeout)) {
assert(sparkContext.statusTracker.getActiveJobIds().nonEmpty)
}

query.stop()
// make sure jobs are stopped
eventually(timeout(streamingTimeout)) {
assert(sparkContext.statusTracker.getActiveJobIds().isEmpty)
}
}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down