diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index eefc8c232b564..f1924a4573b21 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler /** * A backend interface for scheduling systems that allows plugging in different ones under - * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as + * TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as * machines become available and can launch tasks on them. */ private[spark] trait SchedulerBackend { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 1cdfed1d7005e..92616c997e20c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler import org.apache.spark.scheduler.SchedulingMode.SchedulingMode /** - * Low-level task scheduler interface, currently implemented exclusively by the ClusterScheduler. + * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl. * This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks * for a single SparkContext. These schedulers get sets of tasks submitted to them from the * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 1a4b7e599c01e..5ea4557bbf56a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -26,13 +26,14 @@ import scala.collection.mutable.HashSet import scala.math.max import scala.math.min -import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState} +import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, + SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.{Clock, SystemClock} /** - * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of + * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of * each task, retries tasks if they fail (up to a limited number of times), and * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, @@ -41,7 +42,7 @@ import org.apache.spark.util.{Clock, SystemClock} * THREADING: This class is designed to only be called from code with a lock on the * TaskScheduler (e.g. its event handlers). It should not be called from other threads. * - * @param sched the ClusterScheduler associated with the TaskSetManager + * @param sched the TaskSchedulerImpl associated with the TaskSetManager * @param taskSet the TaskSet to manage scheduling for * @param maxTaskFailures if any particular task fails more than this number of times, the entire * task set will be aborted diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c576beb0c0d38..bcf0ce19a54cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -203,7 +203,7 @@ private[spark] class MesosSchedulerBackend( getResource(offer.getResourcesList, "cpus").toInt) } - // Call into the ClusterScheduler + // Call into the TaskSchedulerImpl val taskLists = scheduler.resourceOffers(offerableWorkers) // Build a list of Mesos tasks for each slave diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 50f7e79e97dd8..16e2f5cf3076d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -35,7 +35,7 @@ private case class KillTask(taskId: Long) /** * Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on * LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend - * and the ClusterScheduler. + * and the TaskSchedulerImpl. */ private[spark] class LocalActor( scheduler: TaskSchedulerImpl, @@ -76,7 +76,7 @@ private[spark] class LocalActor( /** * LocalBackend is used when running a local version of Spark where the executor, backend, and - * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks + * master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks * on a single Executor (created by the LocalBackend) running locally. */ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index ac07f60e284bb..c4e7a4bb7d385 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -93,10 +93,10 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA // If this test hangs, it's probably because no resource offers were made after the task // failed. val scheduler: TaskSchedulerImpl = sc.taskScheduler match { - case clusterScheduler: TaskSchedulerImpl => - clusterScheduler + case taskScheduler: TaskSchedulerImpl => + taskScheduler case _ => - assert(false, "Expect local cluster to use ClusterScheduler") + assert(false, "Expect local cluster to use TaskSchedulerImpl") throw new ClassCastException } scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala similarity index 79% rename from core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala rename to core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 85e929925e3b5..f4e62c64daf12 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -29,9 +29,9 @@ class FakeTaskSetManager( initPriority: Int, initStageId: Int, initNumTasks: Int, - clusterScheduler: TaskSchedulerImpl, + taskScheduler: TaskSchedulerImpl, taskSet: TaskSet) - extends TaskSetManager(clusterScheduler, taskSet, 0) { + extends TaskSetManager(taskScheduler, taskSet, 0) { parent = null weight = 1 @@ -105,7 +105,7 @@ class FakeTaskSetManager( } } -class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging { +class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging { def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = { new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet) @@ -133,8 +133,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging } test("FIFO Scheduler Test") { - sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new TaskSchedulerImpl(sc) + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -144,9 +144,9 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) schedulableBuilder.buildPools() - val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet) - val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet) - val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet) + val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet) + val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet) + val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager0, null) schedulableBuilder.addTaskSetManager(taskSetManager1, null) schedulableBuilder.addTaskSetManager(taskSetManager2, null) @@ -160,8 +160,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging } test("Fair Scheduler Test") { - sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new TaskSchedulerImpl(sc) + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -189,15 +189,15 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val properties2 = new Properties() properties2.setProperty("spark.scheduler.pool","2") - val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet) - val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet) - val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet) + val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet) + val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet) + val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager10, properties1) schedulableBuilder.addTaskSetManager(taskSetManager11, properties1) schedulableBuilder.addTaskSetManager(taskSetManager12, properties1) - val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet) - val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet) + val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet) + val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager23, properties2) schedulableBuilder.addTaskSetManager(taskSetManager24, properties2) @@ -217,8 +217,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging } test("Nested Pool Test") { - sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new TaskSchedulerImpl(sc) + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -240,23 +240,23 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging pool1.addSchedulable(pool10) pool1.addSchedulable(pool11) - val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet) - val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet) + val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet) + val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet) pool00.addSchedulable(taskSetManager000) pool00.addSchedulable(taskSetManager001) - val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet) - val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet) + val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet) + val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet) pool01.addSchedulable(taskSetManager010) pool01.addSchedulable(taskSetManager011) - val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet) - val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet) + val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet) + val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet) pool10.addSchedulable(taskSetManager100) pool10.addSchedulable(taskSetManager101) - val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet) - val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet) + val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet) + val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet) pool11.addSchedulable(taskSetManager110) pool11.addSchedulable(taskSetManager111) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 34a7d8cefeea2..20f6e503872ac 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.FakeClock -class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) { +class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) { override def taskStarted(task: Task[_], taskInfo: TaskInfo) { taskScheduler.startedTasks += taskInfo.index } @@ -51,12 +51,12 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler } /** - * A mock ClusterScheduler implementation that just remembers information about tasks started and + * A mock TaskSchedulerImpl implementation that just remembers information about tasks started and * feedback received from the TaskSetManagers. Note that it's important to initialize this with * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost * to work, and these are required for locality in TaskSetManager. */ -class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */) +class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */) extends TaskSchedulerImpl(sc) { val startedTasks = new ArrayBuffer[Long] @@ -87,7 +87,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("TaskSet with no preferences") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) @@ -113,7 +113,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("multiple offers with no preferences") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(3) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) @@ -144,7 +144,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("basic delay scheduling") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = createTaskSet(4, Seq(TaskLocation("host1", "exec1")), Seq(TaskLocation("host2", "exec2")), @@ -188,7 +188,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("delay scheduling with fallback") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) val taskSet = createTaskSet(5, Seq(TaskLocation("host1")), @@ -228,7 +228,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("delay scheduling with failed hosts") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = createTaskSet(3, Seq(TaskLocation("host1")), Seq(TaskLocation("host2")), @@ -260,7 +260,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("task result lost") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) val clock = new FakeClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) @@ -277,7 +277,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("repeated failures lead to task set abortion") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) val clock = new FakeClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)