Skip to content

Commit

Permalink
modidy numa patch for Spark3.0 and delelte oap-common dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
HongW2019 authored and jikunshang committed Aug 2, 2020
1 parent fdec769 commit 4888064
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 86 deletions.
4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@
</properties>

<dependencies>
<dependency>
<groupId>com.intel</groupId>
<artifactId>oap-common</artifactId>
</dependency>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
Expand Down
7 changes: 1 addition & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ object SparkEnv extends Logging {
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
None,
bindAddress,
advertiseAddress,
Option(port),
Expand All @@ -198,7 +197,6 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
numaNodeId: Option[String],
bindAddress: String,
hostname: String,
numCores: Int,
Expand All @@ -207,7 +205,6 @@ object SparkEnv extends Logging {
val env = create(
conf,
executorId,
numaNodeId,
bindAddress,
hostname,
None,
Expand All @@ -222,12 +219,11 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
numaNodeId: Option[String],
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
createExecutorEnv(conf, executorId, numaNodeId, hostname,
createExecutorEnv(conf, executorId, hostname,
hostname, numCores, ioEncryptionKey, isLocal)
}

Expand All @@ -237,7 +233,6 @@ object SparkEnv extends Logging {
private def create(
conf: SparkConf,
executorId: String,
numaNodeId: Option[String],
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}

driverConf.set(EXECUTOR_ID, arguments.executorId)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.numaNodeId,
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey,
isLocal = false)
SparkEnv.get.conf.set("spark.executor.numa.id", s"${arguments.numaNodeId.getOrElse(-1)}")

env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
Expand Down
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
</modules>

<properties>
<oap.common.version>0.8.0</oap.common.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
Expand Down Expand Up @@ -348,11 +347,6 @@
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.intel</groupId>
<artifactId>oap-common</artifactId>
<version>${oap.common.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,21 +204,18 @@ private[yarn] class ExecutorRunnable(

val numaEnabled = sparkConf.get(SPARK_YARN_NUMA_ENABLED)

logInfo(s"[NUMACHECK] numaEnabled $numaEnabled executorId $executorId")
// Don't need numa binding for driver.
val numaCtlCommand = if (numaEnabled && executorId != "<executorId>" && numaNodeId.nonEmpty) {
val (numaCtlCommand, numaNodeOpts) = if (numaEnabled && executorId != "<executorId>"
&& numaNodeId.nonEmpty) {
logInfo(s"numaNodeId ${numaNodeId.get}")
val command = s"numactl --cpubind=${numaNodeId.get} --membind=${numaNodeId.get} "
command
(command, Seq("--numa-node-id", numaNodeId.get.toString))
} else {
""
}

val numaNodeOpts = if (executorId != "<executorId>" && numaNodeId.nonEmpty) {
val numanode = Seq("--numa-node-id", numaNodeId.get.toString)
numanode
} else {
Nil
("", Nil)
}

logInfo(s"[NUMACHECK] numactl command $numaCtlCommand")
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++
Seq(numaCtlCommand + Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
Expand All @@ -236,6 +233,7 @@ private[yarn] class ExecutorRunnable(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

logInfo(s"[NUMACHECK] container command $commands")
// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)

private[yarn] val numaManager = new NumaManager(sparkConf)

def getNumExecutorsRunning: Int = runningExecutors.size()

def getNumReleasedContainers: Int = releasedContainers.size()
Expand All @@ -186,6 +184,14 @@ private[yarn] class YarnAllocator(

def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted

// The total number of numa node
private[yarn] val totalNumaNumber = sparkConf.get(SPARK_YARN_NUMA_NUMBER)
// Mapping from host to executor counter
private[yarn] case class NumaInfo(cotainer2numa: mutable.HashMap[String, Int],
numaUsed: Array[Int])

private[yarn] val hostToNumaInfo = new mutable.HashMap[String, NumaInfo]()

/**
* A sequence of pending container requests that have not yet been fulfilled.
*/
Expand Down Expand Up @@ -534,12 +540,22 @@ private[yarn] class YarnAllocator(
for (container <- containersToUse) {
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
// Setting the numa id that the executor should binding.
// new numaid binding method
val numaInfo = hostToNumaInfo.getOrElseUpdate(executorHostname,
NumaInfo(new mutable.HashMap[String, Int], new Array[Int](totalNumaNumber)))
val minUsed = numaInfo.numaUsed.min
val newNumaNodeId = numaInfo.numaUsed.indexOf(minUsed)
numaInfo.cotainer2numa.put(container.getId.toString, newNumaNodeId)
numaInfo.numaUsed(newNumaNodeId) += 1

val numaNodeId = newNumaNodeId.toString
logInfo(s"numaNodeId: $numaNodeId on host $executorHostname," +
"container: " + container.getId.toString +
", minUsed: " + minUsed)

val containerId = container.getId
val executorId = executorIdCounter.toString

// Set the numa id that the executor should binding.
val numaNodeId = numaManager.assignNumaId(containerId, executorHostname)

assert(container.getResource.getMemory >= resource.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId with numa ID $numaNodeId")
Expand Down Expand Up @@ -567,7 +583,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
numaNodeId,
Some(numaNodeId),
executorHostname,
executorMemory,
executorCores,
Expand Down Expand Up @@ -626,6 +642,17 @@ private[yarn] class YarnAllocator(
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit.
val exitStatus = completedContainer.getExitStatus

var numaNodeId = -1
val hostName = hostOpt.getOrElse("nohost")
val numaInfoOp = hostToNumaInfo.get(hostName)
numaInfoOp match {
case Some(numaInfo) =>
numaNodeId = numaInfo.cotainer2numa.get(containerId.toString).getOrElse(-1)
if(-1 != numaNodeId) numaInfo.numaUsed(numaNodeId) -= 1
case _ => numaNodeId = -1
}

val (exitCausedByApp, containerExitReason) = exitStatus match {
case ContainerExitStatus.SUCCESS =>
(false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
Expand Down Expand Up @@ -696,7 +723,6 @@ private[yarn] class YarnAllocator(
}

allocatedContainerToHostMap.remove(containerId)
numaManager.releaseNuma(containerId, host)
}

containerIdToExecutorId.remove(containerId).foreach { eid =>
Expand Down Expand Up @@ -791,51 +817,6 @@ private[yarn] class YarnAllocator(

}

// scalastyle:off
// Manage how to bind numa with an exector. No matter numa binding turn on/off
// we should assign a numa id since Persistent Memory always be associate with a numa id
private[yarn] class NumaManager(sparkConf: SparkConf) extends Logging {
private final val totalNumaNode = sparkConf.get(SPARK_YARN_NUMA_NUM)
private val hostToNumaIds = new ConcurrentHashMap[String, Array[Int]]()
private val hostToContainers = new ConcurrentHashMap[String, mutable.HashMap[ContainerId, String]]()

def assignNumaId(
containerId: ContainerId,
executorHostName: String): Option[String] = {
if (totalNumaNode == 0) return None
if (hostToContainers.containsKey(executorHostName)) {
if (!hostToContainers.get(executorHostName).contains(containerId)) {
this.synchronized {
val numaIds = hostToNumaIds.get(executorHostName)
val v = numaIds.zipWithIndex.min._2
logDebug(s"bind $containerId with $v on host $executorHostName")
hostToContainers.get(executorHostName) += (containerId -> v.toString)
numaIds(v) += 1
Some(v.toString)
}
} else {
hostToContainers.get(executorHostName).get(containerId)
}
} else {
logDebug(s"bind $containerId with 0 on host $executorHostName")
hostToNumaIds.put(executorHostName, Array.fill[Int](totalNumaNode)(0))
hostToNumaIds.get(executorHostName)(0) += 1
hostToContainers.putIfAbsent(executorHostName, mutable.HashMap[ContainerId, String](containerId -> "0"))
Some("0")
}
}

def releaseNuma(containerId: ContainerId, executorHostName: String): Unit = {
if (hostToContainers.get(executorHostName) != null) {
val numaIdToRelease = hostToContainers.get(executorHostName).getOrElseUpdate(containerId, "null")
logDebug(s"release $containerId with $numaIdToRelease on host $executorHostName")
hostToNumaIds.get(executorHostName)(numaIdToRelease.toInt) -= 1
hostToContainers.get(executorHostName).remove(containerId)
}
}

}

private object YarnAllocator {
val MEM_REGEX = "[0-9.]+ [KMG]B"
val VMEM_EXCEEDED_EXIT_CODE = -103
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val SPARK_YARN_NUMA_NUM = ConfigBuilder("spark.yarn.numa.num")
.doc("Specify numa node number in the host")
private[spark] val SPARK_YARN_NUMA_NUMBER = ConfigBuilder("spark.yarn.numa.number")
.doc("Total number of numanodes in physical server")
.intConf
.createWithDefault(0)
.createWithDefault(2)

private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
numaNodeId: Option[String],
bindAddress: String,
hostname: String,
cores: Int,
Expand All @@ -46,6 +47,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
rpcEnv,
driverUrl,
executorId,
numaNodeId,
bindAddress,
hostname,
cores,
Expand Down Expand Up @@ -73,8 +75,8 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt, resourceProfile)
arguments.numaNodeId, arguments.bindAddress, arguments.hostname, arguments.cores,
arguments.userClassPath, env, arguments.resourcesFileOpt, resourceProfile)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
this.getClass.getCanonicalName.stripSuffix("$"))
Expand Down
2 changes: 1 addition & 1 deletion scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ This file is divided into 3 sections:
</check>

<check customId="argcount" level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters><parameter name="maxParameters"><![CDATA[10]]></parameter></parameters>
<parameters><parameter name="maxParameters"><![CDATA[12]]></parameter></parameters>
</check>

<check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
Expand Down

0 comments on commit 4888064

Please sign in to comment.