Skip to content

Commit

Permalink
Address UTs for SQL event filter (builder)
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Nov 13, 2019
1 parent 2944416 commit 916a458
Show file tree
Hide file tree
Showing 4 changed files with 390 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ private[spark] class BasicEventFilterBuilder extends SparkListener with EventFil
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val stages = _liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int])
_liveJobToStages -= jobEnd.jobId
stages.foreach { stage =>
_stageToTasks -= stage
_stageToRDDs -= stage
}
_stageToTasks --= stages
_stageToRDDs --= stages
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.ui._
import org.apache.spark.sql.streaming.StreamingQueryListener

// FIXME: UTs
class SQLEventFilterBuilder extends SparkListener with EventFilterBuilder {
val liveExecutionToJobs = new mutable.HashMap[Long, mutable.Set[Int]]
val jobToStages = new mutable.HashMap[Int, Seq[Int]]
val stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilterBuilder {
private val _liveExecutionToJobs = new mutable.HashMap[Long, mutable.Set[Int]]
private val _jobToStages = new mutable.HashMap[Int, Seq[Int]]
private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
private val _stageToRDDs = new mutable.HashMap[Int, Seq[Int]]
private val stages = new mutable.HashSet[Int]

def liveExecutionToJobs: Map[Long, Set[Int]] = _liveExecutionToJobs.mapValues(_.toSet).toMap
def jobToStages: Map[Int, Seq[Int]] = _jobToStages.toMap
def stageToTasks: Map[Int, Set[Long]] = _stageToTasks.mapValues(_.toSet).toMap
def stageToRDDs: Map[Int, Seq[Int]] = _stageToRDDs.toMap

override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
Expand All @@ -42,17 +48,26 @@ class SQLEventFilterBuilder extends SparkListener with EventFilterBuilder {
val executionId = executionIdString.toLong
val jobId = jobStart.jobId

val jobsForExecution = liveExecutionToJobs.getOrElseUpdate(executionId,
val jobsForExecution = _liveExecutionToJobs.getOrElseUpdate(executionId,
mutable.HashSet[Int]())
jobsForExecution += jobId

jobToStages += jobStart.jobId -> jobStart.stageIds
jobStart.stageIds.foreach { stageId => stageToTasks += stageId -> mutable.HashSet[Long]() }
_jobToStages += jobStart.jobId -> jobStart.stageIds
stages ++= jobStart.stageIds
jobStart.stageIds.foreach { stageId => _stageToTasks += stageId -> mutable.HashSet[Long]() }
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageId = stageSubmitted.stageInfo.stageId
if (stages.contains(stageId)) {
val rddInfos = stageSubmitted.stageInfo.rddInfos
_stageToRDDs += stageId -> rddInfos.map(_.id)
}
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
if (stageToTasks.contains(taskStart.stageId)) {
val curTasks = stageToTasks(taskStart.stageId)
if (_stageToTasks.contains(taskStart.stageId)) {
val curTasks = _stageToTasks(taskStart.stageId)
curTasks += taskStart.taskInfo.taskId
}
}
Expand All @@ -64,76 +79,95 @@ class SQLEventFilterBuilder extends SparkListener with EventFilterBuilder {
}

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
liveExecutionToJobs += event.executionId -> mutable.HashSet[Int]()
_liveExecutionToJobs += event.executionId -> mutable.HashSet[Int]()
}

private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
val jobs = liveExecutionToJobs.getOrElse(event.executionId, mutable.HashSet[Int]())
liveExecutionToJobs -= event.executionId
val jobs = _liveExecutionToJobs.getOrElse(event.executionId, mutable.HashSet[Int]())
_liveExecutionToJobs -= event.executionId

val stages = jobToStages.filter(kv => jobs.contains(kv._1)).values.flatten
jobToStages --= jobs
stageToTasks --= stages
val stagesToDrop = _jobToStages.filter(kv => jobs.contains(kv._1)).values.flatten
_jobToStages --= jobs
stages --= stagesToDrop
_stageToTasks --= stagesToDrop
_stageToRDDs --= stagesToDrop
}

override def createFilter(): EventFilter = {
new SQLLiveEntitiesEventFilter(this)
SQLLiveEntitiesEventFilter(this)
}
}

// FIXME: UTs
class SQLLiveEntitiesEventFilter(trackListener: SQLEventFilterBuilder)
extends EventFilter with Logging {
private[spark] class SQLLiveEntitiesEventFilter(
liveExecutionToJobs: Map[Long, Set[Int]],
jobToStages: Map[Int, Seq[Int]],
stageToTasks: Map[Int, Set[Long]],
stageToRDDs: Map[Int, Seq[Int]]) extends EventFilter with Logging {

private val liveTasks: Set[Long] = trackListener.stageToTasks.values match {
private val liveTasks: Set[Long] = stageToTasks.values match {
case xs if xs.isEmpty => Set.empty[Long]
case xs => xs.reduce(_ ++ _).toSet
}

private val liveRDDs: Set[Int] = stageToRDDs.values match {
case xs if xs.isEmpty => Set.empty[Int]
case xs => xs.reduce(_ ++ _).toSet
}

if (log.isDebugEnabled) {
logDebug(s"live executions : ${trackListener.liveExecutionToJobs.keySet}")
logDebug(s"jobs in live executions : ${trackListener.liveExecutionToJobs.values.flatten}")
logDebug(s"jobs : ${trackListener.jobToStages.keySet}")
logDebug(s"stages in jobs : ${trackListener.jobToStages.values.flatten}")
logDebug(s"stages : ${trackListener.stageToTasks.keySet}")
logDebug(s"tasks in stages : ${trackListener.stageToTasks.values.flatten}")
logDebug(s"live executions : ${liveExecutionToJobs.keySet}")
logDebug(s"jobs in live executions : ${liveExecutionToJobs.values.flatten}")
logDebug(s"jobs : ${jobToStages.keySet}")
logDebug(s"stages in jobs : ${jobToStages.values.flatten}")
logDebug(s"stages : ${stageToTasks.keySet}")
logDebug(s"tasks in stages : ${stageToTasks.values.flatten}")
logDebug(s"RDDs in stages : ${stageToRDDs.values.flatten}")
}

override def filterStageCompleted(event: SparkListenerStageCompleted): Option[Boolean] = {
Some(trackListener.stageToTasks.contains(event.stageInfo.stageId))
trueOrNone(stageToTasks.contains(event.stageInfo.stageId))
}

override def filterStageSubmitted(event: SparkListenerStageSubmitted): Option[Boolean] = {
Some(trackListener.stageToTasks.contains(event.stageInfo.stageId))
trueOrNone(stageToTasks.contains(event.stageInfo.stageId))
}

override def filterTaskStart(event: SparkListenerTaskStart): Option[Boolean] = {
Some(liveTasks.contains(event.taskInfo.taskId))
trueOrNone(liveTasks.contains(event.taskInfo.taskId))
}

override def filterTaskGettingResult(event: SparkListenerTaskGettingResult): Option[Boolean] = {
Some(liveTasks.contains(event.taskInfo.taskId))
trueOrNone(liveTasks.contains(event.taskInfo.taskId))
}

override def filterTaskEnd(event: SparkListenerTaskEnd): Option[Boolean] = {
Some(liveTasks.contains(event.taskInfo.taskId))
trueOrNone(liveTasks.contains(event.taskInfo.taskId))
}

override def filterJobStart(event: SparkListenerJobStart): Option[Boolean] = {
Some(trackListener.jobToStages.contains(event.jobId))
trueOrNone(jobToStages.contains(event.jobId))
}

override def filterJobEnd(event: SparkListenerJobEnd): Option[Boolean] = {
Some(trackListener.jobToStages.contains(event.jobId))
trueOrNone(jobToStages.contains(event.jobId))
}

override def filterUnpersistRDD(event: SparkListenerUnpersistRDD): Option[Boolean] = {
trueOrNone(liveRDDs.contains(event.rddId))
}

override def filterExecutorMetricsUpdate(
event: SparkListenerExecutorMetricsUpdate): Option[Boolean] = {
Some(event.accumUpdates.exists { case (_, stageId, _, _) =>
trackListener.stageToTasks.contains(stageId)
trueOrNone(event.accumUpdates.exists { case (_, stageId, _, _) =>
stageToTasks.contains(stageId)
})
}

override def filterSpeculativeTaskSubmitted(
event: SparkListenerSpeculativeTaskSubmitted): Option[Boolean] = {
trueOrNone(stageToTasks.contains(event.stageId))
}

override def filterOtherEvent(event: SparkListenerEvent): Option[Boolean] = event match {
case e: SparkListenerSQLExecutionStart => filterExecutionStart(e)
case e: SparkListenerSQLAdaptiveExecutionUpdate => filterAdaptiveExecutionUpdate(e)
Expand All @@ -146,19 +180,37 @@ class SQLLiveEntitiesEventFilter(trackListener: SQLEventFilterBuilder)
}

def filterExecutionStart(event: SparkListenerSQLExecutionStart): Option[Boolean] = {
Some(trackListener.liveExecutionToJobs.contains(event.executionId))
Some(liveExecutionToJobs.contains(event.executionId))
}

def filterAdaptiveExecutionUpdate(
event: SparkListenerSQLAdaptiveExecutionUpdate): Option[Boolean] = {
Some(trackListener.liveExecutionToJobs.contains(event.executionId))
Some(liveExecutionToJobs.contains(event.executionId))
}

def filterExecutionEnd(event: SparkListenerSQLExecutionEnd): Option[Boolean] = {
Some(trackListener.liveExecutionToJobs.contains(event.executionId))
Some(liveExecutionToJobs.contains(event.executionId))
}

def filterDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Option[Boolean] = {
Some(trackListener.liveExecutionToJobs.contains(event.executionId))
Some(liveExecutionToJobs.contains(event.executionId))
}

private def trueOrNone(booleanValue: Boolean): Option[Boolean] = {
if (booleanValue) {
Some(booleanValue)
} else {
None
}
}
}

private[spark] object SQLLiveEntitiesEventFilter {
def apply(builder: SQLEventFilterBuilder): SQLLiveEntitiesEventFilter = {
new SQLLiveEntitiesEventFilter(
builder.liveExecutionToJobs,
builder.jobToStages,
builder.stageToTasks,
builder.stageToRDDs)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.history

import org.apache.spark.{SparkFunSuite, Success, TaskState}
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.scheduler.{JobSucceeded, SparkListenerApplicationStart, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerLogStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
import org.apache.spark.status.ListenerEventsTestHelper

class SQLEventFilterBuilderSuite extends SparkFunSuite {
import ListenerEventsTestHelper._

override protected def beforeEach(): Unit = {
ListenerEventsTestHelper.reset()
}

test("track live SQL executions") {
case class JobInfo(
stageIds: Seq[Int],
stageToTaskIds: Map[Int, Seq[Long]],
stageToRddIds: Map[Int, Seq[Int]])

def pushJobEventsWithoutJobEnd(
listener: SQLEventFilterBuilder,
jobId: Int,
execIds: Array[String],
sqlExecId: Option[String],
time: Long): JobInfo = {
// Start a job with 1 stages / 4 tasks each
val rddsForStage = createRdds(2)
val stage = createStage(rddsForStage, Nil)

val jobProps = createJobProps()
sqlExecId.foreach { id => jobProps.setProperty(SQLExecution.EXECUTION_ID_KEY, id) }

listener.onJobStart(SparkListenerJobStart(jobId, time, Seq(stage), jobProps))

// Submit stage
stage.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, jobProps))

// Start tasks from stage
val s1Tasks = ListenerEventsTestHelper.createTasks(4, execIds, time)
s1Tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stage.stageId,
stage.attemptNumber(), task))
}

// Succeed all tasks in stage.
val s1Metrics = TaskMetrics.empty
s1Metrics.setExecutorCpuTime(2L)
s1Metrics.setExecutorRunTime(4L)

s1Tasks.foreach { task =>
task.markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber,
"taskType", Success, task, new ExecutorMetrics, s1Metrics))
}

// End stage.
stage.completionTime = Some(time)
listener.onStageCompleted(SparkListenerStageCompleted(stage))

JobInfo(Seq(stage.stageId), Map(stage.stageId -> s1Tasks.map(_.taskId)),
Map(stage.stageId -> rddsForStage.map(_.id)))
}

var time = 0L

val listener = new SQLEventFilterBuilder

listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion"))

// Start the application.
time += 1
listener.onApplicationStart(SparkListenerApplicationStart(
"name",
Some("id"),
time,
"user",
Some("attempt"),
None))

// Start a couple of executors.
time += 1
val execIds = Array("1", "2")
execIds.foreach { id =>
listener.onExecutorAdded(createExecutorAddedEvent(id, time))
}

// Start SQL Execution
listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1", "details1", "plan",
new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time))

time += 1

// job 1, 2: coupled with SQL execution 1, finished
val jobInfoForJob1 = pushJobEventsWithoutJobEnd(listener, 1, execIds, Some("1"), time)
listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))

val jobInfoForJob2 = pushJobEventsWithoutJobEnd(listener, 2, execIds, Some("1"), time)
listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))

// job 3: not coupled with SQL execution 1, finished
pushJobEventsWithoutJobEnd(listener, 3, execIds, None, time)
listener.onJobEnd(SparkListenerJobEnd(3, time, JobSucceeded))

// job 4: not coupled with SQL execution 1, not finished
pushJobEventsWithoutJobEnd(listener, 4, execIds, None, time)
listener.onJobEnd(SparkListenerJobEnd(4, time, JobSucceeded))

assert(listener.liveExecutionToJobs.keys === Set(1))
assert(listener.liveExecutionToJobs(1) === Set(1, 2))

// only SQL executions related jobs are tracked
assert(listener.jobToStages.keys === Set(1, 2))
assert(listener.jobToStages(1).toSet === jobInfoForJob1.stageIds.toSet)
assert(listener.jobToStages(2).toSet === jobInfoForJob2.stageIds.toSet)

assert(listener.stageToTasks.keys ===
(jobInfoForJob1.stageIds ++ jobInfoForJob2.stageIds).toSet)
listener.stageToTasks.foreach { case (stageId, tasks) =>
val expectedTasks = jobInfoForJob1.stageToTaskIds.getOrElse(stageId,
jobInfoForJob2.stageToTaskIds.getOrElse(stageId, null))
assert(tasks === expectedTasks.toSet)
}

assert(listener.stageToRDDs.keys ===
(jobInfoForJob1.stageIds ++ jobInfoForJob2.stageIds).toSet)
listener.stageToRDDs.foreach { case (stageId, rdds) =>
val expectedRDDs = jobInfoForJob1.stageToRddIds.getOrElse(stageId,
jobInfoForJob2.stageToRddIds.getOrElse(stageId, null))
assert(rdds.toSet === expectedRDDs.toSet)
}

// End SQL execution
listener.onOtherEvent(SparkListenerSQLExecutionEnd(1, 0))

assert(listener.liveExecutionToJobs.isEmpty)
assert(listener.jobToStages.isEmpty)
assert(listener.stageToTasks.isEmpty)
assert(listener.stageToRDDs.isEmpty)
}
}
Loading

0 comments on commit 916a458

Please sign in to comment.