Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove references to ClusterScheduler (SPARK-1140) #9

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler

/**
* A backend interface for scheduling systems that allows plugging in different ones under
* ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
* TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.scheduler
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode

/**
* Low-level task scheduler interface, currently implemented exclusively by the ClusterScheduler.
* Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl.
* This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks
* for a single SparkContext. These schedulers get sets of tasks submitted to them from the
* DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min

import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted,
SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.{Clock, SystemClock}

/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
* each task, retries tasks if they fail (up to a limited number of times), and
* handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
* to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
Expand All @@ -41,7 +42,7 @@ import org.apache.spark.util.{Clock, SystemClock}
* THREADING: This class is designed to only be called from code with a lock on the
* TaskScheduler (e.g. its event handlers). It should not be called from other threads.
*
* @param sched the ClusterScheduler associated with the TaskSetManager
* @param sched the TaskSchedulerImpl associated with the TaskSetManager
* @param taskSet the TaskSet to manage scheduling for
* @param maxTaskFailures if any particular task fails more than this number of times, the entire
* task set will be aborted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private[spark] class MesosSchedulerBackend(
getResource(offer.getResourcesList, "cpus").toInt)
}

// Call into the ClusterScheduler
// Call into the TaskSchedulerImpl
val taskLists = scheduler.resourceOffers(offerableWorkers)

// Build a list of Mesos tasks for each slave
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private case class KillTask(taskId: Long)
/**
* Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on
* LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend
* and the ClusterScheduler.
* and the TaskSchedulerImpl.
*/
private[spark] class LocalActor(
scheduler: TaskSchedulerImpl,
Expand Down Expand Up @@ -76,7 +76,7 @@ private[spark] class LocalActor(

/**
* LocalBackend is used when running a local version of Spark where the executor, backend, and
* master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks
* master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks
* on a single Executor (created by the LocalBackend) running locally.
*/
private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
// If this test hangs, it's probably because no resource offers were made after the task
// failed.
val scheduler: TaskSchedulerImpl = sc.taskScheduler match {
case clusterScheduler: TaskSchedulerImpl =>
clusterScheduler
case taskScheduler: TaskSchedulerImpl =>
taskScheduler
case _ =>
assert(false, "Expect local cluster to use ClusterScheduler")
assert(false, "Expect local cluster to use TaskSchedulerImpl")
throw new ClassCastException
}
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class FakeTaskSetManager(
initPriority: Int,
initStageId: Int,
initNumTasks: Int,
clusterScheduler: TaskSchedulerImpl,
taskScheduler: TaskSchedulerImpl,
taskSet: TaskSet)
extends TaskSetManager(clusterScheduler, taskSet, 0) {
extends TaskSetManager(taskScheduler, taskSet, 0) {

parent = null
weight = 1
Expand Down Expand Up @@ -105,7 +105,7 @@ class FakeTaskSetManager(
}
}

class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging {

def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = {
new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
Expand Down Expand Up @@ -133,8 +133,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
}

test("FIFO Scheduler Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new TaskSchedulerImpl(sc)
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
Expand All @@ -144,9 +144,9 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()

val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet)
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet)
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
Expand All @@ -160,8 +160,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
}

test("Fair Scheduler Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new TaskSchedulerImpl(sc)
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
Expand Down Expand Up @@ -189,15 +189,15 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.pool","2")

val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet)
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet)
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)

val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet)
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)

Expand All @@ -217,8 +217,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
}

test("Nested Pool Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new TaskSchedulerImpl(sc)
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new FakeTask(0)
tasks += task
Expand All @@ -240,23 +240,23 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)

val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet)
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet)
pool00.addSchedulable(taskSetManager000)
pool00.addSchedulable(taskSetManager001)

val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet)
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet)
pool01.addSchedulable(taskSetManager010)
pool01.addSchedulable(taskSetManager011)

val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet)
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet)
pool10.addSchedulable(taskSetManager100)
pool10.addSchedulable(taskSetManager101)

val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet)
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet)
pool11.addSchedulable(taskSetManager110)
pool11.addSchedulable(taskSetManager111)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.FakeClock

class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
taskScheduler.startedTasks += taskInfo.index
}
Expand All @@ -51,12 +51,12 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler
}

/**
* A mock ClusterScheduler implementation that just remembers information about tasks started and
* A mock TaskSchedulerImpl implementation that just remembers information about tasks started and
* feedback received from the TaskSetManagers. Note that it's important to initialize this with
* a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
* to work, and these are required for locality in TaskSetManager.
*/
class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
extends TaskSchedulerImpl(sc)
{
val startedTasks = new ArrayBuffer[Long]
Expand Down Expand Up @@ -87,7 +87,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)

Expand All @@ -113,7 +113,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("multiple offers with no preferences") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)

Expand Down Expand Up @@ -144,7 +144,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("basic delay scheduling") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = createTaskSet(4,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host2", "exec2")),
Expand Down Expand Up @@ -188,7 +188,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("delay scheduling with fallback") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc,
val sched = new FakeTaskScheduler(sc,
("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
val taskSet = createTaskSet(5,
Seq(TaskLocation("host1")),
Expand Down Expand Up @@ -228,7 +228,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("delay scheduling with failed hosts") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = createTaskSet(3,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
Expand Down Expand Up @@ -260,7 +260,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("task result lost") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
Expand All @@ -277,7 +277,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("repeated failures lead to task set abortion") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
Expand Down