From 53b993658eff0edf27117bd5ae43d3bc0588aee1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 16 May 2015 16:09:08 -0700 Subject: [PATCH] Set scopes for foreachRDD properly Previously, RDDs created inside the body of foreachRDD are not scoped properly, such that low level Spark operations surface to the UI. This is now fixed such that these RDDs are wrapped in the `foreachRDD` scope. --- .../spark/streaming/dstream/DStream.scala | 28 +++++++++---------- .../streaming/dstream/ForEachDStream.scala | 2 +- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index be6a0d6ec125d..922671be8732d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -134,8 +134,8 @@ abstract class DStream[T: ClassTag] ( */ private def makeScope(time: Time): Option[RDDOperationScope] = { baseScope.map { bsJson => - val formattedBatchTime = - UIUtils.formatBatchTime(time.milliseconds, ssc.graph.batchDuration.milliseconds) + val formattedBatchTime = UIUtils.formatBatchTime( + time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val bs = RDDOperationScope.fromJson(bsJson) val baseName = bs.name // e.g. countByWindow, "kafka stream [0]" val scopeName = @@ -334,14 +334,20 @@ abstract class DStream[T: ClassTag] ( * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ - private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { + private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = createRDDWith(time) { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { - val rddOption = doCompute(time) + // Disable checks for existing output directories in jobs launched by the streaming + // scheduler, since we may need to write output to an existing directory during checkpoint + // recovery; see SPARK-4835 for more details. We need to have this call here because + // compute() might cause Spark jobs to be launched. + val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + compute(time) + } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing @@ -363,10 +369,10 @@ abstract class DStream[T: ClassTag] ( } /** - * Helper method to generate an RDD for the specified time. - * This sets and resets the relevant local variables before and after the call to compute. + * Wrap a body of code such that the call site and operation scope + * information are passed to the RDDs created in this body properly. */ - private def doCompute(time: Time): Option[RDD[T]] = { + protected def createRDDWith[U](time: Time)(body: => U): U = { val scopeKey = SparkContext.RDD_SCOPE_KEY val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY // Pass this DStream's operation scope and creation site information to RDDs through @@ -388,13 +394,7 @@ abstract class DStream[T: ClassTag] ( ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true") } - // Disable checks for existing output directories in jobs launched by the streaming - // scheduler, since we may need to write output to an existing directory during checkpoint - // recovery; see SPARK-4835 for more details. We need to have this call here because - // compute() might cause Spark jobs to be launched. - PairRDDFunctions.disableOutputSpecValidation.withValue(true) { - compute(time) - } + body } finally { // Restore any state that was modified before returning ssc.sparkContext.setCallSite(prevCallSite) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 685a32e1d280d..a64a77dd661ae 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -37,7 +37,7 @@ class ForEachDStream[T: ClassTag] ( override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => - val jobFunc = () => { + val jobFunc = () => createRDDWith(time) { ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) }