Skip to content

Commit

Permalink
Set scopes for foreachRDD properly
Browse files Browse the repository at this point in the history
Previously, RDDs created inside the body of foreachRDD are not
scoped properly, such that low level Spark operations surface
to the UI. This is now fixed such that these RDDs are wrapped
in the `foreachRDD` scope.
  • Loading branch information
Andrew Or committed May 16, 2015
1 parent 1881802 commit 53b9936
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ abstract class DStream[T: ClassTag] (
*/
private def makeScope(time: Time): Option[RDDOperationScope] = {
baseScope.map { bsJson =>
val formattedBatchTime =
UIUtils.formatBatchTime(time.milliseconds, ssc.graph.batchDuration.milliseconds)
val formattedBatchTime = UIUtils.formatBatchTime(
time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val bs = RDDOperationScope.fromJson(bsJson)
val baseName = bs.name // e.g. countByWindow, "kafka stream [0]"
val scopeName =
Expand Down Expand Up @@ -334,14 +334,20 @@ abstract class DStream[T: ClassTag] (
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
*/
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = createRDDWith(time) {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
val rddOption = doCompute(time)
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}

rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
Expand All @@ -363,10 +369,10 @@ abstract class DStream[T: ClassTag] (
}

/**
* Helper method to generate an RDD for the specified time.
* This sets and resets the relevant local variables before and after the call to compute.
* Wrap a body of code such that the call site and operation scope
* information are passed to the RDDs created in this body properly.
*/
private def doCompute(time: Time): Option[RDD[T]] = {
protected def createRDDWith[U](time: Time)(body: => U): U = {
val scopeKey = SparkContext.RDD_SCOPE_KEY
val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
// Pass this DStream's operation scope and creation site information to RDDs through
Expand All @@ -388,13 +394,7 @@ abstract class DStream[T: ClassTag] (
ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
}

// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
body
} finally {
// Restore any state that was modified before returning
ssc.sparkContext.setCallSite(prevCallSite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ForEachDStream[T: ClassTag] (
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
val jobFunc = () => createRDDWith(time) {
ssc.sparkContext.setCallSite(creationSite)
foreachFunc(rdd, time)
}
Expand Down

0 comments on commit 53b9936

Please sign in to comment.