From 82074288ea2c6ac11788a07dbd5d5e129a9d5dbc Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Fri, 18 Jul 2014 14:09:10 -0700 Subject: [PATCH 1/4] Refactor mesos scheduler resourceOffers and add unit test --- .../cluster/mesos/MesosSchedulerBackend.scala | 90 ++++++++----------- .../mesos/MesosSchedulerBackendSuite.scala | 84 +++++++++++++++++ 2 files changed, 120 insertions(+), 54 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c5f3493477bc5..615312070d627 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -200,6 +200,11 @@ private[spark] class MesosSchedulerBackend( } } + def toWorkerOffer(offer: Offer) = new WorkerOffer( + offer.getSlaveId.getValue, + offer.getHostname, + getResource(offer.getResourcesList, "cpus").toInt) + override def disconnected(d: SchedulerDriver) {} override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} @@ -212,62 +217,39 @@ private[spark] class MesosSchedulerBackend( override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { val oldClassLoader = setClassLoader() try { - synchronized { - // Build a big list of the offerable workers, and remember their indices so that we can - // figure out which Offer to reply to for each worker - val offerableWorkers = new ArrayBuffer[WorkerOffer] - val offerableIndices = new HashMap[String, Int] - - def sufficientOffer(o: Offer) = { - val mem = getResource(o.getResourcesList, "mem") - val cpus = getResource(o.getResourcesList, "cpus") - val slaveId = o.getSlaveId.getValue - (mem >= MemoryUtils.calculateTotalMemory(sc) && - // need at least 1 for executor, 1 for task - cpus >= 2 * scheduler.CPUS_PER_TASK) || - (slaveIdsWithExecutors.contains(slaveId) && - cpus >= scheduler.CPUS_PER_TASK) - } - - for ((offer, index) <- offers.zipWithIndex if sufficientOffer(offer)) { - val slaveId = offer.getSlaveId.getValue - offerableIndices.put(slaveId, index) - val cpus = if (slaveIdsWithExecutors.contains(slaveId)) { - getResource(offer.getResourcesList, "cpus").toInt - } else { - // If the executor doesn't exist yet, subtract CPU for executor - getResource(offer.getResourcesList, "cpus").toInt - - scheduler.CPUS_PER_TASK - } - offerableWorkers += new WorkerOffer( - offer.getSlaveId.getValue, - offer.getHostname, - cpus) - } - - // Call into the TaskSchedulerImpl - val taskLists = scheduler.resourceOffers(offerableWorkers) - - // Build a list of Mesos tasks for each slave - val mesosTasks = offers.map(o => new JArrayList[MesosTaskInfo]()) - for ((taskList, index) <- taskLists.zipWithIndex) { - if (!taskList.isEmpty) { - for (taskDesc <- taskList) { - val slaveId = taskDesc.executorId - val offerNum = offerableIndices(slaveId) - slaveIdsWithExecutors += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) - } - } - } - - // Reply to the offers - val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? - for (i <- 0 until offers.size) { - d.launchTasks(Collections.singleton(offers(i).getId), mesosTasks(i), filters) + val (acceptedOffers, declinedOffers) = offers.partition(o => { + val mem = getResource(o.getResourcesList, "mem") + val slaveId = o.getSlaveId.getValue + mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId) + }) + + val offerableWorkers = acceptedOffers.map(toWorkerOffer) + + val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap + + val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] + + // Call into the TaskSchedulerImpl + scheduler.resourceOffers(offerableWorkers) + .filter(!_.isEmpty) + .foreach(_.foreach(taskDesc => { + val slaveId = taskDesc.executorId + slaveIdsWithExecutors += slaveId + taskIdToSlaveId(taskDesc.taskId) = slaveId + mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) + .add(createMesosTask(taskDesc, slaveId)) + })) + + // Reply to the offers + val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? + + mesosTasks.foreach { + case (slaveId, tasks) => { + d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } } + + declinedOffers.foreach(o => d.declineOffer(o.getId)) } finally { restoreClassLoader(oldClassLoader) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..28791243b3423 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.mesos + +import org.scalatest.FunSuite +import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext} +import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend +import org.apache.mesos.SchedulerDriver +import org.apache.mesos.Protos._ +import org.scalatest.mock.EasyMockSugar +import org.apache.mesos.Protos.Value.Scalar +import org.easymock.{Capture, EasyMock} +import java.nio.ByteBuffer +import java.util.Collections +import java.util + +class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { + test("mesos resource offer is launching tasks") { + def createOffer(id: Int, mem: Int, cpu: Int) = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.setId(OfferID.newBuilder().setValue(id.toString).build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue("s1")).setHostname("localhost").build() + } + + val driver = EasyMock.createMock(classOf[SchedulerDriver]) + val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + val offers = new java.util.ArrayList[Offer] + offers.add(createOffer(1, 101, 1)) + offers.add(createOffer(1, 99, 1)) + + val conf = new SparkConf + conf.set("spark.executor.memory", "100m") + conf.set("spark.home", "/path") + val sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val workerOffers = Seq(backend.toWorkerOffer(offers.get(0))) + val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffers))).andReturn(Seq(Seq(taskDesc))) + EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() + EasyMock.replay(taskScheduler) + val capture = new Capture[util.Collection[TaskInfo]] + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(offers.get(0).getId)), + EasyMock.capture(capture), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)) + EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1)) + EasyMock.replay(driver) + backend.resourceOffers(driver, offers) + assert(capture.getValue.size() == 1) + val taskInfo = capture.getValue.iterator().next() + assert(taskInfo.getName.equals("n1")) + val cpus = taskInfo.getResourcesList.get(0) + assert(cpus.getName.equals("cpus")) + assert(cpus.getScalar.getValue.equals(2.0)) + assert(taskInfo.getSlaveId.getValue.equals("s1")) + } +} From e6494dc15f285cfdad2fa0408a096744f1668ce1 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Fri, 18 Jul 2014 14:21:45 -0700 Subject: [PATCH 2/4] Refactor class loading --- .../cluster/mesos/MesosSchedulerBackend.scala | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 615312070d627..b9f5e7154295f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -205,6 +205,15 @@ private[spark] class MesosSchedulerBackend( offer.getHostname, getResource(offer.getResourcesList, "cpus").toInt) + private def inClassLoader()(fun: => Unit) = { + val oldClassLoader = setClassLoader() + try { + fun + } finally { + restoreClassLoader(oldClassLoader) + } + } + override def disconnected(d: SchedulerDriver) {} override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} @@ -215,8 +224,7 @@ private[spark] class MesosSchedulerBackend( * tasks are balanced across the cluster. */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - val oldClassLoader = setClassLoader() - try { + inClassLoader() { val (acceptedOffers, declinedOffers) = offers.partition(o => { val mem = getResource(o.getResourcesList, "mem") val slaveId = o.getSlaveId.getValue @@ -233,11 +241,11 @@ private[spark] class MesosSchedulerBackend( scheduler.resourceOffers(offerableWorkers) .filter(!_.isEmpty) .foreach(_.foreach(taskDesc => { - val slaveId = taskDesc.executorId - slaveIdsWithExecutors += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) - .add(createMesosTask(taskDesc, slaveId)) + val slaveId = taskDesc.executorId + slaveIdsWithExecutors += slaveId + taskIdToSlaveId(taskDesc.taskId) = slaveId + mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) + .add(createMesosTask(taskDesc, slaveId)) })) // Reply to the offers @@ -250,8 +258,6 @@ private[spark] class MesosSchedulerBackend( } declinedOffers.foreach(o => d.declineOffer(o.getId)) - } finally { - restoreClassLoader(oldClassLoader) } } @@ -290,8 +296,7 @@ private[spark] class MesosSchedulerBackend( } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val oldClassLoader = setClassLoader() - try { + inClassLoader() { val tid = status.getTaskId.getValue.toLong val state = TaskState.fromMesos(status.getState) synchronized { @@ -304,18 +309,13 @@ private[spark] class MesosSchedulerBackend( } } scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) - } finally { - restoreClassLoader(oldClassLoader) } } override def error(d: SchedulerDriver, message: String) { - val oldClassLoader = setClassLoader() - try { + inClassLoader() { logError("Mesos error: " + message) scheduler.error(message) - } finally { - restoreClassLoader(oldClassLoader) } } @@ -332,15 +332,12 @@ private[spark] class MesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { - val oldClassLoader = setClassLoader() - try { + inClassLoader() { logInfo("Mesos slave lost: " + slaveId.getValue) synchronized { slaveIdsWithExecutors -= slaveId.getValue } scheduler.executorLost(slaveId.getValue, reason) - } finally { - restoreClassLoader(oldClassLoader) } } From 9ccab0960d8186ae101ee46f61e89d919467b043 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Mon, 21 Jul 2014 00:33:36 -0700 Subject: [PATCH 3/4] Address review comments --- .../scheduler/mesos/MesosSchedulerBackendSuite.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index 28791243b3423..1f6b2278db703 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -29,6 +29,7 @@ import org.easymock.{Capture, EasyMock} import java.nio.ByteBuffer import java.util.Collections import java.util +import scala.collection.mutable class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { test("mesos resource offer is launching tasks") { @@ -52,10 +53,12 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea offers.add(createOffer(1, 101, 1)) offers.add(createOffer(1, 99, 1)) - val conf = new SparkConf - conf.set("spark.executor.memory", "100m") - conf.set("spark.home", "/path") - val sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) + val sc = EasyMock.createMock(classOf[SparkContext]) + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() + EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() + EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() + EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes() + EasyMock.replay(sc) val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") val workerOffers = Seq(backend.toWorkerOffer(offers.get(0))) val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) From 4ea5dec9a95c846e5eb3938210e4acbf389db4bf Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Mon, 10 Nov 2014 18:14:03 -0800 Subject: [PATCH 4/4] Rebase from master and address comments --- .../cluster/mesos/MesosSchedulerBackend.scala | 72 +++++++++---------- .../mesos/MesosSchedulerBackendSuite.scala | 19 +++-- 2 files changed, 49 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index b9f5e7154295f..d13795186c48e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -166,29 +166,16 @@ private[spark] class MesosSchedulerBackend( execArgs } - private def setClassLoader(): ClassLoader = { - val oldClassLoader = Thread.currentThread.getContextClassLoader - Thread.currentThread.setContextClassLoader(classLoader) - oldClassLoader - } - - private def restoreClassLoader(oldClassLoader: ClassLoader) { - Thread.currentThread.setContextClassLoader(oldClassLoader) - } - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - val oldClassLoader = setClassLoader() - try { + inClassLoader() { appId = frameworkId.getValue logInfo("Registered as framework ID " + appId) registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() } - } finally { - restoreClassLoader(oldClassLoader) } } @@ -200,17 +187,13 @@ private[spark] class MesosSchedulerBackend( } } - def toWorkerOffer(offer: Offer) = new WorkerOffer( - offer.getSlaveId.getValue, - offer.getHostname, - getResource(offer.getResourcesList, "cpus").toInt) - private def inClassLoader()(fun: => Unit) = { - val oldClassLoader = setClassLoader() + val oldClassLoader = Thread.currentThread.getContextClassLoader + Thread.currentThread.setContextClassLoader(classLoader) try { fun } finally { - restoreClassLoader(oldClassLoader) + Thread.currentThread.setContextClassLoader(oldClassLoader) } } @@ -225,13 +208,30 @@ private[spark] class MesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { inClassLoader() { - val (acceptedOffers, declinedOffers) = offers.partition(o => { + val (acceptedOffers, declinedOffers) = offers.partition { o => val mem = getResource(o.getResourcesList, "mem") + val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue - mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId) - }) + (mem >= MemoryUtils.calculateTotalMemory(sc) && + // need at least 1 for executor, 1 for task + cpus >= 2 * scheduler.CPUS_PER_TASK) || + (slaveIdsWithExecutors.contains(slaveId) && + cpus >= scheduler.CPUS_PER_TASK) + } - val offerableWorkers = acceptedOffers.map(toWorkerOffer) + val offerableWorkers = acceptedOffers.map { o => + val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { + getResource(o.getResourcesList, "cpus").toInt + } else { + // If the executor doesn't exist yet, subtract CPU for executor + getResource(o.getResourcesList, "cpus").toInt - + scheduler.CPUS_PER_TASK + } + new WorkerOffer( + o.getSlaveId.getValue, + o.getHostname, + cpus) + } val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap @@ -240,21 +240,21 @@ private[spark] class MesosSchedulerBackend( // Call into the TaskSchedulerImpl scheduler.resourceOffers(offerableWorkers) .filter(!_.isEmpty) - .foreach(_.foreach(taskDesc => { - val slaveId = taskDesc.executorId - slaveIdsWithExecutors += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) - .add(createMesosTask(taskDesc, slaveId)) - })) + .foreach { offer => + offer.foreach { taskDesc => + val slaveId = taskDesc.executorId + slaveIdsWithExecutors += slaveId + taskIdToSlaveId(taskDesc.taskId) = slaveId + mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) + .add(createMesosTask(taskDesc, slaveId)) + } + } // Reply to the offers val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? - mesosTasks.foreach { - case (slaveId, tasks) => { - d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) - } + mesosTasks.foreach { case (slaveId, tasks) => + d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } declinedOffers.foreach(o => d.declineOffer(o.getId)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index 1f6b2278db703..bef8d3a58ba63 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -18,9 +18,9 @@ package org.apache.spark.scheduler.mesos import org.scalatest.FunSuite -import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext} +import org.apache.spark.{scheduler, SparkConf, SparkContext, LocalSparkContext} import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend +import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend} import org.apache.mesos.SchedulerDriver import org.apache.mesos.Protos._ import org.scalatest.mock.EasyMockSugar @@ -49,18 +49,25 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea val driver = EasyMock.createMock(classOf[SchedulerDriver]) val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) - val offers = new java.util.ArrayList[Offer] - offers.add(createOffer(1, 101, 1)) - offers.add(createOffer(1, 99, 1)) val sc = EasyMock.createMock(classOf[SparkContext]) + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes() EasyMock.replay(sc) + val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + val minCpu = 4 + val offers = new java.util.ArrayList[Offer] + offers.add(createOffer(1, minMem, minCpu)) + offers.add(createOffer(1, minMem - 1, minCpu)) val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") - val workerOffers = Seq(backend.toWorkerOffer(offers.get(0))) + val workerOffers = Seq(offers.get(0)).map(o => new WorkerOffer( + o.getSlaveId.getValue, + o.getHostname, + 2 + )) val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffers))).andReturn(Seq(Seq(taskDesc))) EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()