From 0729a41489967f404e9e7577bfb0652e15fe6e58 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 13 Nov 2019 19:25:22 +0900 Subject: [PATCH] Address some more tests - leave new TODO/FIXME for tests --- .../history/EventLogFileCompactor.scala | 41 ++--- .../history/EventLogFileCompactorSuite.scala | 158 ++++++++++++++++++ .../FilteredEventLogFileRewriterSuite.scala | 134 +++++++++++++++ .../history/FsHistoryProviderSuite.scala | 2 + .../status/ListenerEventsTestHelper.scala | 1 + 5 files changed, 309 insertions(+), 27 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala index cdf38fd5669aa..d4a81ed6ecf17 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala @@ -35,7 +35,6 @@ import org.apache.spark.internal.config.EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN import org.apache.spark.scheduler._ import org.apache.spark.util.{JsonProtocol, Utils} -// FIXME: UTs class EventLogFileCompactor( sparkConf: SparkConf, hadoopConf: Configuration, @@ -60,11 +59,10 @@ class EventLogFileCompactor( if (filesToCompact.isEmpty) { filesToRetain } else { - // first pass val bus = new ReplayListenerBus() val builders = ServiceLoader.load(classOf[EventFilterBuilder], - Utils.getContextOrSparkClassLoader).asScala + Utils.getContextOrSparkClassLoader).asScala.toSeq builders.foreach(bus.addListener) filesToCompact.foreach { log => @@ -73,17 +71,14 @@ class EventLogFileCompactor( } } - // second pass - val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, - filesToCompact, fs, builders.map(_.createFilter()).toSeq) - rewriter.start() - rewriter.rewrite() - rewriter.stop() + val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, + builders.map(_.createFilter())) + val compactedPath = rewriter.rewrite(filesToCompact) // cleanup files which are replaced with new compacted file. cleanupCompactedFiles(filesToCompact) - fs.getFileStatus(new Path(rewriter.logPath)) :: filesToRetain.toList + fs.getFileStatus(new Path(compactedPath)) :: filesToRetain.toList } } @@ -112,36 +107,28 @@ class EventLogFileCompactor( } } -// FIXME: UTs class FilteredEventLogFileRewriter( sparkConf: SparkConf, hadoopConf: Configuration, - eventLogFiles: Seq[FileStatus], fs: FileSystem, filters: Seq[EventFilter]) extends Logging { - require(eventLogFiles.nonEmpty) + def rewrite(eventLogFiles: Seq[FileStatus]): String = { + require(eventLogFiles.nonEmpty) - private val targetEventLogFilePath = eventLogFiles.last.getPath - private val logWriter: CompactedEventLogFileWriter = new CompactedEventLogFileWriter( - targetEventLogFilePath, "dummy", None, targetEventLogFilePath.getParent.toUri, - sparkConf, hadoopConf) + val targetEventLogFilePath = eventLogFiles.last.getPath + val logWriter: CompactedEventLogFileWriter = new CompactedEventLogFileWriter( + targetEventLogFilePath, "dummy", None, targetEventLogFilePath.getParent.toUri, + sparkConf, hadoopConf) - def logPath: String = logWriter.logPath - - def start(): Unit = { logWriter.start() - } - - def stop(): Unit = { + eventLogFiles.foreach { file => rewriteFile(logWriter, file) } logWriter.stop() - } - def rewrite(): Unit = { - eventLogFiles.foreach(rewriteFile) + logWriter.logPath } - private def rewriteFile(fileStatus: FileStatus): Unit = { + private def rewriteFile(logWriter: CompactedEventLogFileWriter, fileStatus: FileStatus): Unit = { Utils.tryWithResource(EventLogFileReader.openEventLog(fileStatus.getPath, fs)) { in => val lines = Source.fromInputStream(in)(Codec.UTF8).getLines() diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala new file mode 100644 index 0000000000000..1ecb9fa10f853 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.File + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.json4s.jackson.JsonMethods.{compact, render} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.config.EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN +import org.apache.spark.scheduler._ +import org.apache.spark.util.JsonProtocol + +class EventLogFileCompactorSuite extends SparkFunSuite { + private val sparkConf = testSparkConf() + private val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + + test("No event log files") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assert(Seq.empty[FileStatus] === compactor.compact(Seq.empty)) + } + } + + test("No compact file, less origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPath1 = writeDummyEventLogFile(dir, 1) + val logPath2 = writeDummyEventLogFile(dir, 2) + + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val fileStatuses = Seq(logPath1, logPath2).map { p => fs.getFileStatus(new Path(p)) } + val filesToRead = compactor.compact(fileStatuses) + + assert(filesToRead.map(_.getPath) === fileStatuses.map(_.getPath)) + } + } + + test("No compact file, more origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPaths = (1 to 5).map { idx => writeDummyEventLogFile(dir, idx) } + + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val fileStatuses = logPaths.map { p => fs.getFileStatus(new Path(p)) } + val filesToRead = compactor.compact(fileStatuses) + + // 3 (max file to retain) + 1 (compacted file) + assert(filesToRead.length === 4) + val originalFilesToRead = filesToRead.takeRight(3) + val originFileToCompact = fileStatuses.takeRight(4).head.getPath + val compactFilePath = filesToRead.head.getPath + + assert(compactFilePath.getName === originFileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(originalFilesToRead.map(_.getPath) === fileStatuses.takeRight(3).map(_.getPath)) + + // compacted files will be removed + fileStatuses.take(2).foreach { status => assert(!fs.exists(status.getPath)) } + filesToRead.foreach { status => assert(fs.exists(status.getPath)) } + } + } + + test("compact file exists, less origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPaths = (1 to 5).map { idx => writeDummyEventLogFile(dir, idx) } + + val fileToCompact = logPaths(2) + val compactedPath = new Path(fileToCompact + EventLogFileWriter.COMPACTED) + assert(fs.rename(new Path(fileToCompact), compactedPath)) + + val newLogPaths = logPaths.take(2) ++ Seq(compactedPath.toString) ++ + logPaths.takeRight(2) + + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val fileStatuses = newLogPaths.map { p => fs.getFileStatus(new Path(p)) } + val filesToRead = compactor.compact(fileStatuses) + + // filesToRead will start from rightmost compact file, but no new compact file + assert(filesToRead.map(_.getPath) === fileStatuses.takeRight(3).map(_.getPath)) + } + } + + test("compact file exists, more origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPaths = (1 to 10).map { idx => writeDummyEventLogFile(dir, idx) } + + val fileToCompact = logPaths(2) + val compactedPath = new Path(fileToCompact + EventLogFileWriter.COMPACTED) + assert(fs.rename(new Path(fileToCompact), compactedPath)) + + val newLogPaths = logPaths.take(2) ++ Seq(compactedPath.toString) ++ + logPaths.takeRight(7) + + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val fileStatuses = newLogPaths.map { p => fs.getFileStatus(new Path(p)) } + val filesToRead = compactor.compact(fileStatuses) + + // 3 (max file to retain) + 1 (compacted file) + assert(filesToRead.length === 4) + val originalFilesToRead = filesToRead.takeRight(3) + val originFileToCompact = fileStatuses.takeRight(4).head.getPath + val compactFilePath = filesToRead.head.getPath + + assert(compactFilePath.getName === originFileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(originalFilesToRead.map(_.getPath) === fileStatuses.takeRight(3).map(_.getPath)) + + // compacted files will be removed - we don't check files "before" the rightmost compact file + fileStatuses.drop(2).dropRight(3).foreach { status => assert(!fs.exists(status.getPath)) } + filesToRead.foreach { status => assert(fs.exists(status.getPath)) } + } + } + + // TODO: Would we want to verify the result of compact file here? Because we already have it in + // FilteredEventLogFileRewriterSuite, and adding tests everywhere seem to be redundant. + + private def writeDummyEventLogFile(dir: File, idx: Int): String = { + // to simplify the code, we don't concern about file name being matched with the naming rule + // of event log file + val writer = new SingleEventLogFileWriter(s"app$idx", None, dir.toURI, sparkConf, hadoopConf) + writer.start() + writer.writeEvent(convertEvent( + SparkListenerApplicationStart("app", Some("app"), 0, "user", None)), flushLogger = true) + writer.stop() + writer.logPath + } + + private def convertEvent(event: SparkListenerEvent): String = { + compact(render(JsonProtocol.sparkEventToJson(event))) + } + + private def testSparkConf(): SparkConf = { + new SparkConf().set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 3) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala new file mode 100644 index 0000000000000..a554fbcc2031a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.mutable +import scala.io.{Codec, Source} + +import org.apache.hadoop.fs.Path +import org.json4s.jackson.JsonMethods.{compact, render} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.scheduler._ +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.{JsonProtocol, Utils} + +class FilteredEventLogFileRewriterSuite extends SparkFunSuite { + test("rewrite files with test filters") { + def writeEventToWriter(writer: EventLogFileWriter, event: SparkListenerEvent): String = { + val line = convertEvent(event) + writer.writeEvent(line, flushLogger = true) + line + } + + withTempDir { tempDir => + val sparkConf = new SparkConf + val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + val fs = new Path(tempDir.getAbsolutePath).getFileSystem(hadoopConf) + + val writer = new SingleEventLogFileWriter("app", None, tempDir.toURI, sparkConf, hadoopConf) + writer.start() + + val expectedLines = new mutable.ArrayBuffer[String] + + // filterApplicationEnd: Some(true) & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerApplicationEnd(0)) + + // filterBlockManagerAdded: Some(true) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerBlockManagerAdded(0, BlockManagerId("1", "host1", 1), + 10)) + + // filterApplicationStart: Some(false) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerApplicationStart("app", None, 0, "user", None)) + + // filterNodeBlacklisted: None & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerNodeBlacklisted(0, "host1", 1)) + + // filterNodeUnblacklisted: None & Some(false) => filter out + writeEventToWriter(writer, SparkListenerNodeUnblacklisted(0, "host1")) + + // other events: None & None => filter in + expectedLines += writeEventToWriter(writer, SparkListenerUnpersistRDD(0)) + + writer.stop() + + val filters = Seq(new TestEventFilter1, new TestEventFilter2) + + val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, filters) + val logPath = new Path(writer.logPath) + val newPath = rewriter.rewrite(Seq(fs.getFileStatus(logPath))) + assert(new Path(newPath).getName === logPath.getName + EventLogFileWriter.COMPACTED) + + Utils.tryWithResource(EventLogFileReader.openEventLog(new Path(newPath), fs)) { is => + val lines = Source.fromInputStream(is)(Codec.UTF8).getLines() + var linesLength = 0 + lines.foreach { line => + linesLength += 1 + assert(expectedLines.contains(line)) + } + assert(linesLength === expectedLines.length, "The number of lines for rewritten file " + + s"is not expected: expected ${expectedLines.length} / actual $linesLength") + } + } + } + + private def convertEvent(event: SparkListenerEvent): String = { + compact(render(JsonProtocol.sparkEventToJson(event))) + } +} + +class TestEventFilter1 extends EventFilter { + override def filterApplicationEnd(event: SparkListenerApplicationEnd): Option[Boolean] = { + Some(true) + } + + override def filterBlockManagerAdded(event: SparkListenerBlockManagerAdded): Option[Boolean] = { + Some(true) + } + + override def filterApplicationStart(event: SparkListenerApplicationStart): Option[Boolean] = { + Some(false) + } +} + +class TestEventFilter2 extends EventFilter { + override def filterApplicationEnd(event: SparkListenerApplicationEnd): Option[Boolean] = { + Some(true) + } + + override def filterEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Option[Boolean] = { + Some(true) + } + + override def filterBlockManagerAdded(event: SparkListenerBlockManagerAdded): Option[Boolean] = { + Some(false) + } + + override def filterApplicationStart(event: SparkListenerApplicationStart): Option[Boolean] = { + Some(false) + } + + override def filterNodeBlacklisted(event: SparkListenerNodeBlacklisted): Option[Boolean] = { + Some(true) + } + + override def filterNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Option[Boolean] = { + Some(false) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index ed195dd44e917..6259276b1a9d8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1333,6 +1333,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } + // FIXME: add end-to-end test for compaction, or mock to verify whether compact is being called + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: diff --git a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala index cf96c3a52d6f7..f9496ecc4740a 100644 --- a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala +++ b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala @@ -20,6 +20,7 @@ package org.apache.spark.status import java.util.Properties import scala.collection.immutable.Map + import org.apache.spark.{AccumulatorSuite, SparkContext} import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListenerExecutorMetricsUpdate, SparkListenerExecutorRemoved, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality}