diff --git a/core/pom.xml b/core/pom.xml
index ad743a77128..faa616ab385 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -35,6 +35,10 @@
+
+ com.intel
+ oap-common
+
com.thoughtworks.paranamer
paranamer
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 8ba17398318..12b8cdf8269 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -179,6 +179,7 @@ object SparkEnv extends Logging {
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
+ None,
bindAddress,
advertiseAddress,
Option(port),
@@ -197,6 +198,7 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
+ numaNodeId: Option[String],
bindAddress: String,
hostname: String,
numCores: Int,
@@ -205,6 +207,7 @@ object SparkEnv extends Logging {
val env = create(
conf,
executorId,
+ numaNodeId,
bindAddress,
hostname,
None,
@@ -219,11 +222,12 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
+ numaNodeId: Option[String],
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
- createExecutorEnv(conf, executorId, hostname,
+ createExecutorEnv(conf, executorId, numaNodeId, hostname,
hostname, numCores, ioEncryptionKey, isLocal)
}
@@ -233,6 +237,7 @@ object SparkEnv extends Logging {
private def create(
conf: SparkConf,
executorId: String,
+ numaNodeId: Option[String],
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 25c5b9812fa..575b10c8cad 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -49,6 +49,7 @@ private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
+ numaNodeId: Option[String],
bindAddress: String,
hostname: String,
cores: Int,
@@ -253,6 +254,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case class Arguments(
driverUrl: String,
executorId: String,
+ numaNodeId: Option[String],
bindAddress: String,
hostname: String,
cores: Int,
@@ -266,8 +268,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
- arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
- arguments.resourcesFileOpt, resourceProfile)
+ arguments.numaNodeId, arguments.bindAddress, arguments.hostname, arguments.cores,
+ arguments.userClassPath, env, arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
@@ -328,8 +330,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
driverConf.set(EXECUTOR_ID, arguments.executorId)
- val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
- arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
+ val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.numaNodeId,
+ arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey,
+ isLocal = false)
env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
@@ -343,6 +346,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
def parseArguments(args: Array[String], classNameForEntry: String): Arguments = {
var driverUrl: String = null
var executorId: String = null
+ var numaNodeId: Option[String] = None
var bindAddress: String = null
var hostname: String = null
var cores: Int = 0
@@ -376,6 +380,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
+ case ("--numa-node-id") :: value :: tail =>
+ numaNodeId = Some(value.trim)
+ argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
@@ -408,7 +415,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
bindAddress = hostname
}
- Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
+ Arguments(driverUrl, executorId, numaNodeId, bindAddress, hostname, cores, appId, workerUrl,
userClassPath, resourcesFileOpt, resourceProfileId)
}
@@ -422,6 +429,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| --driver-url
| --executor-id
| --bind-address
+ | --numa-node-id
| --hostname
| --cores
| --resourcesFile
diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 3134a738b33..e0d5db3312b 100644
--- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -55,7 +55,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
- val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
+ val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", None, "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
withTempDir { tmpDir =>
val testResourceArgs: JObject = ("" -> "")
@@ -76,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
- val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
+ val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", None, "host1", "host1",
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
withTempDir { tmpDir =>
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
@@ -110,7 +110,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
- val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
+ val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", None, "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
withTempDir { tmpDir =>
@@ -137,8 +137,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
- val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
- 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
+ val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1",
+ "host1", 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
// not enough gpu's on the executor
withTempDir { tmpDir =>
@@ -190,8 +190,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
- val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
- 4, Seq.empty[URL], env, None, resourceProfile)
+ val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1",
+ "host1", 4, Seq.empty[URL], env, None, resourceProfile)
// executor resources < required
withTempDir { tmpDir =>
@@ -221,8 +221,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
- val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
- 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
+ val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1",
+ "host1", 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
val parsedResources = backend.parseOrFindResources(None)
@@ -268,7 +268,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
- val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
+ val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(gpuArgs))
@@ -293,7 +293,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
try {
val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
val env = createMockEnv(conf, serializer, Some(rpcEnv))
- backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
+ backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", None,
"host1", "host1", 4, Seq.empty[URL], env, None,
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
assert(backend.taskResources.isEmpty)
diff --git a/pom.xml b/pom.xml
index 2d8a52cfe72..7ce3f9c60dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,6 +110,7 @@
+ 0.8.0
UTF-8
UTF-8
1.8
@@ -347,6 +348,11 @@
+
+ com.intel
+ oap-common
+ ${oap.common.version}
+
org.apache.spark
spark-tags_${scala.binary.version}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 862acd8c03a..1248028a630 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -457,7 +457,7 @@ private[spark] class ApplicationMaster(
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "",
- "", executorMemory, executorCores, appId, securityMgr, localResources,
+ None, "", executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index d9262bbac65..0dc005da923 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.JavaUtils
@@ -49,6 +50,7 @@ private[yarn] class ExecutorRunnable(
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
+ numaNodeId: Option[String],
hostname: String,
executorMemory: Int,
executorCores: Int,
@@ -200,9 +202,26 @@ private[yarn] class ExecutorRunnable(
Seq("--user-class-path", "file:" + absPath)
}.toSeq
+ val numaEnabled = sparkConf.get(SPARK_YARN_NUMA_ENABLED)
+
+ // Don't need numa binding for driver.
+ val numaCtlCommand = if (numaEnabled && executorId != "" && numaNodeId.nonEmpty) {
+ val command = s"numactl --cpubind=${numaNodeId.get} --membind=${numaNodeId.get} "
+ command
+ } else {
+ ""
+ }
+
+ val numaNodeOpts = if (executorId != "" && numaNodeId.nonEmpty) {
+ val numanode = Seq("--numa-node-id", numaNodeId.get.toString)
+ numanode
+ } else {
+ Nil
+ }
+
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++
- Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
+ Seq(numaCtlCommand + Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
@@ -211,6 +230,7 @@ private[yarn] class ExecutorRunnable(
"--cores", executorCores.toString,
"--app-id", appId,
"--resourceProfileId", resourceProfileId.toString) ++
+ numaNodeOpts ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 09414cbbe50..1ecdc679149 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -176,6 +176,8 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)
+ private[yarn] val numaManager = new NumaManager(sparkConf)
+
def getNumExecutorsRunning: Int = runningExecutors.size()
def getNumReleasedContainers: Int = releasedContainers.size()
@@ -534,9 +536,13 @@ private[yarn] class YarnAllocator(
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
+
+ // Set the numa id that the executor should binding.
+ val numaNodeId = numaManager.assignNumaId(containerId, executorHostname)
+
assert(container.getResource.getMemory >= resource.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
- s"for executor with ID $executorId")
+ s"for executor with ID $executorId with numa ID $numaNodeId")
def updateInternalState(): Unit = synchronized {
runningExecutors.add(executorId)
@@ -561,6 +567,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
+ numaNodeId,
executorHostname,
executorMemory,
executorCores,
@@ -689,6 +696,7 @@ private[yarn] class YarnAllocator(
}
allocatedContainerToHostMap.remove(containerId)
+ numaManager.releaseNuma(containerId, host)
}
containerIdToExecutorId.remove(containerId).foreach { eid =>
@@ -783,6 +791,51 @@ private[yarn] class YarnAllocator(
}
+// scalastyle:off
+// Manage how to bind numa with an exector. No matter numa binding turn on/off
+// we should assign a numa id since Persistent Memory always be associate with a numa id
+private[yarn] class NumaManager(sparkConf: SparkConf) extends Logging {
+ private final val totalNumaNode = sparkConf.get(SPARK_YARN_NUMA_NUM)
+ private val hostToNumaIds = new ConcurrentHashMap[String, Array[Int]]()
+ private val hostToContainers = new ConcurrentHashMap[String, mutable.HashMap[ContainerId, String]]()
+
+ def assignNumaId(
+ containerId: ContainerId,
+ executorHostName: String): Option[String] = {
+ if (totalNumaNode == 0) return None
+ if (hostToContainers.containsKey(executorHostName)) {
+ if (!hostToContainers.get(executorHostName).contains(containerId)) {
+ this.synchronized {
+ val numaIds = hostToNumaIds.get(executorHostName)
+ val v = numaIds.zipWithIndex.min._2
+ logDebug(s"bind $containerId with $v on host $executorHostName")
+ hostToContainers.get(executorHostName) += (containerId -> v.toString)
+ numaIds(v) += 1
+ Some(v.toString)
+ }
+ } else {
+ hostToContainers.get(executorHostName).get(containerId)
+ }
+ } else {
+ logDebug(s"bind $containerId with 0 on host $executorHostName")
+ hostToNumaIds.put(executorHostName, Array.fill[Int](totalNumaNode)(0))
+ hostToNumaIds.get(executorHostName)(0) += 1
+ hostToContainers.putIfAbsent(executorHostName, mutable.HashMap[ContainerId, String](containerId -> "0"))
+ Some("0")
+ }
+ }
+
+ def releaseNuma(containerId: ContainerId, executorHostName: String): Unit = {
+ if (hostToContainers.get(executorHostName) != null) {
+ val numaIdToRelease = hostToContainers.get(executorHostName).getOrElseUpdate(containerId, "null")
+ logDebug(s"release $containerId with $numaIdToRelease on host $executorHostName")
+ hostToNumaIds.get(executorHostName)(numaIdToRelease.toInt) -= 1
+ hostToContainers.get(executorHostName).remove(containerId)
+ }
+ }
+
+}
+
private object YarnAllocator {
val MEM_REGEX = "[0-9.]+ [KMG]B"
val VMEM_EXCEEDED_EXIT_CODE = -103
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 3797491bb2e..0fbc6ee19b2 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -158,6 +158,17 @@ package object config {
/* Launcher configuration. */
+ private[spark] val SPARK_YARN_NUMA_ENABLED = ConfigBuilder("spark.yarn.numa.enabled")
+ .doc("Whether enabling numa binding when executor start up. This is recommend to true " +
+ "when persistent memory is enabled.")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val SPARK_YARN_NUMA_NUM = ConfigBuilder("spark.yarn.numa.num")
+ .doc("Specify numa node number in the host")
+ .intConf
+ .createWithDefault(0)
+
private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the " +
"launcher process.")