From 5af53ada65f62e6b5987eada288fb48e9211ef9d Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 26 Nov 2014 16:52:04 -0800 Subject: [PATCH] [SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on accmulator https://issues.apache.org/jira/browse/SPARK-3628 In current implementation, the accumulator will be updated for every successfully finished task, even the task is from a resubmitted stage, which makes the accumulator counter-intuitive In this patch, I changed the way for the DAGScheduler to update the accumulator, DAGScheduler maintains a HashTable, mapping the stage id to the received pairs. Only when the stage becomes independent, (no job needs it any more), we accumulate the values of the pairs, when a task finished, we check if the HashTable has contained such stageId, it saves the accumulator_id, value only when the task is the first finished task of a new stage or the stage is running for the first attempt... Author: CodingCat Closes #2524 from CodingCat/SPARK-732-1 and squashes the following commits: 701a1e8 [CodingCat] roll back change on Accumulator.scala 1433e6f [CodingCat] make MIMA happy b233737 [CodingCat] address Matei's comments 02261b8 [CodingCat] rollback some changes 6b0aff9 [CodingCat] update document 2b2e8cf [CodingCat] updateAccumulator 83b75f8 [CodingCat] style fix 84570d2 [CodingCat] re-enable the bad accumulator guard 1e9e14d [CodingCat] add NPE guard 21b6840 [CodingCat] simplify the patch 88d1f03 [CodingCat] fix rebase error f74266b [CodingCat] add test case for resubmitted result stage 5cf586f [CodingCat] de-duplicate on task level 138f9b3 [CodingCat] make MIMA happy 67593d2 [CodingCat] make if allowing duplicate update as an option of accumulator --- .../scala/org/apache/spark/Accumulators.scala | 4 +- .../apache/spark/scheduler/DAGScheduler.scala | 53 +++++++++++-------- .../spark/scheduler/DAGSchedulerSuite.scala | 34 +++++++++--- docs/programming-guide.md | 6 +++ 4 files changed, 67 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index dc1e8f6c21b62..000bbd6b532ad 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.io.{ObjectInputStream, Serializable} +import java.util.concurrent.atomic.AtomicLong import scala.collection.generic.Growable import scala.collection.mutable.Map @@ -228,6 +229,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa */ class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String]) extends Accumulable[T,T](initialValue, param, name) { + def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None) } @@ -282,7 +284,7 @@ private object Accumulators { val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]() var lastId: Long = 0 - def newId: Long = synchronized { + def newId(): Long = synchronized { lastId += 1 lastId } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b1222af662e9b..cb8ccfbdbdcbb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -449,7 +449,6 @@ class DAGScheduler( } // data structures based on StageId stageIdToStage -= stageId - logDebug("After removal of stage %d, remaining stages = %d" .format(stageId, stageIdToStage.size)) } @@ -902,6 +901,34 @@ class DAGScheduler( } } + /** Merge updates from a task to our local accumulator values */ + private def updateAccumulators(event: CompletionEvent): Unit = { + val task = event.task + val stage = stageIdToStage(task.stageId) + if (event.accumUpdates != null) { + try { + Accumulators.add(event.accumUpdates) + event.accumUpdates.foreach { case (id, partialValue) => + val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]] + // To avoid UI cruft, ignore cases where value wasn't updated + if (acc.name.isDefined && partialValue != acc.zero) { + val name = acc.name.get + val stringPartialValue = Accumulators.stringifyPartialValue(partialValue) + val stringValue = Accumulators.stringifyValue(acc.value) + stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue) + event.taskInfo.accumulables += + AccumulableInfo(id, name, Some(stringPartialValue), stringValue) + } + } + } catch { + // If we see an exception during accumulator update, just log the + // error and move on. + case e: Exception => + logError(s"Failed to update accumulators for $task", e) + } + } + } + /** * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. @@ -942,27 +969,6 @@ class DAGScheduler( } event.reason match { case Success => - if (event.accumUpdates != null) { - try { - Accumulators.add(event.accumUpdates) - event.accumUpdates.foreach { case (id, partialValue) => - val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]] - // To avoid UI cruft, ignore cases where value wasn't updated - if (acc.name.isDefined && partialValue != acc.zero) { - val name = acc.name.get - val stringPartialValue = Accumulators.stringifyPartialValue(partialValue) - val stringValue = Accumulators.stringifyValue(acc.value) - stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue) - event.taskInfo.accumulables += - AccumulableInfo(id, name, Some(stringPartialValue), stringValue) - } - } - } catch { - // If we see an exception during accumulator update, just log the error and move on. - case e: Exception => - logError(s"Failed to update accumulators for $task", e) - } - } listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) stage.pendingTasks -= task @@ -971,6 +977,7 @@ class DAGScheduler( stage.resultOfJob match { case Some(job) => if (!job.finished(rt.outputId)) { + updateAccumulators(event) job.finished(rt.outputId) = true job.numFinished += 1 // If the whole job has finished, remove it @@ -995,6 +1002,7 @@ class DAGScheduler( } case smt: ShuffleMapTask => + updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) @@ -1083,7 +1091,6 @@ class DAGScheduler( } failedStages += failedStage failedStages += mapStage - // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 819f95634bcdc..bdd721dc7eaf7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -207,7 +207,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assert(taskSet.tasks.size >= results.size) for ((result, i) <- results.zipWithIndex) { if (i < taskSet.tasks.size) { - runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null)) + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null)) + } + } + } + + private def completeWithAccumulator(accumId: Long, taskSet: TaskSet, + results: Seq[(TaskEndReason, Any)]) { + assert(taskSet.tasks.size >= results.size) + for ((result, i) <- results.zipWithIndex) { + if (i < taskSet.tasks.size) { + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, + Map[Long, Any]((accumId, 1)), null, null)) } } } @@ -493,17 +504,16 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F runEvent(ExecutorLost("exec-hostA")) val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) - val noAccum = Map[Long, Any]() val taskSet = taskSets(0) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null)) // should work because it's a non-failed host - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null)) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null)) // should work because it's a new epoch taskSet.tasks(1).epoch = newEpoch - runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null)) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) @@ -728,6 +738,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assert(scheduler.sc.dagScheduler === null) } + test("accumulator not calculated for resubmitted result stage") { + //just for register + val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam) + val finalRdd = new MyRDD(sc, 1, Nil) + submit(finalRdd, Array(0)) + completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) + completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assert(Accumulators.originals(accum.id).value === 1) + assertDataStructuresEmpty + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 49f319ba775e5..c60de6e970531 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1306,6 +1306,12 @@ vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam()) +For accumulator updates performed inside actions only, Spark guarantees that each task's update to the accumulator +will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware +of that each task's update may be applied more than once if tasks or job stages are re-executed. + + + # Deploying to a Cluster The [application submission guide](submitting-applications.html) describes how to submit applications to a cluster.