Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into configure-ports
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Aug 5, 2014
2 parents 93d359f + 41e0a21 commit c22ad00
Show file tree
Hide file tree
Showing 21 changed files with 375 additions and 59 deletions.
19 changes: 15 additions & 4 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@ import org.apache.spark.serializer.JavaSerializer
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T])
param: AccumulableParam[R, T],
val name: Option[String])
extends Serializable {

val id = Accumulators.newId
def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None)

val id: Long = Accumulators.newId

@transient private var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
private var deserialized = false
Expand Down Expand Up @@ -219,8 +225,10 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
extends Accumulable[T,T](initialValue, param)
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}

/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
Expand Down Expand Up @@ -281,4 +289,7 @@ private object Accumulators {
}
}
}

def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
def stringifyValue(value: Any) = "%s".format(value)
}
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,15 @@ class SparkContext(config: SparkConf) extends Logging {
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
* driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
new Accumulator(initialValue, param, Some(name))
}

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
Expand All @@ -769,6 +778,16 @@ class SparkContext(config: SparkConf) extends Logging {
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
* access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param, Some(name))

/**
* Create an accumulator from a "mutable collection" type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,40 +429,99 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]

/**
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
sc.accumulator(initialValue, name)(IntAccumulatorParam)
.asInstanceOf[Accumulator[java.lang.Integer]]

/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]

/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
.asInstanceOf[Accumulator[java.lang.Double]]

/**
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)

/**
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
intAccumulator(initialValue, name)

/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue)


/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue, name)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
sc.accumulator(initialValue)(accumulatorParam)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])
: Accumulator[T] =
sc.accumulator(initialValue, name)(accumulatorParam)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
*/
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
sc.accumulable(initialValue)(param)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R])
: Accumulable[T, R] =
sc.accumulable(initialValue, name)(param)

/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
*/
@DeveloperApi
class AccumulableInfo (
val id: Long,
val name: String,
val update: Option[String], // represents a partial update within a task
val value: String) {

override def equals(other: Any): Boolean = other match {
case acc: AccumulableInfo =>
this.id == acc.id && this.name == acc.name &&
this.update == acc.update && this.value == acc.value
case _ => false
}
}

object AccumulableInfo {
def apply(id: Long, name: String, update: Option[String], value: String) =
new AccumulableInfo(id, name, update, value)

def apply(id: Long, name: String, value: String) = new AccumulableInfo(id, name, None, value)
}
24 changes: 22 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,14 @@ class DAGScheduler(
val task = event.task
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
}

if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
return
Expand All @@ -906,12 +912,26 @@ class DAGScheduler(
if (event.accumUpdates != null) {
try {
Accumulators.add(event.accumUpdates)
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
stage.pendingTasks -= task
task match {
case rt: ResultTask[_, _] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import scala.collection.mutable.HashMap

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.RDDInfo

Expand All @@ -37,6 +39,8 @@ class StageInfo(
var completionTime: Option[Long] = None
/** If the stage failed, the reason why. */
var failureReason: Option[String] = None
/** Terminal values of accumulables updated during this stage. */
val accumulables = HashMap[Long, AccumulableInfo]()

def stageFailed(reason: String) {
failureReason = Some(reason)
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import scala.collection.mutable.ListBuffer

import org.apache.spark.annotation.DeveloperApi

/**
Expand All @@ -41,6 +43,13 @@ class TaskInfo(
*/
var gettingResultTime: Long = 0

/**
* Intermediate updates to accumulables during this task. Note that it is valid for the same
* accumulable to be updated multiple times in a single task or for two accumulables with the
* same name but different IDs to exist in a task.
*/
val accumulables = ListBuffer[AccumulableInfo]()

/**
* The time when the task has completed successfully (including the time to remotely fetch
* results, if necessary).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ui.jobs

import scala.collection.mutable.{HashMap, ListBuffer}
import scala.collection.mutable.{HashMap, ListBuffer, Map}

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -65,6 +65,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})

for ((id, info) <- stageCompleted.stageInfo.accumulables) {
stageData.accumulables(id) = info
}

poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
activeStages.remove(stageId)
if (stage.failureReason.isEmpty) {
Expand Down Expand Up @@ -130,6 +134,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})

for (accumulableInfo <- info.accumulables) {
stageData.accumulables(accumulableInfo.id) = accumulableInfo
}

val execSummaryMap = stageData.executorSummary
val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new ExecutorSummary)

Expand Down
Loading

0 comments on commit c22ad00

Please sign in to comment.