Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Nov 13, 2019
1 parent 0729a41 commit 316bc17
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.deploy.history

import org.apache.spark.scheduler._

trait EventFilterBuilder extends SparkListenerInterface {
private[spark] trait EventFilterBuilder extends SparkListenerInterface {
def createFilter(): EventFilter
}

trait EventFilter {
private[spark] trait EventFilter {
def filterStageCompleted(event: SparkListenerStageCompleted): Option[Boolean] = None

def filterStageSubmitted(event: SparkListenerStageSubmitted): Option[Boolean] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class EventLogFileCompactor(
// sequentially if the last event log file is already a compacted file, everything
// will be skipped
def compact(eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = {
if (eventLogFiles.isEmpty) {
return Seq.empty[FileStatus]
if (eventLogFiles.length <= maxFilesToRetain) {
return eventLogFiles
}

// skip everything if the last file is already a compacted file
Expand Down Expand Up @@ -84,10 +84,14 @@ class EventLogFileCompactor(

private def cleanupCompactedFiles(files: Seq[FileStatus]): Unit = {
files.foreach { file =>
var deleted = false
try {
fs.delete(file.getPath, true)
deleted = fs.delete(file.getPath, true)
} catch {
case _: IOException => logWarning(s"Failed to remove ${file.getPath} / skip removing.")
case _: IOException =>
}
if (!deleted) {
logWarning(s"Failed to remove ${file.getPath} / skip removing.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class RollingEventLogFilesFileReader(
private lazy val eventLogFiles: Seq[FileStatus] = {
val eventLogFiles = files.filter(isEventLogFile).sortBy { status =>
val filePath = status.getPath
var idx = getIndex(status.getPath.getName) + 0.0d
var idx = getIndex(filePath.getName) + 0.0d
// trick to place compacted file later than normal file if index is same.
if (EventLogFileWriter.isCompacted(filePath)) {
idx += 0.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,16 +402,20 @@ object RollingEventLogFilesWriter {
status.isDirectory && status.getPath.getName.startsWith(EVENT_LOG_DIR_NAME_PREFIX)
}

def isEventLogFile(fileName: String): Boolean = {
fileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX)
}

def isEventLogFile(status: FileStatus): Boolean = {
status.isFile && status.getPath.getName.startsWith(EVENT_LOG_FILE_NAME_PREFIX)
status.isFile && isEventLogFile(status.getPath.getName)
}

def isAppStatusFile(status: FileStatus): Boolean = {
status.isFile && status.getPath.getName.startsWith(APPSTATUS_FILE_NAME_PREFIX)
}

def getIndex(eventLogFileName: String): Long = {
require(eventLogFileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX), "Not an event log file!")
require(isEventLogFile(eventLogFileName), "Not an event log file!")
val index = eventLogFileName.stripPrefix(EVENT_LOG_FILE_NAME_PREFIX).split("_")(0)
index.toLong
}
Expand Down

0 comments on commit 316bc17

Please sign in to comment.