Skip to content

Commit

Permalink
Adding unit test for this situation
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Nov 25, 2014
1 parent c4f0697 commit 58c35b5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ private[spark] class MesosSchedulerBackend(

val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]


val slavesIdsOfAcceptedOffers = HashSet[String]()

// Call into the TaskSchedulerImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ import org.apache.mesos.SchedulerDriver
import org.apache.mesos.Protos._
import org.scalatest.mock.EasyMockSugar
import org.apache.mesos.Protos.Value.Scalar
import org.easymock.{IAnswer, Capture, EasyMock}
import org.easymock.{Capture, EasyMock}
import java.nio.ByteBuffer
import java.util.Collections
import java.util
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {

Expand Down Expand Up @@ -63,22 +62,22 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val minCpu = 4

val offers = new java.util.ArrayList[Offer]
offers.add(createOffer(1, minMem, minCpu))
offers.add(createOffer(2, minMem - 1, minCpu))
offers.add(createOffer(3, minMem, minCpu))
val mesosOffers = new java.util.ArrayList[Offer]
mesosOffers.add(createOffer(1, minMem, minCpu))
mesosOffers.add(createOffer(2, minMem - 1, minCpu))
mesosOffers.add(createOffer(3, minMem, minCpu))

val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")

val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
expectedWorkerOffers.append(new WorkerOffer(
offers.get(0).getSlaveId.getValue,
offers.get(0).getHostname,
mesosOffers.get(0).getSlaveId.getValue,
mesosOffers.get(0).getHostname,
2
))
expectedWorkerOffers.append(new WorkerOffer(
offers.get(2).getSlaveId.getValue,
offers.get(2).getHostname,
mesosOffers.get(2).getSlaveId.getValue,
mesosOffers.get(2).getHostname,
2
))
val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
Expand All @@ -89,24 +88,38 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val capture = new Capture[util.Collection[TaskInfo]]
EasyMock.expect(
driver.launchTasks(
EasyMock.eq(Collections.singleton(offers.get(0).getId)),
EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)),
EasyMock.capture(capture),
EasyMock.anyObject(classOf[Filters])
)
).andReturn(Status.valueOf(1)).once
EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.expect(driver.declineOffer(offers.get(2).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.expect(driver.declineOffer(mesosOffers.get(1).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.expect(driver.declineOffer(mesosOffers.get(2).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.replay(driver)
backend.resourceOffers(driver, offers)

backend.resourceOffers(driver, mesosOffers)

EasyMock.verify(driver)
assert(capture.getValue.size() == 1)

val taskInfo = capture.getValue.iterator().next()
assert(taskInfo.getName.equals("n1"))

val cpus = taskInfo.getResourcesList.get(0)
assert(cpus.getName.equals("cpus"))
assert(cpus.getScalar.getValue.equals(2.0))
assert(taskInfo.getSlaveId.getValue.equals("s1"))

// Unwanted resources offered on an existing node. Make sure they are declined
val mesosOffers2 = new java.util.ArrayList[Offer]
mesosOffers2.add(createOffer(1, minMem, minCpu))
EasyMock.reset(taskScheduler)
EasyMock.reset(driver)
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq())))
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler)
EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.replay(driver)

backend.resourceOffers(driver, mesosOffers2)
EasyMock.verify(driver)
}
}

0 comments on commit 58c35b5

Please sign in to comment.