Skip to content

Commit

Permalink
[SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on…
Browse files Browse the repository at this point in the history
… accmulator

https://issues.apache.org/jira/browse/SPARK-3628

In current implementation, the accumulator will be updated for every successfully finished task, even the task is from a resubmitted stage, which makes the accumulator counter-intuitive

In this patch, I changed the way for the DAGScheduler to update the accumulator,

DAGScheduler maintains a HashTable, mapping the stage id to the received <accumulator_id , value> pairs. Only when the stage becomes independent, (no job needs it any more), we accumulate the values of the <accumulator_id , value> pairs, when a task finished, we check if the HashTable has contained such stageId, it saves the accumulator_id, value only when the task is the first finished task of a new stage or the stage is running for the first attempt...

Author: CodingCat <zhunansjtu@gmail.com>

Closes #2524 from CodingCat/SPARK-732-1 and squashes the following commits:

701a1e8 [CodingCat] roll back change on Accumulator.scala
1433e6f [CodingCat] make MIMA happy
b233737 [CodingCat] address Matei's comments
02261b8 [CodingCat] rollback  some changes
6b0aff9 [CodingCat] update document
2b2e8cf [CodingCat] updateAccumulator
83b75f8 [CodingCat] style fix
84570d2 [CodingCat] re-enable  the bad accumulator guard
1e9e14d [CodingCat] add NPE guard
21b6840 [CodingCat] simplify the patch
88d1f03 [CodingCat] fix rebase error
f74266b [CodingCat] add test case for resubmitted result stage
5cf586f [CodingCat] de-duplicate on task level
138f9b3 [CodingCat] make MIMA happy
67593d2 [CodingCat] make if allowing duplicate update as an option of accumulator
  • Loading branch information
CodingCat authored and mateiz committed Nov 27, 2014
1 parent 561d31d commit 5af53ad
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 30 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong

import scala.collection.generic.Growable
import scala.collection.mutable.Map
Expand Down Expand Up @@ -228,6 +229,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}

Expand Down Expand Up @@ -282,7 +284,7 @@ private object Accumulators {
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
var lastId: Long = 0

def newId: Long = synchronized {
def newId(): Long = synchronized {
lastId += 1
lastId
}
Expand Down
53 changes: 30 additions & 23 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ class DAGScheduler(
}
// data structures based on StageId
stageIdToStage -= stageId

logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}
Expand Down Expand Up @@ -902,6 +901,34 @@ class DAGScheduler(
}
}

/** Merge updates from a task to our local accumulator values */
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
if (event.accumUpdates != null) {
try {
Accumulators.add(event.accumUpdates)
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
} catch {
// If we see an exception during accumulator update, just log the
// error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
}

/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
Expand Down Expand Up @@ -942,27 +969,6 @@ class DAGScheduler(
}
event.reason match {
case Success =>
if (event.accumUpdates != null) {
try {
Accumulators.add(event.accumUpdates)
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
Expand All @@ -971,6 +977,7 @@ class DAGScheduler(
stage.resultOfJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
Expand All @@ -995,6 +1002,7 @@ class DAGScheduler(
}

case smt: ShuffleMapTask =>
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
Expand Down Expand Up @@ -1083,7 +1091,6 @@ class DAGScheduler(
}
failedStages += failedStage
failedStages += mapStage

// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null))
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null))
}
}
}

private def completeWithAccumulator(accumId: Long, taskSet: TaskSet,
results: Seq[(TaskEndReason, Any)]) {
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2,
Map[Long, Any]((accumId, 1)), null, null))
}
}
}
Expand Down Expand Up @@ -493,17 +504,16 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
runEvent(ExecutorLost("exec-hostA"))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
val noAccum = Map[Long, Any]()
val taskSet = taskSets(0)
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
// should work because it's a non-failed host
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null))
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
Expand Down Expand Up @@ -728,6 +738,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assert(scheduler.sc.dagScheduler === null)
}

test("accumulator not calculated for resubmitted result stage") {
//just for register
val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam)
val finalRdd = new MyRDD(sc, 1, Nil)
submit(finalRdd, Array(0))
completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assert(Accumulators.originals(accum.id).value === 1)
assertDataStructuresEmpty
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down
6 changes: 6 additions & 0 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,12 @@ vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

</div>

For accumulator updates performed inside <b>actions only</b>, Spark guarantees that each task's update to the accumulator
will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware
of that each task's update may be applied more than once if tasks or job stages are re-executed.



# Deploying to a Cluster

The [application submission guide](submitting-applications.html) describes how to submit applications to a cluster.
Expand Down

0 comments on commit 5af53ad

Please sign in to comment.