Skip to content

Commit

Permalink
Set correct call site for streaming jobs so that it is displayed corr…
Browse files Browse the repository at this point in the history
…ectly on the Spark UI
  • Loading branch information
tdas committed Nov 25, 2014
1 parent 0fe54cf commit 69fc26f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ForEachDStream[T: ClassTag] (
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
ssc.sparkContext.setCallSite(creationSite)
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,16 +336,20 @@ package object testPackage extends Assertions {

// Verify creation site of generated RDDs
var rddGenerated = false
var rddCreationSiteCorrect = true
var rddCreationSiteCorrect = false
var foreachCallSiteCorrect = false

inputStream.foreachRDD { rdd =>
rddCreationSiteCorrect = rdd.creationSite == creationSite
foreachCallSiteCorrect =
rdd.sparkContext.getCallSite().shortForm.contains("StreamingContextSuite")
rddGenerated = true
}
ssc.start()

eventually(timeout(10000 millis), interval(10 millis)) {
assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct")
assert(rddGenerated && foreachCallSiteCorrect, "Call site in foreachRDD was not correct")
}
} finally {
ssc.stop()
Expand Down

0 comments on commit 69fc26f

Please sign in to comment.