Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on accmulator #2524

Closed
wants to merge 15 commits into from
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong

import scala.collection.generic.Growable
import scala.collection.mutable.Map
Expand Down Expand Up @@ -228,6 +229,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}

Expand All @@ -252,7 +254,7 @@ private object Accumulators {
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
var lastId: Long = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can delete the lastId variable if you use AtomicLong.


def newId: Long = synchronized {
def newId(): Long = synchronized {
lastId += 1
lastId
}
Expand Down
53 changes: 30 additions & 23 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ class DAGScheduler(
}
// data structures based on StageId
stageIdToStage -= stageId

logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}
Expand Down Expand Up @@ -901,6 +900,34 @@ class DAGScheduler(
}
}

/** Merge updates from a task to our local accumulator values */
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
if (event.accumUpdates != null) {
try {
Accumulators.add(event.accumUpdates)
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
} catch {
// If we see an exception during accumulator update, just log the
// error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
}

/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
Expand Down Expand Up @@ -941,27 +968,6 @@ class DAGScheduler(
}
event.reason match {
case Success =>
if (event.accumUpdates != null) {
try {
Accumulators.add(event.accumUpdates)
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
Expand All @@ -970,6 +976,7 @@ class DAGScheduler(
stage.resultOfJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
Expand All @@ -994,6 +1001,7 @@ class DAGScheduler(
}

case smt: ShuffleMapTask =>
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
Expand Down Expand Up @@ -1082,7 +1090,6 @@ class DAGScheduler(
}
failedStages += failedStage
failedStages += mapStage

// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null))
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null))
}
}
}

private def completeWithAccumulator(accumId: Long, taskSet: TaskSet,
results: Seq[(TaskEndReason, Any)]) {
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2,
Map[Long, Any]((accumId, 1)), null, null))
}
}
}
Expand Down Expand Up @@ -493,17 +504,16 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
runEvent(ExecutorLost("exec-hostA"))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
val noAccum = Map[Long, Any]()
val taskSet = taskSets(0)
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
// should work because it's a non-failed host
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null))
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
Expand Down Expand Up @@ -728,6 +738,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assert(scheduler.sc.dagScheduler === null)
}

test("accumulator not calculated for resubmitted result stage") {
//just for register
val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam)
val finalRdd = new MyRDD(sc, 1, Nil)
submit(finalRdd, Array(0))
completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assert(Accumulators.originals(accum.id).value === 1)
assertDataStructuresEmpty
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down
6 changes: 6 additions & 0 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,12 @@ vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

</div>

For accumulator updates performed inside <b>actions only</b>, Spark guarantees that each task's update to the accumulator
will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware
of that each task's update may be applied more than once if tasks or job stages are re-executed.



# Deploying to a Cluster

The [application submission guide](submitting-applications.html) describes how to submit applications to a cluster.
Expand Down