From 60bc6d57577742e861d62c183ec56d9893e3ea6a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 21 Mar 2014 18:17:43 -0700 Subject: [PATCH] First complete implementation of HistoryServer (only for finished apps) This involves a change in Spark's event log format. All event logs are now prefixed with EVENT_LOG_. If compression is used, the logger creates a special empty file prefixed with COMPRESSION_CODEC_ that indicates which codec is used. After the application finishes, the logger logs a special empty file named APPLICATION_COMPLETE. The ReplayListenerBus is now responsible for parsing all of the above file formats. In this commit, we establish a one-to-one mapping between ReplayListenerBus and event logging applications. The semantics of the ReplayListenerBus is further clarified (e.g. replay is not allowed before starting, and can only be called once). This commit also adds a control mechanism for the frequency at which HistoryServer accesses the disk to check for log updates. This enforces a minimum interval of N seconds between two checks, where N is arbitrarily chosen to be 5. --- .../scala/org/apache/spark/SparkContext.scala | 5 +- .../spark/deploy/ApplicationDescription.scala | 4 +- .../spark/deploy/history/HistoryServer.scala | 138 +++++++++++------- .../spark/deploy/history/IndexPage.scala | 6 +- .../apache/spark/deploy/master/Master.scala | 30 +--- .../spark/scheduler/ApplicationListener.scala | 8 +- .../scheduler/EventLoggingListener.scala | 55 +++++-- .../spark/scheduler/ReplayListenerBus.scala | 106 ++++++++++---- .../cluster/SparkDeploySchedulerBackend.scala | 3 +- .../apache/spark/storage/FileSegment.scala | 2 +- .../scala/org/apache/spark/ui/SparkUI.scala | 4 +- .../org/apache/spark/util/FileLogger.scala | 27 ++-- 12 files changed, 237 insertions(+), 151 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e9256c10b8e67..b0cac873ba24d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -164,15 +164,12 @@ class SparkContext( private[spark] val eventLogger: Option[EventLoggingListener] = { if (conf.getBoolean("spark.eventLog.enabled", false)) { val logger = new EventLoggingListener(appName, conf) + logger.start() listenerBus.addListener(logger) Some(logger) } else None } - // Information needed to replay logged events, if any - private[spark] val eventLoggingInfo: Option[EventLoggingInfo] = - eventLogger.map { logger => Some(logger.info) }.getOrElse(None) - // At this point, all relevant SparkListeners have been registered, so begin releasing events listenerBus.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 15fa8a7679874..86305d2ea8a09 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy -import org.apache.spark.scheduler.EventLoggingInfo - private[spark] class ApplicationDescription( val name: String, val maxCores: Option[Int], @@ -26,7 +24,7 @@ private[spark] class ApplicationDescription( val command: Command, val sparkHome: Option[String], var appUiUrl: String, - val eventLogInfo: Option[EventLoggingInfo] = None) + val eventLogDir: Option[String] = None) extends Serializable { val user = System.getProperty("user.name", "") diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index ea5875d7853bb..b36abbd9d4ff4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -53,6 +53,9 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf) private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir)) private val securityManager = new SecurityManager(conf) + // A timestamp of when the disk was last accessed to check for log updates + private var lastLogCheck = -1L + private val handlers = Seq[ServletContextHandler]( createStaticHandler(HistoryServer.STATIC_RESOURCE_DIR, "/static"), createServletHandler("/", @@ -74,81 +77,106 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf) checkForLogs() } - /** Parse app ID from the given log path. */ - def getAppId(logPath: String): String = logPath.split("/").last - - /** Return the address of this server. */ - def getAddress = "http://" + host + ":" + boundPort - /** - * Check for any updated event logs. + * Check for any updates to event logs in the base directory. + * + * If a new finished application is found, the server renders the associated SparkUI + * from the application's event logs, attaches this UI to itself, and stores metadata + * information for this application. * - * If a new application is found, render the associated SparkUI and remember it. - * If an existing application is updated, re-render the associated SparkUI. - * If an existing application is removed, remove the associated SparkUI. + * If the logs for an existing finished application are no longer found, remove all + * associated information and detach the SparkUI. */ def checkForLogs() { - val logStatus = fileSystem.listStatus(new Path(baseLogDir)) - val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - - // Render any missing or outdated SparkUI - logDirs.foreach { dir => - val path = dir.getPath.toString - val appId = getAppId(path) - val lastUpdated = { - val logFiles = fileSystem.listStatus(dir.getPath) - if (logFiles != null) logFiles.map(_.getModificationTime).max else dir.getModificationTime - } - if (!appIdToInfo.contains(appId) || appIdToInfo(appId).lastUpdated < lastUpdated) { - maybeRenderUI(appId, path, lastUpdated) + if (logCheckReady) { + lastLogCheck = System.currentTimeMillis + val logStatus = fileSystem.listStatus(new Path(baseLogDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + + // Render SparkUI for any new completed applications + logDirs.foreach { dir => + val path = dir.getPath.toString + val appId = getAppId(path) + val lastUpdated = getModificationTime(dir) + if (!appIdToInfo.contains(appId)) { + maybeRenderUI(appId, path, lastUpdated) + } } - } - // Remove any outdated SparkUIs - val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) } - appIdToInfo.foreach { case (appId, info) => - if (!appIds.contains(appId)) { - detachUI(info.ui) - appIdToInfo.remove(appId) + // Remove any outdated SparkUIs + val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) } + appIdToInfo.foreach { case (appId, info) => + if (!appIds.contains(appId)) { + detachUI(info.ui) + appIdToInfo.remove(appId) + } } } } - /** Attempt to render a new SparkUI from event logs residing in the given log directory. */ + /** + * Render a new SparkUI from the event logs if the associated application is finished. + * + * HistoryServer looks for a special file that indicates application completion in the given + * directory. If this file exists, the associated application is regarded to be complete, in + * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing. + */ private def maybeRenderUI(appId: String, logPath: String, lastUpdated: Long) { - val replayBus = new ReplayListenerBus(conf) - val appListener = new ApplicationListener - replayBus.addListener(appListener) - val ui = new SparkUI(conf, replayBus, appId, "/history/%s".format(appId)) - - // Do not call ui.bind() to avoid creating a new server for each application - ui.start() - val success = replayBus.replay(logPath) - if (success) { - attachUI(ui) - if (!appListener.started) { - logWarning("Application has event logs but has not started: %s".format(appId)) + val replayBus = new ReplayListenerBus(logPath) + replayBus.start() + + // If the application completion file is found + if (replayBus.isApplicationComplete) { + val ui = new SparkUI(replayBus, appId, "/history/%s".format(appId)) + val appListener = new ApplicationListener + replayBus.addListener(appListener) + + // Do not call ui.bind() to avoid creating a new server for each application + ui.start() + val success = replayBus.replay() + if (success) { + attachUI(ui) + val appName = if (appListener.applicationStarted) appListener.appName else appId + ui.setAppName("%s (history)".format(appName)) + val startTime = appListener.startTime + val endTime = appListener.endTime + val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui) + appIdToInfo(appId) = info } - val appName = appListener.appName - val startTime = appListener.startTime - val endTime = appListener.endTime - val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui) - - // If the UI already exists, terminate it and replace it - appIdToInfo.remove(appId).foreach { info => detachUI(info.ui) } - appIdToInfo(appId) = info - - // Use mnemonic original app name rather than app ID - val originalAppName = "%s (history)".format(appName) - ui.setAppName(originalAppName) + } else { + logWarning("Skipping incomplete application: %s".format(logPath)) + } + replayBus.stop() + } + + /** Parse app ID from the given log path. */ + def getAppId(logPath: String): String = logPath.split("/").last + + /** Return the address of this server. */ + def getAddress = "http://" + host + ":" + boundPort + + /** Return when this directory is last modified. */ + private def getModificationTime(dir: FileStatus): Long = { + val logFiles = fileSystem.listStatus(dir.getPath) + if (logFiles != null) { + logFiles.map(_.getModificationTime).max + } else { + dir.getModificationTime } } + /** Return whether the last log check has happened sufficiently long ago. */ + private def logCheckReady: Boolean = { + System.currentTimeMillis - lastLogCheck > HistoryServer.UPDATE_INTERVAL_SECONDS * 1000 + } } object HistoryServer { val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR + // Minimum interval between each check for logs, which requires a disk access + val UPDATE_INTERVAL_SECONDS = 5 + def main(argStrings: Array[String]) { val conf = new SparkConf val args = new HistoryServerArguments(argStrings, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala index a92cf3160dcfc..2b997026b3a47 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala @@ -30,19 +30,18 @@ private[spark] class IndexPage(parent: HistoryServer) { private val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") def render(request: HttpServletRequest): Seq[Node] = { - // Check if logs have been updated parent.checkForLogs() // Populate app table, with most recently modified app first val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated } val appTable = UIUtils.listingTable(appHeader, appRow, appRows) - val content =
  • Event Log Location: {parent.baseLogDir}
  • -

    Applications

    {appTable} +

    +

    Finished Applications

    {appTable}
@@ -67,7 +66,6 @@ private[spark] class IndexPage(parent: HistoryServer) { val duration = if (difference > 0) DeployWebUI.formatDuration(difference) else "---" val logDirectory = parent.getAppId(info.logPath) val lastUpdated = dateFmt.format(new Date(info.lastUpdated)) - {appName} {startTime} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 26ecfa406af5b..ad15de5e4ce07 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -149,7 +149,6 @@ private[spark] class Master( override def postStop() { webUi.stop() - appIdToUI.values.foreach(_.stop()) masterMetricsSystem.stop() applicationMetricsSystem.stop() persistenceEngine.close() @@ -622,10 +621,7 @@ private[spark] class Master( if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach( a => { - appIdToUI.remove(a.id).foreach { ui => - ui.stop() - webUi.detachUI(ui) - } + appIdToUI.remove(a.id).foreach { ui => webUi.detachUI(ui) } applicationMetricsSystem.removeSource(a.appSource) }) completedApps.trimStart(toRemove) @@ -663,28 +659,14 @@ private[spark] class Master( */ def startPersistedSparkUI(app: ApplicationInfo): Option[SparkUI] = { val appName = app.desc.name - val eventLogInfo = app.desc.eventLogInfo.getOrElse { return None } - val eventLogDir = eventLogInfo.logDir - val eventCompressionCodec = eventLogInfo.compressionCodec - val appConf = new SparkConf - eventCompressionCodec.foreach { codec => - appConf.set("spark.eventLog.compress", "true") - appConf.set("spark.io.compression.codec", codec) - } - val replayBus = new ReplayListenerBus(appConf) - val ui = new SparkUI( - appConf, - replayBus, - "%s (finished)".format(appName), - "/history/%s".format(app.id)) + val eventLogDir = app.desc.eventLogDir.getOrElse { return None } + val replayBus = new ReplayListenerBus(eventLogDir) + val ui = new SparkUI(replayBus, "%s (finished)".format(appName), "/history/%s".format(app.id)) // Do not call ui.bind() to avoid creating a new server for each application ui.start() - val success = replayBus.replay(eventLogDir) - if (!success) { - ui.stop() - None - } else Some(ui) + val success = replayBus.replay() + if (success) Some(ui) else None } /** Generate a new app ID given a app's submission date */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationListener.scala index ef984ed23dcff..9a20ad1bb5ef4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationListener.scala @@ -29,13 +29,13 @@ private[spark] class ApplicationListener extends SparkListener { var startTime = -1L var endTime = -1L - def started = startTime != -1 + def applicationStarted = startTime != -1 - def finished = endTime != -1 + def applicationFinished = endTime != -1 - def duration: Long = { + def applicationDuration: Long = { val difference = endTime - startTime - if (started && finished && difference > 0) difference else -1L + if (applicationStarted && applicationFinished && difference > 0) difference else -1L } override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 98cbdcb18ce2a..ead6a2904228b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -36,6 +36,8 @@ import org.apache.spark.util.{JsonProtocol, FileLogger} private[spark] class EventLoggingListener(appName: String, conf: SparkConf) extends SparkListener with Logging { + import EventLoggingListener._ + private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false) private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024 @@ -46,16 +48,19 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) private val logger = new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite) - // Information needed to replay the events logged by this listener later - val info = { - val compressionCodec = if (shouldCompress) { - Some(conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)) - } else None - EventLoggingInfo(logDir, compressionCodec) + /** + * Begin logging events. If compression is used, log a file that indicates which compression + * library is used. + */ + def start() { + logInfo("Logging events to %s".format(logDir)) + if (shouldCompress) { + val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) + logger.newFile(COMPRESSION_CODEC_PREFIX + codec) + } + logger.newFile(LOG_PREFIX + logger.fileIndex) } - logInfo("Logging events to %s".format(logDir)) - /** Log the event as JSON */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { val eventJson = compact(render(JsonProtocol.sparkEventToJson(event))) @@ -95,8 +100,36 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) override def onApplicationEnd(event: SparkListenerApplicationEnd) = logEvent(event, flushLogger = true) - def stop() = logger.stop() + /** + * Stop logging events. In addition, create an empty special file to indicate application + * completion. + */ + def stop() = { + logger.newFile(APPLICATION_COMPLETE) + logger.stop() + } } -// If compression is not enabled, compressionCodec is None -private[spark] case class EventLoggingInfo(logDir: String, compressionCodec: Option[String]) +private[spark] object EventLoggingListener { + val LOG_PREFIX = "EVENT_LOG_" + val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" + val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" + + def isEventLogFile(fileName: String): Boolean = { + fileName.contains(LOG_PREFIX) + } + + def isCompressionCodecFile(fileName: String): Boolean = { + fileName.contains(COMPRESSION_CODEC_PREFIX) + } + + def isApplicationCompleteFile(fileName: String): Boolean = { + fileName == APPLICATION_COMPLETE + } + + def parseCompressionCodec(fileName: String): String = { + if (isCompressionCodecFile(fileName)) { + fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "") + } else "" + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index db76178b65501..8c61e0742bc1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -31,38 +31,62 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} /** - * An EventBus that replays logged events from persisted storage + * An EventBus that replays logged events from persisted storage. + * + * This class expects files to be appropriately prefixed as specified in EventLoggingListener. + * There exists a one-to-one mapping between ReplayListenerBus and event logging applications. */ -private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus with Logging { - private val compressed = conf.getBoolean("spark.eventLog.compress", false) - - // Only used if compression is enabled - private lazy val compressionCodec = CompressionCodec.createCodec(conf) +private[spark] class ReplayListenerBus(logDir: String) extends SparkListenerBus with Logging { + private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) + private var applicationComplete = false + private var compressionCodec: Option[CompressionCodec] = None + private var logPaths = Array[Path]() + private var started = false + private var replayed = false /** - * Return a list of paths representing log files in the given directory. + * Prepare state for reading event logs. + * + * This gathers relevant files in the given directory and extracts meaning from each category. + * More specifically, this involves looking for event logs, the compression codec file + * (if event logs are compressed), and the application completion file (if the application + * has run to completion). */ - private def getLogFilePaths(logDir: String, fileSystem: FileSystem): Array[Path] = { - val path = new Path(logDir) - if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) { - logWarning("Log path provided is not a valid directory: %s".format(logDir)) - return Array[Path]() - } - val logStatus = fileSystem.listStatus(path) - if (logStatus == null || !logStatus.exists(!_.isDir)) { - logWarning("Log path provided contains no log files: %s".format(logDir)) - return Array[Path]() - } - logStatus.filter(!_.isDir).map(_.getPath).sortBy(_.getName) + def start() { + val filePaths = getFilePaths(logDir, fileSystem) + logPaths = filePaths.filter { file => EventLoggingListener.isEventLogFile(file.getName) } + compressionCodec = + filePaths.find { file => + EventLoggingListener.isCompressionCodecFile(file.getName) + }.map { file => + val codec = EventLoggingListener.parseCompressionCodec(file.getName) + val conf = new SparkConf + conf.set("spark.io.compression.codec", codec) + CompressionCodec.createCodec(conf) + } + applicationComplete = + filePaths.exists { file => + EventLoggingListener.isApplicationCompleteFile(file.getName) + } + started = true + } + + /** Return whether the associated application signaled completion. */ + def isApplicationComplete: Boolean = { + assert(started, "ReplayListenerBus not started yet") + applicationComplete } /** - * Replay each event in the order maintained in the given logs. + * Replay each event in the order maintained in the given logs. This should only be called + * exactly once. Return whether event logs are actually found. */ - def replay(logDir: String): Boolean = { - val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - val logPaths = getLogFilePaths(logDir, fileSystem) + def replay(): Boolean = { + assert(started, "ReplayListenerBus must be started before replaying logged events") + assert(!replayed, "ReplayListenerBus cannot replay events more than once") + if (logPaths.length == 0) { + logWarning("Log path provided contains no log files: %s".format(logDir)) return false } @@ -72,15 +96,11 @@ private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus var fileStream: Option[InputStream] = None var bufferedStream: Option[InputStream] = None var compressStream: Option[InputStream] = None - var currentLine = "" + var currentLine = "" try { - currentLine = "" fileStream = Some(fileSystem.open(path)) bufferedStream = Some(new FastBufferedInputStream(fileStream.get)) - compressStream = - if (compressed) { - Some(compressionCodec.compressedInputStream(bufferedStream.get)) - } else bufferedStream + compressStream = Some(wrapForCompression(bufferedStream.get)) // Parse each line as an event and post it to all attached listeners val lines = Source.fromInputStream(compressStream.get).getLines() @@ -98,7 +118,33 @@ private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus compressStream.foreach(_.close()) } } - fileSystem.close() + + replayed = true true } + + /** Stop the file system. */ + def stop() { + fileSystem.close() + } + + /** If a compression codec is specified, wrap the given stream in a compression stream. */ + private def wrapForCompression(stream: InputStream): InputStream = { + compressionCodec.map { codec => codec.compressedInputStream(stream) }.getOrElse(stream) + } + + /** Return a list of paths representing files found in the given directory. */ + private def getFilePaths(logDir: String, fileSystem: FileSystem): Array[Path] = { + val path = new Path(logDir) + if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) { + logWarning("Log path provided is not a valid directory: %s".format(logDir)) + return Array[Path]() + } + val logStatus = fileSystem.listStatus(path) + if (logStatus == null || !logStatus.exists(!_.isDir)) { + logWarning("No files are found in the given log directory: %s".format(logDir)) + return Array[Path]() + } + logStatus.filter(!_.isDir).map(_.getPath).sortBy(_.getName) + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 25b7472a99cdb..6a73ab10b5ba6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -48,8 +48,9 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome() + val eventLogDir = sc.eventLogger.map { logger => Some(logger.logDir) }.getOrElse(None) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sparkHome, sc.ui.appUIAddress, sc.eventLoggingInfo) + sparkHome, sc.ui.appUIAddress, eventLogDir) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala index 555486830a769..132502b75f8cd 100644 --- a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala +++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala @@ -23,6 +23,6 @@ import java.io.File * References a particular segment of a file (potentially the entire file), * based off an offset and a length. */ -private[spark] class FileSegment(val file: File, val offset: Long, val length : Long) { +private[spark] class FileSegment(val file: File, val offset: Long, val length: Long) { override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length) } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index dc1b0c986c1d7..dc457a1ddcf06 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -39,8 +39,8 @@ private[spark] class SparkUI( extends WebUI("SparkUI") with Logging { def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName) - def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) = - this(null, conf, listenerBus, appName, basePath) + def this(listenerBus: SparkListenerBus, appName: String, basePath: String) = + this(null, new SparkConf, listenerBus, appName, basePath) // If SparkContext is not provided, assume the associated application is not live val live = sc != null diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index f07962096a32c..479f9775a1ed8 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -46,7 +46,7 @@ class FileLogger( private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - private var fileIndex = 0 + var fileIndex = 0 // Only used if compression is enabled private lazy val compressionCodec = CompressionCodec.createCodec(conf) @@ -54,10 +54,9 @@ class FileLogger( // Only defined if the file system scheme is not local private var hadoopDataStream: Option[FSDataOutputStream] = None - private var writer: Option[PrintWriter] = { - createLogDir() - Some(createWriter()) - } + private var writer: Option[PrintWriter] = None + + createLogDir() /** * Create a logging directory with the given path. @@ -81,8 +80,8 @@ class FileLogger( /** * Create a new writer for the file identified by the given path. */ - private def createWriter(): PrintWriter = { - val logPath = logDir + "/" + fileIndex + private def createWriter(fileName: String): PrintWriter = { + val logPath = logDir + "/" + fileName val uri = new URI(logPath) /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). @@ -144,13 +143,17 @@ class FileLogger( } /** - * Start a writer for a new file if one does not already exit. + * Start a writer for a new file, closing the existing one if it exists. + * @param fileName Name of the new file, defaulting to the file index if not provided. */ - def start() { - writer.getOrElse { - fileIndex += 1 - writer = Some(createWriter()) + def newFile(fileName: String = "") { + fileIndex += 1 + writer.foreach(_.close()) + val name = fileName match { + case "" => fileIndex.toString + case _ => fileName } + writer = Some(createWriter(name)) } /**