From 316bc172b8678a1e05c974db8be904ab343050b2 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 13 Nov 2019 19:38:31 +0900 Subject: [PATCH] Address review comments --- .../apache/spark/deploy/history/EventFilter.scala | 4 ++-- .../spark/deploy/history/EventLogFileCompactor.scala | 12 ++++++++---- .../spark/deploy/history/EventLogFileReaders.scala | 2 +- .../spark/deploy/history/EventLogFileWriters.scala | 8 ++++++-- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala index da73908bb8ad1..6bb9022241f31 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala @@ -19,11 +19,11 @@ package org.apache.spark.deploy.history import org.apache.spark.scheduler._ -trait EventFilterBuilder extends SparkListenerInterface { +private[spark] trait EventFilterBuilder extends SparkListenerInterface { def createFilter(): EventFilter } -trait EventFilter { +private[spark] trait EventFilter { def filterStageCompleted(event: SparkListenerStageCompleted): Option[Boolean] = None def filterStageSubmitted(event: SparkListenerStageSubmitted): Option[Boolean] = None diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala index d4a81ed6ecf17..baa40b9044b01 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala @@ -46,8 +46,8 @@ class EventLogFileCompactor( // sequentially if the last event log file is already a compacted file, everything // will be skipped def compact(eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = { - if (eventLogFiles.isEmpty) { - return Seq.empty[FileStatus] + if (eventLogFiles.length <= maxFilesToRetain) { + return eventLogFiles } // skip everything if the last file is already a compacted file @@ -84,10 +84,14 @@ class EventLogFileCompactor( private def cleanupCompactedFiles(files: Seq[FileStatus]): Unit = { files.foreach { file => + var deleted = false try { - fs.delete(file.getPath, true) + deleted = fs.delete(file.getPath, true) } catch { - case _: IOException => logWarning(s"Failed to remove ${file.getPath} / skip removing.") + case _: IOException => + } + if (!deleted) { + logWarning(s"Failed to remove ${file.getPath} / skip removing.") } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala index 597fd3f2671f9..2ec41bfa038ef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala @@ -220,7 +220,7 @@ class RollingEventLogFilesFileReader( private lazy val eventLogFiles: Seq[FileStatus] = { val eventLogFiles = files.filter(isEventLogFile).sortBy { status => val filePath = status.getPath - var idx = getIndex(status.getPath.getName) + 0.0d + var idx = getIndex(filePath.getName) + 0.0d // trick to place compacted file later than normal file if index is same. if (EventLogFileWriter.isCompacted(filePath)) { idx += 0.1 diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index d5f413e681a2c..8b497f4cbea93 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -402,8 +402,12 @@ object RollingEventLogFilesWriter { status.isDirectory && status.getPath.getName.startsWith(EVENT_LOG_DIR_NAME_PREFIX) } + def isEventLogFile(fileName: String): Boolean = { + fileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + } + def isEventLogFile(status: FileStatus): Boolean = { - status.isFile && status.getPath.getName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + status.isFile && isEventLogFile(status.getPath.getName) } def isAppStatusFile(status: FileStatus): Boolean = { @@ -411,7 +415,7 @@ object RollingEventLogFilesWriter { } def getIndex(eventLogFileName: String): Long = { - require(eventLogFileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX), "Not an event log file!") + require(isEventLogFile(eventLogFileName), "Not an event log file!") val index = eventLogFileName.stripPrefix(EVENT_LOG_FILE_NAME_PREFIX).split("_")(0) index.toLong }