Skip to content

Commit

Permalink
Address some more tests - leave new TODO/FIXME for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Nov 13, 2019
1 parent 916a458 commit 0729a41
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.internal.config.EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN
import org.apache.spark.scheduler._
import org.apache.spark.util.{JsonProtocol, Utils}

// FIXME: UTs
class EventLogFileCompactor(
sparkConf: SparkConf,
hadoopConf: Configuration,
Expand All @@ -60,11 +59,10 @@ class EventLogFileCompactor(
if (filesToCompact.isEmpty) {
filesToRetain
} else {
// first pass
val bus = new ReplayListenerBus()

val builders = ServiceLoader.load(classOf[EventFilterBuilder],
Utils.getContextOrSparkClassLoader).asScala
Utils.getContextOrSparkClassLoader).asScala.toSeq
builders.foreach(bus.addListener)

filesToCompact.foreach { log =>
Expand All @@ -73,17 +71,14 @@ class EventLogFileCompactor(
}
}

// second pass
val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf,
filesToCompact, fs, builders.map(_.createFilter()).toSeq)
rewriter.start()
rewriter.rewrite()
rewriter.stop()
val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs,
builders.map(_.createFilter()))
val compactedPath = rewriter.rewrite(filesToCompact)

// cleanup files which are replaced with new compacted file.
cleanupCompactedFiles(filesToCompact)

fs.getFileStatus(new Path(rewriter.logPath)) :: filesToRetain.toList
fs.getFileStatus(new Path(compactedPath)) :: filesToRetain.toList
}
}

Expand Down Expand Up @@ -112,36 +107,28 @@ class EventLogFileCompactor(
}
}

// FIXME: UTs
class FilteredEventLogFileRewriter(
sparkConf: SparkConf,
hadoopConf: Configuration,
eventLogFiles: Seq[FileStatus],
fs: FileSystem,
filters: Seq[EventFilter]) extends Logging {

require(eventLogFiles.nonEmpty)
def rewrite(eventLogFiles: Seq[FileStatus]): String = {
require(eventLogFiles.nonEmpty)

private val targetEventLogFilePath = eventLogFiles.last.getPath
private val logWriter: CompactedEventLogFileWriter = new CompactedEventLogFileWriter(
targetEventLogFilePath, "dummy", None, targetEventLogFilePath.getParent.toUri,
sparkConf, hadoopConf)
val targetEventLogFilePath = eventLogFiles.last.getPath
val logWriter: CompactedEventLogFileWriter = new CompactedEventLogFileWriter(
targetEventLogFilePath, "dummy", None, targetEventLogFilePath.getParent.toUri,
sparkConf, hadoopConf)

def logPath: String = logWriter.logPath

def start(): Unit = {
logWriter.start()
}

def stop(): Unit = {
eventLogFiles.foreach { file => rewriteFile(logWriter, file) }
logWriter.stop()
}

def rewrite(): Unit = {
eventLogFiles.foreach(rewriteFile)
logWriter.logPath
}

private def rewriteFile(fileStatus: FileStatus): Unit = {
private def rewriteFile(logWriter: CompactedEventLogFileWriter, fileStatus: FileStatus): Unit = {
Utils.tryWithResource(EventLogFileReader.openEventLog(fileStatus.getPath, fs)) { in =>
val lines = Source.fromInputStream(in)(Codec.UTF8).getLines()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import java.io.File

import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.config.EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN
import org.apache.spark.scheduler._
import org.apache.spark.util.JsonProtocol

class EventLogFileCompactorSuite extends SparkFunSuite {
private val sparkConf = testSparkConf()
private val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)

test("No event log files") {
withTempDir { dir =>
val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs)
assert(Seq.empty[FileStatus] === compactor.compact(Seq.empty))
}
}

test("No compact file, less origin files available than max files to retain") {
withTempDir { dir =>
val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)

val logPath1 = writeDummyEventLogFile(dir, 1)
val logPath2 = writeDummyEventLogFile(dir, 2)

val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs)
val fileStatuses = Seq(logPath1, logPath2).map { p => fs.getFileStatus(new Path(p)) }
val filesToRead = compactor.compact(fileStatuses)

assert(filesToRead.map(_.getPath) === fileStatuses.map(_.getPath))
}
}

test("No compact file, more origin files available than max files to retain") {
withTempDir { dir =>
val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)

val logPaths = (1 to 5).map { idx => writeDummyEventLogFile(dir, idx) }

val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs)
val fileStatuses = logPaths.map { p => fs.getFileStatus(new Path(p)) }
val filesToRead = compactor.compact(fileStatuses)

// 3 (max file to retain) + 1 (compacted file)
assert(filesToRead.length === 4)
val originalFilesToRead = filesToRead.takeRight(3)
val originFileToCompact = fileStatuses.takeRight(4).head.getPath
val compactFilePath = filesToRead.head.getPath

assert(compactFilePath.getName === originFileToCompact.getName + EventLogFileWriter.COMPACTED)
assert(originalFilesToRead.map(_.getPath) === fileStatuses.takeRight(3).map(_.getPath))

// compacted files will be removed
fileStatuses.take(2).foreach { status => assert(!fs.exists(status.getPath)) }
filesToRead.foreach { status => assert(fs.exists(status.getPath)) }
}
}

test("compact file exists, less origin files available than max files to retain") {
withTempDir { dir =>
val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)

val logPaths = (1 to 5).map { idx => writeDummyEventLogFile(dir, idx) }

val fileToCompact = logPaths(2)
val compactedPath = new Path(fileToCompact + EventLogFileWriter.COMPACTED)
assert(fs.rename(new Path(fileToCompact), compactedPath))

val newLogPaths = logPaths.take(2) ++ Seq(compactedPath.toString) ++
logPaths.takeRight(2)

val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs)
val fileStatuses = newLogPaths.map { p => fs.getFileStatus(new Path(p)) }
val filesToRead = compactor.compact(fileStatuses)

// filesToRead will start from rightmost compact file, but no new compact file
assert(filesToRead.map(_.getPath) === fileStatuses.takeRight(3).map(_.getPath))
}
}

test("compact file exists, more origin files available than max files to retain") {
withTempDir { dir =>
val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)

val logPaths = (1 to 10).map { idx => writeDummyEventLogFile(dir, idx) }

val fileToCompact = logPaths(2)
val compactedPath = new Path(fileToCompact + EventLogFileWriter.COMPACTED)
assert(fs.rename(new Path(fileToCompact), compactedPath))

val newLogPaths = logPaths.take(2) ++ Seq(compactedPath.toString) ++
logPaths.takeRight(7)

val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs)
val fileStatuses = newLogPaths.map { p => fs.getFileStatus(new Path(p)) }
val filesToRead = compactor.compact(fileStatuses)

// 3 (max file to retain) + 1 (compacted file)
assert(filesToRead.length === 4)
val originalFilesToRead = filesToRead.takeRight(3)
val originFileToCompact = fileStatuses.takeRight(4).head.getPath
val compactFilePath = filesToRead.head.getPath

assert(compactFilePath.getName === originFileToCompact.getName + EventLogFileWriter.COMPACTED)
assert(originalFilesToRead.map(_.getPath) === fileStatuses.takeRight(3).map(_.getPath))

// compacted files will be removed - we don't check files "before" the rightmost compact file
fileStatuses.drop(2).dropRight(3).foreach { status => assert(!fs.exists(status.getPath)) }
filesToRead.foreach { status => assert(fs.exists(status.getPath)) }
}
}

// TODO: Would we want to verify the result of compact file here? Because we already have it in
// FilteredEventLogFileRewriterSuite, and adding tests everywhere seem to be redundant.

private def writeDummyEventLogFile(dir: File, idx: Int): String = {
// to simplify the code, we don't concern about file name being matched with the naming rule
// of event log file
val writer = new SingleEventLogFileWriter(s"app$idx", None, dir.toURI, sparkConf, hadoopConf)
writer.start()
writer.writeEvent(convertEvent(
SparkListenerApplicationStart("app", Some("app"), 0, "user", None)), flushLogger = true)
writer.stop()
writer.logPath
}

private def convertEvent(event: SparkListenerEvent): String = {
compact(render(JsonProtocol.sparkEventToJson(event)))
}

private def testSparkConf(): SparkConf = {
new SparkConf().set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 3)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import scala.collection.mutable
import scala.io.{Codec, Source}

import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{JsonProtocol, Utils}

class FilteredEventLogFileRewriterSuite extends SparkFunSuite {
test("rewrite files with test filters") {
def writeEventToWriter(writer: EventLogFileWriter, event: SparkListenerEvent): String = {
val line = convertEvent(event)
writer.writeEvent(line, flushLogger = true)
line
}

withTempDir { tempDir =>
val sparkConf = new SparkConf
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
val fs = new Path(tempDir.getAbsolutePath).getFileSystem(hadoopConf)

val writer = new SingleEventLogFileWriter("app", None, tempDir.toURI, sparkConf, hadoopConf)
writer.start()

val expectedLines = new mutable.ArrayBuffer[String]

// filterApplicationEnd: Some(true) & Some(true) => filter in
expectedLines += writeEventToWriter(writer, SparkListenerApplicationEnd(0))

// filterBlockManagerAdded: Some(true) & Some(false) => filter out
writeEventToWriter(writer, SparkListenerBlockManagerAdded(0, BlockManagerId("1", "host1", 1),
10))

// filterApplicationStart: Some(false) & Some(false) => filter out
writeEventToWriter(writer, SparkListenerApplicationStart("app", None, 0, "user", None))

// filterNodeBlacklisted: None & Some(true) => filter in
expectedLines += writeEventToWriter(writer, SparkListenerNodeBlacklisted(0, "host1", 1))

// filterNodeUnblacklisted: None & Some(false) => filter out
writeEventToWriter(writer, SparkListenerNodeUnblacklisted(0, "host1"))

// other events: None & None => filter in
expectedLines += writeEventToWriter(writer, SparkListenerUnpersistRDD(0))

writer.stop()

val filters = Seq(new TestEventFilter1, new TestEventFilter2)

val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, filters)
val logPath = new Path(writer.logPath)
val newPath = rewriter.rewrite(Seq(fs.getFileStatus(logPath)))
assert(new Path(newPath).getName === logPath.getName + EventLogFileWriter.COMPACTED)

Utils.tryWithResource(EventLogFileReader.openEventLog(new Path(newPath), fs)) { is =>
val lines = Source.fromInputStream(is)(Codec.UTF8).getLines()
var linesLength = 0
lines.foreach { line =>
linesLength += 1
assert(expectedLines.contains(line))
}
assert(linesLength === expectedLines.length, "The number of lines for rewritten file " +
s"is not expected: expected ${expectedLines.length} / actual $linesLength")
}
}
}

private def convertEvent(event: SparkListenerEvent): String = {
compact(render(JsonProtocol.sparkEventToJson(event)))
}
}

class TestEventFilter1 extends EventFilter {
override def filterApplicationEnd(event: SparkListenerApplicationEnd): Option[Boolean] = {
Some(true)
}

override def filterBlockManagerAdded(event: SparkListenerBlockManagerAdded): Option[Boolean] = {
Some(true)
}

override def filterApplicationStart(event: SparkListenerApplicationStart): Option[Boolean] = {
Some(false)
}
}

class TestEventFilter2 extends EventFilter {
override def filterApplicationEnd(event: SparkListenerApplicationEnd): Option[Boolean] = {
Some(true)
}

override def filterEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Option[Boolean] = {
Some(true)
}

override def filterBlockManagerAdded(event: SparkListenerBlockManagerAdded): Option[Boolean] = {
Some(false)
}

override def filterApplicationStart(event: SparkListenerApplicationStart): Option[Boolean] = {
Some(false)
}

override def filterNodeBlacklisted(event: SparkListenerNodeBlacklisted): Option[Boolean] = {
Some(true)
}

override def filterNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Option[Boolean] = {
Some(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
}

// FIXME: add end-to-end test for compaction, or mock to verify whether compact is being called

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.status
import java.util.Properties

import scala.collection.immutable.Map

import org.apache.spark.{AccumulatorSuite, SparkContext}
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListenerExecutorMetricsUpdate, SparkListenerExecutorRemoved, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality}
Expand Down

0 comments on commit 0729a41

Please sign in to comment.