Skip to content

Commit

Permalink
re-enable the bad accumulator guard
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Nov 22, 2014
1 parent 1e9e14d commit 84570d2
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -940,26 +940,6 @@ class DAGScheduler(
}
event.reason match {
case Success =>
if (event.accumUpdates != null) {
try {
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
Expand All @@ -969,7 +949,25 @@ class DAGScheduler(
case Some(job) =>
if (!job.finished(rt.outputId)) {
if (event.accumUpdates != null) {
Accumulators.add(event.accumUpdates)
try {
Accumulators.add(event.accumUpdates)
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
job.finished(rt.outputId) = true
job.numFinished += 1
Expand Down

0 comments on commit 84570d2

Please sign in to comment.