Skip to content

Commit

Permalink
[SPARK-50811] Support enabling JVM profiler on driver
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

SPARK-46094 added JVM profiling support for the executor, this PR extends it to support the driver.

### Why are the changes needed?

It's also valuable to profile the driver when hits some driver-side performance issues, for example, some catalyst rules may not be efficient and occupy too much CPU time.

### Does this PR introduce _any_ user-facing change?

Yes, new feature.

### How was this patch tested?

I tested the functionality works well on both YARN client and cluster modes.

YARN client mode
```
bin/spark-submit run-example \
  --master yarn \
  --deploy-mode client \
  --conf spark.plugins=org.apache.spark.profiler.ProfilerPlugin \
  --conf spark.profiler.driver.enabled=true \
  --conf spark.profiler.executor.enabled=true \
  --conf spark.profiler.executor.fraction=1 \
  --conf spark.profiler.dfsDir=hdfs:///spark-profiling \
  SparkPi 100000
```
```
$ hadoop fs -ls /spark-profiling/application_1736320707252_0035
Found 49 items
-rw-rw----   3 hadoop supergroup   16335722 2025-01-16 11:00 /spark-profiling/application_1736320707252_0035/profile-driver.jfr
-rw-rw----   3 hadoop supergroup    4861636 2025-01-16 11:00 /spark-profiling/application_1736320707252_0035/profile-exec-1.jfr
-rw-rw----   3 hadoop supergroup    3321751 2025-01-16 11:00 /spark-profiling/application_1736320707252_0035/profile-exec-10.jfr
-rw-rw----   3 hadoop supergroup    3084930 2025-01-16 11:00 /spark-profiling/application_1736320707252_0035/profile-exec-11.jfr
...
```

YARN cluster mode
```
bin/spark-submit run-example \
  --master yarn \
  --deploy-mode cluster \
  --conf spark.plugins=org.apache.spark.profiler.ProfilerPlugin \
  --conf spark.profiler.driver.enabled=true \
  --conf spark.profiler.executor.enabled=true \
  --conf spark.profiler.executor.fraction=1 \
  --conf spark.profiler.dfsDir=hdfs:///spark-profiling \
  SparkPi 100000
```

```
$ hadoop fs -ls /spark-profiling/application_1736320707252_0036_1
Found 49 items
-rw-rw----   3 hadoop supergroup   14579297 2025-01-16 11:04 /spark-profiling/application_1736320707252_0036_1/profile-driver.jfr
-rw-rw----   3 hadoop supergroup    4615154 2025-01-16 11:04 /spark-profiling/application_1736320707252_0036_1/profile-exec-1.jfr
-rw-rw----   3 hadoop supergroup    3043193 2025-01-16 11:04 /spark-profiling/application_1736320707252_0036_1/profile-exec-10.jfr
-rw-rw----   3 hadoop supergroup    2969970 2025-01-16 11:04 /spark-profiling/application_1736320707252_0036_1/profile-exec-11.jfr
...
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49483 from pan3793/SPARK-50811.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
pan3793 authored and dongjoon-hyun committed Jan 16, 2025
1 parent 0ca9633 commit 90801c2
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 29 deletions.
8 changes: 8 additions & 0 deletions connector/profiler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ Then enable the profiling in the configuration.

<table class="spark-config">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.profiler.driver.enabled</code></td>
<td><code>false</code></td>
<td>
If true, turn on profiling in driver.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.profiler.executor.enabled</code></td>
<td><code>false</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,48 @@ import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._
import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.EXECUTOR_ID


/**
* Spark plugin to do profiling
*/
class ProfilerPlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = null
override def driverPlugin(): DriverPlugin = new ProfilerDriverPlugin

override def executorPlugin(): ExecutorPlugin = new ProfilerExecutorPlugin
}

class ProfilerDriverPlugin extends DriverPlugin with Logging {

private var sparkConf: SparkConf = _
private var pluginCtx: PluginContext = _
private var profiler: SparkAsyncProfiler = _
private var driverProfilingEnabled: Boolean = _

override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
pluginCtx = ctx
sparkConf = ctx.conf()
driverProfilingEnabled = sparkConf.get(PROFILER_DRIVER_ENABLED)
if (driverProfilingEnabled) {
logInfo("Driver starting profiling")
profiler = new SparkAsyncProfiler(sparkConf, pluginCtx.executorID())
profiler.start()
}

Map.empty[String, String].asJava
}

override def shutdown(): Unit = {
logInfo("Driver profiler shutting down")
if (profiler != null) {
profiler.stop()
}
}
}

class ProfilerExecutorPlugin extends ExecutorPlugin with Logging {

private var sparkConf: SparkConf = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext.DRIVER_IDENTIFIER
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.PATH
import org.apache.spark.util.{ThreadUtils, Utils}


/**
* A class that wraps AsyncProfiler
*/
Expand All @@ -42,14 +42,15 @@ private[spark] class SparkAsyncProfiler(conf: SparkConf, executorId: String) ext
private val profilerLocalDir = conf.get(PROFILER_LOCAL_DIR)
private val writeInterval = conf.get(PROFILER_DFS_WRITE_INTERVAL)

private val appId = try {
conf.getAppId
} catch {
case _: NoSuchElementException => "local-" + System.currentTimeMillis
// app_id and app_attempt_id is unavailable during drvier plugin initialization
private def getAppId: Option[String] = conf.getOption("spark.app.id")
private def getAttemptId: Option[String] = conf.getOption("spark.app.attempt.id")

private val profileFile = if (executorId == DRIVER_IDENTIFIER) {
s"profile-$executorId.jfr"
} else {
s"profile-exec-$executorId.jfr"
}
private val appAttemptId = conf.getOption("spark.app.attempt.id")
private val baseName = Utils.nameForAppAndAttempt(appId, appAttemptId)
private val profileFile = s"profile-exec-$executorId.jfr"

private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/$profileFile"
private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/$profileFile"
Expand All @@ -59,7 +60,7 @@ private[spark] class SparkAsyncProfiler(conf: SparkConf, executorId: String) ext
private val PROFILER_FOLDER_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
private val PROFILER_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("660", 8).toShort)
private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB
private var outputStream: FSDataOutputStream = _
@volatile private var outputStream: FSDataOutputStream = _
private var inputStream: InputStream = _
private val dataBuffer = new Array[Byte](UPLOAD_SIZE)
private var threadpool: ScheduledExecutorService = _
Expand Down Expand Up @@ -108,24 +109,8 @@ private[spark] class SparkAsyncProfiler(conf: SparkConf, executorId: String) ext
}

private def startWriting(): Unit = {
profilerDfsDirOpt.foreach { profilerDfsDir =>
val profilerDirForApp = s"$profilerDfsDir/$baseName"
val profileOutputFile = s"$profilerDirForApp/$profileFile"

val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val fs = Utils.getHadoopFileSystem(profilerDfsDir, hadoopConf)

requireProfilerBaseDirAsDirectory(fs, profilerDfsDir)

val profilerDirForAppPath = new Path(profilerDirForApp)
if (!fs.exists(profilerDirForAppPath)) {
// SPARK-30860: use the class method to avoid the umask causing permission issues
FileSystem.mkdirs(fs, profilerDirForAppPath, PROFILER_FOLDER_PERMISSIONS)
}

outputStream = FileSystem.create(fs, new Path(profileOutputFile), PROFILER_FILE_PERMISSIONS)
profilerDfsDirOpt.foreach { _ =>
try {
logInfo(log"Copying profiling file to ${MDC(PATH, profileOutputFile)}")
inputStream = new BufferedInputStream(
new FileInputStream(s"$profilerLocalDir/$profileFile"))
threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread")
Expand Down Expand Up @@ -157,6 +142,31 @@ private[spark] class SparkAsyncProfiler(conf: SparkConf, executorId: String) ext
if (!writing) {
return
}

if (outputStream == null) {
while (getAppId.isEmpty) {
logDebug("Waiting for Spark application started")
Thread.sleep(1000L)
}
val baseName = Utils.nameForAppAndAttempt(getAppId.get, getAttemptId)
val profilerDirForApp = s"${profilerDfsDirOpt.get}/$baseName"
val profileOutputFile = s"$profilerDirForApp/$profileFile"

val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val fs = Utils.getHadoopFileSystem(profilerDfsDirOpt.get, hadoopConf)

requireProfilerBaseDirAsDirectory(fs, profilerDfsDirOpt.get)

val profilerDirForAppPath = new Path(profilerDirForApp)
if (!fs.exists(profilerDirForAppPath)) {
// SPARK-30860: use the class method to avoid the umask causing permission issues
FileSystem.mkdirs(fs, profilerDirForAppPath, PROFILER_FOLDER_PERMISSIONS)
}

logInfo(log"Copying profiling file to ${MDC(PATH, profileOutputFile)}")
outputStream = FileSystem.create(fs, new Path(profileOutputFile), PROFILER_FILE_PERMISSIONS)
}

try {
// stop (pause) the profiler, dump the results and then resume. This is not ideal as we miss
// the events while the file is being dumped, but that is the only way to make sure that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ import org.apache.spark.internal.config.ConfigBuilder

package object profiler {

private[profiler] val PROFILER_DRIVER_ENABLED =
ConfigBuilder("spark.profiler.driver.enabled")
.doc("Turn on profiling in driver.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[profiler] val PROFILER_EXECUTOR_ENABLED =
ConfigBuilder("spark.profiler.executor.enabled")
.doc("Turn on profiling in executors.")
Expand Down

0 comments on commit 90801c2

Please sign in to comment.