Skip to content

Commit

Permalink
Expose Spark version and include it in event logs
Browse files Browse the repository at this point in the history
This allows us to deal with inconsistencies in event log version
incompatibilities in the future.
  • Loading branch information
andrewor14 committed Apr 9, 2014
1 parent 2282300 commit d02dbaa
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 14 deletions.
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,9 @@ class SparkContext(config: SparkConf) extends Logging {
listenerBus.addListener(listener)
}

/** The version of Spark on which this application is running. */
def version = SparkContext.SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
Expand Down Expand Up @@ -1213,6 +1216,8 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {

private[spark] val SPARK_VERSION = "1.0.0"

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler

import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, FileLogger}

Expand Down Expand Up @@ -58,6 +58,7 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
}
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
logger.newFile(LOG_PREFIX + logger.fileIndex)
}

Expand Down Expand Up @@ -111,22 +112,33 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
}

private[spark] object EventLoggingListener {
val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
val LOG_PREFIX = "EVENT_LOG_"
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

def isSparkVersionFile(fileName: String): Boolean = {
fileName.startsWith(SPARK_VERSION_PREFIX)
}

def isEventLogFile(fileName: String): Boolean = {
fileName.contains(LOG_PREFIX)
fileName.startsWith(LOG_PREFIX)
}

def isCompressionCodecFile(fileName: String): Boolean = {
fileName.contains(COMPRESSION_CODEC_PREFIX)
fileName.startsWith(COMPRESSION_CODEC_PREFIX)
}

def isApplicationCompleteFile(fileName: String): Boolean = {
fileName == APPLICATION_COMPLETE
}

def parseSparkVersion(fileName: String): String = {
if (isSparkVersionFile(fileName)) {
fileName.replaceAll(SPARK_VERSION_PREFIX, "")
} else ""
}

def parseCompressionCodec(fileName: String): String = {
if (isCompressionCodecFile(fileName)) {
fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[spark] class ReplayListenerBus(
def this(logDir: String) = this(logDir, Utils.getHadoopFileSystem(logDir))

private var applicationComplete = false
private var sparkVersion: Option[String] = None
private var compressionCodec: Option[CompressionCodec] = None
private var logPaths = Array[Path]()
private var started = false
Expand All @@ -52,25 +53,27 @@ private[spark] class ReplayListenerBus(
* Prepare state for reading event logs.
*
* This gathers relevant files in the given directory and extracts meaning from each category.
* More specifically, this involves looking for event logs, the compression codec file
* (if event logs are compressed), and the application completion file (if the application
* has run to completion).
* More specifically, this involves looking for event logs, the Spark version file, the
* compression codec file (if event logs are compressed), and the application completion
* file (if the application has run to completion).
*/
def start() {
val filePaths = getFilePaths(logDir, fileSystem)
logPaths = filePaths.filter { file => EventLoggingListener.isEventLogFile(file.getName) }
compressionCodec =
filePaths.find { file =>
EventLoggingListener.isCompressionCodecFile(file.getName)
}.map { file =>
logPaths = filePaths
.filter { file => EventLoggingListener.isEventLogFile(file.getName) }
sparkVersion = filePaths
.find { file => EventLoggingListener.isSparkVersionFile(file.getName) }
.map { file => EventLoggingListener.parseSparkVersion(file.getName) }
compressionCodec = filePaths
.find { file => EventLoggingListener.isCompressionCodecFile(file.getName) }
.map { file =>
val codec = EventLoggingListener.parseCompressionCodec(file.getName)
val conf = new SparkConf
conf.set("spark.io.compression.codec", codec)
CompressionCodec.createCodec(conf)
}
applicationComplete = filePaths.exists { file =>
EventLoggingListener.isApplicationCompleteFile(file.getName)
}
applicationComplete = filePaths
.exists { file => EventLoggingListener.isApplicationCompleteFile(file.getName) }
started = true
}

Expand All @@ -80,6 +83,12 @@ private[spark] class ReplayListenerBus(
applicationComplete
}

/** Return the version of Spark on which the given application was run. */
def getSparkVersion: String = {
assert(started, "ReplayListenerBus not started yet")
sparkVersion.getOrElse("<Unknown>")
}

/**
* Replay each event in the order maintained in the given logs. This should only be called
* exactly once. Return whether event logs are actually found.
Expand Down

0 comments on commit d02dbaa

Please sign in to comment.