Skip to content

Commit

Permalink
Rebase from master and address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tnachen committed Nov 11, 2014
1 parent 9ccab09 commit 4ea5dec
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,29 +166,16 @@ private[spark] class MesosSchedulerBackend(
execArgs
}

private def setClassLoader(): ClassLoader = {
val oldClassLoader = Thread.currentThread.getContextClassLoader
Thread.currentThread.setContextClassLoader(classLoader)
oldClassLoader
}

private def restoreClassLoader(oldClassLoader: ClassLoader) {
Thread.currentThread.setContextClassLoader(oldClassLoader)
}

override def offerRescinded(d: SchedulerDriver, o: OfferID) {}

override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
val oldClassLoader = setClassLoader()
try {
inClassLoader() {
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
}
} finally {
restoreClassLoader(oldClassLoader)
}
}

Expand All @@ -200,17 +187,13 @@ private[spark] class MesosSchedulerBackend(
}
}

def toWorkerOffer(offer: Offer) = new WorkerOffer(
offer.getSlaveId.getValue,
offer.getHostname,
getResource(offer.getResourcesList, "cpus").toInt)

private def inClassLoader()(fun: => Unit) = {
val oldClassLoader = setClassLoader()
val oldClassLoader = Thread.currentThread.getContextClassLoader
Thread.currentThread.setContextClassLoader(classLoader)
try {
fun
} finally {
restoreClassLoader(oldClassLoader)
Thread.currentThread.setContextClassLoader(oldClassLoader)
}
}

Expand All @@ -225,13 +208,30 @@ private[spark] class MesosSchedulerBackend(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
val (acceptedOffers, declinedOffers) = offers.partition(o => {
val (acceptedOffers, declinedOffers) = offers.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
})
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
cpus >= 2 * scheduler.CPUS_PER_TASK) ||
(slaveIdsWithExecutors.contains(slaveId) &&
cpus >= scheduler.CPUS_PER_TASK)
}

val offerableWorkers = acceptedOffers.map(toWorkerOffer)
val offerableWorkers = acceptedOffers.map { o =>
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
// If the executor doesn't exist yet, subtract CPU for executor
getResource(o.getResourcesList, "cpus").toInt -
scheduler.CPUS_PER_TASK
}
new WorkerOffer(
o.getSlaveId.getValue,
o.getHostname,
cpus)
}

val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap

Expand All @@ -240,21 +240,21 @@ private[spark] class MesosSchedulerBackend(
// Call into the TaskSchedulerImpl
scheduler.resourceOffers(offerableWorkers)
.filter(!_.isEmpty)
.foreach(_.foreach(taskDesc => {
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
}))
.foreach { offer =>
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
}
}

// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?

mesosTasks.foreach {
case (slaveId, tasks) => {
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
mesosTasks.foreach { case (slaveId, tasks) =>
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}

declinedOffers.foreach(o => d.declineOffer(o.getId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.scheduler.mesos

import org.scalatest.FunSuite
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
import org.apache.spark.{scheduler, SparkConf, SparkContext, LocalSparkContext}
import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend
import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend}
import org.apache.mesos.SchedulerDriver
import org.apache.mesos.Protos._
import org.scalatest.mock.EasyMockSugar
Expand Down Expand Up @@ -49,18 +49,25 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea

val driver = EasyMock.createMock(classOf[SchedulerDriver])
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
val offers = new java.util.ArrayList[Offer]
offers.add(createOffer(1, 101, 1))
offers.add(createOffer(1, 99, 1))

val sc = EasyMock.createMock(classOf[SparkContext])

EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
EasyMock.replay(sc)
val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val minCpu = 4
val offers = new java.util.ArrayList[Offer]
offers.add(createOffer(1, minMem, minCpu))
offers.add(createOffer(1, minMem - 1, minCpu))
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
val workerOffers = Seq(backend.toWorkerOffer(offers.get(0)))
val workerOffers = Seq(offers.get(0)).map(o => new WorkerOffer(
o.getSlaveId.getValue,
o.getHostname,
2
))
val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffers))).andReturn(Seq(Seq(taskDesc)))
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
Expand Down

0 comments on commit 4ea5dec

Please sign in to comment.