Skip to content

Commit

Permalink
[SPARK-48716] Add jobGroupId to SparkListenerSQLExecutionStart
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add jobGroupId to SparkListenerSQLExecutionStart

### Why are the changes needed?
JobGroupId can be used to combine jobs within the same group. This is going to be useful in the listener so it makes the job grouping easy to do

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit Test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47092 from gjxdxh/gjxdxh/SPARK-48716.

Authored-by: Lingkai Kong <lingkai.kong@databricks.com>
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
  • Loading branch information
gjxdxh authored and JoshRosen committed Jul 9, 2024
1 parent d824e9e commit 4c99c4d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkEnv, SparkException, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkContext, SparkEnv, SparkException, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, SPARK_JOB_INTERRUPT_ON_CANCEL}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX}
Expand Down Expand Up @@ -128,7 +128,8 @@ object SQLExecution extends Logging {
sparkPlanInfo = SparkPlanInfo.EMPTY,
time = System.currentTimeMillis(),
modifiedConfigs = redactedConfigs,
jobTags = sc.getJobTags()
jobTags = sc.getJobTags(),
jobGroupId = Option(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
)
try {
body match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ class SQLAppStatusListener(

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
val SparkListenerSQLExecutionStart(executionId, rootExecutionId, description, details,
physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs, _) = event
physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs, _, _) = event

val planGraph = SparkPlanGraph(sparkPlanInfo)
val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ case class SparkListenerSQLExecutionStart(
sparkPlanInfo: SparkPlanInfo,
time: Long,
modifiedConfigs: Map[String, String] = Map.empty,
jobTags: Set[String] = Set.empty)
jobTags: Set[String] = Set.empty,
jobGroupId: Option[String] = None)
extends SparkListenerEvent

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,46 @@ class SQLExecutionSuite extends SparkFunSuite with SQLConfHelper {

spark.range(1).collect()

spark.sparkContext.listenerBus.waitUntilEmpty()
assert(jobTags.contains(jobTag))
assert(sqlJobTags.contains(jobTag))
} finally {
spark.sparkContext.removeJobTag(jobTag)
spark.stop()
}
}

test("jobGroupId property") {
val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
val JobGroupId = "test-JobGroupId"
try {
spark.sparkContext.setJobGroup(JobGroupId, "job Group id")

var jobGroupIdOpt: Option[String] = None
var sqlJobGroupIdOpt: Option[String] = None
spark.sparkContext.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobGroupIdOpt = Some(jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
}

override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: SparkListenerSQLExecutionStart =>
sqlJobGroupIdOpt = e.jobGroupId
}
}
})

spark.range(1).collect()

spark.sparkContext.listenerBus.waitUntilEmpty()
assert(jobGroupIdOpt.contains(JobGroupId))
assert(sqlJobGroupIdOpt.contains(JobGroupId))
} finally {
spark.sparkContext.clearJobGroup()
spark.stop()
}
}
}

object SQLExecutionSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case SparkListenerSQLExecutionStart(_, _, _, _, planDescription, _, _, _, _) =>
case SparkListenerSQLExecutionStart(_, _, _, _, planDescription, _, _, _, _, _) =>
assert(expected.forall(planDescription.contains))
checkDone = true
case _ => // ignore other events
Expand Down

0 comments on commit 4c99c4d

Please sign in to comment.