diff --git a/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala b/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala index d0e5647318b2c..6c32a8752df39 100644 --- a/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala @@ -22,11 +22,11 @@ import scala.collection.mutable.ArrayBuffer import org.json4s.jackson.JsonMethods._ +import org.apache.spark.SparkContext import org.apache.spark.scheduler._ import org.apache.spark.storage._ import org.apache.spark.util.FileLogger import org.apache.spark.util.JsonProtocol -import org.apache.spark.SparkContext private[ui] trait UISparkListener extends SparkListener @@ -74,7 +74,7 @@ private[ui] class GatewayUISparkListener(parent: SparkUI, sc: SparkContext) exte override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { listeners.foreach(_.onStageCompleted(stageCompleted)) - logEvent(stageCompleted) + logEvent(stageCompleted, flushLogger = true) } override def onTaskStart(taskStart: SparkListenerTaskStart) { @@ -122,7 +122,7 @@ private[ui] class GatewayUISparkListener(parent: SparkUI, sc: SparkContext) exte logEvent(unpersistRDD, flushLogger = true) } - def stop() = logger.foreach(_.close()) + def stop() = logger.foreach(_.stop()) } /** diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index f5831e1382c54..f7feed78d80a5 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -24,10 +24,11 @@ import java.util.Date import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.fs.{FSDataOutputStream, Path, FileSystem} /** - * A generic class for logging information to file + * A generic class for logging information to file. + * * @param logBaseDir Path to the directory in which files are logged * @param name An identifier of each FileLogger instance * @param overwriteExistingFiles Whether to overwrite existing files @@ -42,13 +43,17 @@ class FileLogger( private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") private var fileIndex = 0 + // Only defined if the file system scheme uses the Hadoop API + private var hadoopDataStream: Option[FSDataOutputStream] = None + private var hadoopFileSystem: Option[FileSystem] = None + private var writer: Option[PrintWriter] = { createLogDir() createWriter() } /** Create a logging directory with the given path */ - private def createLogDir() = { + private def createLogDir() { val dir = new File(logDir) if (dir.exists) { logWarning("Logging directory already exists: " + logDir) @@ -60,27 +65,37 @@ class FileLogger( } /** - * Create a new writer to the file identified with the given path file. - * File systems currently supported include hdfs, s3, and the local file system. + * Create a new writer for the file identified by the given path. + * + * File systems currently supported include HDFS, S3, and the local file system. + * The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ private def createWriter(): Option[PrintWriter] = { val logPath = logDir + "/" + fileIndex val uri = new URI(logPath) - val fileStream = uri.getScheme match { + + val dataStream = uri.getScheme match { case "hdfs" | "s3" => - val conf = SparkHadoopUtil.get.newConfiguration() - val fs = FileSystem.get(uri, conf) + val fs = hadoopFileSystem.getOrElse { + val conf = SparkHadoopUtil.get.newConfiguration() + hadoopFileSystem = Some(FileSystem.get(uri, conf)) + hadoopFileSystem.get + } val path = new Path(logPath) - fs.create(path, overwriteExistingFiles) + hadoopDataStream = Some(fs.create(path, overwriteExistingFiles)) + hadoopDataStream.get + case "file" | null => - // org.apache.hadoop.fs.FileSystem (r1.0.4) does not flush on local files // Second parameter is whether to append new FileOutputStream(logPath, !overwriteExistingFiles) - case _ => - logWarning("Given logging directory is invalid: %s".format(logDir)) - return None + + case unsupportedScheme => + throw new UnsupportedOperationException("File system scheme %s is not supported!" + .format(unsupportedScheme)) } - val bufferedStream = new BufferedOutputStream(fileStream) + + val bufferedStream = new BufferedOutputStream(dataStream) Some(new PrintWriter(bufferedStream)) } @@ -89,7 +104,7 @@ class FileLogger( * @param msg The message to be logged * @param withTime Whether to prepend message with a timestamp */ - def log(msg: String, withTime: Boolean = false) = { + def log(msg: String, withTime: Boolean = false) { var writeInfo = msg if (withTime) { val date = new Date(System.currentTimeMillis()) @@ -105,20 +120,39 @@ class FileLogger( */ def logLine(msg: String, withTime: Boolean = false) = log(msg + "\n", withTime) - /** Flush the writer to disk manually */ - def flush() = writer.foreach(_.flush()) + /** + * Flush the writer to disk manually. + * + * If the Hadoop FileSystem is used, the underlying FSDataOutputStream (r1.0.4) must be + * sync()'ed manually as it does not support flush(), which is invoked by when higher + * level streams are flushed. + */ + def flush() { + writer.foreach(_.flush()) + hadoopDataStream.foreach(_.sync()) + } /** Close the writer. Any subsequent calls to log or flush will have no effect. */ - def close() = { + def close() { writer.foreach(_.close()) writer = None } /** Start a new writer (for a new file) if there does not already exist one */ - def start() = { + def start() { writer.getOrElse { fileIndex += 1 writer = createWriter() } } + + /** + * Close all open writers, streams, and file systems. Any subsequent uses of this FileLogger + * instance will throw exceptions. + */ + def stop() { + hadoopDataStream.foreach(_.close()) + hadoopFileSystem.foreach(_.close()) + writer.foreach(_.close()) + } }