diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index b9f5e7154295f..d13795186c48e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -166,29 +166,16 @@ private[spark] class MesosSchedulerBackend( execArgs } - private def setClassLoader(): ClassLoader = { - val oldClassLoader = Thread.currentThread.getContextClassLoader - Thread.currentThread.setContextClassLoader(classLoader) - oldClassLoader - } - - private def restoreClassLoader(oldClassLoader: ClassLoader) { - Thread.currentThread.setContextClassLoader(oldClassLoader) - } - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - val oldClassLoader = setClassLoader() - try { + inClassLoader() { appId = frameworkId.getValue logInfo("Registered as framework ID " + appId) registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() } - } finally { - restoreClassLoader(oldClassLoader) } } @@ -200,17 +187,13 @@ private[spark] class MesosSchedulerBackend( } } - def toWorkerOffer(offer: Offer) = new WorkerOffer( - offer.getSlaveId.getValue, - offer.getHostname, - getResource(offer.getResourcesList, "cpus").toInt) - private def inClassLoader()(fun: => Unit) = { - val oldClassLoader = setClassLoader() + val oldClassLoader = Thread.currentThread.getContextClassLoader + Thread.currentThread.setContextClassLoader(classLoader) try { fun } finally { - restoreClassLoader(oldClassLoader) + Thread.currentThread.setContextClassLoader(oldClassLoader) } } @@ -225,13 +208,30 @@ private[spark] class MesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { inClassLoader() { - val (acceptedOffers, declinedOffers) = offers.partition(o => { + val (acceptedOffers, declinedOffers) = offers.partition { o => val mem = getResource(o.getResourcesList, "mem") + val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue - mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId) - }) + (mem >= MemoryUtils.calculateTotalMemory(sc) && + // need at least 1 for executor, 1 for task + cpus >= 2 * scheduler.CPUS_PER_TASK) || + (slaveIdsWithExecutors.contains(slaveId) && + cpus >= scheduler.CPUS_PER_TASK) + } - val offerableWorkers = acceptedOffers.map(toWorkerOffer) + val offerableWorkers = acceptedOffers.map { o => + val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { + getResource(o.getResourcesList, "cpus").toInt + } else { + // If the executor doesn't exist yet, subtract CPU for executor + getResource(o.getResourcesList, "cpus").toInt - + scheduler.CPUS_PER_TASK + } + new WorkerOffer( + o.getSlaveId.getValue, + o.getHostname, + cpus) + } val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap @@ -240,21 +240,21 @@ private[spark] class MesosSchedulerBackend( // Call into the TaskSchedulerImpl scheduler.resourceOffers(offerableWorkers) .filter(!_.isEmpty) - .foreach(_.foreach(taskDesc => { - val slaveId = taskDesc.executorId - slaveIdsWithExecutors += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) - .add(createMesosTask(taskDesc, slaveId)) - })) + .foreach { offer => + offer.foreach { taskDesc => + val slaveId = taskDesc.executorId + slaveIdsWithExecutors += slaveId + taskIdToSlaveId(taskDesc.taskId) = slaveId + mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) + .add(createMesosTask(taskDesc, slaveId)) + } + } // Reply to the offers val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? - mesosTasks.foreach { - case (slaveId, tasks) => { - d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) - } + mesosTasks.foreach { case (slaveId, tasks) => + d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } declinedOffers.foreach(o => d.declineOffer(o.getId)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index 1f6b2278db703..bef8d3a58ba63 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -18,9 +18,9 @@ package org.apache.spark.scheduler.mesos import org.scalatest.FunSuite -import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext} +import org.apache.spark.{scheduler, SparkConf, SparkContext, LocalSparkContext} import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend +import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend} import org.apache.mesos.SchedulerDriver import org.apache.mesos.Protos._ import org.scalatest.mock.EasyMockSugar @@ -49,18 +49,25 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea val driver = EasyMock.createMock(classOf[SchedulerDriver]) val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) - val offers = new java.util.ArrayList[Offer] - offers.add(createOffer(1, 101, 1)) - offers.add(createOffer(1, 99, 1)) val sc = EasyMock.createMock(classOf[SparkContext]) + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes() EasyMock.replay(sc) + val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + val minCpu = 4 + val offers = new java.util.ArrayList[Offer] + offers.add(createOffer(1, minMem, minCpu)) + offers.add(createOffer(1, minMem - 1, minCpu)) val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") - val workerOffers = Seq(backend.toWorkerOffer(offers.get(0))) + val workerOffers = Seq(offers.get(0)).map(o => new WorkerOffer( + o.getSlaveId.getValue, + o.getHostname, + 2 + )) val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffers))).andReturn(Seq(Seq(taskDesc))) EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()