Skip to content

Commit

Permalink
Merge numa patch and oap commom dependency from branch-2.4.4-oap-0.8
Browse files Browse the repository at this point in the history
  • Loading branch information
yeyuqiang authored and jikunshang committed Aug 2, 2020
1 parent 3fdfce3 commit fdec769
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 20 deletions.
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
</properties>

<dependencies>
<dependency>
<groupId>com.intel</groupId>
<artifactId>oap-common</artifactId>
</dependency>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ object SparkEnv extends Logging {
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
None,
bindAddress,
advertiseAddress,
Option(port),
Expand All @@ -197,6 +198,7 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
numaNodeId: Option[String],
bindAddress: String,
hostname: String,
numCores: Int,
Expand All @@ -205,6 +207,7 @@ object SparkEnv extends Logging {
val env = create(
conf,
executorId,
numaNodeId,
bindAddress,
hostname,
None,
Expand All @@ -219,11 +222,12 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
numaNodeId: Option[String],
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
createExecutorEnv(conf, executorId, hostname,
createExecutorEnv(conf, executorId, numaNodeId, hostname,
hostname, numCores, ioEncryptionKey, isLocal)
}

Expand All @@ -233,6 +237,7 @@ object SparkEnv extends Logging {
private def create(
conf: SparkConf,
executorId: String,
numaNodeId: Option[String],
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
numaNodeId: Option[String],
bindAddress: String,
hostname: String,
cores: Int,
Expand Down Expand Up @@ -253,6 +254,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case class Arguments(
driverUrl: String,
executorId: String,
numaNodeId: Option[String],
bindAddress: String,
hostname: String,
cores: Int,
Expand All @@ -266,8 +268,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt, resourceProfile)
arguments.numaNodeId, arguments.bindAddress, arguments.hostname, arguments.cores,
arguments.userClassPath, env, arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
Expand Down Expand Up @@ -328,8 +330,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}

driverConf.set(EXECUTOR_ID, arguments.executorId)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.numaNodeId,
arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey,
isLocal = false)

env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
Expand All @@ -343,6 +346,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
def parseArguments(args: Array[String], classNameForEntry: String): Arguments = {
var driverUrl: String = null
var executorId: String = null
var numaNodeId: Option[String] = None
var bindAddress: String = null
var hostname: String = null
var cores: Int = 0
Expand Down Expand Up @@ -376,6 +380,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--numa-node-id") :: value :: tail =>
numaNodeId = Some(value.trim)
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
Expand Down Expand Up @@ -408,7 +415,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
bindAddress = hostname
}

Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
Arguments(driverUrl, executorId, numaNodeId, bindAddress, hostname, cores, appId, workerUrl,
userClassPath, resourcesFileOpt, resourceProfileId)
}

Expand All @@ -422,6 +429,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| --driver-url <driverUrl>
| --executor-id <executorId>
| --bind-address <bindAddress>
| --numa-node-id <numaNodeId>
| --hostname <hostname>
| --cores <cores>
| --resourcesFile <fileWithJSONResourceInformation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)

// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", None, "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
withTempDir { tmpDir =>
val testResourceArgs: JObject = ("" -> "")
Expand All @@ -76,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", None, "host1", "host1",
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
withTempDir { tmpDir =>
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
Expand Down Expand Up @@ -110,7 +110,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", None, "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)

withTempDir { tmpDir =>
Expand All @@ -137,8 +137,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1",
"host1", 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))

// not enough gpu's on the executor
withTempDir { tmpDir =>
Expand Down Expand Up @@ -190,8 +190,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1",
"host1", 4, Seq.empty[URL], env, None, resourceProfile)

// executor resources < required
withTempDir { tmpDir =>
Expand Down Expand Up @@ -221,8 +221,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)

// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1",
"host1", 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))

val parsedResources = backend.parseOrFindResources(None)

Expand Down Expand Up @@ -268,7 +268,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)

// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(gpuArgs))
Expand All @@ -293,7 +293,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
try {
val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
val env = createMockEnv(conf, serializer, Some(rpcEnv))
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", None,
"host1", "host1", 4, Seq.empty[URL], env, None,
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
assert(backend.taskResources.isEmpty)
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
</modules>

<properties>
<oap.common.version>0.8.0</oap.common.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
Expand Down Expand Up @@ -347,6 +348,11 @@
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.intel</groupId>
<artifactId>oap-common</artifactId>
<version>${oap.common.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ private[spark] class ApplicationMaster(
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
None, "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.JavaUtils
Expand All @@ -49,6 +50,7 @@ private[yarn] class ExecutorRunnable(
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
numaNodeId: Option[String],
hostname: String,
executorMemory: Int,
executorCores: Int,
Expand Down Expand Up @@ -200,9 +202,26 @@ private[yarn] class ExecutorRunnable(
Seq("--user-class-path", "file:" + absPath)
}.toSeq

val numaEnabled = sparkConf.get(SPARK_YARN_NUMA_ENABLED)

// Don't need numa binding for driver.
val numaCtlCommand = if (numaEnabled && executorId != "<executorId>" && numaNodeId.nonEmpty) {
val command = s"numactl --cpubind=${numaNodeId.get} --membind=${numaNodeId.get} "
command
} else {
""
}

val numaNodeOpts = if (executorId != "<executorId>" && numaNodeId.nonEmpty) {
val numanode = Seq("--numa-node-id", numaNodeId.get.toString)
numanode
} else {
Nil
}

YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
Seq(numaCtlCommand + Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
Expand All @@ -211,6 +230,7 @@ private[yarn] class ExecutorRunnable(
"--cores", executorCores.toString,
"--app-id", appId,
"--resourceProfileId", resourceProfileId.toString) ++
numaNodeOpts ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)

private[yarn] val numaManager = new NumaManager(sparkConf)

def getNumExecutorsRunning: Int = runningExecutors.size()

def getNumReleasedContainers: Int = releasedContainers.size()
Expand Down Expand Up @@ -534,9 +536,13 @@ private[yarn] class YarnAllocator(
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString

// Set the numa id that the executor should binding.
val numaNodeId = numaManager.assignNumaId(containerId, executorHostname)

assert(container.getResource.getMemory >= resource.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId")
s"for executor with ID $executorId with numa ID $numaNodeId")

def updateInternalState(): Unit = synchronized {
runningExecutors.add(executorId)
Expand All @@ -561,6 +567,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
numaNodeId,
executorHostname,
executorMemory,
executorCores,
Expand Down Expand Up @@ -689,6 +696,7 @@ private[yarn] class YarnAllocator(
}

allocatedContainerToHostMap.remove(containerId)
numaManager.releaseNuma(containerId, host)
}

containerIdToExecutorId.remove(containerId).foreach { eid =>
Expand Down Expand Up @@ -783,6 +791,51 @@ private[yarn] class YarnAllocator(

}

// scalastyle:off
// Manage how to bind numa with an exector. No matter numa binding turn on/off
// we should assign a numa id since Persistent Memory always be associate with a numa id
private[yarn] class NumaManager(sparkConf: SparkConf) extends Logging {
private final val totalNumaNode = sparkConf.get(SPARK_YARN_NUMA_NUM)
private val hostToNumaIds = new ConcurrentHashMap[String, Array[Int]]()
private val hostToContainers = new ConcurrentHashMap[String, mutable.HashMap[ContainerId, String]]()

def assignNumaId(
containerId: ContainerId,
executorHostName: String): Option[String] = {
if (totalNumaNode == 0) return None
if (hostToContainers.containsKey(executorHostName)) {
if (!hostToContainers.get(executorHostName).contains(containerId)) {
this.synchronized {
val numaIds = hostToNumaIds.get(executorHostName)
val v = numaIds.zipWithIndex.min._2
logDebug(s"bind $containerId with $v on host $executorHostName")
hostToContainers.get(executorHostName) += (containerId -> v.toString)
numaIds(v) += 1
Some(v.toString)
}
} else {
hostToContainers.get(executorHostName).get(containerId)
}
} else {
logDebug(s"bind $containerId with 0 on host $executorHostName")
hostToNumaIds.put(executorHostName, Array.fill[Int](totalNumaNode)(0))
hostToNumaIds.get(executorHostName)(0) += 1
hostToContainers.putIfAbsent(executorHostName, mutable.HashMap[ContainerId, String](containerId -> "0"))
Some("0")
}
}

def releaseNuma(containerId: ContainerId, executorHostName: String): Unit = {
if (hostToContainers.get(executorHostName) != null) {
val numaIdToRelease = hostToContainers.get(executorHostName).getOrElseUpdate(containerId, "null")
logDebug(s"release $containerId with $numaIdToRelease on host $executorHostName")
hostToNumaIds.get(executorHostName)(numaIdToRelease.toInt) -= 1
hostToContainers.get(executorHostName).remove(containerId)
}
}

}

private object YarnAllocator {
val MEM_REGEX = "[0-9.]+ [KMG]B"
val VMEM_EXCEEDED_EXIT_CODE = -103
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ package object config {

/* Launcher configuration. */

private[spark] val SPARK_YARN_NUMA_ENABLED = ConfigBuilder("spark.yarn.numa.enabled")
.doc("Whether enabling numa binding when executor start up. This is recommend to true " +
"when persistent memory is enabled.")
.booleanConf
.createWithDefault(false)

private[spark] val SPARK_YARN_NUMA_NUM = ConfigBuilder("spark.yarn.numa.num")
.doc("Specify numa node number in the host")
.intConf
.createWithDefault(0)

private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the " +
"launcher process.")
Expand Down

0 comments on commit fdec769

Please sign in to comment.