diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala new file mode 100644 index 0000000000000..c8956ed3d423d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{BufferedInputStream, InputStream} +import java.util.concurrent.ConcurrentHashMap +import java.util.zip.{ZipEntry, ZipOutputStream} + +import com.google.common.io.ByteStreams +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DFSInputStream + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventLogFileWriter.codecName +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** The base class of reader which will read the information of event log file(s). */ +abstract class EventLogFileReader( + protected val fileSystem: FileSystem, + val rootPath: Path) { + + protected def fileSizeForDFS(path: Path): Option[Long] = { + Utils.tryWithResource(fileSystem.open(path)) { in => + in.getWrappedStream match { + case dfsIn: DFSInputStream => Some(dfsIn.getFileLength) + case _ => None + } + } + } + + protected def addFileAsZipEntry( + zipStream: ZipOutputStream, + path: Path, + entryName: String): Unit = { + Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => + zipStream.putNextEntry(new ZipEntry(entryName)) + ByteStreams.copy(inputStream, zipStream) + zipStream.closeEntry() + } + } + + /** Returns the last index of event log files. None for single event log file. */ + def lastIndex: Option[Long] + + /** + * Returns the size of file for the last index of event log files. Returns its size for + * single event log file. + */ + def fileSizeForLastIndex: Long + + /** Returns whether the application is completed. */ + def completed: Boolean + + /** + * Returns the size of file for the last index (itself for single event log file) of event log + * files, only when underlying input stream is DFSInputStream. Otherwise returns None. + */ + def fileSizeForLastIndexForDFS: Option[Long] + + /** + * Returns the modification time for the last index (itself for single event log file) + * of event log files. + */ + def modificationTime: Long + + /** + * This method compresses the files passed in, and writes the compressed data out into the + * ZipOutputStream passed in. Each file is written as a new ZipEntry with its name being + * the name of the file being compressed. + */ + def zipEventLogFiles(zipStream: ZipOutputStream): Unit + + /** Returns all available event log files. */ + def listEventLogFiles: Seq[FileStatus] + + /** Returns the short compression name if being used. None if it's uncompressed. */ + def compressionCodec: Option[String] + + /** Returns the size of all event log files. */ + def totalSize: Long +} + +object EventLogFileReader { + // A cache for compression codecs to avoid creating the same codec many times + private val codecMap = new ConcurrentHashMap[String, CompressionCodec]() + + def apply( + fs: FileSystem, + path: Path, + lastIndex: Option[Long]): EventLogFileReader = { + lastIndex match { + case Some(_) => new RollingEventLogFilesFileReader(fs, path) + case None => new SingleFileEventLogFileReader(fs, path) + } + } + + def apply(fs: FileSystem, path: Path): Option[EventLogFileReader] = { + apply(fs, fs.getFileStatus(path)) + } + + def apply(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = { + if (isSingleEventLog(status)) { + Some(new SingleFileEventLogFileReader(fs, status.getPath)) + } else if (isRollingEventLogs(status)) { + Some(new RollingEventLogFilesFileReader(fs, status.getPath)) + } else { + None + } + } + + /** + * Opens an event log file and returns an input stream that contains the event data. + * + * @return input stream that holds one JSON record per line. + */ + def openEventLog(log: Path, fs: FileSystem): InputStream = { + val in = new BufferedInputStream(fs.open(log)) + try { + val codec = codecName(log).map { c => + codecMap.computeIfAbsent(c, CompressionCodec.createCodec(new SparkConf, _)) + } + codec.map(_.compressedContinuousInputStream(in)).getOrElse(in) + } catch { + case e: Throwable => + in.close() + throw e + } + } + + private def isSingleEventLog(status: FileStatus): Boolean = { + !status.isDirectory && + // FsHistoryProvider used to generate a hidden file which can't be read. Accidentally + // reading a garbage file is safe, but we would log an error which can be scary to + // the end-user. + !status.getPath.getName.startsWith(".") + } + + private def isRollingEventLogs(status: FileStatus): Boolean = { + RollingEventLogFilesWriter.isEventLogDir(status) + } +} + +/** + * The reader which will read the information of single event log file. + * + * This reader gets the status of event log file only once when required; + * It may not give "live" status of file that could be changing concurrently, and + * FileNotFoundException could occur if the log file is renamed before getting the + * status of log file. + */ +class SingleFileEventLogFileReader( + fs: FileSystem, + path: Path) extends EventLogFileReader(fs, path) { + private lazy val status = fileSystem.getFileStatus(rootPath) + + override def lastIndex: Option[Long] = None + + override def fileSizeForLastIndex: Long = status.getLen + + override def completed: Boolean = !rootPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS) + + override def fileSizeForLastIndexForDFS: Option[Long] = { + if (completed) { + Some(fileSizeForLastIndex) + } else { + fileSizeForDFS(rootPath) + } + } + + override def modificationTime: Long = status.getModificationTime + + override def zipEventLogFiles(zipStream: ZipOutputStream): Unit = { + addFileAsZipEntry(zipStream, rootPath, rootPath.getName) + } + + override def listEventLogFiles: Seq[FileStatus] = Seq(status) + + override def compressionCodec: Option[String] = EventLogFileWriter.codecName(rootPath) + + override def totalSize: Long = fileSizeForLastIndex +} + +/** + * The reader which will read the information of rolled multiple event log files. + * + * This reader lists the files only once; if caller would like to play with updated list, + * it needs to create another reader instance. + */ +class RollingEventLogFilesFileReader( + fs: FileSystem, + path: Path) extends EventLogFileReader(fs, path) { + import RollingEventLogFilesWriter._ + + private lazy val files: Seq[FileStatus] = { + val ret = fs.listStatus(rootPath).toSeq + require(ret.exists(isEventLogFile), "Log directory must contain at least one event log file!") + require(ret.exists(isAppStatusFile), "Log directory must contain an appstatus file!") + ret + } + + private lazy val appStatusFile = files.find(isAppStatusFile).get + + private lazy val eventLogFiles: Seq[FileStatus] = { + val eventLogFiles = files.filter(isEventLogFile).sortBy { status => + getIndex(status.getPath.getName) + } + val indices = eventLogFiles.map { file => getIndex(file.getPath.getName) }.sorted + require((indices.head to indices.last) == indices, "Found missing event log file, expected" + + s" indices: ${(indices.head to indices.last)}, actual: ${indices}") + eventLogFiles + } + + override def lastIndex: Option[Long] = Some(getIndex(lastEventLogFile.getPath.getName)) + + override def fileSizeForLastIndex: Long = lastEventLogFile.getLen + + override def completed: Boolean = { + !appStatusFile.getPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS) + } + + override def fileSizeForLastIndexForDFS: Option[Long] = { + if (completed) { + Some(fileSizeForLastIndex) + } else { + fileSizeForDFS(lastEventLogFile.getPath) + } + } + + override def modificationTime: Long = lastEventLogFile.getModificationTime + + override def zipEventLogFiles(zipStream: ZipOutputStream): Unit = { + val dirEntryName = rootPath.getName + "/" + zipStream.putNextEntry(new ZipEntry(dirEntryName)) + files.foreach { file => + addFileAsZipEntry(zipStream, file.getPath, dirEntryName + file.getPath.getName) + } + } + + override def listEventLogFiles: Seq[FileStatus] = eventLogFiles + + override def compressionCodec: Option[String] = { + EventLogFileWriter.codecName(eventLogFiles.head.getPath) + } + + override def totalSize: Long = eventLogFiles.map(_.getLen).sum + + private def lastEventLogFile: FileStatus = eventLogFiles.last +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala new file mode 100644 index 0000000000000..3fa5ef94892aa --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets + +import org.apache.commons.compress.utils.CountingOutputStream +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + +/** + * The base class of writer which will write event logs into file. + * + * The following configurable parameters are available to tune the behavior of writing: + * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events + * spark.eventLog.overwrite - Whether to overwrite any existing files + * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * + * Note that descendant classes can maintain its own parameters: refer the javadoc of each class + * for more details. + * + * NOTE: CountingOutputStream being returned by "initLogFile" counts "non-compressed" bytes. + */ +abstract class EventLogFileWriter( + appId: String, + appAttemptId : Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration) extends Logging { + + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt + protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + protected val compressionCodec = + if (shouldCompress) { + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) + } else { + None + } + + private[history] val compressionCodecName = compressionCodec.map { c => + CompressionCodec.getShortName(c.getClass.getName) + } + + // Only defined if the file system scheme is not local + protected var hadoopDataStream: Option[FSDataOutputStream] = None + protected var writer: Option[PrintWriter] = None + + protected def requireLogBaseDirAsDirectory(): Unit = { + if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") + } + } + + protected def initLogFile(path: Path)(fnSetupWriter: OutputStream => PrintWriter): Unit = { + if (shouldOverwrite && fileSystem.delete(path, true)) { + logWarning(s"Event log $path already exists. Overwriting...") + } + + val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme + val isDefaultLocal = defaultFs == null || defaultFs == "file" + val uri = path.toUri + + // The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + // Therefore, for local files, use FileOutputStream instead. + val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { + new FileOutputStream(uri.getPath) + } else { + hadoopDataStream = Some( + SparkHadoopUtil.createFile(fileSystem, path, sparkConf.get(EVENT_LOG_ALLOW_EC))) + hadoopDataStream.get + } + + try { + val cstream = compressionCodec.map(_.compressedContinuousOutputStream(dstream)) + .getOrElse(dstream) + val bstream = new BufferedOutputStream(cstream, outputBufferSize) + fileSystem.setPermission(path, EventLogFileWriter.LOG_FILE_PERMISSIONS) + logInfo(s"Logging events to $path") + writer = Some(fnSetupWriter(bstream)) + } catch { + case e: Exception => + dstream.close() + throw e + } + } + + protected def writeJson(json: String, flushLogger: Boolean = false): Unit = { + // scalastyle:off println + writer.foreach(_.println(json)) + // scalastyle:on println + if (flushLogger) { + writer.foreach(_.flush()) + hadoopDataStream.foreach(_.hflush()) + } + } + + protected def closeWriter(): Unit = { + writer.foreach(_.close()) + } + + protected def renameFile(src: Path, dest: Path, overwrite: Boolean): Unit = { + if (fileSystem.exists(dest)) { + if (overwrite) { + logWarning(s"Event log $dest already exists. Overwriting...") + if (!fileSystem.delete(dest, true)) { + logWarning(s"Error deleting $dest") + } + } else { + throw new IOException(s"Target log file already exists ($dest)") + } + } + fileSystem.rename(src, dest) + // touch file to ensure modtime is current across those filesystems where rename() + // does not set it but support setTimes() instead; it's a no-op on most object stores + try { + fileSystem.setTimes(dest, System.currentTimeMillis(), -1) + } catch { + case e: Exception => logDebug(s"failed to set time of $dest", e) + } + } + + /** initialize writer for event logging */ + def start(): Unit + + /** writes JSON format of event to file */ + def writeEvent(eventJson: String, flushLogger: Boolean = false): Unit + + /** stops writer - indicating the application has been completed */ + def stop(): Unit + + /** returns representative path of log. for tests only. */ + def logPath: String +} + +object EventLogFileWriter { + // Suffix applied to the names of files still being written by applications. + val IN_PROGRESS = ".inprogress" + + val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + def apply( + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration): EventLogFileWriter = { + if (sparkConf.get(EVENT_LOG_ENABLE_ROLLING)) { + new RollingEventLogFilesWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) + } else { + new SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) + } + } + + def nameForAppAndAttempt(appId: String, appAttemptId: Option[String]): String = { + val base = Utils.sanitizeDirName(appId) + if (appAttemptId.isDefined) { + base + "_" + Utils.sanitizeDirName(appAttemptId.get) + } else { + base + } + } + + def codecName(log: Path): Option[String] = { + // Compression codec is encoded as an extension, e.g. app_123.lzf + // Since we sanitize the app ID to not include periods, it is safe to split on it + val logName = log.getName.stripSuffix(IN_PROGRESS) + logName.split("\\.").tail.lastOption + } +} + +/** + * The writer to write event logs into single file. + */ +class SingleEventLogFileWriter( + appId: String, + appAttemptId : Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration) + extends EventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) { + + override val logPath: String = SingleEventLogFileWriter.getLogPath(logBaseDir, appId, + appAttemptId, compressionCodecName) + + private val inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS + + override def start(): Unit = { + requireLogBaseDirAsDirectory() + + initLogFile(new Path(inProgressPath)) { os => + new PrintWriter(new OutputStreamWriter(os, StandardCharsets.UTF_8)) + } + } + + override def writeEvent(eventJson: String, flushLogger: Boolean = false): Unit = { + writeJson(eventJson, flushLogger) + } + + /** + * Stop logging events. The event log file will be renamed so that it loses the + * ".inprogress" suffix. + */ + override def stop(): Unit = { + closeWriter() + renameFile(new Path(inProgressPath), new Path(logPath), shouldOverwrite) + } +} + +object SingleEventLogFileWriter { + /** + * Return a file-system-safe path to the log file for the given application. + * + * Note that because we currently only create a single log file for each application, + * we must encode all the information needed to parse this event log in the file name + * instead of within the file itself. Otherwise, if the file is compressed, for instance, + * we won't know which codec to use to decompress the metadata needed to open the file in + * the first place. + * + * The log file name will identify the compression codec used for the contents, if any. + * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log. + * + * @param logBaseDir Directory where the log file will be written. + * @param appId A unique app ID. + * @param appAttemptId A unique attempt id of appId. May be the empty string. + * @param compressionCodecName Name to identify the codec used to compress the contents + * of the log, or None if compression is not enabled. + * @return A path which consists of file-system-safe characters. + */ + def getLogPath( + logBaseDir: URI, + appId: String, + appAttemptId: Option[String], + compressionCodecName: Option[String] = None): String = { + val codec = compressionCodecName.map("." + _).getOrElse("") + new Path(logBaseDir).toString.stripSuffix("/") + "/" + + EventLogFileWriter.nameForAppAndAttempt(appId, appAttemptId) + codec + } +} + +/** + * The writer to write event logs into multiple log files, rolled over via configured size. + * + * The class creates one directory per application, and stores event log files as well as + * metadata files. The name of directory and files in the directory would follow: + * + * - The name of directory: eventlog_v2_appId(_[appAttemptId]) + * - The prefix of name on event files: events_[index]_[appId](_[appAttemptId])(.[codec]) + * - "index" would be monotonically increasing value (say, sequence) + * - The name of metadata (app. status) file name: appstatus_[appId](_[appAttemptId])(.inprogress) + * + * The writer will roll over the event log file when configured size is reached. Note that the + * writer doesn't check the size on file being open for write: the writer tracks the count of bytes + * written before compression is applied. + * + * For metadata files, the class will leverage zero-byte file, as it provides minimized cost. + */ +class RollingEventLogFilesWriter( + appId: String, + appAttemptId : Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration) + extends EventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) { + + import RollingEventLogFilesWriter._ + + private val eventFileMaxLength = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILE_SIZE) + + private val logDirForAppPath = getAppEventLogDirPath(logBaseDir, appId, appAttemptId) + + private var countingOutputStream: Option[CountingOutputStream] = None + + // index and event log path will be updated soon in rollEventLogFile, which `start` will call + private var index: Long = 0L + private var currentEventLogFilePath: Path = _ + + override def start(): Unit = { + requireLogBaseDirAsDirectory() + + if (fileSystem.exists(logDirForAppPath) && shouldOverwrite) { + fileSystem.delete(logDirForAppPath, true) + } + + if (fileSystem.exists(logDirForAppPath)) { + throw new IOException(s"Target log directory already exists ($logDirForAppPath)") + } + + fileSystem.mkdirs(logDirForAppPath, EventLogFileWriter.LOG_FILE_PERMISSIONS) + createAppStatusFile(inProgress = true) + rollEventLogFile() + } + + override def writeEvent(eventJson: String, flushLogger: Boolean = false): Unit = { + writer.foreach { w => + val currentLen = countingOutputStream.get.getBytesWritten + if (currentLen + eventJson.length > eventFileMaxLength) { + rollEventLogFile() + } + } + + writeJson(eventJson, flushLogger) + } + + private def rollEventLogFile(): Unit = { + closeWriter() + + index += 1 + currentEventLogFilePath = getEventLogFilePath(logDirForAppPath, appId, appAttemptId, index, + compressionCodecName) + + initLogFile(currentEventLogFilePath) { os => + countingOutputStream = Some(new CountingOutputStream(os)) + new PrintWriter( + new OutputStreamWriter(countingOutputStream.get, StandardCharsets.UTF_8)) + } + } + + override def stop(): Unit = { + closeWriter() + val appStatusPathIncomplete = getAppStatusFilePath(logDirForAppPath, appId, appAttemptId, + inProgress = true) + val appStatusPathComplete = getAppStatusFilePath(logDirForAppPath, appId, appAttemptId, + inProgress = false) + renameFile(appStatusPathIncomplete, appStatusPathComplete, overwrite = true) + } + + override def logPath: String = logDirForAppPath.toString + + private def createAppStatusFile(inProgress: Boolean): Unit = { + val appStatusPath = getAppStatusFilePath(logDirForAppPath, appId, appAttemptId, inProgress) + val outputStream = fileSystem.create(appStatusPath) + // we intentionally create zero-byte file to minimize the cost + outputStream.close() + } +} + +object RollingEventLogFilesWriter { + private[history] val EVENT_LOG_DIR_NAME_PREFIX = "eventlog_v2_" + private[history] val EVENT_LOG_FILE_NAME_PREFIX = "events_" + private[history] val APPSTATUS_FILE_NAME_PREFIX = "appstatus_" + + def getAppEventLogDirPath(logBaseDir: URI, appId: String, appAttemptId: Option[String]): Path = + new Path(new Path(logBaseDir), EVENT_LOG_DIR_NAME_PREFIX + + EventLogFileWriter.nameForAppAndAttempt(appId, appAttemptId)) + + def getAppStatusFilePath( + appLogDir: Path, + appId: String, + appAttemptId: Option[String], + inProgress: Boolean): Path = { + val base = APPSTATUS_FILE_NAME_PREFIX + + EventLogFileWriter.nameForAppAndAttempt(appId, appAttemptId) + val name = if (inProgress) base + EventLogFileWriter.IN_PROGRESS else base + new Path(appLogDir, name) + } + + def getEventLogFilePath( + appLogDir: Path, + appId: String, + appAttemptId: Option[String], + index: Long, + codecName: Option[String]): Path = { + val base = s"${EVENT_LOG_FILE_NAME_PREFIX}${index}_" + + EventLogFileWriter.nameForAppAndAttempt(appId, appAttemptId) + val codec = codecName.map("." + _).getOrElse("") + new Path(appLogDir, base + codec) + } + + def isEventLogDir(status: FileStatus): Boolean = { + status.isDirectory && status.getPath.getName.startsWith(EVENT_LOG_DIR_NAME_PREFIX) + } + + def isEventLogFile(status: FileStatus): Boolean = { + status.isFile && status.getPath.getName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + } + + def isAppStatusFile(status: FileStatus): Boolean = { + status.isFile && status.getPath.getName.startsWith(APPSTATUS_FILE_NAME_PREFIX) + } + + def getIndex(eventLogFileName: String): Long = { + require(eventLogFileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX), "Not an event log file!") + val index = eventLogFileName.stripPrefix(EVENT_LOG_FILE_NAME_PREFIX).split("_")(0) + index.toLong + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f7360660fa79a..903cb48c0bd20 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -430,27 +430,27 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) - .filter { entry => - !entry.isDirectory() && - // FsHistoryProvider used to generate a hidden file which can't be read. Accidentally - // reading a garbage file is safe, but we would log an error which can be scary to - // the end-user. - !entry.getPath().getName().startsWith(".") && - !isBlacklisted(entry.getPath) - } - .filter { entry => + .filter { entry => !isBlacklisted(entry.getPath) } + .flatMap { entry => EventLogFileReader(fs, entry) } + .filter { reader => try { - val info = listing.read(classOf[LogInfo], entry.getPath().toString()) + val info = listing.read(classOf[LogInfo], reader.rootPath.toString()) if (info.appId.isDefined) { // If the SHS view has a valid application, update the time the file was last seen so // that the entry is not deleted from the SHS listing. Also update the file size, in // case the code below decides we don't need to parse the log. - listing.write(info.copy(lastProcessed = newLastScanTime, fileSize = entry.getLen())) + listing.write(info.copy(lastProcessed = newLastScanTime, + fileSize = reader.fileSizeForLastIndex, + lastIndex = reader.lastIndex, + isComplete = reader.completed)) } - if (shouldReloadLog(info, entry)) { - if (info.appId.isDefined && fastInProgressParsing) { + if (shouldReloadLog(info, reader)) { + // ignore fastInProgressParsing when the status of application is changed from + // in-progress to completed, which is needed for rolling event log. + if (info.appId.isDefined && (info.isComplete == reader.completed) && + fastInProgressParsing) { // When fast in-progress parsing is on, we don't need to re-parse when the // size changes, but we do need to invalidate any existing UIs. // Also, we need to update the `lastUpdated time` to display the updated time in @@ -463,6 +463,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) attempt.info.copy(lastUpdated = new Date(newLastScanTime)), attempt.logPath, attempt.fileSize, + attempt.lastIndex, attempt.adminAcls, attempt.viewAcls, attempt.adminAclsGroups, @@ -488,25 +489,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // If the file is currently not being tracked by the SHS, add an entry for it and try // to parse it. This will allow the cleaner code to detect the file as stale later on // if it was not possible to parse it. - listing.write(LogInfo(entry.getPath().toString(), newLastScanTime, None, None, - entry.getLen())) - entry.getLen() > 0 + listing.write(LogInfo(reader.rootPath.toString(), newLastScanTime, LogType.EventLogs, + None, None, reader.fileSizeForLastIndex, reader.lastIndex, + reader.completed)) + reader.fileSizeForLastIndex > 0 } } .sortWith { case (entry1, entry2) => - entry1.getModificationTime() > entry2.getModificationTime() + entry1.modificationTime > entry2.modificationTime } if (updated.nonEmpty) { - logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}") + logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}") } val tasks = updated.flatMap { entry => try { - val task: Future[Unit] = replayExecutor.submit(new Runnable { - override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true) - }, Unit) - Some(task -> entry.getPath) + val task: Future[Unit] = replayExecutor.submit( + () => mergeApplicationListing(entry, newLastScanTime, true)) + Some(task -> entry.rootPath) } catch { // let the iteration over the updated entries break, since an exception on // replayExecutor.submit (..) indicates the ExecutorService is unable @@ -566,22 +567,26 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - private[history] def shouldReloadLog(info: LogInfo, entry: FileStatus): Boolean = { - var result = info.fileSize < entry.getLen - if (!result && info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { - try { - result = Utils.tryWithResource(fs.open(entry.getPath)) { in => - in.getWrappedStream match { - case dfsIn: DFSInputStream => info.fileSize < dfsIn.getFileLength - case _ => false - } + private[history] def shouldReloadLog(info: LogInfo, reader: EventLogFileReader): Boolean = { + if (info.isComplete != reader.completed) { + true + } else { + var result = if (info.lastIndex.isDefined) { + require(reader.lastIndex.isDefined) + info.lastIndex.get < reader.lastIndex.get || info.fileSize < reader.fileSizeForLastIndex + } else { + info.fileSize < reader.fileSizeForLastIndex + } + if (!result && !reader.completed) { + try { + result = reader.fileSizeForLastIndexForDFS.exists(info.fileSize < _) + } catch { + case e: Exception => + logDebug(s"Failed to check the length for the file : ${info.logPath}", e) } - } catch { - case e: Exception => - logDebug(s"Failed to check the length for the file : ${info.logPath}", e) } + result } - result } private def cleanAppData(appId: String, attemptId: Option[String], logPath: String): Unit = { @@ -628,23 +633,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) attemptId: Option[String], zipStream: ZipOutputStream): Unit = { - /** - * This method compresses the files passed in, and writes the compressed data out into the - * [[OutputStream]] passed in. Each file is written as a new [[ZipEntry]] with its name being - * the name of the file being compressed. - */ - def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = { - val fs = file.getFileSystem(hadoopConf) - val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer - try { - outputStream.putNextEntry(new ZipEntry(entryName)) - ByteStreams.copy(inputStream, outputStream) - outputStream.closeEntry() - } finally { - inputStream.close() - } - } - val app = try { load(appId) } catch { @@ -657,9 +645,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) attemptId .map { id => app.attempts.filter(_.info.attemptId == Some(id)) } .getOrElse(app.attempts) - .map(_.logPath) - .foreach { log => - zipFileToStream(new Path(logDir, log), log, zipStream) + .foreach { attempt => + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + reader.zipEventLogFiles(zipStream) } } finally { zipStream.close() @@ -670,7 +659,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * Replay the given log file, saving the application in the listing db. */ protected def mergeApplicationListing( - fileStatus: FileStatus, + reader: EventLogFileReader, scanTime: Long, enableOptimizations: Boolean): Unit = { val eventsFilter: ReplayEventsFilter = { eventString => @@ -680,8 +669,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) eventString.startsWith(ENV_UPDATE_EVENT_PREFIX) } - val logPath = fileStatus.getPath() - val appCompleted = isCompleted(logPath.getName()) + val logPath = reader.rootPath + val appCompleted = reader.completed val reparseChunkSize = conf.get(END_EVENT_REPARSE_CHUNK_SIZE) // Enable halt support in listener if: @@ -691,13 +680,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ((!appCompleted && fastInProgressParsing) || reparseChunkSize > 0) val bus = new ReplayListenerBus() - val listener = new AppListingListener(fileStatus, clock, shouldHalt) + val listener = new AppListingListener(reader, clock, shouldHalt) bus.addListener(listener) logInfo(s"Parsing $logPath for listing data...") - Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in => - bus.replay(in, logPath.toString, !appCompleted, eventsFilter) - } + val logFiles = reader.listEventLogFiles + parseAppEventLogs(logFiles, bus, !appCompleted, eventsFilter) // If enabled above, the listing listener will halt parsing when there's enough information to // create a listing entry. When the app is completed, or fast parsing is disabled, we still need @@ -719,8 +707,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // current position is, since the replay listener bus buffers data internally. val lookForEndEvent = shouldHalt && (appCompleted || !fastInProgressParsing) if (lookForEndEvent && listener.applicationInfo.isDefined) { - Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in => - val target = fileStatus.getLen() - reparseChunkSize + val lastFile = logFiles.last + Utils.tryWithResource(EventLogFileReader.openEventLog(lastFile.getPath, fs)) { in => + val target = lastFile.getLen - reparseChunkSize if (target > 0) { logInfo(s"Looking for end event; skipping $target bytes from $logPath...") var skipped = 0L @@ -737,7 +726,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) source.next() } - bus.replay(source, logPath.toString, !appCompleted, eventsFilter) + bus.replay(source, lastFile.getPath.toString, !appCompleted, eventsFilter) } } @@ -749,13 +738,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // listing data is good. invalidateUI(app.info.id, app.attempts.head.info.attemptId) addListing(app) - listing.write(LogInfo(logPath.toString(), scanTime, Some(app.info.id), - app.attempts.head.info.attemptId, fileStatus.getLen())) + listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, Some(app.info.id), + app.attempts.head.info.attemptId, reader.fileSizeForLastIndex, + reader.lastIndex, reader.completed)) // For a finished log, remove the corresponding "in progress" entry from the listing DB if // the file is really gone. - if (appCompleted) { - val inProgressLog = logPath.toString() + EventLoggingListener.IN_PROGRESS + // The logic is only valid for single event log, as root path doesn't change for + // rolled event logs. + if (appCompleted && reader.lastIndex.isEmpty) { + val inProgressLog = logPath.toString() + EventLogFileWriter.IN_PROGRESS try { // Fetch the entry first to avoid an RPC when it's already removed. listing.read(classOf[LogInfo], inProgressLog) @@ -772,13 +764,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // mean the end event is before the configured threshold, so call the method again to // re-parse the whole log. logInfo(s"Reparsing $logPath since end event was not found.") - mergeApplicationListing(fileStatus, scanTime, false) + mergeApplicationListing(reader, scanTime, enableOptimizations = false) case _ => // If the app hasn't written down its app ID to the logs, still record the entry in the // listing db, with an empty ID. This will make the log eligible for deletion if the app // does not make progress after the configured max log age. - listing.write(LogInfo(logPath.toString(), scanTime, None, None, fileStatus.getLen())) + listing.write( + LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None, + reader.fileSizeForLastIndex, reader.lastIndex, reader.completed)) } } @@ -845,8 +839,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.delete(classOf[LogInfo], log.logPath) } } - // Clean the blacklist from the expired entries. - clearBlacklist(CLEAN_INTERVAL_S) } /** @@ -854,7 +846,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ private def rebuildAppStore( store: KVStore, - eventLog: FileStatus, + reader: EventLogFileReader, lastUpdated: Long): Unit = { // Disable async updates, since they cause higher memory usage, and it's ok to take longer // to parse the event logs in the SHS. @@ -871,13 +863,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } replayBus.addListener(listener) try { - val path = eventLog.getPath() - logInfo(s"Parsing $path to re-build UI...") - Utils.tryWithResource(EventLoggingListener.openEventLog(path, fs)) { in => - replayBus.replay(in, path.toString(), maybeTruncated = !isCompleted(path.toString())) - } + logInfo(s"Parsing ${reader.rootPath} to re-build UI...") + parseAppEventLogs(reader.listEventLogFiles, replayBus, !reader.completed) trackingStore.close(false) - logInfo(s"Finished parsing $path") + logInfo(s"Finished parsing ${reader.rootPath}") } catch { case e: Exception => Utils.tryLogNonFatalError { @@ -887,6 +876,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + private def parseAppEventLogs( + logFiles: Seq[FileStatus], + replayBus: ReplayListenerBus, + maybeTruncated: Boolean, + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { + // stop replaying next log files if ReplayListenerBus indicates some error or halt + var continueReplay = true + logFiles.foreach { file => + if (continueReplay) { + Utils.tryWithResource(EventLogFileReader.openEventLog(file.getPath, fs)) { in => + continueReplay = replayBus.replay(in, file.getPath.toString, + maybeTruncated = maybeTruncated, eventsFilter = eventsFilter) + } + } + } + } + /** * Checks whether HDFS is in safe mode. * @@ -968,15 +974,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // At this point the disk data either does not exist or was deleted because it failed to // load, so the event log needs to be replayed. - val status = fs.getFileStatus(new Path(logDir, attempt.logPath)) - val isCompressed = EventLoggingListener.codecName(status.getPath()).flatMap { name => - Try(CompressionCodec.getShortName(name)).toOption - }.isDefined + + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + val isCompressed = reader.compressionCodec.isDefined logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") - val lease = dm.lease(status.getLen(), isCompressed) + val lease = dm.lease(reader.totalSize, isCompressed) val newStorePath = try { Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store => - rebuildAppStore(store, status, attempt.info.lastUpdated.getTime()) + rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) } lease.commit(appId, attempt.info.attemptId) } catch { @@ -990,8 +996,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { val store = new InMemoryStore() - val status = fs.getFileStatus(new Path(logDir, attempt.logPath)) - rebuildAppStore(store, status, attempt.info.lastUpdated.getTime()) + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) store } @@ -1019,15 +1026,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } } - - private def isCompleted(name: String): Boolean = { - !name.endsWith(EventLoggingListener.IN_PROGRESS) - } - } private[history] object FsHistoryProvider { - private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads" private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\"" @@ -1050,6 +1051,10 @@ private[history] case class FsHistoryProviderMetadata( uiVersion: Long, logDir: String) +private[history] object LogType extends Enumeration { + val DriverLogs, EventLogs = Value +} + /** * Tracking info for event logs detected in the configured log directory. Tracks both valid and * invalid logs (e.g. unparseable logs, recorded as logs with no app ID) so that the cleaner @@ -1060,12 +1065,15 @@ private[history] case class LogInfo( @KVIndexParam("lastProcessed") lastProcessed: Long, appId: Option[String], attemptId: Option[String], - fileSize: Long) + fileSize: Long, + lastIndex: Option[Long], + isComplete: Boolean) private[history] class AttemptInfoWrapper( val info: ApplicationAttemptInfo, val logPath: String, val fileSize: Long, + val lastIndex: Option[Long], val adminAcls: Option[String], val viewAcls: Option[String], val adminAclsGroups: Option[String], @@ -1089,12 +1097,13 @@ private[history] class ApplicationInfoWrapper( } private[history] class AppListingListener( - log: FileStatus, + reader: EventLogFileReader, clock: Clock, haltEnabled: Boolean) extends SparkListener { private val app = new MutableApplicationInfo() - private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen()) + private val attempt = new MutableAttemptInfo(reader.rootPath.getName(), + reader.fileSizeForLastIndex, reader.lastIndex) private var gotEnvUpdate = false private var halted = false @@ -1113,7 +1122,7 @@ private[history] class AppListingListener( override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { attempt.endTime = new Date(event.time) - attempt.lastUpdated = new Date(log.getModificationTime()) + attempt.lastUpdated = new Date(reader.modificationTime) attempt.duration = event.time - attempt.startTime.getTime() attempt.completed = true } @@ -1174,7 +1183,7 @@ private[history] class AppListingListener( } - private class MutableAttemptInfo(logPath: String, fileSize: Long) { + private class MutableAttemptInfo(logPath: String, fileSize: Long, lastIndex: Option[Long]) { var attemptId: Option[String] = None var startTime = new Date(-1) var endTime = new Date(-1) @@ -1203,6 +1212,7 @@ private[history] class AppListingListener( apiInfo, logPath, fileSize, + lastIndex, adminAcls, viewAcls, adminAclsGroups, diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index cd4704df7c389..dc801383bf937 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -17,25 +17,20 @@ package org.apache.spark.scheduler -import java.io._ import java.net.URI import java.nio.charset.StandardCharsets -import java.util.Locale import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} -import org.apache.hadoop.fs.permission.FsPermission import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SPARK_VERSION, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogFileWriter import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} /** @@ -43,12 +38,12 @@ import org.apache.spark.util.{JsonProtocol, Utils} * * Event logging is specified by the following configurable parameters: * spark.eventLog.enabled - Whether event logging is enabled. - * spark.eventLog.logBlockUpdates.enabled - Whether to log block updates - * spark.eventLog.compress - Whether to compress logged events - * spark.eventLog.compression.codec - The codec to compress logged events - * spark.eventLog.overwrite - Whether to overwrite any existing files. * spark.eventLog.dir - Path to the directory in which events are logged. - * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * spark.eventLog.logBlockUpdates.enabled - Whether to log block updates + * spark.eventLog.logStageExecutorMetrics.enabled - Whether to log stage executor metrics + * + * Event log file writer maintains its own parameters: refer the doc of [[EventLogFileWriter]] + * and its descendant for more details. */ private[spark] class EventLoggingListener( appId: String, @@ -64,87 +59,43 @@ private[spark] class EventLoggingListener( this(appId, appAttemptId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) - private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) - private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) - private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES) - private val testing = sparkConf.get(EVENT_LOG_TESTING) - private val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt - private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) - private val compressionCodec = - if (shouldCompress) { - Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) - } else { - None - } - // Visible for tests only. - private[scheduler] val compressionCodecName = compressionCodec.map { c => - CompressionCodec.getShortName(c.getClass.getName) - } - - // Only defined if the file system scheme is not local - private var hadoopDataStream: Option[FSDataOutputStream] = None - - private var writer: Option[PrintWriter] = None + // For testing. + private[scheduler] val logWriter: EventLogFileWriter = + EventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) // For testing. Keep track of all JSON serialized events that have been logged. - private[scheduler] val loggedEvents = new ArrayBuffer[JValue] + private[scheduler] val loggedEvents = new mutable.ArrayBuffer[JValue] + + private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES) + private val shouldLogStageExecutorMetrics = sparkConf.get(EVENT_LOG_STAGE_EXECUTOR_METRICS) + private val testing = sparkConf.get(EVENT_LOG_TESTING) - // Visible for tests only. - private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // map of (stageId, stageAttempt) to executor metric peaks per executor/driver for the stage + private val liveStageExecutorMetrics = + mutable.HashMap.empty[(Int, Int), mutable.HashMap[String, ExecutorMetrics]] /** * Creates the log file in the configured log directory. */ - def start() { - if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { - throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") - } - - val workingPath = logPath + IN_PROGRESS - val path = new Path(workingPath) - val uri = path.toUri - val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme - val isDefaultLocal = defaultFs == null || defaultFs == "file" - - if (shouldOverwrite && fileSystem.delete(path, true)) { - logWarning(s"Event log $path already exists. Overwriting...") - } + def start(): Unit = { + logWriter.start() + initEventLog() + } - /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). - * Therefore, for local files, use FileOutputStream instead. */ - val dstream = - if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { - new FileOutputStream(uri.getPath) - } else { - hadoopDataStream = Some(fileSystem.create(path)) - hadoopDataStream.get - } - - try { - val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) - val bstream = new BufferedOutputStream(cstream, outputBufferSize) - - EventLoggingListener.initEventLog(bstream, testing, loggedEvents) - fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) - writer = Some(new PrintWriter(bstream)) - logInfo("Logging events to %s".format(logPath)) - } catch { - case e: Exception => - dstream.close() - throw e + private def initEventLog(): Unit = { + val metadata = SparkListenerLogStart(SPARK_VERSION) + val eventJson = JsonProtocol.logStartToJson(metadata) + val metadataJson = compact(eventJson) + logWriter.writeEvent(metadataJson, flushLogger = true) + if (testing && loggedEvents != null) { + loggedEvents += eventJson } } /** Log the event as JSON. */ - private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { + private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false): Unit = { val eventJson = JsonProtocol.sparkEventToJson(event) - // scalastyle:off println - writer.foreach(_.println(compact(render(eventJson)))) - // scalastyle:on println - if (flushLogger) { - writer.foreach(_.flush()) - hadoopDataStream.foreach(_.hflush()) - } + logWriter.writeEvent(compact(render(eventJson)), flushLogger) if (testing) { loggedEvents += eventJson } @@ -239,32 +190,9 @@ private[spark] class EventLoggingListener( } } - /** - * Stop logging events. The event log file will be renamed so that it loses the - * ".inprogress" suffix. - */ + /** Stop logging events. */ def stop(): Unit = { - writer.foreach(_.close()) - - val target = new Path(logPath) - if (fileSystem.exists(target)) { - if (shouldOverwrite) { - logWarning(s"Event log $target already exists. Overwriting...") - if (!fileSystem.delete(target, true)) { - logWarning(s"Error deleting $target") - } - } else { - throw new IOException("Target log file already exists (%s)".format(logPath)) - } - } - fileSystem.rename(new Path(logPath + IN_PROGRESS), target) - // touch file to ensure modtime is current across those filesystems where rename() - // does not set it, -and which support setTimes(); it's a no-op on most object stores - try { - fileSystem.setTimes(target, System.currentTimeMillis(), -1) - } catch { - case e: Exception => logDebug(s"failed to set time of $target", e) - } + logWriter.stop() } private[spark] def redactEvent( @@ -285,95 +213,7 @@ private[spark] class EventLoggingListener( } private[spark] object EventLoggingListener extends Logging { - // Suffix applied to the names of files still being written by applications. - val IN_PROGRESS = ".inprogress" val DEFAULT_LOG_DIR = "/tmp/spark-events" - - private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) - - // A cache for compression codecs to avoid creating the same codec many times - private val codecMap = new mutable.HashMap[String, CompressionCodec] - - /** - * Write metadata about an event log to the given stream. - * The metadata is encoded in the first line of the event log as JSON. - * - * @param logStream Raw output stream to the event log file. - */ - def initEventLog( - logStream: OutputStream, - testing: Boolean, - loggedEvents: ArrayBuffer[JValue]): Unit = { - val metadata = SparkListenerLogStart(SPARK_VERSION) - val eventJson = JsonProtocol.logStartToJson(metadata) - val metadataJson = compact(eventJson) + "\n" - logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8)) - if (testing && loggedEvents != null) { - loggedEvents += eventJson - } - } - - /** - * Return a file-system-safe path to the log file for the given application. - * - * Note that because we currently only create a single log file for each application, - * we must encode all the information needed to parse this event log in the file name - * instead of within the file itself. Otherwise, if the file is compressed, for instance, - * we won't know which codec to use to decompress the metadata needed to open the file in - * the first place. - * - * The log file name will identify the compression codec used for the contents, if any. - * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log. - * - * @param logBaseDir Directory where the log file will be written. - * @param appId A unique app ID. - * @param appAttemptId A unique attempt id of appId. May be the empty string. - * @param compressionCodecName Name to identify the codec used to compress the contents - * of the log, or None if compression is not enabled. - * @return A path which consists of file-system-safe characters. - */ - def getLogPath( - logBaseDir: URI, - appId: String, - appAttemptId: Option[String], - compressionCodecName: Option[String] = None): String = { - val base = new Path(logBaseDir).toString.stripSuffix("/") + "/" + sanitize(appId) - val codec = compressionCodecName.map("." + _).getOrElse("") - if (appAttemptId.isDefined) { - base + "_" + sanitize(appAttemptId.get) + codec - } else { - base + codec - } - } - - private def sanitize(str: String): String = { - str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase(Locale.ROOT) - } - - /** - * Opens an event log file and returns an input stream that contains the event data. - * - * @return input stream that holds one JSON record per line. - */ - def openEventLog(log: Path, fs: FileSystem): InputStream = { - val in = new BufferedInputStream(fs.open(log)) - try { - val codec = codecName(log).map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) - } - codec.map(_.compressedInputStream(in)).getOrElse(in) - } catch { - case e: Throwable => - in.close() - throw e - } - } - - def codecName(log: Path): Option[String] = { - // Compression codec is encoded as an extension, e.g. app_123.lzf - // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) - logName.split("\\.").tail.lastOption - } - + // Dummy stage key used by driver in executor metrics updates + val DRIVER_STAGE_KEY = (-1, -1) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 4c6b0c1227b18..53bc03de672ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -48,13 +48,15 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { * @param eventsFilter Filter function to select JSON event strings in the log data stream that * should be parsed and replayed. When not specified, all event strings in the log data * are parsed and replayed. + * @return whether it succeeds to replay the log file entirely without error including + * HaltReplayException. false otherwise. */ def replay( logData: InputStream, sourceName: String, maybeTruncated: Boolean = false, - eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { - val lines = Source.fromInputStream(logData).getLines() + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Boolean = { + val lines = Source.fromInputStream(logData)(Codec.UTF8).getLines() replay(lines, sourceName, maybeTruncated, eventsFilter) } @@ -66,7 +68,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { lines: Iterator[String], sourceName: String, maybeTruncated: Boolean, - eventsFilter: ReplayEventsFilter): Unit = { + eventsFilter: ReplayEventsFilter): Boolean = { var currentLine: String = null var lineNumber: Int = 0 val unrecognizedEvents = new scala.collection.mutable.HashSet[String] @@ -114,17 +116,18 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { } } } + true } catch { case e: HaltReplayException => // Just stop replay. - case _: EOFException if maybeTruncated => - case _: IOException if maybeTruncated => - logWarning(s"Failed to read Spark event log: $sourceName") + false + case _: EOFException if maybeTruncated => false case ioe: IOException => throw ioe case e: Exception => logError(s"Exception parsing Spark event log: $sourceName", e) logError(s"Malformed line #$lineNumber: $currentLine\n") + false } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 1b6f765b8095b..07fd2dc6f432d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -40,10 +40,10 @@ import org.apache.spark.TestUtils.JavaSourceFromString import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.SparkSubmit._ import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate +import org.apache.spark.deploy.history.EventLogFileReader import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.scheduler.EventLoggingListener import org.apache.spark.util.{CommandLineUtils, ResetSystemProperties, Utils} trait TestPrematureExit { @@ -519,7 +519,7 @@ class SparkSubmitSuite unusedJar.toString) runSparkSubmit(args) val listStatus = fileSystem.listStatus(testDirPath) - val logData = EventLoggingListener.openEventLog(listStatus.last.getPath, fileSystem) + val logData = EventLogFileReader.openEventLog(listStatus.last.getPath, fileSystem) Source.fromInputStream(logData).getLines().foreach { line => assert(!line.contains("secret_password")) } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala new file mode 100644 index 0000000000000..a2ce4acdaaf37 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.zip.{ZipInputStream, ZipOutputStream} + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.deploy.history.RollingEventLogFilesWriter._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter with Logging { + + protected val fileSystem = Utils.getHadoopFileSystem("/", SparkHadoopUtil.get.conf) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { + testDir = Utils.createTempDir(namePrefix = s"event log") + testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { + Utils.deleteRecursively(testDir) + } + + test("Retrieve EventLogFileReader correctly") { + def assertInstanceOfEventLogReader( + expectedClazz: Option[Class[_ <: EventLogFileReader]], + actual: Option[EventLogFileReader]): Unit = { + if (expectedClazz.isEmpty) { + assert(actual.isEmpty, s"Expected no EventLogFileReader instance but was " + + s"${actual.map(_.getClass).getOrElse("")}") + } else { + assert(actual.isDefined, s"Expected an EventLogFileReader instance but was empty") + assert(expectedClazz.get.isAssignableFrom(actual.get.getClass), + s"Expected ${expectedClazz.get} but was ${actual.get.getClass}") + } + } + + def testCreateEventLogReaderWithPath( + path: Path, + isFile: Boolean, + expectedClazz: Option[Class[_ <: EventLogFileReader]]): Unit = { + if (isFile) { + Utils.tryWithResource(fileSystem.create(path)) { is => + is.writeInt(10) + } + } else { + fileSystem.mkdirs(path) + } + + val reader = EventLogFileReader(fileSystem, path) + assertInstanceOfEventLogReader(expectedClazz, reader) + val reader2 = EventLogFileReader(fileSystem, + fileSystem.getFileStatus(path)) + assertInstanceOfEventLogReader(expectedClazz, reader2) + } + + // path with no last index - single event log + val reader1 = EventLogFileReader(fileSystem, new Path(testDirPath, "aaa"), + None) + assertInstanceOfEventLogReader(Some(classOf[SingleFileEventLogFileReader]), Some(reader1)) + + // path with last index - rolling event log + val reader2 = EventLogFileReader(fileSystem, + new Path(testDirPath, s"${EVENT_LOG_DIR_NAME_PREFIX}aaa"), Some(3)) + assertInstanceOfEventLogReader(Some(classOf[RollingEventLogFilesFileReader]), Some(reader2)) + + // path - file (both path and FileStatus) + val eventLogFile = new Path(testDirPath, "bbb") + testCreateEventLogReaderWithPath(eventLogFile, isFile = true, + Some(classOf[SingleFileEventLogFileReader])) + + // path - file starting with "." + val invalidEventLogFile = new Path(testDirPath, ".bbb") + testCreateEventLogReaderWithPath(invalidEventLogFile, isFile = true, None) + + // path - directory with "eventlog_v2_" prefix + val eventLogDir = new Path(testDirPath, s"${EVENT_LOG_DIR_NAME_PREFIX}ccc") + testCreateEventLogReaderWithPath(eventLogDir, isFile = false, + Some(classOf[RollingEventLogFilesFileReader])) + + // path - directory with no "eventlog_v2_" prefix + val invalidEventLogDir = new Path(testDirPath, "ccc") + testCreateEventLogReaderWithPath(invalidEventLogDir, isFile = false, None) + } + + val allCodecs = Seq(None) ++ + CompressionCodec.ALL_COMPRESSION_CODECS.map { c => Some(CompressionCodec.getShortName(c)) } + + allCodecs.foreach { codecShortName => + test(s"get information, list event log files, zip log files - with codec $codecShortName") { + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, codecShortName) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, hadoopConf) + writer.start() + + // The test for writing events into EventLogFileWriter is covered to its own test suite. + val dummyData = Seq("dummy1", "dummy2", "dummy3") + dummyData.foreach(writer.writeEvent(_, flushLogger = true)) + + val logPathIncompleted = getCurrentLogPath(writer.logPath, isCompleted = false) + val readerOpt = EventLogFileReader(fileSystem, new Path(logPathIncompleted)) + assertAppropriateReader(readerOpt) + val reader = readerOpt.get + + verifyReader(reader, new Path(logPathIncompleted), codecShortName, isCompleted = false) + + writer.stop() + + val logPathCompleted = getCurrentLogPath(writer.logPath, isCompleted = true) + val readerOpt2 = EventLogFileReader(fileSystem, new Path(logPathCompleted)) + assertAppropriateReader(readerOpt2) + val reader2 = readerOpt2.get + + verifyReader(reader2, new Path(logPathCompleted), codecShortName, isCompleted = true) + } + } + + protected def createWriter( + appId: String, + appAttemptId : Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration): EventLogFileWriter + + protected def getCurrentLogPath(logPath: String, isCompleted: Boolean): String + + protected def assertAppropriateReader(actualReader: Option[EventLogFileReader]): Unit + + protected def verifyReader( + reader: EventLogFileReader, + logPath: Path, + compressionCodecShortName: Option[String], + isCompleted: Boolean): Unit +} + +class SingleFileEventLogFileReaderSuite extends EventLogFileReadersSuite { + override protected def createWriter( + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration): EventLogFileWriter = { + new SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) + } + + override protected def assertAppropriateReader(actualReader: Option[EventLogFileReader]): Unit = { + assert(actualReader.isDefined, s"Expected an EventLogReader instance but was empty") + assert(actualReader.get.isInstanceOf[SingleFileEventLogFileReader], + s"Expected SingleFileEventLogReader but was ${actualReader.get.getClass}") + } + + override protected def getCurrentLogPath(logPath: String, isCompleted: Boolean): String = { + if (!isCompleted) logPath + EventLogFileWriter.IN_PROGRESS else logPath + } + + override protected def verifyReader( + reader: EventLogFileReader, + logPath: Path, + compressionCodecShortName: Option[String], + isCompleted: Boolean): Unit = { + val status = fileSystem.getFileStatus(logPath) + + assert(status.isFile) + assert(reader.rootPath === fileSystem.makeQualified(logPath)) + assert(reader.lastIndex.isEmpty) + assert(reader.fileSizeForLastIndex === status.getLen) + assert(reader.completed === isCompleted) + assert(reader.modificationTime === status.getModificationTime) + assert(reader.listEventLogFiles.length === 1) + assert(reader.listEventLogFiles.map(_.getPath.toUri.getPath) === + Seq(logPath.toUri.getPath)) + assert(reader.compressionCodec === compressionCodecShortName) + assert(reader.totalSize === status.getLen) + + val underlyingStream = new ByteArrayOutputStream() + Utils.tryWithResource(new ZipOutputStream(underlyingStream)) { os => + reader.zipEventLogFiles(os) + } + + Utils.tryWithResource(new ZipInputStream( + new ByteArrayInputStream(underlyingStream.toByteArray))) { is => + + var entry = is.getNextEntry + assert(entry != null) + val actual = new String(ByteStreams.toByteArray(is), StandardCharsets.UTF_8) + val expected = Files.toString(new File(logPath.toString), StandardCharsets.UTF_8) + assert(actual === expected) + assert(is.getNextEntry === null) + } + } +} + +class RollingEventLogFilesReaderSuite extends EventLogFileReadersSuite { + allCodecs.foreach { codecShortName => + test(s"rolling event log files - codec $codecShortName") { + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, codecShortName) + conf.set(EVENT_LOG_ENABLE_ROLLING, true) + conf.set(EVENT_LOG_ROLLING_MAX_FILE_SIZE.key, "10m") + + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, + SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + + // write log more than 20m (intended to roll over to 3 files) + val dummyStr = "dummy" * 1024 + writeTestEvents(writer, dummyStr, 1024 * 1024 * 20) + + val logPathIncompleted = getCurrentLogPath(writer.logPath, isCompleted = false) + val readerOpt = EventLogFileReader(fileSystem, + new Path(logPathIncompleted)) + verifyReader(readerOpt.get, new Path(logPathIncompleted), codecShortName, isCompleted = false) + assert(readerOpt.get.listEventLogFiles.length === 3) + + writer.stop() + + val logPathCompleted = getCurrentLogPath(writer.logPath, isCompleted = true) + val readerOpt2 = EventLogFileReader(fileSystem, new Path(logPathCompleted)) + verifyReader(readerOpt2.get, new Path(logPathCompleted), codecShortName, isCompleted = true) + assert(readerOpt2.get.listEventLogFiles.length === 3) + } + } + + override protected def createWriter( + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration): EventLogFileWriter = { + new RollingEventLogFilesWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) + } + + override protected def assertAppropriateReader(actualReader: Option[EventLogFileReader]): Unit = { + assert(actualReader.isDefined, s"Expected an EventLogReader instance but was empty") + assert(actualReader.get.isInstanceOf[RollingEventLogFilesFileReader], + s"Expected RollingEventLogFilesReader but was ${actualReader.get.getClass}") + } + + override protected def getCurrentLogPath(logPath: String, isCompleted: Boolean): String = logPath + + override protected def verifyReader( + reader: EventLogFileReader, + logPath: Path, + compressionCodecShortName: Option[String], + isCompleted: Boolean): Unit = { + import RollingEventLogFilesWriter._ + + val status = fileSystem.getFileStatus(logPath) + assert(status.isDirectory) + + val statusInDir = fileSystem.listStatus(logPath) + val eventFiles = statusInDir.filter(isEventLogFile).sortBy { s => getIndex(s.getPath.getName) } + assert(eventFiles.nonEmpty) + val lastEventFile = eventFiles.last + val allLen = eventFiles.map(_.getLen).sum + + assert(reader.rootPath === fileSystem.makeQualified(logPath)) + assert(reader.lastIndex === Some(getIndex(lastEventFile.getPath.getName))) + assert(reader.fileSizeForLastIndex === lastEventFile.getLen) + assert(reader.completed === isCompleted) + assert(reader.modificationTime === lastEventFile.getModificationTime) + assert(reader.listEventLogFiles.length === eventFiles.length) + assert(reader.listEventLogFiles.map(_.getPath) === eventFiles.map(_.getPath)) + assert(reader.compressionCodec === compressionCodecShortName) + assert(reader.totalSize === allLen) + + val underlyingStream = new ByteArrayOutputStream() + Utils.tryWithResource(new ZipOutputStream(underlyingStream)) { os => + reader.zipEventLogFiles(os) + } + + Utils.tryWithResource(new ZipInputStream( + new ByteArrayInputStream(underlyingStream.toByteArray))) { is => + + val entry = is.getNextEntry + assert(entry != null) + + // directory + assert(entry.getName === logPath.getName + "/") + + val allFileNames = fileSystem.listStatus(logPath).map(_.getPath.getName).toSet + + var count = 0 + var noMoreEntry = false + while (!noMoreEntry) { + val entry = is.getNextEntry + if (entry == null) { + noMoreEntry = true + } else { + count += 1 + + assert(entry.getName.startsWith(logPath.getName + "/")) + val fileName = entry.getName.stripPrefix(logPath.getName + "/") + assert(allFileNames.contains(fileName)) + + val actual = new String(ByteStreams.toByteArray(is), StandardCharsets.UTF_8) + val expected = Files.toString(new File(logPath.toString, fileName), + StandardCharsets.UTF_8) + assert(actual === expected) + } + } + + assert(count === allFileNames.size) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala new file mode 100644 index 0000000000000..c4b40884eebf5 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{File, FileOutputStream, IOException} +import java.net.URI + +import scala.collection.mutable +import scala.io.Source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils + + +abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkContext + with BeforeAndAfter { + + protected val fileSystem = Utils.getHadoopFileSystem("/", + SparkHadoopUtil.get.newConfiguration(new SparkConf())) + protected var testDir: File = _ + protected var testDirPath: Path = _ + + before { + testDir = Utils.createTempDir(namePrefix = s"event log") + testDirPath = new Path(testDir.getAbsolutePath()) + } + + after { + Utils.deleteRecursively(testDir) + } + + test("create EventLogFileWriter with enable/disable rolling") { + def buildWriterAndVerify(conf: SparkConf, expectedClazz: Class[_]): Unit = { + val writer = EventLogFileWriter( + getUniqueApplicationId, None, testDirPath.toUri, conf, + SparkHadoopUtil.get.newConfiguration(conf)) + val writerClazz = writer.getClass + assert(expectedClazz === writerClazz) + } + + val conf = new SparkConf + conf.set(EVENT_LOG_ENABLED, true) + conf.set(EVENT_LOG_DIR, testDir.toString) + + // default config + buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + + conf.set(EVENT_LOG_ENABLE_ROLLING, true) + buildWriterAndVerify(conf, classOf[RollingEventLogFilesWriter]) + + conf.set(EVENT_LOG_ENABLE_ROLLING, false) + buildWriterAndVerify(conf, classOf[SingleEventLogFileWriter]) + } + + val allCodecs = Seq(None) ++ + CompressionCodec.ALL_COMPRESSION_CODECS.map(c => Some(CompressionCodec.getShortName(c))) + + allCodecs.foreach { codecShortName => + test(s"initialize, write, stop - with codec $codecShortName") { + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, codecShortName) + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, + SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + + // snappy stream throws exception on empty stream, so we should provide some data to test. + val dummyData = Seq("dummy1", "dummy2", "dummy3") + dummyData.foreach(writer.writeEvent(_, flushLogger = true)) + + writer.stop() + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, codecShortName, dummyData) + } + } + + test("spark.eventLog.compression.codec overrides spark.io.compression.codec") { + val conf = new SparkConf + conf.set(EVENT_LOG_COMPRESS, true) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + + val appId = "test" + val appAttemptId = None + + // The default value is `spark.io.compression.codec`. + val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) + assert(writer.compressionCodecName.contains("lz4")) + + // `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`. + conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd") + val writer2 = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) + assert(writer2.compressionCodecName.contains("zstd")) + } + + protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): List[String] = { + val logDataStream = EventLogFileReader.openEventLog(log, fs) + try { + Source.fromInputStream(logDataStream).getLines().toList + } finally { + logDataStream.close() + } + } + + protected def createWriter( + appId: String, + appAttemptId : Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration): EventLogFileWriter + + /** + * This should be called with "closed" event log file; No guarantee on reading event log file + * which is being written, especially the file is compressed. SHS also does the best it can. + */ + protected def verifyWriteEventLogFile( + appId: String, + appAttemptId : Option[String], + logBaseDir: URI, + compressionCodecShortName: Option[String], + expectedLines: Seq[String] = Seq.empty): Unit +} + +class SingleEventLogFileWriterSuite extends EventLogFileWritersSuite { + + test("Log overwriting") { + val appId = "test" + val appAttemptId = None + val logUri = SingleEventLogFileWriter.getLogPath(testDir.toURI, appId, appAttemptId) + + val conf = getLoggingConf(testDirPath) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val writer = createWriter(appId, appAttemptId, testDir.toURI, conf, hadoopConf) + + val logPath = new Path(logUri).toUri.getPath + writer.start() + + val dummyData = Seq("dummy1", "dummy2", "dummy3") + dummyData.foreach(writer.writeEvent(_, flushLogger = true)) + + // Create file before writing the event log + new FileOutputStream(new File(logPath)).close() + // Expected IOException, since we haven't enabled log overwrite. + intercept[IOException] { writer.stop() } + + // Try again, but enable overwriting. + conf.set(EVENT_LOG_OVERWRITE, true) + val writer2 = createWriter(appId, appAttemptId, testDir.toURI, conf, hadoopConf) + writer2.start() + dummyData.foreach(writer2.writeEvent(_, flushLogger = true)) + writer2.stop() + } + + test("Event log name") { + val baseDirUri = Utils.resolveURI("/base-dir") + // without compression + assert(s"${baseDirUri.toString}/app1" === SingleEventLogFileWriter.getLogPath( + baseDirUri, "app1", None, None)) + // with compression + assert(s"${baseDirUri.toString}/app1.lzf" === + SingleEventLogFileWriter.getLogPath(baseDirUri, "app1", None, Some("lzf"))) + // illegal characters in app ID + assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1" === + SingleEventLogFileWriter.getLogPath(baseDirUri, + "a fine:mind$dollar{bills}.1", None, None)) + // illegal characters in app ID with compression + assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1.lz4" === + SingleEventLogFileWriter.getLogPath(baseDirUri, + "a fine:mind$dollar{bills}.1", None, Some("lz4"))) + } + + override protected def createWriter( + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration): EventLogFileWriter = { + new SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) + } + + override protected def verifyWriteEventLogFile( + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + compressionCodecShortName: Option[String], + expectedLines: Seq[String]): Unit = { + // read single event log file + val logPath = SingleEventLogFileWriter.getLogPath(logBaseDir, appId, appAttemptId, + compressionCodecShortName) + + val finalLogPath = new Path(logPath) + assert(fileSystem.exists(finalLogPath) && fileSystem.isFile(finalLogPath)) + assert(expectedLines === readLinesFromEventLogFile(finalLogPath, fileSystem)) + } +} + +class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { + import RollingEventLogFilesWriter._ + + test("Event log names") { + val baseDirUri = Utils.resolveURI("/base-dir") + val appId = "app1" + val appAttemptId = None + + // happy case with app ID + val logDir = RollingEventLogFilesWriter.getAppEventLogDirPath(baseDirUri, appId, None) + assert(s"${baseDirUri.toString}/${EVENT_LOG_DIR_NAME_PREFIX}${appId}" === logDir.toString) + + // appstatus: inprogress or completed + assert(s"$logDir/${APPSTATUS_FILE_NAME_PREFIX}${appId}${EventLogFileWriter.IN_PROGRESS}" === + RollingEventLogFilesWriter.getAppStatusFilePath(logDir, appId, appAttemptId, + inProgress = true).toString) + assert(s"$logDir/${APPSTATUS_FILE_NAME_PREFIX}${appId}" === + RollingEventLogFilesWriter.getAppStatusFilePath(logDir, appId, appAttemptId, + inProgress = false).toString) + + // without compression + assert(s"$logDir/${EVENT_LOG_FILE_NAME_PREFIX}1_${appId}" === + RollingEventLogFilesWriter.getEventLogFilePath(logDir, appId, appAttemptId, 1, None).toString) + + // with compression + assert(s"$logDir/${EVENT_LOG_FILE_NAME_PREFIX}1_${appId}.lzf" === + RollingEventLogFilesWriter.getEventLogFilePath(logDir, appId, appAttemptId, + 1, Some("lzf")).toString) + + // illegal characters in app ID + assert(s"${baseDirUri.toString}/${EVENT_LOG_DIR_NAME_PREFIX}a-fine-mind_dollar_bills__1" === + RollingEventLogFilesWriter.getAppEventLogDirPath(baseDirUri, + "a fine:mind$dollar{bills}.1", None).toString) + } + + test("Log overwriting") { + val appId = "test" + val appAttemptId = None + val logDirPath = RollingEventLogFilesWriter.getAppEventLogDirPath(testDir.toURI, appId, + appAttemptId) + + val conf = getLoggingConf(testDirPath) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val writer = createWriter(appId, appAttemptId, testDir.toURI, conf, hadoopConf) + + val logPath = logDirPath.toUri.getPath + + // Create file before writing the event log directory + // it doesn't matter whether the existing one is file or directory + new FileOutputStream(new File(logPath)).close() + + // Expected IOException, since we haven't enabled log overwrite. + // Note that the place IOException is thrown is different from single event log file. + intercept[IOException] { writer.start() } + + // Try again, but enable overwriting. + conf.set(EVENT_LOG_OVERWRITE, true) + + val writer2 = createWriter(appId, appAttemptId, testDir.toURI, conf, hadoopConf) + writer2.start() + val dummyData = Seq("dummy1", "dummy2", "dummy3") + dummyData.foreach(writer2.writeEvent(_, flushLogger = true)) + writer2.stop() + } + + allCodecs.foreach { codecShortName => + test(s"rolling event log files - codec $codecShortName") { + def assertEventLogFilesIndex( + eventLogFiles: Seq[FileStatus], + expectedLastIndex: Int, + expectedMaxSizeBytes: Long): Unit = { + assert(eventLogFiles.forall(f => f.getLen <= expectedMaxSizeBytes)) + assert((1 to expectedLastIndex) === + eventLogFiles.map(f => getIndex(f.getPath.getName))) + } + + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, codecShortName) + conf.set(EVENT_LOG_ENABLE_ROLLING, true) + conf.set(EVENT_LOG_ROLLING_MAX_FILE_SIZE.key, "10m") + + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, + SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + + // write log more than 20m (intended to roll over to 3 files) + val dummyStr = "dummy" * 1024 + val expectedLines = writeTestEvents(writer, dummyStr, 1024 * 1024 * 21) + + val logDirPath = getAppEventLogDirPath(testDirPath.toUri, appId, attemptId) + + val eventLogFiles = listEventLogFiles(logDirPath) + assertEventLogFilesIndex(eventLogFiles, 3, 1024 * 1024 * 10) + + writer.stop() + + val eventLogFiles2 = listEventLogFiles(logDirPath) + assertEventLogFilesIndex(eventLogFiles2, 3, 1024 * 1024 * 10) + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, + codecShortName, expectedLines) + } + } + + test(s"rolling event log files - the max size of event log file size less than lower limit") { + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, None) + conf.set(EVENT_LOG_ENABLE_ROLLING, true) + conf.set(EVENT_LOG_ROLLING_MAX_FILE_SIZE.key, "9m") + + val e = intercept[IllegalArgumentException] { + createWriter(appId, attemptId, testDirPath.toUri, conf, + SparkHadoopUtil.get.newConfiguration(conf)) + } + assert(e.getMessage.contains("should be configured to be at least")) + } + + override protected def createWriter( + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration): EventLogFileWriter = { + new RollingEventLogFilesWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) + } + + override protected def verifyWriteEventLogFile( + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + compressionCodecShortName: Option[String], + expectedLines: Seq[String]): Unit = { + val logDirPath = getAppEventLogDirPath(logBaseDir, appId, appAttemptId) + + assert(fileSystem.exists(logDirPath) && fileSystem.isDirectory(logDirPath)) + + val appStatusFile = getAppStatusFilePath(logDirPath, appId, appAttemptId, inProgress = false) + assert(fileSystem.exists(appStatusFile) && fileSystem.isFile(appStatusFile)) + + val eventLogFiles = listEventLogFiles(logDirPath) + val allLines = mutable.ArrayBuffer[String]() + eventLogFiles.foreach { file => + allLines.appendAll(readLinesFromEventLogFile(file.getPath, fileSystem)) + } + + assert(expectedLines === allLines) + } + + private def listEventLogFiles(logDirPath: Path): Seq[FileStatus] = { + fileSystem.listStatus(logDirPath).filter(isEventLogFile) + .sortBy { fs => getIndex(fs.getPath.getName) } + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala new file mode 100644 index 0000000000000..55eddce3968c2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.nio.charset.StandardCharsets + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config._ + +object EventLogTestHelper { + def getUniqueApplicationId: String = "test-" + System.currentTimeMillis + + /** + * Get a SparkConf with event logging enabled. It doesn't enable rolling event logs, so caller + * should set it manually. + */ + def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None): SparkConf = { + val conf = new SparkConf + conf.set(EVENT_LOG_ENABLED, true) + conf.set(EVENT_LOG_BLOCK_UPDATES, true) + conf.set(EVENT_LOG_TESTING, true) + conf.set(EVENT_LOG_DIR, logDir.toString) + compressionCodec.foreach { codec => + conf.set(EVENT_LOG_COMPRESS, true) + conf.set(EVENT_LOG_COMPRESSION_CODEC, codec) + } + conf.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true) + conf + } + + def writeTestEvents( + writer: EventLogFileWriter, + eventStr: String, + desiredSize: Long): Seq[String] = { + val stringLen = eventStr.getBytes(StandardCharsets.UTF_8).length + val repeatCount = Math.floor(desiredSize / stringLen).toInt + (0 until repeatCount).map { _ => + writer.writeEvent(eventStr, flushLogger = true) + eventStr + } + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index a3457cd4b4c5e..4b0464386423f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -44,6 +44,7 @@ import org.apache.spark.io._ import org.apache.spark.scheduler._ import org.apache.spark.security.GroupMappingServiceProvider import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.KVUtils.KVStoreScalaSerializer import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} @@ -70,8 +71,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { appAttemptId: Option[String], inProgress: Boolean, codec: Option[String] = None): File = { - val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else "" - val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId, codec) + val ip = if (inProgress) EventLogFileWriter.IN_PROGRESS else "" + val logUri = SingleEventLogFileWriter.getLogPath(testDir.toURI, appId, appAttemptId, codec) val logPath = new Path(logUri).toUri.getPath + ip new File(logPath) } @@ -82,7 +83,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } - private def testAppLogParsing(inMemory: Boolean) { + private def testAppLogParsing(inMemory: Boolean): Unit = { val clock = new ManualClock(12345678) val conf = createTestConf(inMemory = inMemory) val provider = new FsHistoryProvider(conf, clock) @@ -157,10 +158,10 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) { var mergeApplicationListingCall = 0 override protected def mergeApplicationListing( - fileStatus: FileStatus, + reader: EventLogFileReader, lastSeen: Long, enableSkipToEnd: Boolean): Unit = { - super.mergeApplicationListing(fileStatus, lastSeen, enableSkipToEnd) + super.mergeApplicationListing(reader, lastSeen, enableSkipToEnd) mergeApplicationListingCall += 1 } } @@ -195,13 +196,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { ) updateAndCheck(provider) { list => list.size should be (1) - provider.getAttempt("app1", None).logPath should endWith(EventLoggingListener.IN_PROGRESS) + provider.getAttempt("app1", None).logPath should endWith(EventLogFileWriter.IN_PROGRESS) } logFile1.renameTo(newLogFile("app1", None, inProgress = false)) updateAndCheck(provider) { list => list.size should be (1) - provider.getAttempt("app1", None).logPath should not endWith(EventLoggingListener.IN_PROGRESS) + provider.getAttempt("app1", None).logPath should not endWith(EventLogFileWriter.IN_PROGRESS) } } @@ -916,28 +917,82 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { when(mockedFs.open(path)).thenReturn(in) when(in.getWrappedStream).thenReturn(dfsIn) when(dfsIn.getFileLength).thenReturn(200) + // FileStatus.getLen is more than logInfo fileSize var fileStatus = new FileStatus(200, false, 0, 0, 0, path) var logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100) assert(mockedProvider.shouldReloadLog(logInfo, fileStatus)) + when(mockedFs.getFileStatus(path)).thenReturn(fileStatus) + var logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"), + Some("attemptId"), 100, None, false) + var reader = EventLogFileReader(mockedFs, path) + assert(reader.isDefined) + assert(mockedProvider.shouldReloadLog(logInfo, reader.get)) fileStatus = new FileStatus() fileStatus.setPath(path) + when(mockedFs.getFileStatus(path)).thenReturn(fileStatus) // DFSInputStream.getFileLength is more than logInfo fileSize logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100) assert(mockedProvider.shouldReloadLog(logInfo, fileStatus)) + logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"), + Some("attemptId"), 100, None, false) + reader = EventLogFileReader(mockedFs, path) + assert(reader.isDefined) + assert(mockedProvider.shouldReloadLog(logInfo, reader.get)) + // DFSInputStream.getFileLength is equal to logInfo fileSize - logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 200) - assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) + logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"), + Some("attemptId"), 200, None, false) + reader = EventLogFileReader(mockedFs, path) + assert(reader.isDefined) + assert(!mockedProvider.shouldReloadLog(logInfo, reader.get)) + // in.getWrappedStream returns other than DFSInputStream val bin = mock(classOf[BufferedInputStream]) when(in.getWrappedStream).thenReturn(bin) - assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) + reader = EventLogFileReader(mockedFs, path) + assert(reader.isDefined) + assert(!mockedProvider.shouldReloadLog(logInfo, reader.get)) + // fs.open throws exception when(mockedFs.open(path)).thenThrow(new IOException("Throwing intentionally")) - assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) + reader = EventLogFileReader(mockedFs, path) + assert(reader.isDefined) + assert(!mockedProvider.shouldReloadLog(logInfo, reader.get)) } + test("backwards compatibility with LogInfo from Spark 2.4") { + case class LogInfoV24( + logPath: String, + lastProcessed: Long, + appId: Option[String], + attemptId: Option[String], + fileSize: Long) + + val oldObj = LogInfoV24("dummy", System.currentTimeMillis(), Some("hello"), + Some("attempt1"), 100) + + val serializer = new KVStoreScalaSerializer() + val serializedOldObj = serializer.serialize(oldObj) + val deserializedOldObj = serializer.deserialize(serializedOldObj, classOf[LogInfo]) + assert(deserializedOldObj.logPath === oldObj.logPath) + assert(deserializedOldObj.lastProcessed === oldObj.lastProcessed) + assert(deserializedOldObj.appId === oldObj.appId) + assert(deserializedOldObj.attemptId === oldObj.attemptId) + assert(deserializedOldObj.fileSize === oldObj.fileSize) + + // SPARK-25118: added logType: LogType.Value - expected 'null' on old format + assert(deserializedOldObj.logType === null) + + // SPARK-28869: added lastIndex: Option[Long], isComplete: Boolean - expected 'None' and + // 'false' on old format. The default value for isComplete is wrong value for completed app, + // but the value will be corrected once checkForLogs is called. + assert(deserializedOldObj.lastIndex === None) + assert(deserializedOldObj.isComplete === false) + } + + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: @@ -958,7 +1013,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val fstream = new FileOutputStream(file) val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) val bstream = new BufferedOutputStream(cstream) - EventLoggingListener.initEventLog(bstream, false, null) + + val metadata = SparkListenerLogStart(org.apache.spark.SPARK_VERSION) + val eventJson = JsonProtocol.logStartToJson(metadata) + val metadataJson = compact(eventJson) + "\n" + bstream.write(metadataJson.getBytes(StandardCharsets.UTF_8)) val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8) Utils.tryWithSafeFinally { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index c260a1ef1c615..899dc6ec965a4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -17,21 +17,31 @@ package org.apache.spark.scheduler -import java.io.{File, FileOutputStream, IOException, InputStream} +import java.io.{File, InputStream} +import java.util.Arrays + +import scala.collection.immutable.Map import scala.collection.mutable +import scala.collection.mutable.Set import scala.io.Source + import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ import org.mockito.Mockito import org.scalatest.BeforeAndAfter + import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.{EventLogFileReader, SingleEventLogFileWriter} +import org.apache.spark.deploy.history.EventLogTestHelper._ +import org.apache.spark.executor.{TaskMetrics} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{EVENT_LOG_COMPRESS, EVENT_LOG_COMPRESSION_CODEC} +import org.apache.spark.internal.config._ import org.apache.spark.io._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{JsonProtocol, Utils} + /** * Test whether EventLoggingListener logs events properly. * @@ -41,7 +51,6 @@ import org.apache.spark.util.{JsonProtocol, Utils} */ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter with Logging { - import EventLoggingListenerSuite._ private val fileSystem = Utils.getHadoopFileSystem("/", SparkHadoopUtil.get.newConfiguration(new SparkConf())) @@ -58,26 +67,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit Utils.deleteRecursively(testDir) } - test("Verify log file exist") { - // Verify logging directory exists - val conf = getLoggingConf(testDirPath) - val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf) - eventLogger.start() - - val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS) - assert(fileSystem.exists(logPath)) - val logStatus = fileSystem.getFileStatus(logPath) - assert(!logStatus.isDirectory) - - // Verify log is renamed after stop() - eventLogger.stop() - assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDirectory) - } - - test("Basic event logging") { - testEventLogging() - } - test("Basic event logging with compression") { CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec => testEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec))) @@ -106,55 +95,10 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit assert(redactedProps(key) == "*********(redacted)") } - test("Log overwriting") { - val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None) - val logPath = new Path(logUri).toUri.getPath - // Create file before writing the event log - new FileOutputStream(new File(logPath)).close() - // Expected IOException, since we haven't enabled log overwrite. - intercept[IOException] { testEventLogging() } - // Try again, but enable overwriting. - testEventLogging(extraConf = Map("spark.eventLog.overwrite" -> "true")) - } - - test("spark.eventLog.compression.codec overrides spark.io.compression.codec") { - val conf = new SparkConf - conf.set(EVENT_LOG_COMPRESS, true) - - // The default value is `spark.io.compression.codec`. - val e = new EventLoggingListener("test", None, testDirPath.toUri(), conf) - assert(e.compressionCodecName.contains("lz4")) - - // `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`. - conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd") - val e2 = new EventLoggingListener("test", None, testDirPath.toUri(), conf) - assert(e2.compressionCodecName.contains("zstd")) - } - - test("Event log name") { - val baseDirUri = Utils.resolveURI("/base-dir") - // without compression - assert(s"${baseDirUri.toString}/app1" === EventLoggingListener.getLogPath( - baseDirUri, "app1", None)) - // with compression - assert(s"${baseDirUri.toString}/app1.lzf" === - EventLoggingListener.getLogPath(baseDirUri, "app1", None, Some("lzf"))) - // illegal characters in app ID - assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1" === - EventLoggingListener.getLogPath(baseDirUri, - "a fine:mind$dollar{bills}.1", None)) - // illegal characters in app ID with compression - assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1.lz4" === - EventLoggingListener.getLogPath(baseDirUri, - "a fine:mind$dollar{bills}.1", None, Some("lz4"))) - } - /* ----------------- * * Actual test logic * * ----------------- */ - import EventLoggingListenerSuite._ - /** * Test basic event logging functionality. * @@ -183,7 +127,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit eventLogger.stop() // Verify file contains exactly the two events logged - val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + val logPath = eventLogger.logWriter.logPath + val logData = EventLogFileReader.openEventLog(new Path(logPath), fileSystem) try { val lines = readLines(logData) val logStart = SparkListenerLogStart(SPARK_VERSION) @@ -211,9 +156,10 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit sc = new SparkContext("local-cluster[2,2,1024]", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get - val eventLogPath = eventLogger.logPath + + val eventLogPath = eventLogger.logWriter.logPath val expectedLogDir = testDir.toURI() - assert(eventLogPath === EventLoggingListener.getLogPath( + assert(eventLogPath === SingleEventLogFileWriter.getLogPath( expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName))) // Begin listening for events that trigger asserts @@ -228,7 +174,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit eventExistenceListener.assertAllCallbacksInvoked() // Make sure expected events exist in the log file. - val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + val logData = EventLogFileReader.openEventLog(new Path(eventLogger.logWriter.logPath), + fileSystem) val eventSet = mutable.Set( SparkListenerApplicationStart, SparkListenerBlockManagerAdded, @@ -276,19 +223,19 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit var jobEnded = false var appEnded = false - override def onJobStart(jobStart: SparkListenerJobStart) { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { jobStarted = true } - override def onJobEnd(jobEnd: SparkListenerJobEnd) { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { jobEnded = true } - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { appEnded = true } - def assertAllCallbacksInvoked() { + def assertAllCallbacksInvoked(): Unit = { assert(jobStarted, "JobStart callback not invoked!") assert(jobEnded, "JobEnd callback not invoked!") assert(appEnded, "ApplicationEnd callback not invoked!") @@ -296,23 +243,3 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } - - -object EventLoggingListenerSuite { - - /** Get a SparkConf with event logging enabled. */ - def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None): SparkConf = { - val conf = new SparkConf - conf.set("spark.eventLog.enabled", "true") - conf.set("spark.eventLog.logBlockUpdates.enabled", "true") - conf.set("spark.eventLog.testing", "true") - conf.set("spark.eventLog.dir", logDir.toString) - compressionCodec.foreach { codec => - conf.set("spark.eventLog.compress", "true") - conf.set(EVENT_LOG_COMPRESSION_CODEC, codec) - } - conf - } - - def getUniqueApplicationId: String = "test-" + System.currentTimeMillis -} diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index e24d550a62665..ea508e0534b53 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -18,15 +18,19 @@ package org.apache.spark.scheduler import java.io._ -import java.net.URI import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable.ArrayBuffer + import org.apache.hadoop.fs.Path +import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfter import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogFileReader +import org.apache.spark.deploy.history.EventLogTestHelper._ import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec} import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils} @@ -59,7 +63,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // scalastyle:on println writer.close() - val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) + val conf = getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) val eventMonster = new EventMonster(conf) try { @@ -104,14 +108,14 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp } // Read the compressed .inprogress file and verify only first event was parsed. - val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) + val conf = getLoggingConf(logFilePath) val replayer = new ReplayListenerBus() val eventMonster = new EventMonster(conf) replayer.addListener(eventMonster) // Verify the replay returns the events given the input maybe truncated. - val logData = EventLoggingListener.openEventLog(logFilePath, fileSystem) + val logData = EventLogFileReader.openEventLog(logFilePath, fileSystem) Utils.tryWithResource(new EarlyEOFInputStream(logData, buffered.size - 10)) { failingStream => replayer.replay(failingStream, logFilePath.toString, true) @@ -120,7 +124,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp } // Verify the replay throws the EOF exception since the input may not be truncated. - val logData2 = EventLoggingListener.openEventLog(logFilePath, fileSystem) + val logData2 = EventLogFileReader.openEventLog(logFilePath, fileSystem) Utils.tryWithResource(new EarlyEOFInputStream(logData2, buffered.size - 10)) { failingStream2 => intercept[EOFException] { replayer.replay(failingStream2, logFilePath.toString, false) @@ -142,7 +146,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // scalastyle:on println writer.close() - val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) + val conf = getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) val eventMonster = new EventMonster(conf) try { @@ -188,7 +192,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val logDirPath = new Path(logDir.toURI) fileSystem.mkdirs(logDirPath) - val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName) + val conf = getLoggingConf(logDirPath, codecName) sc = new SparkContext("local-cluster[2,1,1024]", "Test replay", conf) // Run a few jobs @@ -205,7 +209,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp assert(!eventLog.isDirectory) // Replay events - val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem) + val logData = EventLogFileReader.openEventLog(eventLog.getPath(), fileSystem) val eventMonster = new EventMonster(conf) try { val replayer = new ReplayListenerBus() diff --git a/docs/configuration.md b/docs/configuration.md index 57b114c3cef6f..97e204d0660ff 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -802,6 +802,21 @@ Apart from these, the following properties are also available, and may be useful Buffer size to use when writing to output streams, in KiB unless otherwise specified. + + spark.eventLog.rolling.enabled + false + + Whether rolling over event log files is enabled. If set to true, it cuts down each event + log file to the configured size. + + + + spark.eventLog.rolling.maxFileSize + 128m + + The max size of event log file before it's rolled over. + + spark.ui.dagGraph.retainedRootRDDs Int.MaxValue