From 3e85b87d9033e6d9a2634f7598abc3acee77486f Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Sun, 19 Jan 2014 10:16:25 -0800 Subject: [PATCH 1/2] SPARK-1033. Ask for cores in Yarn container requests --- docs/running-on-yarn.md | 2 +- .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 3bd62646bab06..fb8a043df33d0 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -133,7 +133,7 @@ See [Building Spark with Maven](building-with-maven.html) for instructions on ho # Important Notes -- We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed. +- Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be guaranteed. - The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. - The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN. - The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files. diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 738ff986d85a5..f53c13013f9cb 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -102,7 +102,8 @@ private[yarn] class YarnAllocationHandler( def getNumWorkersFailed: Int = numWorkersFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + (container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + && container.getResource.getVirtualCores >= workerCores) } def releaseContainer(container: Container) { @@ -532,15 +533,15 @@ private[yarn] class YarnAllocationHandler( priority: Int ): ArrayBuffer[ContainerRequest] = { - val memoryResource = Records.newRecord(classOf[Resource]) - memoryResource.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val resource = Resource.newInstance(memoryRequest, workerCores) val prioritySetting = Records.newRecord(classOf[Priority]) prioritySetting.setPriority(priority) val requests = new ArrayBuffer[ContainerRequest]() for (i <- 0 until numWorkers) { - requests += new ContainerRequest(memoryResource, hosts, racks, prioritySetting) + requests += new ContainerRequest(resource, hosts, racks, prioritySetting) } requests } From adf42611f16daec091af10f2c9b39bd154dc67b3 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 21 Jan 2014 00:38:02 -0800 Subject: [PATCH 2/2] Incorporate Tom's comments - update doc and code to reflect that core requests may not always be honored --- docs/running-on-yarn.md | 2 +- .../org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index fb8a043df33d0..5dadd54492dca 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -133,7 +133,7 @@ See [Building Spark with Maven](building-with-maven.html) for instructions on ho # Important Notes -- Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be guaranteed. +- Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be passed to YARN. Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured. - The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. - The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN. - The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files. diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index f53c13013f9cb..1ac61124cb028 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -102,8 +102,7 @@ private[yarn] class YarnAllocationHandler( def getNumWorkersFailed: Int = numWorkersFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - (container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) - && container.getResource.getVirtualCores >= workerCores) + container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) } def releaseContainer(container: Container) {