From 704c6ca3c3db7b02c38edbe717df46fc63b1d3e4 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 3 Jan 2020 12:59:03 +0900 Subject: [PATCH] [SPARK-29779][CORE] Compact old event log files and cleanup - part 1 --- ...he.spark.deploy.history.EventFilterBuilder | 1 + .../history/BasicEventFilterBuilder.scala | 176 +++++++++++++ .../spark/deploy/history/EventFilter.scala | 109 ++++++++ .../history/EventLogFileCompactor.scala | 212 ++++++++++++++++ .../deploy/history/EventLogFileReaders.scala | 28 ++- .../deploy/history/EventLogFileWriters.scala | 28 ++- .../spark/internal/config/package.scala | 18 ++ .../BasicEventFilterBuilderSuite.scala | 236 ++++++++++++++++++ .../history/BasicEventFilterSuite.scala | 197 +++++++++++++++ .../history/EventLogFileCompactorSuite.scala | 233 +++++++++++++++++ .../history/EventLogFileReadersSuite.scala | 6 +- .../history/EventLogFileWritersSuite.scala | 4 +- .../deploy/history/EventLogTestHelper.scala | 78 +++++- .../FilteredEventLogFileRewriterSuite.scala | 90 +++++++ .../spark/status/AppStatusListenerSuite.scala | 38 +-- .../status/ListenerEventsTestHelper.scala | 201 +++++++++++++++ 16 files changed, 1597 insertions(+), 58 deletions(-) create mode 100644 core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder new file mode 100644 index 0000000000000..784e58270ab42 --- /dev/null +++ b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder @@ -0,0 +1 @@ +org.apache.spark.deploy.history.BasicEventFilterBuilder \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala new file mode 100644 index 0000000000000..fa51c1b0bc8b6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.mutable + +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ + +/** + * This class tracks both live jobs and live executors, and pass the list to the + * [[BasicEventFilter]] to help BasicEventFilter to reject finished jobs (+ stages/tasks/RDDs) + * and dead executors. + */ +private[spark] class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder { + private val _liveJobToStages = new mutable.HashMap[Int, Seq[Int]] + private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]] + private val _stageToRDDs = new mutable.HashMap[Int, Seq[Int]] + private val _liveExecutors = new mutable.HashSet[String] + + private var totalJobs: Long = 0L + private var totalStages: Long = 0L + private var totalTasks: Long = 0L + + def liveJobToStages: Map[Int, Seq[Int]] = _liveJobToStages.toMap + def stageToTasks: Map[Int, Set[Long]] = _stageToTasks.mapValues(_.toSet).toMap + def stageToRDDs: Map[Int, Seq[Int]] = _stageToRDDs.toMap + def liveExecutors: Set[String] = _liveExecutors.toSet + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + totalJobs += 1 + totalStages += jobStart.stageIds.length + _liveJobToStages += jobStart.jobId -> jobStart.stageIds + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + val stages = _liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int]) + _liveJobToStages -= jobEnd.jobId + _stageToTasks --= stages + _stageToRDDs --= stages + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + _stageToRDDs.getOrElseUpdate(stageSubmitted.stageInfo.stageId, + stageSubmitted.stageInfo.rddInfos.map(_.id)) + } + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + totalTasks += 1 + val curTasks = _stageToTasks.getOrElseUpdate(taskStart.stageId, + mutable.HashSet[Long]()) + curTasks += taskStart.taskInfo.taskId + } + + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + _liveExecutors += executorAdded.executorId + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + _liveExecutors -= executorRemoved.executorId + } + + override def createFilter(): EventFilter = new BasicEventFilter(this) + + def statistics(): FilterStatistics = { + FilterStatistics(totalJobs, liveJobToStages.size, totalStages, + liveJobToStages.map(_._2.size).sum, totalTasks, _stageToTasks.map(_._2.size).sum) + } +} + +/** + * This class provides the functionality to reject events which are related to the finished + * jobs based on the given information. This class only deals with job related events, and provides + * a PartialFunction which returns false for rejected events for finished jobs, returns true + * otherwise. + */ +private[spark] abstract class JobEventFilter( + stats: Option[FilterStatistics], + jobToStages: Map[Int, Seq[Int]], + stageToTasks: Map[Int, Set[Long]], + stageToRDDs: Map[Int, Seq[Int]]) extends EventFilter with Logging { + + private val liveTasks: Set[Long] = stageToTasks.values.flatten.toSet + private val liveRDDs: Set[Int] = stageToRDDs.values.flatten.toSet + + logDebug(s"jobs : ${jobToStages.keySet}") + logDebug(s"stages in jobs : ${jobToStages.values.flatten}") + logDebug(s"stages : ${stageToTasks.keySet}") + logDebug(s"tasks in stages : ${stageToTasks.values.flatten}") + logDebug(s"RDDs in stages : ${stageToRDDs.values.flatten}") + + override def statistics(): Option[FilterStatistics] = stats + + protected val acceptFnForJobEvents: PartialFunction[SparkListenerEvent, Boolean] = { + case e: SparkListenerStageCompleted => + stageToTasks.contains(e.stageInfo.stageId) + + case e: SparkListenerStageSubmitted => + stageToTasks.contains(e.stageInfo.stageId) + + case e: SparkListenerTaskStart => + liveTasks.contains(e.taskInfo.taskId) + + case e: SparkListenerTaskGettingResult => + liveTasks.contains(e.taskInfo.taskId) + + case e: SparkListenerTaskEnd => + liveTasks.contains(e.taskInfo.taskId) + + case e: SparkListenerJobStart => + jobToStages.contains(e.jobId) + + case e: SparkListenerJobEnd => + jobToStages.contains(e.jobId) + + case e: SparkListenerUnpersistRDD => + liveRDDs.contains(e.rddId) + + case e: SparkListenerExecutorMetricsUpdate => + e.accumUpdates.exists { case (_, stageId, _, _) => + stageToTasks.contains(stageId) + } + + case e: SparkListenerSpeculativeTaskSubmitted => + stageToTasks.contains(e.stageId) + } +} + +/** + * This class rejects events which are related to the finished jobs or dead executors, + * based on the given information. The events which are not related to the job and executor + * will be considered as "Don't mind". + */ +private[spark] class BasicEventFilter( + _stats: FilterStatistics, + _liveJobToStages: Map[Int, Seq[Int]], + _stageToTasks: Map[Int, Set[Long]], + _stageToRDDs: Map[Int, Seq[Int]], + liveExecutors: Set[String]) + extends JobEventFilter(Some(_stats), _liveJobToStages, _stageToTasks, _stageToRDDs) with Logging { + + def this(builder: BasicEventFilterBuilder) = { + this(builder.statistics(), builder.liveJobToStages, builder.stageToTasks, builder.stageToRDDs, + builder.liveExecutors) + } + + logDebug(s"live executors : $liveExecutors") + + private val _acceptFn: PartialFunction[SparkListenerEvent, Boolean] = { + case e: SparkListenerExecutorAdded => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorRemoved => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorBlacklisted => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorUnblacklisted => liveExecutors.contains(e.executorId) + case e: SparkListenerStageExecutorMetrics => liveExecutors.contains(e.execId) + } + + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + _acceptFn.orElse(acceptFnForJobEvents) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala new file mode 100644 index 0000000000000..edbb34bff77d8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.io.{Codec, Source} +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.json4s.jackson.JsonMethods.parse + +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * EventFilterBuilder provides the interface to gather the information from events being received + * by [[SparkListenerInterface]], and create a new [[EventFilter]] instance which leverages + * information gathered to decide whether the event should be accepted or not. + */ +private[spark] trait EventFilterBuilder extends SparkListenerInterface { + def createFilter(): EventFilter +} + +/** [[EventFilter]] decides whether the given event should be accepted or rejected. */ +private[spark] trait EventFilter { + /** + * Provide statistic information of event filter, which would be used for measuring the score + * of compaction. + * + * To simplify the condition, currently the fields of statistic are static, since major kinds of + * events compaction would filter out are job related event types. If the filter doesn't track + * with job related events, return None instead. + */ + def statistics(): Option[FilterStatistics] + + /** + * Classify whether the event is accepted or rejected by this filter. + * + * The method should return the partial function which matches the events where the filter can + * decide whether the event should be accepted or rejected. Otherwise it should leave the events + * be unmatched. + */ + def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] +} + +object EventFilter extends Logging { + case class FilterStatistics( + totalJobs: Long, + liveJobs: Long, + totalStages: Long, + liveStages: Long, + totalTasks: Long, + liveTasks: Long) + + def applyFilterToFile( + fs: FileSystem, + filters: Seq[EventFilter], + path: Path, + onAccepted: (String, SparkListenerEvent) => Unit, + onRejected: (String, SparkListenerEvent) => Unit, + onUnidentified: String => Unit): Unit = { + Utils.tryWithResource(EventLogFileReader.openEventLog(path, fs)) { in => + val lines = Source.fromInputStream(in)(Codec.UTF8).getLines() + + lines.zipWithIndex.foreach { case (line, lineNum) => + try { + val event = try { + Some(JsonProtocol.sparkEventFromJson(parse(line))) + } catch { + // ignore any exception occurred from unidentified json + case NonFatal(_) => + onUnidentified(line) + None + } + + event.foreach { e => + val results = filters.flatMap(_.acceptFn().lift.apply(e)) + if (results.isEmpty || !results.contains(false)) { + onAccepted(line, e) + } else { + onRejected(line, e) + } + } + } catch { + case e: Exception => + logError(s"Exception parsing Spark event log: ${path.getName}", e) + logError(s"Malformed line #$lineNum: $line\n") + throw e + } + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala new file mode 100644 index 0000000000000..686ed7cba0a49 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.net.URI +import java.util.ServiceLoader + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} +import org.apache.spark.scheduler.ReplayListenerBus +import org.apache.spark.util.Utils + +/** + * This class compacts the old event log files into one compact file, via two phases reading: + * + * 1) Initialize available [[EventFilterBuilder]] instances, and replay the old event log files with + * builders, so that these builders can gather the information to create [[EventFilter]] instances. + * 2) Initialize [[EventFilter]] instances from [[EventFilterBuilder]] instances, and replay the + * old event log files with filters. Rewrite the events to the compact file which the filters decide + * to accept. + * + * This class will calculate the score based on statistic from [[EventFilter]] instances, which + * represents approximate rate of filtered-out events. Score is being calculated via applying + * heuristic; task events tend to take most size in event log. + * + * This class assumes caller will provide the sorted list of files which are sorted by the index of + * event log file, with "at most" one compact file placed first if it exists. Caller should keep in + * mind that this class doesn't care about the semantic of ordering. + * + * When compacting the files, the range of compaction for given file list is determined as: + * (first ~ the file where there're `maxFilesToRetain` files on the right side) + * + * If there're not enough files on the range of compaction, compaction will be skipped. + */ +class EventLogFileCompactor( + sparkConf: SparkConf, + hadoopConf: Configuration, + fs: FileSystem) extends Logging { + private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN) + private val compactionThresholdScore: Double = sparkConf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD) + + def compact(eventLogFiles: Seq[FileStatus]): (CompactionResult.Value, Option[Long]) = { + assertPrecondition(eventLogFiles) + + if (eventLogFiles.length < maxFilesToRetain) { + return (CompactionResult.NOT_ENOUGH_FILES, None) + } + + val filesToCompact = findFilesToCompact(eventLogFiles) + if (filesToCompact.isEmpty) { + (CompactionResult.NOT_ENOUGH_FILES, None) + } else { + val builders = initializeBuilders(fs, filesToCompact.map(_.getPath)) + + val filters = builders.map(_.createFilter()) + val minScore = filters.flatMap(_.statistics()).map(calculateScore).min + + if (minScore < compactionThresholdScore) { + (CompactionResult.LOW_SCORE_FOR_COMPACTION, None) + } else { + val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, filters) + rewriter.rewrite(filesToCompact) + cleanupCompactedFiles(filesToCompact) + (CompactionResult.SUCCESS, Some(RollingEventLogFilesWriter.getEventLogFileIndex( + filesToCompact.last.getPath.getName))) + } + } + } + + private def assertPrecondition(eventLogFiles: Seq[FileStatus]): Unit = { + val idxCompactedFiles = eventLogFiles.zipWithIndex.filter { case (file, _) => + EventLogFileWriter.isCompacted(file.getPath) + } + require(idxCompactedFiles.size < 2 && idxCompactedFiles.headOption.forall(_._2 == 0), + "The number of compact files should be at most 1, and should be placed first if exists.") + } + + /** + * Loads all available EventFilterBuilders in classloader via ServiceLoader, and initializes + * them via replaying events in given files. + */ + private def initializeBuilders(fs: FileSystem, files: Seq[Path]): Seq[EventFilterBuilder] = { + val bus = new ReplayListenerBus() + + val builders = ServiceLoader.load(classOf[EventFilterBuilder], + Utils.getContextOrSparkClassLoader).asScala.toSeq + builders.foreach(bus.addListener) + + files.foreach { log => + Utils.tryWithResource(EventLogFileReader.openEventLog(log, fs)) { in => + bus.replay(in, log.getName) + } + } + + builders + } + + private def calculateScore(stats: FilterStatistics): Double = { + // For now it's simply measuring how many task events will be filtered out (rejected) + // but it can be sophisticated later once we get more heuristic information and found + // the case where this simple calculation doesn't work. + (stats.totalTasks - stats.liveTasks) * 1.0 / stats.totalTasks + } + + private def cleanupCompactedFiles(files: Seq[FileStatus]): Unit = { + files.foreach { file => + var deleted = false + try { + deleted = fs.delete(file.getPath, true) + } catch { + case _: IOException => + } + if (!deleted) { + logWarning(s"Failed to remove ${file.getPath} / skip removing.") + } + } + } + + private def findFilesToCompact(eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = { + val numNormalEventLogFiles = { + if (EventLogFileWriter.isCompacted(eventLogFiles.head.getPath)) { + eventLogFiles.length - 1 + } else { + eventLogFiles.length + } + } + + // This avoids compacting only compact file. + if (numNormalEventLogFiles > maxFilesToRetain) { + eventLogFiles.dropRight(maxFilesToRetain) + } else { + Seq.empty + } + } +} + +object CompactionResult extends Enumeration { + val SUCCESS, NOT_ENOUGH_FILES, LOW_SCORE_FOR_COMPACTION = Value +} + +/** + * This class rewrites the event log files into one compact file: the compact file will only + * contain the events which pass the filters. Events will be dropped only when all filters + * decide to reject the event or don't mind about the event. Otherwise, the original line for + * the event is written to the compact file as it is. + */ +class FilteredEventLogFileRewriter( + sparkConf: SparkConf, + hadoopConf: Configuration, + fs: FileSystem, + filters: Seq[EventFilter]) { + + def rewrite(eventLogFiles: Seq[FileStatus]): String = { + require(eventLogFiles.nonEmpty) + + val lastIndexEventLogPath = eventLogFiles.last.getPath + val logWriter = new CompactedEventLogFileWriter(lastIndexEventLogPath, "dummy", None, + lastIndexEventLogPath.getParent.toUri, sparkConf, hadoopConf) + + logWriter.start() + eventLogFiles.foreach { file => + EventFilter.applyFilterToFile(fs, filters, file.getPath, + onAccepted = (line, _) => logWriter.writeEvent(line, flushLogger = true), + onRejected = (_, _) => {}, + onUnidentified = line => logWriter.writeEvent(line, flushLogger = true) + ) + } + logWriter.stop() + + logWriter.logPath + } +} + +/** + * This class helps to write compact file; to avoid reimplementing everything, it extends + * [[SingleEventLogFileWriter]], but only `originalFilePath` is used to determine the + * path of compact file. + */ +class CompactedEventLogFileWriter( + originalFilePath: Path, + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration) + extends SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) { + + override val logPath: String = originalFilePath.toUri.toString + EventLogFileWriter.COMPACTED +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala index c8956ed3d423d..9f63a6441a838 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala @@ -173,7 +173,8 @@ class SingleFileEventLogFileReader( override def fileSizeForLastIndex: Long = status.getLen - override def completed: Boolean = !rootPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS) + override def completed: Boolean = !rootPath.getName.stripSuffix(EventLogFileWriter.COMPACTED) + .endsWith(EventLogFileWriter.IN_PROGRESS) override def fileSizeForLastIndexForDFS: Option[Long] = { if (completed) { @@ -218,15 +219,23 @@ class RollingEventLogFilesFileReader( private lazy val eventLogFiles: Seq[FileStatus] = { val eventLogFiles = files.filter(isEventLogFile).sortBy { status => - getIndex(status.getPath.getName) + val filePath = status.getPath + var idx = getEventLogFileIndex(filePath.getName).toDouble + // trick to place compacted file later than normal file if index is same. + if (EventLogFileWriter.isCompacted(filePath)) { + idx += 0.1 + } + idx } - val indices = eventLogFiles.map { file => getIndex(file.getPath.getName) }.sorted + val filesToRead = dropBeforeLastCompactFile(eventLogFiles) + val indices = filesToRead.map { file => getEventLogFileIndex(file.getPath.getName) } require((indices.head to indices.last) == indices, "Found missing event log file, expected" + - s" indices: ${(indices.head to indices.last)}, actual: ${indices}") - eventLogFiles + s" indices: ${indices.head to indices.last}, actual: ${indices}") + filesToRead } - override def lastIndex: Option[Long] = Some(getIndex(lastEventLogFile.getPath.getName)) + override def lastIndex: Option[Long] = Some( + getEventLogFileIndex(lastEventLogFile.getPath.getName)) override def fileSizeForLastIndex: Long = lastEventLogFile.getLen @@ -261,4 +270,11 @@ class RollingEventLogFilesFileReader( override def totalSize: Long = eventLogFiles.map(_.getLen).sum private def lastEventLogFile: FileStatus = eventLogFiles.last + + private def dropBeforeLastCompactFile(eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = { + val lastCompactedFileIdx = eventLogFiles.lastIndexWhere { fs => + EventLogFileWriter.isCompacted(fs.getPath) + } + eventLogFiles.drop(lastCompactedFileIdx) + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 3fa5ef94892aa..1d58d054b7825 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -113,9 +113,9 @@ abstract class EventLogFileWriter( } } - protected def writeJson(json: String, flushLogger: Boolean = false): Unit = { + protected def writeLine(line: String, flushLogger: Boolean = false): Unit = { // scalastyle:off println - writer.foreach(_.println(json)) + writer.foreach(_.println(line)) // scalastyle:on println if (flushLogger) { writer.foreach(_.flush()) @@ -164,6 +164,7 @@ abstract class EventLogFileWriter( object EventLogFileWriter { // Suffix applied to the names of files still being written by applications. val IN_PROGRESS = ".inprogress" + val COMPACTED = ".compact" val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) @@ -192,9 +193,11 @@ object EventLogFileWriter { def codecName(log: Path): Option[String] = { // Compression codec is encoded as an extension, e.g. app_123.lzf // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) + val logName = log.getName.stripSuffix(COMPACTED).stripSuffix(IN_PROGRESS) logName.split("\\.").tail.lastOption } + + def isCompacted(log: Path): Boolean = log.getName.endsWith(COMPACTED) } /** @@ -211,7 +214,7 @@ class SingleEventLogFileWriter( override val logPath: String = SingleEventLogFileWriter.getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) - private val inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS + protected def inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS override def start(): Unit = { requireLogBaseDirAsDirectory() @@ -222,7 +225,7 @@ class SingleEventLogFileWriter( } override def writeEvent(eventJson: String, flushLogger: Boolean = false): Unit = { - writeJson(eventJson, flushLogger) + writeLine(eventJson, flushLogger) } /** @@ -327,10 +330,11 @@ class RollingEventLogFilesWriter( } } - writeJson(eventJson, flushLogger) + writeLine(eventJson, flushLogger) } - private def rollEventLogFile(): Unit = { + /** exposed for testing only */ + private[history] def rollEventLogFile(): Unit = { closeWriter() index += 1 @@ -399,16 +403,20 @@ object RollingEventLogFilesWriter { status.isDirectory && status.getPath.getName.startsWith(EVENT_LOG_DIR_NAME_PREFIX) } + def isEventLogFile(fileName: String): Boolean = { + fileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + } + def isEventLogFile(status: FileStatus): Boolean = { - status.isFile && status.getPath.getName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + status.isFile && isEventLogFile(status.getPath.getName) } def isAppStatusFile(status: FileStatus): Boolean = { status.isFile && status.getPath.getName.startsWith(APPSTATUS_FILE_NAME_PREFIX) } - def getIndex(eventLogFileName: String): Long = { - require(eventLogFileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX), "Not an event log file!") + def getEventLogFileIndex(eventLogFileName: String): Long = { + require(isEventLogFile(eventLogFileName), "Not an event log file!") val index = eventLogFileName.stripPrefix(EVENT_LOG_FILE_NAME_PREFIX).split("_")(0) index.toLong } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0e872be9d3c28..2d5d7a5a05701 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -195,6 +195,24 @@ package object config { "configured to be at least 10 MiB.") .createWithDefaultString("128m") + private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN = + ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain") + // TODO: remove this when integrating compactor with FsHistoryProvider + .internal() + .doc("The maximum number of event log files which will be retained as non-compacted. " + + "By default, all event log files will be retained. Please set the configuration " + + s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " + + "the overall size of event log files.") + .intConf + .checkValue(_ > 0, "Max event log files to retain should be higher than 0.") + .createWithDefault(Integer.MAX_VALUE) + + private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD = + ConfigBuilder("spark.eventLog.rolling.compaction.score.threshold") + .internal() + .doubleConf + .createWithDefault(0.7d) + private[spark] val EXECUTOR_ID = ConfigBuilder("spark.executor.id").stringConf.createOptional diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala new file mode 100644 index 0000000000000..ca570c257c025 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.{SparkFunSuite, Success, TaskResultLost, TaskState} +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.scheduler._ +import org.apache.spark.status.ListenerEventsTestHelper + +class BasicEventFilterBuilderSuite extends SparkFunSuite { + import ListenerEventsTestHelper._ + + override protected def beforeEach(): Unit = { + ListenerEventsTestHelper.reset() + } + + test("track live jobs") { + var time = 0L + + val listener = new BasicEventFilterBuilder + listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion")) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + // Start a couple of executors. + time += 1 + val execIds = Array("1", "2") + execIds.foreach { id => + listener.onExecutorAdded(createExecutorAddedEvent(id, time)) + } + + // Start a job with 2 stages / 4 tasks each + time += 1 + + val rddsForStage1 = createRdds(2) + val rddsForStage2 = createRdds(2) + + val stage1 = createStage(rddsForStage1, Nil) + val stage2 = createStage(rddsForStage2, Seq(stage1.stageId)) + val stages = Seq(stage1, stage2) + + val jobProps = createJobProps() + listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps)) + + // Submit stage 1 + time += 1 + stages.head.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, jobProps)) + + // Start tasks from stage 1 + time += 1 + + val s1Tasks = ListenerEventsTestHelper.createTasks(4, execIds, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, + stages.head.attemptNumber(), task)) + } + + // Fail one of the tasks, re-start it. + time += 1 + s1Tasks.head.markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, + "taskType", TaskResultLost, s1Tasks.head, new ExecutorMetrics, null)) + + time += 1 + val reattempt = createTaskWithNewAttempt(s1Tasks.head, time) + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber, + reattempt)) + + // Succeed all tasks in stage 1. + val pending = s1Tasks.drop(1) ++ Seq(reattempt) + + val s1Metrics = TaskMetrics.empty + s1Metrics.setExecutorCpuTime(2L) + s1Metrics.setExecutorRunTime(4L) + + time += 1 + pending.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, + "taskType", Success, task, new ExecutorMetrics, s1Metrics)) + } + + // End stage 1. + time += 1 + stages.head.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stages.head)) + + assert(listener.liveJobToStages.keys.toSeq === Seq(1)) + assert(listener.liveJobToStages(1) === Seq(0, 1)) + assert(listener.stageToRDDs.keys.toSeq === Seq(0)) + assert(listener.stageToRDDs(0) === rddsForStage1.map(_.id)) + // stage 1 not yet submitted + assert(listener.stageToTasks.keys.toSeq === Seq(0)) + assert(listener.stageToTasks(0) === (s1Tasks ++ Seq(reattempt)).map(_.taskId).toSet) + + // Submit stage 2. + time += 1 + stages.last.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, jobProps)) + + // Start and fail all tasks of stage 2. + time += 1 + val s2Tasks = createTasks(4, execIds, time) + s2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, + stages.last.attemptNumber, + task)) + } + + time += 1 + s2Tasks.foreach { task => + task.markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptNumber, + "taskType", TaskResultLost, task, new ExecutorMetrics, null)) + } + + // Fail stage 2. + time += 1 + stages.last.completionTime = Some(time) + stages.last.failureReason = Some("uh oh") + listener.onStageCompleted(SparkListenerStageCompleted(stages.last)) + + // - Re-submit stage 2, all tasks, and succeed them and the stage. + val oldS2 = stages.last + val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptNumber + 1, oldS2.name, oldS2.numTasks, + oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics) + + time += 1 + newS2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, jobProps)) + + val newS2Tasks = createTasks(4, execIds, time) + + newS2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptNumber, task)) + } + + time += 1 + newS2Tasks.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptNumber, "taskType", + Success, task, new ExecutorMetrics, null)) + } + + time += 1 + newS2.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(newS2)) + + assert(listener.liveJobToStages.keys.toSeq === Seq(1)) + assert(listener.liveJobToStages(1) === Seq(0, 1)) + assert(listener.stageToRDDs.keys === Set(0, 1)) + assert(listener.stageToRDDs(0) === rddsForStage1.map(_.id)) + assert(listener.stageToRDDs(1) === rddsForStage2.map(_.id)) + assert(listener.stageToTasks.keys.toSet === Set(0, 1)) + // stage 0 is finished but it stores the information regarding stage + assert(listener.stageToTasks(0) === (s1Tasks ++ Seq(reattempt)).map(_.taskId).toSet) + // stage 1 is newly added + assert(listener.stageToTasks(1) === (s2Tasks ++ newS2Tasks).map(_.taskId).toSet) + + // Start next job. + time += 1 + + val rddsForStage3 = createRdds(2) + val rddsForStage4 = createRdds(2) + + val stage3 = createStage(rddsForStage3, Nil) + val stage4 = createStage(rddsForStage4, Seq(stage3.stageId)) + val stagesForJob2 = Seq(stage3, stage4) + + listener.onJobStart(SparkListenerJobStart(2, time, stagesForJob2, jobProps)) + + // End job 1. + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + // everything related to job 1 should be cleaned up, but not for job 2 + assert(listener.liveJobToStages.keys.toSet === Set(2)) + assert(listener.stageToRDDs.isEmpty) + // stageToTasks has no information for job 2, as no task has been started + assert(listener.stageToTasks.isEmpty) + } + + test("track live executors") { + var time = 0L + + val listener = new BasicEventFilterBuilder + listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion")) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + // Start a couple of executors. + time += 1 + val execIds = (1 to 3).map(_.toString) + execIds.foreach { id => + listener.onExecutorAdded(createExecutorAddedEvent(id, time)) + } + + // End one of executors. + time += 1 + listener.onExecutorRemoved(createExecutorRemovedEvent(execIds.head, time)) + + assert(listener.liveExecutors === execIds.drop(1).toSet) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala new file mode 100644 index 0000000000000..27f2c4bca5c92 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.{storage, SparkFunSuite, Success, TaskState} +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.scheduler._ +import org.apache.spark.status.ListenerEventsTestHelper._ +import org.apache.spark.storage.{BlockManagerId, RDDBlockId, StorageLevel} + +class BasicEventFilterSuite extends SparkFunSuite { + import BasicEventFilterSuite._ + + test("filter out events for finished jobs") { + // assume finished job 1 with stage 1, tasks (1, 2), rdds (1, 2) + // live job 2 with stages 2, tasks (3, 4), rdds (3, 4) + val liveJobToStages: Map[Int, Seq[Int]] = Map(2 -> Seq(2, 3)) + val stageToTasks: Map[Int, Set[Long]] = Map(2 -> Set(3, 4), 3 -> Set(5, 6)) + val stageToRDDs: Map[Int, Seq[Int]] = Map(2 -> Seq(3, 4), 3 -> Seq(5, 6)) + val liveExecutors: Set[String] = Set("1", "2") + val filterStats = FilterStatistics(2, 1, 2, 1, 4, 2) + + val filter = new BasicEventFilter(filterStats, liveJobToStages, stageToTasks, stageToRDDs, + liveExecutors) + val acceptFn = filter.acceptFn().lift + + // Verifying with finished job 1 + val rddsForStage1 = createRddsWithId(1 to 2) + val stage1 = createStage(1, rddsForStage1, Nil) + val tasksForStage1 = createTasks(Seq(1L, 2L), liveExecutors.toArray, 0) + tasksForStage1.foreach { task => task.markFinished(TaskState.FINISHED, 5) } + + val jobStartEventForJob1 = SparkListenerJobStart(1, 0, Seq(stage1)) + val jobEndEventForJob1 = SparkListenerJobEnd(1, 0, JobSucceeded) + val stageSubmittedEventsForJob1 = SparkListenerStageSubmitted(stage1) + val stageCompletedEventsForJob1 = SparkListenerStageCompleted(stage1) + val unpersistRDDEventsForJob1 = (1 to 2).map(SparkListenerUnpersistRDD) + + // job events for finished job should be rejected + assertFilterJobEvents(acceptFn, jobStartEventForJob1, jobEndEventForJob1, Some(false)) + + // stage events for finished job should be rejected + // NOTE: it doesn't filter out stage events which are also related to the executor + assertFilterStageEvents( + acceptFn, + stageSubmittedEventsForJob1, + stageCompletedEventsForJob1, + unpersistRDDEventsForJob1, + SparkListenerSpeculativeTaskSubmitted(stage1.stageId, stageAttemptId = 1), + Some(false)) + + // task events for finished job should be rejected + assertFilterTaskEvents(acceptFn, tasksForStage1, stage1, Some(false)) + + // Verifying with live job 2 + val rddsForStage2 = createRddsWithId(3 to 4) + val stage2 = createStage(2, rddsForStage2, Nil) + val tasksForStage2 = createTasks(Seq(3L, 4L), liveExecutors.toArray, 0) + tasksForStage1.foreach { task => task.markFinished(TaskState.FINISHED, 5) } + + val jobStartEventForJob2 = SparkListenerJobStart(2, 0, Seq(stage2)) + val stageSubmittedEventsForJob2 = SparkListenerStageSubmitted(stage2) + val stageCompletedEventsForJob2 = SparkListenerStageCompleted(stage2) + val unpersistRDDEventsForJob2 = rddsForStage2.map { rdd => SparkListenerUnpersistRDD(rdd.id) } + + // job events for live job should be accepted + assert(acceptFn(jobStartEventForJob2) === Some(true)) + + // stage events for live job should be accepted + assertFilterStageEvents( + acceptFn, + stageSubmittedEventsForJob2, + stageCompletedEventsForJob2, + unpersistRDDEventsForJob2, + SparkListenerSpeculativeTaskSubmitted(stage2.stageId, stageAttemptId = 1), + Some(true)) + + // task events for live job should be accepted + assertFilterTaskEvents(acceptFn, tasksForStage2, stage2, Some(true)) + } + + test("filter out events for dead executors") { + // assume executor 1 was dead, and live executor 2 is available + val liveExecutors: Set[String] = Set("2") + + val filter = new BasicEventFilter(EMPTY_STATS, Map.empty, Map.empty, Map.empty, + liveExecutors) + val acceptFn = filter.acceptFn().lift + + // events for dead executor should be rejected + assert(acceptFn(createExecutorAddedEvent(1)) === Some(false)) + // though the name of event is stage executor metrics, AppStatusListener only deals with + // live executors + assert(acceptFn( + SparkListenerStageExecutorMetrics(1.toString, 0, 0, new ExecutorMetrics)) === + Some(false)) + assert(acceptFn(SparkListenerExecutorBlacklisted(0, 1.toString, 1)) === + Some(false)) + assert(acceptFn(SparkListenerExecutorUnblacklisted(0, 1.toString)) === + Some(false)) + assert(acceptFn(createExecutorRemovedEvent(1)) === Some(false)) + + // events for live executor should be accepted + assert(acceptFn(createExecutorAddedEvent(2)) === Some(true)) + assert(acceptFn( + SparkListenerStageExecutorMetrics(2.toString, 0, 0, new ExecutorMetrics)) === + Some(true)) + assert(acceptFn(SparkListenerExecutorBlacklisted(0, 2.toString, 1)) === + Some(true)) + assert(acceptFn(SparkListenerExecutorUnblacklisted(0, 2.toString)) === + Some(true)) + assert(acceptFn(createExecutorRemovedEvent(2)) === Some(true)) + } + + test("other events should be left to other filters") { + def assertNone(predicate: => Option[Boolean]): Unit = { + assert(predicate === None) + } + + val filter = new BasicEventFilter(EMPTY_STATS, Map.empty, Map.empty, Map.empty, Set.empty) + val acceptFn = filter.acceptFn().lift + + assertNone(acceptFn(SparkListenerEnvironmentUpdate(Map.empty))) + assertNone(acceptFn(SparkListenerApplicationStart("1", Some("1"), 0, "user", None))) + assertNone(acceptFn(SparkListenerApplicationEnd(1))) + val bmId = BlockManagerId("1", "host1", 1) + assertNone(acceptFn(SparkListenerBlockManagerAdded(0, bmId, 1))) + assertNone(acceptFn(SparkListenerBlockManagerRemoved(1, bmId))) + assertNone(acceptFn(SparkListenerBlockUpdated( + storage.BlockUpdatedInfo(bmId, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) + assertNone(acceptFn(SparkListenerNodeBlacklisted(0, "host1", 1))) + assertNone(acceptFn(SparkListenerNodeUnblacklisted(0, "host1"))) + assertNone(acceptFn(SparkListenerLogStart("testVersion"))) + } + + private def assertFilterJobEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + jobStart: SparkListenerJobStart, + jobEnd: SparkListenerJobEnd, + expectedVal: Option[Boolean]): Unit = { + assert(acceptFn(jobStart) === expectedVal) + assert(acceptFn(jobEnd) === expectedVal) + } + + private def assertFilterStageEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + stageSubmitted: SparkListenerStageSubmitted, + stageCompleted: SparkListenerStageCompleted, + unpersistRDDs: Seq[SparkListenerUnpersistRDD], + taskSpeculativeSubmitted: SparkListenerSpeculativeTaskSubmitted, + expectedVal: Option[Boolean]): Unit = { + assert(acceptFn(stageSubmitted) === expectedVal) + assert(acceptFn(stageCompleted) === expectedVal) + unpersistRDDs.foreach { event => + assert(acceptFn(event) === expectedVal) + } + assert(acceptFn(taskSpeculativeSubmitted) === expectedVal) + } + + private def assertFilterTaskEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + taskInfos: Seq[TaskInfo], + stageInfo: StageInfo, + expectedVal: Option[Boolean]): Unit = { + taskInfos.foreach { task => + val taskStartEvent = SparkListenerTaskStart(stageInfo.stageId, 0, task) + assert(acceptFn(taskStartEvent) === expectedVal) + + val taskGettingResultEvent = SparkListenerTaskGettingResult(task) + assert(acceptFn(taskGettingResultEvent) === expectedVal) + + val taskEndEvent = SparkListenerTaskEnd(stageInfo.stageId, 0, "taskType", + Success, task, new ExecutorMetrics, null) + assert(acceptFn(taskEndEvent) === expectedVal) + } + } +} + +object BasicEventFilterSuite { + val EMPTY_STATS = FilterStatistics(0, 0, 0, 0, 0, 0) +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala new file mode 100644 index 0000000000000..8c216acb5267a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.io.{Codec, Source} + +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.json4s.jackson.JsonMethods.parse + +import org.apache.spark.{SparkConf, SparkFunSuite, Success} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper.writeEventsToRollingWriter +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.status.ListenerEventsTestHelper._ +import org.apache.spark.util.{JsonProtocol, Utils} + +class EventLogFileCompactorSuite extends SparkFunSuite { + private val sparkConf = testSparkConf() + private val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + + test("No event log files") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + + assertNoCompaction(fs, Seq.empty, compactor.compact(Seq.empty), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("No compact file, less origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 2).map(_ => testEvent): _*) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("No compact file, more origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 5).map(_ => testEvent): _*) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + expectedNumOfFilesCompacted = 2) + } + } + + test("compact file exists, less origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 2).map(_ => testEvent): _*) + + val fileToCompact = fileStatuses.head.getPath + val compactedPath = new Path(fileToCompact.getParent, + fileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(fs.rename(fileToCompact, compactedPath)) + + val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("compact file exists, number of origin files are same as max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 4).map(_ => testEvent): _*) + + val fileToCompact = fileStatuses.head.getPath + val compactedPath = new Path(fileToCompact.getParent, + fileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(fs.rename(fileToCompact, compactedPath)) + + val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("compact file exists, more origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 10).map(_ => testEvent): _*) + + val fileToCompact = fileStatuses.head.getPath + val compactedPath = new Path(fileToCompact.getParent, + fileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(fs.rename(fileToCompact, compactedPath)) + + val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), + expectedNumOfFilesCompacted = 7) + } + } + + test("events for finished job are dropped in new compact file") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + // 1, 2 will be compacted into one file, 3~5 are dummies to ensure max files to retain + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + Seq( + SparkListenerExecutorAdded(0, "exec1", new ExecutorInfo("host1", 1, Map.empty)), + SparkListenerJobStart(1, 0, Seq.empty)), + Seq( + SparkListenerJobEnd(1, 1, JobSucceeded), + SparkListenerExecutorAdded(2, "exec2", new ExecutorInfo("host2", 1, Map.empty))), + testEvent, + testEvent, + testEvent) + + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + expectedNumOfFilesCompacted = 2) + + val expectCompactFileBasePath = fileStatuses.take(2).last.getPath + val compactFilePath = getCompactFilePath(expectCompactFileBasePath) + Utils.tryWithResource(EventLogFileReader.openEventLog(compactFilePath, fs)) { is => + val lines = Source.fromInputStream(is)(Codec.UTF8).getLines().toList + assert(lines.length === 2, "Compacted file should have only two events being accepted") + lines.foreach { line => + val event = JsonProtocol.sparkEventFromJson(parse(line)) + assert(!event.isInstanceOf[SparkListenerJobStart] && + !event.isInstanceOf[SparkListenerJobEnd]) + } + } + } + } + + test("Don't compact file if score is lower than threshold") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + val newConf = sparkConf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.7d) + + // only one of two tasks is finished, which would score 0.5d + val tasks = createTasks(2, Array("exec1"), 0L).map(createTaskStartEvent(_, 1, 0)) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, newConf, hadoopConf, + tasks, + Seq(SparkListenerTaskEnd(1, 0, "taskType", Success, tasks.head.taskInfo, + new ExecutorMetrics, null)), + testEvent, + testEvent, + testEvent) + + val compactor = new EventLogFileCompactor(newConf, hadoopConf, fs) + assertNoCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + CompactionResult.LOW_SCORE_FOR_COMPACTION) + } + } + + private def assertCompaction( + fs: FileSystem, + originalFiles: Seq[FileStatus], + compactRet: (CompactionResult.Value, Option[Long]), + expectedNumOfFilesCompacted: Int): Unit = { + assert(CompactionResult.SUCCESS === compactRet._1) + + val expectRetainedFiles = originalFiles.drop(expectedNumOfFilesCompacted) + expectRetainedFiles.foreach { status => assert(fs.exists(status.getPath)) } + + val expectRemovedFiles = originalFiles.take(expectedNumOfFilesCompacted) + expectRemovedFiles.foreach { status => assert(!fs.exists(status.getPath)) } + + val expectCompactFileBasePath = originalFiles.take(expectedNumOfFilesCompacted).last.getPath + val expectCompactFileIndex = RollingEventLogFilesWriter.getEventLogFileIndex( + expectCompactFileBasePath.getName) + assert(Some(expectCompactFileIndex) === compactRet._2) + + val expectCompactFilePath = getCompactFilePath(expectCompactFileBasePath) + assert(fs.exists(expectCompactFilePath)) + } + + private def getCompactFilePath(expectCompactFileBasePath: Path): Path = { + new Path(expectCompactFileBasePath.getParent, + expectCompactFileBasePath.getName + EventLogFileWriter.COMPACTED) + } + + private def assertNoCompaction( + fs: FileSystem, + originalFiles: Seq[FileStatus], + compactRet: (CompactionResult.Value, Option[Long]), + expectedCompactRet: CompactionResult.Value): Unit = { + assert(compactRet._1 === expectedCompactRet) + assert(None === compactRet._2) + originalFiles.foreach { status => assert(fs.exists(status.getPath)) } + } + + private def testEvent: Seq[SparkListenerEvent] = + Seq(SparkListenerApplicationStart("app", Some("app"), 0, "user", None)) + + private def testSparkConf(): SparkConf = { + new SparkConf() + .set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 3) + // to simplify the tests, we set the score threshold as 0.0d + // individual test can override the value to verify the functionality + .set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala index a2ce4acdaaf37..8eab2da1a37b7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala @@ -288,13 +288,15 @@ class RollingEventLogFilesReaderSuite extends EventLogFileReadersSuite { assert(status.isDirectory) val statusInDir = fileSystem.listStatus(logPath) - val eventFiles = statusInDir.filter(isEventLogFile).sortBy { s => getIndex(s.getPath.getName) } + val eventFiles = statusInDir.filter(isEventLogFile).sortBy { s => + getEventLogFileIndex(s.getPath.getName) + } assert(eventFiles.nonEmpty) val lastEventFile = eventFiles.last val allLen = eventFiles.map(_.getLen).sum assert(reader.rootPath === fileSystem.makeQualified(logPath)) - assert(reader.lastIndex === Some(getIndex(lastEventFile.getPath.getName))) + assert(reader.lastIndex === Some(getEventLogFileIndex(lastEventFile.getPath.getName))) assert(reader.fileSizeForLastIndex === lastEventFile.getLen) assert(reader.completed === isCompleted) assert(reader.modificationTime === lastEventFile.getModificationTime) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index c4b40884eebf5..060b878fb8ef2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -291,7 +291,7 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { expectedMaxSizeBytes: Long): Unit = { assert(eventLogFiles.forall(f => f.getLen <= expectedMaxSizeBytes)) assert((1 to expectedLastIndex) === - eventLogFiles.map(f => getIndex(f.getPath.getName))) + eventLogFiles.map(f => getEventLogFileIndex(f.getPath.getName))) } val appId = getUniqueApplicationId @@ -373,6 +373,6 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { private def listEventLogFiles(logDirPath: Path): Seq[FileStatus] = { fileSystem.listStatus(logDirPath).filter(isEventLogFile) - .sortBy { fs => getIndex(fs.getPath.getName) } + .sortBy { fs => getEventLogFileIndex(fs.getPath.getName) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala index 55eddce3968c2..85a2acb3477f1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala @@ -17,12 +17,17 @@ package org.apache.spark.deploy.history +import java.io.File import java.nio.charset.StandardCharsets -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.SparkConf import org.apache.spark.internal.config._ +import org.apache.spark.scheduler._ +import org.apache.spark.util.JsonProtocol object EventLogTestHelper { def getUniqueApplicationId: String = "test-" + System.currentTimeMillis @@ -56,4 +61,75 @@ object EventLogTestHelper { eventStr } } + + def writeEventLogFile( + sparkConf: SparkConf, + hadoopConf: Configuration, + dir: File, + idx: Int, + events: Seq[SparkListenerEvent]): String = { + // to simplify the code, we don't concern about file name being matched with the naming rule + // of event log file + val writer = new SingleEventLogFileWriter(s"app$idx", None, dir.toURI, sparkConf, hadoopConf) + writer.start() + events.foreach { event => writer.writeEvent(convertEvent(event), flushLogger = true) } + writer.stop() + writer.logPath + } + + def writeEventsToRollingWriter( + fs: FileSystem, + appId: String, + dir: File, + sparkConf: SparkConf, + hadoopConf: Configuration, + eventsFiles: Seq[SparkListenerEvent]*): Seq[FileStatus] = { + val writer = new RollingEventLogFilesWriter(appId, None, dir.toURI, sparkConf, hadoopConf) + writer.start() + + eventsFiles.dropRight(1).foreach { events => + writeEventsToRollingWriter(writer, events, rollFile = true) + } + eventsFiles.lastOption.foreach { events => + writeEventsToRollingWriter(writer, events, rollFile = false) + } + + writer.stop() + EventLogFileReader(fs, new Path(writer.logPath)).get.listEventLogFiles + } + + def writeEventsToRollingWriter( + writer: RollingEventLogFilesWriter, + events: Seq[SparkListenerEvent], + rollFile: Boolean): Unit = { + events.foreach { event => writer.writeEvent(convertEvent(event), flushLogger = true) } + if (rollFile) writer.rollEventLogFile() + } + + def convertEvent(event: SparkListenerEvent): String = { + compact(render(JsonProtocol.sparkEventToJson(event))) + } + + class TestEventFilter1 extends EventFilter { + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + case _: SparkListenerApplicationEnd => true + case _: SparkListenerBlockManagerAdded => true + case _: SparkListenerApplicationStart => false + } + + override def statistics(): Option[EventFilter.FilterStatistics] = None + } + + class TestEventFilter2 extends EventFilter { + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + case _: SparkListenerApplicationEnd => true + case _: SparkListenerEnvironmentUpdate => true + case _: SparkListenerNodeBlacklisted => true + case _: SparkListenerBlockManagerAdded => false + case _: SparkListenerApplicationStart => false + case _: SparkListenerNodeUnblacklisted => false + } + + override def statistics(): Option[EventFilter.FilterStatistics] = None + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala new file mode 100644 index 0000000000000..96883a7647b4e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.mutable +import scala.io.{Codec, Source} + +import org.apache.hadoop.fs.Path + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper.{TestEventFilter1, TestEventFilter2} +import org.apache.spark.scheduler._ +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.Utils + +class FilteredEventLogFileRewriterSuite extends SparkFunSuite { + test("rewrite files with test filters") { + def writeEventToWriter(writer: EventLogFileWriter, event: SparkListenerEvent): String = { + val line = EventLogTestHelper.convertEvent(event) + writer.writeEvent(line, flushLogger = true) + line + } + + withTempDir { tempDir => + val sparkConf = new SparkConf + val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + val fs = new Path(tempDir.getAbsolutePath).getFileSystem(hadoopConf) + + val writer = new SingleEventLogFileWriter("app", None, tempDir.toURI, sparkConf, hadoopConf) + writer.start() + + val expectedLines = new mutable.ArrayBuffer[String] + + // filterApplicationEnd: Some(true) & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerApplicationEnd(0)) + + // filterBlockManagerAdded: Some(true) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerBlockManagerAdded(0, BlockManagerId("1", "host1", 1), + 10)) + + // filterApplicationStart: Some(false) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerApplicationStart("app", None, 0, "user", None)) + + // filterNodeBlacklisted: None & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerNodeBlacklisted(0, "host1", 1)) + + // filterNodeUnblacklisted: None & Some(false) => filter out + writeEventToWriter(writer, SparkListenerNodeUnblacklisted(0, "host1")) + + // other events: None & None => filter in + expectedLines += writeEventToWriter(writer, SparkListenerUnpersistRDD(0)) + + writer.stop() + + val filters = Seq(new TestEventFilter1, new TestEventFilter2) + + val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, filters) + val logPath = new Path(writer.logPath) + val newPath = rewriter.rewrite(Seq(fs.getFileStatus(logPath))) + assert(new Path(newPath).getName === logPath.getName + EventLogFileWriter.COMPACTED) + + Utils.tryWithResource(EventLogFileReader.openEventLog(new Path(newPath), fs)) { is => + val lines = Source.fromInputStream(is)(Codec.UTF8).getLines() + var linesLength = 0 + lines.foreach { line => + linesLength += 1 + assert(expectedLines.contains(line)) + } + assert(linesLength === expectedLines.length, "The number of lines for rewritten file " + + s"is not expected: expected ${expectedLines.length} / actual $linesLength") + } + } + } +} diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index a289dddbdc9e6..e7eed7bf4c879 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -32,12 +32,12 @@ import org.apache.spark.internal.config.Status._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster._ +import org.apache.spark.status.ListenerEventsTestHelper._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.util.Utils class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { - private val conf = new SparkConf() .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) .set(ASYNC_TRACKING_ENABLED, false) @@ -1694,40 +1694,4 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { def blockId: BlockId = RDDBlockId(rddId, partId) } - - /** Create a stage submitted event for the specified stage Id. */ - private def createStageSubmittedEvent(stageId: Int) = { - SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, - Seq.empty, Seq.empty, "details")) - } - - /** Create a stage completed event for the specified stage Id. */ - private def createStageCompletedEvent(stageId: Int) = { - SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, - Seq.empty, Seq.empty, "details")) - } - - /** Create an executor added event for the specified executor Id. */ - private def createExecutorAddedEvent(executorId: Int) = { - SparkListenerExecutorAdded(0L, executorId.toString, - new ExecutorInfo("host1", 1, Map.empty, Map.empty)) - } - - /** Create an executor added event for the specified executor Id. */ - private def createExecutorRemovedEvent(executorId: Int) = { - SparkListenerExecutorRemoved(10L, executorId.toString, "test") - } - - /** Create an executor metrics update event, with the specified executor metrics values. */ - private def createExecutorMetricsUpdateEvent( - stageId: Int, - executorId: Int, - executorMetrics: Array[Long]): SparkListenerExecutorMetricsUpdate = { - val taskMetrics = TaskMetrics.empty - taskMetrics.incDiskBytesSpilled(111) - taskMetrics.incMemoryBytesSpilled(222) - val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo))) - val executorUpdates = Map((stageId, 0) -> new ExecutorMetrics(executorMetrics)) - SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) - } } diff --git a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala new file mode 100644 index 0000000000000..37a35744ada5e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.Properties + +import scala.collection.immutable.Map + +import org.apache.spark.{AccumulatorSuite, SparkContext, Success, TaskState} +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, SparkListenerExecutorMetricsUpdate, SparkListenerExecutorRemoved, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality} +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.storage.{RDDInfo, StorageLevel} + +object ListenerEventsTestHelper { + + private var taskIdTracker = -1L + private var rddIdTracker = -1 + private var stageIdTracker = -1 + + def reset(): Unit = { + taskIdTracker = -1L + rddIdTracker = -1 + stageIdTracker = -1 + } + + def createJobProps(): Properties = { + val jobProps = new Properties() + jobProps.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, "jobDescription") + jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup") + jobProps.setProperty(SparkContext.SPARK_SCHEDULER_POOL, "schedPool") + jobProps + } + + def createRddsWithId(ids: Seq[Int]): Seq[RDDInfo] = { + ids.map { rddId => + new RDDInfo(rddId, s"rdd${rddId}", 2, StorageLevel.NONE, false, Nil) + } + } + + def createRdds(count: Int): Seq[RDDInfo] = { + (1 to count).map { _ => + val rddId = nextRddId() + new RDDInfo(rddId, s"rdd${rddId}", 2, StorageLevel.NONE, false, Nil) + } + } + + def createStage(id: Int, rdds: Seq[RDDInfo], parentIds: Seq[Int]): StageInfo = { + new StageInfo(id, 0, s"stage${id}", 4, rdds, parentIds, s"details${id}") + } + + def createStage(rdds: Seq[RDDInfo], parentIds: Seq[Int]): StageInfo = { + createStage(nextStageId(), rdds, parentIds) + } + + def createTasks(ids: Seq[Long], execs: Array[String], time: Long): Seq[TaskInfo] = { + ids.zipWithIndex.map { case (id, idx) => + val exec = execs(idx % execs.length) + new TaskInfo(id, idx, 1, time, exec, s"$exec.example.com", + TaskLocality.PROCESS_LOCAL, idx % 2 == 0) + } + } + + def createTasks(count: Int, execs: Array[String], time: Long): Seq[TaskInfo] = { + createTasks((1 to count).map { _ => nextTaskId() }, execs, time) + } + + def createTaskWithNewAttempt(orig: TaskInfo, time: Long): TaskInfo = { + // Task reattempts have a different ID, but the same index as the original. + new TaskInfo(nextTaskId(), orig.index, orig.attemptNumber + 1, time, orig.executorId, + s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative) + } + + def createTaskStartEvent( + taskInfo: TaskInfo, + stageId: Int, + attemptId: Int): SparkListenerTaskStart = { + SparkListenerTaskStart(stageId, attemptId, taskInfo) + } + + /** Create a stage submitted event for the specified stage Id. */ + def createStageSubmittedEvent(stageId: Int): SparkListenerStageSubmitted = { + SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + } + + /** Create a stage completed event for the specified stage Id. */ + def createStageCompletedEvent(stageId: Int): SparkListenerStageCompleted = { + SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + } + + def createExecutorAddedEvent(executorId: Int): SparkListenerExecutorAdded = { + createExecutorAddedEvent(executorId.toString, 0) + } + + /** Create an executor added event for the specified executor Id. */ + def createExecutorAddedEvent(executorId: String, time: Long): SparkListenerExecutorAdded = { + SparkListenerExecutorAdded(time, executorId, + new ExecutorInfo("host1", 1, Map.empty, Map.empty)) + } + + def createExecutorRemovedEvent(executorId: Int): SparkListenerExecutorRemoved = { + createExecutorRemovedEvent(executorId.toString, 10L) + } + + /** Create an executor added event for the specified executor Id. */ + def createExecutorRemovedEvent(executorId: String, time: Long): SparkListenerExecutorRemoved = { + SparkListenerExecutorRemoved(time, executorId, "test") + } + + /** Create an executor metrics update event, with the specified executor metrics values. */ + def createExecutorMetricsUpdateEvent( + stageId: Int, + executorId: Int, + executorMetrics: Array[Long]): SparkListenerExecutorMetricsUpdate = { + val taskMetrics = TaskMetrics.empty + taskMetrics.incDiskBytesSpilled(111) + taskMetrics.incMemoryBytesSpilled(222) + val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo))) + val executorUpdates = Map((stageId, 0) -> new ExecutorMetrics(executorMetrics)) + SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) + } + + case class JobInfo( + stageIds: Seq[Int], + stageToTaskIds: Map[Int, Seq[Long]], + stageToRddIds: Map[Int, Seq[Int]]) + + def pushJobEventsWithoutJobEnd( + listener: SparkListener, + jobId: Int, + jobProps: Properties, + execIds: Array[String], + time: Long): JobInfo = { + // Start a job with 1 stages / 4 tasks each + val rddsForStage = createRdds(2) + val stage = createStage(rddsForStage, Nil) + + listener.onJobStart(SparkListenerJobStart(jobId, time, Seq(stage), jobProps)) + + // Submit stage + stage.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, jobProps)) + + // Start tasks from stage + val s1Tasks = createTasks(4, execIds, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, + stage.attemptNumber(), task)) + } + + // Succeed all tasks in stage. + val s1Metrics = TaskMetrics.empty + s1Metrics.setExecutorCpuTime(2L) + s1Metrics.setExecutorRunTime(4L) + + s1Tasks.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, + "taskType", Success, task, new ExecutorMetrics, s1Metrics)) + } + + // End stage. + stage.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stage)) + + JobInfo(Seq(stage.stageId), Map(stage.stageId -> s1Tasks.map(_.taskId)), + Map(stage.stageId -> rddsForStage.map(_.id))) + } + + private def nextTaskId(): Long = { + taskIdTracker += 1 + taskIdTracker + } + + private def nextRddId(): Int = { + rddIdTracker += 1 + rddIdTracker + } + + private def nextStageId(): Int = { + stageIdTracker += 1 + stageIdTracker + } +}