Skip to content

Commit

Permalink
Fix HDFS flush behavior
Browse files Browse the repository at this point in the history
org.apache.hadoop.fs.FSDataOutputStream only supports sync(), but not flush(). This means that flushing
higher level streams doesn't actually do anything if the Hadoop FileSystem is used. This is now fixed.

Further, this clarifies why logging local files is handled as a special case by referencing a known,
unresolved bug in HDFS (HADOOP-7844).
  • Loading branch information
andrewor14 committed Mar 4, 2014
1 parent 36b3e5d commit 03eda0b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 22 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/UISparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import scala.collection.mutable.ArrayBuffer

import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkContext
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark.util.FileLogger
import org.apache.spark.util.JsonProtocol
import org.apache.spark.SparkContext

private[ui] trait UISparkListener extends SparkListener

Expand Down Expand Up @@ -74,7 +74,7 @@ private[ui] class GatewayUISparkListener(parent: SparkUI, sc: SparkContext) exte

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
listeners.foreach(_.onStageCompleted(stageCompleted))
logEvent(stageCompleted)
logEvent(stageCompleted, flushLogger = true)
}

override def onTaskStart(taskStart: SparkListenerTaskStart) {
Expand Down Expand Up @@ -122,7 +122,7 @@ private[ui] class GatewayUISparkListener(parent: SparkUI, sc: SparkContext) exte
logEvent(unpersistRDD, flushLogger = true)
}

def stop() = logger.foreach(_.close())
def stop() = logger.foreach(_.stop())
}

/**
Expand Down
72 changes: 53 additions & 19 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import java.util.Date

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.fs.{FSDataOutputStream, Path, FileSystem}

/**
* A generic class for logging information to file
* A generic class for logging information to file.
*
* @param logBaseDir Path to the directory in which files are logged
* @param name An identifier of each FileLogger instance
* @param overwriteExistingFiles Whether to overwrite existing files
Expand All @@ -42,13 +43,17 @@ class FileLogger(
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private var fileIndex = 0

// Only defined if the file system scheme uses the Hadoop API
private var hadoopDataStream: Option[FSDataOutputStream] = None
private var hadoopFileSystem: Option[FileSystem] = None

private var writer: Option[PrintWriter] = {
createLogDir()
createWriter()
}

/** Create a logging directory with the given path */
private def createLogDir() = {
private def createLogDir() {
val dir = new File(logDir)
if (dir.exists) {
logWarning("Logging directory already exists: " + logDir)
Expand All @@ -60,27 +65,37 @@ class FileLogger(
}

/**
* Create a new writer to the file identified with the given path file.
* File systems currently supported include hdfs, s3, and the local file system.
* Create a new writer for the file identified by the given path.
*
* File systems currently supported include HDFS, S3, and the local file system.
* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
* Therefore, for local files, use FileOutputStream instead.
*/
private def createWriter(): Option[PrintWriter] = {
val logPath = logDir + "/" + fileIndex
val uri = new URI(logPath)
val fileStream = uri.getScheme match {

val dataStream = uri.getScheme match {
case "hdfs" | "s3" =>
val conf = SparkHadoopUtil.get.newConfiguration()
val fs = FileSystem.get(uri, conf)
val fs = hadoopFileSystem.getOrElse {
val conf = SparkHadoopUtil.get.newConfiguration()
hadoopFileSystem = Some(FileSystem.get(uri, conf))
hadoopFileSystem.get
}
val path = new Path(logPath)
fs.create(path, overwriteExistingFiles)
hadoopDataStream = Some(fs.create(path, overwriteExistingFiles))
hadoopDataStream.get

case "file" | null =>
// org.apache.hadoop.fs.FileSystem (r1.0.4) does not flush on local files
// Second parameter is whether to append
new FileOutputStream(logPath, !overwriteExistingFiles)
case _ =>
logWarning("Given logging directory is invalid: %s".format(logDir))
return None

case unsupportedScheme =>
throw new UnsupportedOperationException("File system scheme %s is not supported!"
.format(unsupportedScheme))
}
val bufferedStream = new BufferedOutputStream(fileStream)

val bufferedStream = new BufferedOutputStream(dataStream)
Some(new PrintWriter(bufferedStream))
}

Expand All @@ -89,7 +104,7 @@ class FileLogger(
* @param msg The message to be logged
* @param withTime Whether to prepend message with a timestamp
*/
def log(msg: String, withTime: Boolean = false) = {
def log(msg: String, withTime: Boolean = false) {
var writeInfo = msg
if (withTime) {
val date = new Date(System.currentTimeMillis())
Expand All @@ -105,20 +120,39 @@ class FileLogger(
*/
def logLine(msg: String, withTime: Boolean = false) = log(msg + "\n", withTime)

/** Flush the writer to disk manually */
def flush() = writer.foreach(_.flush())
/**
* Flush the writer to disk manually.
*
* If the Hadoop FileSystem is used, the underlying FSDataOutputStream (r1.0.4) must be
* sync()'ed manually as it does not support flush(), which is invoked by when higher
* level streams are flushed.
*/
def flush() {
writer.foreach(_.flush())
hadoopDataStream.foreach(_.sync())
}

/** Close the writer. Any subsequent calls to log or flush will have no effect. */
def close() = {
def close() {
writer.foreach(_.close())
writer = None
}

/** Start a new writer (for a new file) if there does not already exist one */
def start() = {
def start() {
writer.getOrElse {
fileIndex += 1
writer = createWriter()
}
}

/**
* Close all open writers, streams, and file systems. Any subsequent uses of this FileLogger
* instance will throw exceptions.
*/
def stop() {
hadoopDataStream.foreach(_.close())
hadoopFileSystem.foreach(_.close())
writer.foreach(_.close())
}
}

0 comments on commit 03eda0b

Please sign in to comment.