From 69fc26fd870361c345f6482737d6192949ab46b4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 25 Nov 2014 05:08:32 -0800 Subject: [PATCH] Set correct call site for streaming jobs so that it is displayed correctly on the Spark UI --- .../org/apache/spark/streaming/dstream/ForEachDStream.scala | 1 + .../org/apache/spark/streaming/StreamingContextSuite.scala | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 905bc723f69a9..1361c30395b57 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -38,6 +38,7 @@ class ForEachDStream[T: ClassTag] ( parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => { + ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 4b49c4d251645..9f352bdcb0893 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -336,16 +336,20 @@ package object testPackage extends Assertions { // Verify creation site of generated RDDs var rddGenerated = false - var rddCreationSiteCorrect = true + var rddCreationSiteCorrect = false + var foreachCallSiteCorrect = false inputStream.foreachRDD { rdd => rddCreationSiteCorrect = rdd.creationSite == creationSite + foreachCallSiteCorrect = + rdd.sparkContext.getCallSite().shortForm.contains("StreamingContextSuite") rddGenerated = true } ssc.start() eventually(timeout(10000 millis), interval(10 millis)) { assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct") + assert(rddGenerated && foreachCallSiteCorrect, "Call site in foreachRDD was not correct") } } finally { ssc.stop()