Skip to content

Commit

Permalink
Address FIXMEs
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Nov 11, 2019
1 parent cff732c commit 404e747
Showing 1 changed file with 25 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.scheduler._
class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder {
val liveJobToStages = new mutable.HashMap[Int, Seq[Int]]
val stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
val stageToRDDs = new mutable.HashMap[Int, mutable.Set[Int]]
val liveExecutors = new mutable.HashSet[String]

override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
liveJobToStages += jobStart.jobId -> jobStart.stageIds
Expand All @@ -33,7 +35,10 @@ class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val stages = liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int])
liveJobToStages -= jobEnd.jobId
stages.foreach { stage => stageToTasks -= stage }
stages.foreach { stage =>
stageToTasks -= stage
stageToRDDs -= stage
}
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
Expand All @@ -42,6 +47,14 @@ class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder {
curTasks += taskStart.taskInfo.taskId
}

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
liveExecutors += executorAdded.executorId
}

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
liveExecutors -= executorRemoved.executorId
}

override def createFilter(): EventFilter = new BasicEventFilter(this)
}

Expand All @@ -52,6 +65,11 @@ class BasicEventFilter(trackListener: BasicEventFilterBuilder) extends EventFilt
case xs => xs.reduce(_ ++ _).toSet
}

private val liveRDDs: Set[Int] = trackListener.stageToRDDs.values match {
case xs if xs.isEmpty => Set.empty[Int]
case xs => xs.reduce(_ ++ _).toSet
}

if (log.isDebugEnabled) {
logDebug(s"live jobs : ${trackListener.liveJobToStages.keySet}")
logDebug(s"stages in jobs : ${trackListener.liveJobToStages.values.flatten}")
Expand Down Expand Up @@ -88,55 +106,30 @@ class BasicEventFilter(trackListener: BasicEventFilterBuilder) extends EventFilt
}

override def filterUnpersistRDD(event: SparkListenerUnpersistRDD): Option[Boolean] = {
// FIXME: need to track rdd ids?
None
}

override def filterExecutorMetricsUpdate(
event: SparkListenerExecutorMetricsUpdate): Option[Boolean] = {
// FIXME: need to track live executors?
None
Some(liveRDDs.contains(event.rddId))
}

override def filterStageExecutorMetrics(
event: SparkListenerStageExecutorMetrics): Option[Boolean] = {
// FIXME: need to track live executors?
// handle this regardless of liveness of stage - good to restore metrics
None
Some(trackListener.liveExecutors.contains(event.execId))
}

override def filterExecutorAdded(event: SparkListenerExecutorAdded): Option[Boolean] = {
// FIXME: need to track live executors?
None
Some(trackListener.liveExecutors.contains(event.executorId))
}

override def filterExecutorRemoved(event: SparkListenerExecutorRemoved): Option[Boolean] = {
// FIXME: need to track live executors?
None
Some(trackListener.liveExecutors.contains(event.executorId))
}

override def filterExecutorBlacklisted(
event: SparkListenerExecutorBlacklisted): Option[Boolean] = {
// FIXME: need to track live executors?
None
}

override def filterExecutorBlacklistedForStage(
event: SparkListenerExecutorBlacklistedForStage): Option[Boolean] = {
// FIXME: need to track live executors?
Some(trackListener.stageToTasks.contains(event.stageId))
}

override def filterNodeBlacklistedForStage(
event: SparkListenerNodeBlacklistedForStage): Option[Boolean] = {
// FIXME: need to track live executors?
Some(trackListener.stageToTasks.contains(event.stageId))
Some(trackListener.liveExecutors.contains(event.executorId))
}

override def filterExecutorUnblacklisted(
event: SparkListenerExecutorUnblacklisted): Option[Boolean] = {
// FIXME: need to track live executors?
None
Some(trackListener.liveExecutors.contains(event.executorId))
}

override def filterSpeculativeTaskSubmitted(
Expand Down

0 comments on commit 404e747

Please sign in to comment.