Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group #17765

Closed
wants to merge 8 commits into from
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
val xml = XML.loadString(s"""<span class="description-input">$desc</span>""")

// Verify that this has only anchors and span (we are wrapping in span)
val allowedNodeLabels = Set("a", "span")
val allowedNodeLabels = Set("a", "span", "br")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas Is this change okay? Need it to add line breaks in the job description cells.

val illegalNodes = xml \\ "_" filterNot { case node: Node =>
allowedNodeLabels.contains(node.label)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ class StreamExecution(
*/
private def runBatches(): Unit = {
try {
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
interruptOnCancel = true)
if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
Expand Down Expand Up @@ -289,6 +291,7 @@ class StreamExecution(
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionToRunBatches)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
Copy link
Contributor Author

@kunalkhamar kunalkhamar Apr 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update job description with correct currentBatchId after initializing starting offsets.

logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
Expand All @@ -308,6 +311,7 @@ class StreamExecution(
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update job description with updated currentBatchId after each batch.

} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Expand Down Expand Up @@ -684,8 +688,11 @@ class StreamExecution(
// intentionally
state.set(TERMINATED)
if (microBatchThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
microBatchThread.interrupt()
microBatchThread.join()
// microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
logInfo(s"Query $prettyIdString was stopped")
}
Expand Down Expand Up @@ -825,6 +832,11 @@ class StreamExecution(
}
}

private def getBatchDescriptionString: String = {
val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString
Option(name).map(_ + " ").getOrElse("") +
s"[batch = $batchDescription,<br/>id = $id,<br/>runId = $runId]"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus @zsxwing @tdas Is this format good for description?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would get rid of the [] if you are going to use newlines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, updated.

}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.util.control.ControlThrowable

import org.apache.commons.io.FileUtils

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.ExplainCommand
Expand Down Expand Up @@ -500,6 +502,69 @@ class StreamSuite extends StreamTest {
}
}
}

test("calling stop() on a query cancels related jobs") {
val input = MemoryStream[Int]
val query = input
.toDS()
.map { i =>
while (!org.apache.spark.TaskContext.get().isInterrupted()) {
// keep looping till interrupted by query.stop()
Thread.sleep(100)
}
i
}
.writeStream
.format("console")
.start()

input.addData(1)
// wait for jobs to start
eventually(timeout(streamingTimeout)) {
assert(sparkContext.statusTracker.getActiveJobIds().nonEmpty)
}

query.stop()
// make sure jobs are stopped
eventually(timeout(streamingTimeout)) {
assert(sparkContext.statusTracker.getActiveJobIds().isEmpty)
}
}

test("batch id is updated correctly in the job description") {
val queryName = "memStream"
@volatile var jobDescription: String = null
def assertContainsNameAndBatch(batch: Integer): Unit = {
// wait for listener event to be processed
spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
assert(jobDescription.contains(queryName) && jobDescription.contains(s"batch = $batch"))
}

spark.sparkContext.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobDescription = jobStart.properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)
}
})

val input = MemoryStream[Int]
val query = input
.toDS()
.map(_ + 1)
.writeStream
.format("memory")
.queryName(queryName)
.start()

input.addData(1)
query.processAllAvailable()
assertContainsNameAndBatch(batch = 0)
input.addData(2, 3)
query.processAllAvailable()
assertContainsNameAndBatch(batch = 1)
input.addData(4)
query.processAllAvailable()
assertContainsNameAndBatch(batch = 2)
}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down