Skip to content

Commit

Permalink
Add a unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Oct 16, 2015
1 parent 4706ec0 commit 2d7c030
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ private[spark] class TaskSetManager(
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
return Some((index, TaskLocality.NO_PREF, false))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLocality}
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ReceiverInputDStream
Expand Down Expand Up @@ -80,6 +82,28 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}
}

test("SPARK-11063: TaskSetManager should use Receiver RDD's preferredLocations") {
// Use ManualClock to prevent from starting batches so that we can make sure the only task is
// for starting the Receiver
val _conf = conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
withStreamingContext(new StreamingContext(_conf, Milliseconds(100))) { ssc =>
@volatile var receiverTaskLocality: TaskLocality = null
ssc.sparkContext.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
receiverTaskLocality = taskStart.taskInfo.taskLocality
}
})
val input = ssc.receiverStream(new TestReceiver)
val output = new TestOutputStream(input)
output.register()
ssc.start()
eventually(timeout(10 seconds), interval(10 millis)) {
// If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL
assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
}
}
}
}

/** An input DStream with for testing rate controlling */
Expand Down

0 comments on commit 2d7c030

Please sign in to comment.