From 0ec4ac7be4b268ceeff22357427e7c5ad1431fd9 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 3 Aug 2014 21:55:08 -0700 Subject: [PATCH] Java API's --- .../scala/org/apache/spark/Accumulators.scala | 2 +- .../scala/org/apache/spark/SparkContext.scala | 10 ++++ .../spark/api/java/JavaSparkContext.scala | 59 +++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 845eaabf43703..92fc037d4d049 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -230,7 +230,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa */ class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String], display: Boolean) extends Accumulable[T,T](initialValue, param, name, display) { - def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None, true) + def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None, false) } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4f669dd20e048..61392507ac32c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -775,6 +775,16 @@ class SparkContext(config: SparkConf) extends Logging { def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) = new Accumulable(initialValue, param) + /** + * Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the + * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can + * access the accumuable's `value`. + * @tparam T accumulator type + * @tparam R type that can be added to the accumulator + */ + def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) = + new Accumulable(initialValue, param, Some(name), true) + /** * Create an accumulator from a "mutable collection" type. * diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 1e0493c4855e0..1c83a611ea76a 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -414,6 +414,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] = sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]] + /** + * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values + * to using the `add` method. Only the master can access the accumulator's `value`. + * + * This version supports naming the accumulator for display in Spark's web UI. + */ + def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] = + sc.accumulator(initialValue, name)(IntAccumulatorParam) + .asInstanceOf[Accumulator[java.lang.Integer]] + /** * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. @@ -421,12 +431,31 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] = sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]] + /** + * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values + * to using the `add` method. Only the master can access the accumulator's `value`. + * + * This version supports naming the accumulator for display in Spark's web UI. + */ + def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] = + sc.accumulator(initialValue, name)(DoubleAccumulatorParam) + .asInstanceOf[Accumulator[java.lang.Double]] + /** * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. */ def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue) + /** + * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values + * to using the `add` method. Only the master can access the accumulator's `value`. + * + * This version supports naming the accumulator for display in Spark's web UI. + */ + def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] = + intAccumulator(initialValue, name) + /** * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values * to using the `add` method. Only the master can access the accumulator's `value`. @@ -434,6 +463,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def accumulator(initialValue: Double): Accumulator[java.lang.Double] = doubleAccumulator(initialValue) + + /** + * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values + * to using the `add` method. Only the master can access the accumulator's `value`. + * + * This version supports naming the accumulator for display in Spark's web UI. + */ + def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] = + doubleAccumulator(initialValue, name) + /** * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" * values to using the `add` method. Only the master can access the accumulator's `value`. @@ -441,6 +480,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] = sc.accumulator(initialValue)(accumulatorParam) + /** + * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" + * values to using the `add` method. Only the master can access the accumulator's `value`. + * + * This version supports naming the accumulator for display in Spark's web UI. + */ + def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T]) + : Accumulator[T] = + sc.accumulator(initialValue, name)(accumulatorParam) + /** * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks * can "add" values with `add`. Only the master can access the accumuable's `value`. @@ -448,6 +497,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] = sc.accumulable(initialValue)(param) + /** + * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks + * can "add" values with `add`. Only the master can access the accumuable's `value`. + * + * This version supports naming the accumulator for display in Spark's web UI. + */ + def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R]) + : Accumulable[T, R] = + sc.accumulable(initialValue, name)(param) + /** * Broadcast a read-only variable to the cluster, returning a * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.