Skip to content

Commit

Permalink
Backward compatibility for old history server
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Nov 5, 2014
1 parent 1e50f71 commit dfb0032
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 29 deletions.
25 changes: 23 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,32 @@ case class FetchFailed(
case class ExceptionFailure(
className: String,
description: String,
stackTrace: String,
stackTrace: Array[StackTraceElement], // backwards compatibility
metrics: Option[TaskMetrics])
extends TaskFailedReason {

override def toErrorString: String = stackTrace
/** The stack trace message with a full (recursive) stack trace. */
private var fullStackTrace: String = null

private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
this(e.getClass.getName, e.getMessage, e.getStackTrace, metrics)
fullStackTrace = Utils.exceptionString(e)
}

private[spark] def getFullStackTrace: String = fullStackTrace

private[spark] def setFullStackTrace(fullStackTrace: String): this.type = {
this.fullStackTrace = fullStackTrace
this
}

override def toErrorString: String =
if (fullStackTrace == null) {
Utils.exceptionString(className, description, stackTrace)
}
else {
fullStackTrace
}
}

/**
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,7 @@ private[spark] class Executor(
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
}
val reason = ExceptionFailure(t.getClass.getName, t.getMessage, Utils.exceptionString(t),
metrics)
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

// Don't forcibly exit unless the exception was inherently fatal, to avoid
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,12 @@ private[spark] object JsonProtocol {
("Reduce ID" -> fetchFailed.reduceId) ~
("Message" -> fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
("Class Name" -> exceptionFailure.className) ~
("Description" -> exceptionFailure.description) ~
("Full Stack Trace" -> exceptionFailure.stackTrace) ~
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.getFullStackTrace) ~
("Metrics" -> metrics)
case ExecutorLostFailure(executorId) =>
("Executor ID" -> executorId)
Expand Down Expand Up @@ -635,14 +637,12 @@ private[spark] object JsonProtocol {
case `exceptionFailure` =>
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
val stackTrace = Utils.jsonOption(json \ "Full Stack Trace").map(_.extract[String]).
getOrElse {
// backward compatibility
val oldStackTrace = stackTraceFromJson(json \ "Stack Trace")
Utils.exceptionString(className, description, oldStackTrace)
}
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
val fullStackTrace = Utils.jsonOption(json \ "Full Stack Trace").
map(_.extract[String]).orNull
val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson)
ExceptionFailure(className, description, stackTrace, metrics)
ExceptionFailure(className, description, stackTrace, metrics).
setFullStackTrace(fullStackTrace)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `executorLostFailure` =>
Expand Down
24 changes: 7 additions & 17 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ class JsonProtocolSuite extends FunSuite {
// TaskEndReason
val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
"Some exception")
val exceptionFailure = ExceptionFailure("To be", "or not to be",
Utils.exceptionString(exception), None)
val exceptionFailure = new ExceptionFailure(exception, None)
testTaskEndReason(Success)
testTaskEndReason(Resubmitted)
testTaskEndReason(fetchFailed)
Expand All @@ -129,20 +128,10 @@ class JsonProtocolSuite extends FunSuite {
}

test("ExceptionFailure backward compatibility") {
val exceptionFailureJson =
"""{"Reason":"ExceptionFailure","Class Name":"To be","Description":"or not to be",
|"Stack Trace":[{"Declaring Class":"Apollo","Method Name":"Venus","File Name":"Mercury",
|"Line Number":42},{"Declaring Class":"Afollo","Method Name":"Vemus","File Name":"Mercurry"
|,"Line Number":420},{"Declaring Class":"Ayollo","Method Name":"Vesus","File Name":"Blackbe
|rry","Line Number":4200}]}""".stripMargin.replaceAll("\r|\n", "")

val exception = new Exception("Out of Memory! Please restock film.")
exception.setStackTrace(stackTrace)
val expectedExceptionFailure = ExceptionFailure("To be", "or not to be",
Utils.exceptionString("To be", "or not to be", stackTrace), None)

val exceptionFailure = JsonProtocol.taskEndReasonFromJson(parse(exceptionFailureJson))
assertEquals(expectedExceptionFailure, exceptionFailure)
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
val oldEvent = JsonProtocol.taskEndReasonToJson(exceptionFailure)
.removeField({ _._1 == "Full Stack Trace" })
assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent))
}

test("StageInfo backward compatibility") {
Expand Down Expand Up @@ -419,8 +408,9 @@ class JsonProtocolSuite extends FunSuite {
case (r1: ExceptionFailure, r2: ExceptionFailure) =>
assert(r1.className === r2.className)
assert(r1.description === r2.description)
assert(r1.stackTrace === r2.stackTrace)
assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals)
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
assert(r1.getFullStackTrace === r2.getFullStackTrace)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
Expand Down

0 comments on commit dfb0032

Please sign in to comment.