Skip to content

Commit

Permalink
Attempt ID in listener event should be an option.
Browse files Browse the repository at this point in the history
For backwards compatibility.
  • Loading branch information
Marcelo Vanzin committed Apr 8, 2015
1 parent 88b1de8 commit cbe8bba
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 26 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1404,17 +1404,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def stop() {
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.

if (!stopped.compareAndSet(false, true)) {
logInfo("SparkContext already stopped.")
return
}

postApplicationEnd()
ui.foreach(_.stop())
env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
cleaner.foreach(_.stop())
executorAllocationManager.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
Expand Down Expand Up @@ -1757,7 +1757,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, applicationAttemptId))
startTime, sparkUser, Some(applicationAttemptId)))
}

/** Post the application end event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = Some(applicationStart.appName)
appId = applicationStart.appId
appAttemptId = Some(applicationStart.appAttemptId)
appAttemptId = applicationStart.appAttemptId
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, appId: Option[String],
time: Long, sparkUser: String, appAttemptId: String = "") extends SparkListenerEvent
case class SparkListenerApplicationStart(appName: String, appId: Option[String],
time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ private[spark] object JsonProtocol {
val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
val appAttemptId = (json \ "appAttemptId").extract[String]
val appAttemptId = Utils.jsonOption(json \ "appAttemptId").map(_.extract[String])
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,28 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
// Write a new-style application log.
val newAppComplete = newLogFile("new1", "", inProgress = false)
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(5L)
)

// Write a new-style application log.
val newAppCompressedComplete = newLogFile("new1compressed", "", inProgress = false, Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"),
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(4L))

// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", "", inProgress = true)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
)

// Write an old-style application log.
val oldAppComplete = new File(testDir, "old1")
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
Expand All @@ -97,7 +97,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
)

// Force a reload of data from the log directory, and check that both logs are loaded.
Expand Down Expand Up @@ -144,7 +144,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
SparkListenerApplicationStart("app2", None, 2L, "test"),
SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
Expand All @@ -167,12 +167,12 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", "", inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", None, 1L, "test"),
SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", "", inProgress = false)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app1-2", None, 1L, "test"),
SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
Expand All @@ -188,7 +188,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers

val logFile1 = newLogFile("app1", "", inProgress = true)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
updateAndCheck(provider) { list =>
Expand All @@ -210,7 +210,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers

val logFile1 = newLogFile("app1", "", inProgress = true)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L))

val oldLog = new File(testDir, "old1")
Expand All @@ -226,7 +226,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers

val attempt1 = newLogFile("app1", "attempt1", inProgress = false)
writeFile(attempt1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", "attempt1"),
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
)

Expand All @@ -237,7 +237,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers

val attempt2 = newLogFile("app1", "attempt2", inProgress = true)
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", "attempt2")
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
)

updateAndCheck(provider) { list =>
Expand All @@ -249,7 +249,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val completedAttempt2 = newLogFile("app1", "attempt2", inProgress = false)
attempt2.delete()
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", "attempt2"),
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
SparkListenerApplicationEnd(4L)
)

Expand All @@ -262,7 +262,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers

val app2Attempt1 = newLogFile("app2", "attempt1", inProgress = false)
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", "attempt1"),
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
SparkListenerApplicationEnd(6L)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey")
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)

// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey")
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class JsonProtocolSuite extends FunSuite {
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield",
None)
val applicationEnd = SparkListenerApplicationEnd(42L)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
Expand Down Expand Up @@ -274,7 +275,7 @@ class JsonProtocolSuite extends FunSuite {

test("SparkListenerApplicationStart backwards compatibility") {
// SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user", None)
val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
.removeField({ _._1 == "App ID" })
assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
Expand Down

0 comments on commit cbe8bba

Please sign in to comment.