From 6b93cf46f5805c80ec2d5d901c17fbb8f93e21cd Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 24 Oct 2018 09:08:26 -0500 Subject: [PATCH] [SPARK-16775][CORE] Remove deprecated accumulator v1 APIs ## What changes were proposed in this pull request? Remove deprecated accumulator v1 ## How was this patch tested? Existing tests. Closes #22730 from srowen/SPARK-16775. Authored-by: Sean Owen Signed-off-by: Sean Owen --- .../java/org/apache/spark/package-info.java | 4 +- .../scala/org/apache/spark/Accumulable.scala | 226 ------------------ .../scala/org/apache/spark/Accumulator.scala | 117 --------- .../scala/org/apache/spark/SparkContext.scala | 73 +----- .../spark/api/java/JavaSparkContext.scala | 113 --------- .../spark/scheduler/AccumulableInfo.scala | 2 +- .../org/apache/spark/util/AccumulatorV2.scala | 31 --- .../test/org/apache/spark/JavaAPISuite.java | 54 +---- .../org/apache/spark/AccumulatorSuite.scala | 148 +----------- .../spark/util/AccumulatorV2Suite.scala | 53 ---- .../org/apache/sparktest/ImplicitSuite.scala | 20 -- project/MimaExcludes.scala | 19 ++ 12 files changed, 30 insertions(+), 830 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/Accumulable.scala delete mode 100644 core/src/main/scala/org/apache/spark/Accumulator.scala diff --git a/core/src/main/java/org/apache/spark/package-info.java b/core/src/main/java/org/apache/spark/package-info.java index 4426c7afcebdd..a029931f9e4c0 100644 --- a/core/src/main/java/org/apache/spark/package-info.java +++ b/core/src/main/java/org/apache/spark/package-info.java @@ -16,8 +16,8 @@ */ /** - * Core Spark classes in Scala. A few classes here, such as {@link org.apache.spark.Accumulator} - * and {@link org.apache.spark.storage.StorageLevel}, are also used in Java, but the + * Core Spark classes in Scala. A few classes here, such as + * {@link org.apache.spark.storage.StorageLevel}, are also used in Java, but the * {@link org.apache.spark.api.java} package contains the main Java API. */ package org.apache.spark; diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala deleted file mode 100644 index 3092074232d18..0000000000000 --- a/core/src/main/scala/org/apache/spark/Accumulable.scala +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.io.Serializable - -import scala.collection.generic.Growable -import scala.reflect.ClassTag - -import org.apache.spark.scheduler.AccumulableInfo -import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, LegacyAccumulatorWrapper} - - -/** - * A data type that can be accumulated, i.e. has a commutative and associative "add" operation, - * but where the result type, `R`, may be different from the element type being added, `T`. - * - * You must define how to add data, and how to merge two of these together. For some data types, - * such as a counter, these might be the same operation. In that case, you can use the simpler - * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are - * accumulating a set. You will add items to the set, and you will union two sets together. - * - * Operations are not thread-safe. - * - * @param id ID of this accumulator; for internal use only. - * @param initialValue initial value of accumulator - * @param param helper object defining how to add elements of type `R` and `T` - * @param name human-readable name for use in Spark's web UI - * @param countFailedValues whether to accumulate values from failed tasks. This is set to true - * for system and time metrics like serialization time or bytes spilled, - * and false for things with absolute values like number of input rows. - * This should be used for internal metrics only. - * @tparam R the full accumulated data (result type) - * @tparam T partial data that can be added in - */ -@deprecated("use AccumulatorV2", "2.0.0") -class Accumulable[R, T] private ( - val id: Long, - // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile - @transient private val initialValue: R, - param: AccumulableParam[R, T], - val name: Option[String], - private[spark] val countFailedValues: Boolean) - extends Serializable { - - private[spark] def this( - initialValue: R, - param: AccumulableParam[R, T], - name: Option[String], - countFailedValues: Boolean) = { - this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues) - } - - private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = { - this(initialValue, param, name, false /* countFailedValues */) - } - - def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None) - - val zero = param.zero(initialValue) - private[spark] val newAcc = new LegacyAccumulatorWrapper(initialValue, param) - newAcc.metadata = AccumulatorMetadata(id, name, countFailedValues) - // Register the new accumulator in ctor, to follow the previous behaviour. - AccumulatorContext.register(newAcc) - - /** - * Add more data to this accumulator / accumulable - * @param term the data to add - */ - def += (term: T) { newAcc.add(term) } - - /** - * Add more data to this accumulator / accumulable - * @param term the data to add - */ - def add(term: T) { newAcc.add(term) } - - /** - * Merge two accumulable objects together - * - * Normally, a user will not want to use this version, but will instead call `+=`. - * @param term the other `R` that will get merged with this - */ - def ++= (term: R) { newAcc._value = param.addInPlace(newAcc._value, term) } - - /** - * Merge two accumulable objects together - * - * Normally, a user will not want to use this version, but will instead call `add`. - * @param term the other `R` that will get merged with this - */ - def merge(term: R) { newAcc._value = param.addInPlace(newAcc._value, term) } - - /** - * Access the accumulator's current value; only allowed on driver. - */ - def value: R = { - if (newAcc.isAtDriverSide) { - newAcc.value - } else { - throw new UnsupportedOperationException("Can't read accumulator value in task") - } - } - - /** - * Get the current value of this accumulator from within a task. - * - * This is NOT the global value of the accumulator. To get the global value after a - * completed operation on the dataset, call `value`. - * - * The typical use of this method is to directly mutate the local value, eg., to add - * an element to a Set. - */ - def localValue: R = newAcc.value - - /** - * Set the accumulator's value; only allowed on driver. - */ - def value_= (newValue: R) { - if (newAcc.isAtDriverSide) { - newAcc._value = newValue - } else { - throw new UnsupportedOperationException("Can't assign accumulator value in task") - } - } - - /** - * Set the accumulator's value. For internal use only. - */ - def setValue(newValue: R): Unit = { newAcc._value = newValue } - - /** - * Set the accumulator's value. For internal use only. - */ - private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) } - - /** - * Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values. - */ - private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) - new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) - } - - override def toString: String = if (newAcc._value == null) "null" else newAcc._value.toString -} - - -/** - * Helper object defining how to accumulate values of a particular type. An implicit - * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type. - * - * @tparam R the full accumulated data (result type) - * @tparam T partial data that can be added in - */ -@deprecated("use AccumulatorV2", "2.0.0") -trait AccumulableParam[R, T] extends Serializable { - /** - * Add additional data to the accumulator value. Is allowed to modify and return `r` - * for efficiency (to avoid allocating objects). - * - * @param r the current value of the accumulator - * @param t the data to be added to the accumulator - * @return the new value of the accumulator - */ - def addAccumulator(r: R, t: T): R - - /** - * Merge two accumulated values together. Is allowed to modify and return the first value - * for efficiency (to avoid allocating objects). - * - * @param r1 one set of accumulated data - * @param r2 another set of accumulated data - * @return both data sets merged together - */ - def addInPlace(r1: R, r2: R): R - - /** - * Return the "zero" (identity) value for an accumulator type, given its initial value. For - * example, if R was a vector of N dimensions, this would return a vector of N zeroes. - */ - def zero(initialValue: R): R -} - - -@deprecated("use AccumulatorV2", "2.0.0") -private[spark] class -GrowableAccumulableParam[R : ClassTag, T] - (implicit rg: R => Growable[T] with TraversableOnce[T] with Serializable) - extends AccumulableParam[R, T] { - - def addAccumulator(growable: R, elem: T): R = { - growable += elem - growable - } - - def addInPlace(t1: R, t2: R): R = { - t1 ++= t2 - t1 - } - - def zero(initialValue: R): R = { - // We need to clone initialValue, but it's hard to specify that R should also be Cloneable. - // Instead we'll serialize it to a buffer and load it back. - val ser = new JavaSerializer(new SparkConf(false)).newInstance() - val copy = ser.deserialize[R](ser.serialize(initialValue)) - copy.clear() // In case it contained stuff - copy - } -} diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala deleted file mode 100644 index 9d5fbefc824ad..0000000000000 --- a/core/src/main/scala/org/apache/spark/Accumulator.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -/** - * A simpler value of [[Accumulable]] where the result type being accumulated is the same - * as the types of elements being merged, i.e. variables that are only "added" to through an - * associative and commutative operation and can therefore be efficiently supported in parallel. - * They can be used to implement counters (as in MapReduce) or sums. Spark natively supports - * accumulators of numeric value types, and programmers can add support for new types. - * - * An accumulator is created from an initial value `v` by calling `SparkContext.accumulator`. - * Tasks running on the cluster can then add to it using the `+=` operator. - * However, they cannot read its value. Only the driver program can read the accumulator's value, - * using its [[#value]] method. - * - * The interpreter session below shows an accumulator being used to add up the elements of an array: - * - * {{{ - * scala> val accum = sc.accumulator(0) - * accum: org.apache.spark.Accumulator[Int] = 0 - * - * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) - * ... - * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s - * - * scala> accum.value - * res2: Int = 10 - * }}} - * - * @param initialValue initial value of accumulator - * @param param helper object defining how to add elements of type `T` - * @param name human-readable name associated with this accumulator - * @param countFailedValues whether to accumulate values from failed tasks - * @tparam T result type -*/ -@deprecated("use AccumulatorV2", "2.0.0") -class Accumulator[T] private[spark] ( - // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile - @transient private val initialValue: T, - param: AccumulatorParam[T], - name: Option[String] = None, - countFailedValues: Boolean = false) - extends Accumulable[T, T](initialValue, param, name, countFailedValues) - - -/** - * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add - * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be - * available when you create Accumulators of a specific type. - * - * @tparam T type of value to accumulate - */ -@deprecated("use AccumulatorV2", "2.0.0") -trait AccumulatorParam[T] extends AccumulableParam[T, T] { - def addAccumulator(t1: T, t2: T): T = { - addInPlace(t1, t2) - } -} - - -@deprecated("use AccumulatorV2", "2.0.0") -object AccumulatorParam { - - // The following implicit objects were in SparkContext before 1.2 and users had to - // `import SparkContext._` to enable them. Now we move them here to make the compiler find - // them automatically. However, as there are duplicate codes in SparkContext for backward - // compatibility, please update them accordingly if you modify the following implicit objects. - - @deprecated("use AccumulatorV2", "2.0.0") - implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { - def addInPlace(t1: Double, t2: Double): Double = t1 + t2 - def zero(initialValue: Double): Double = 0.0 - } - - @deprecated("use AccumulatorV2", "2.0.0") - implicit object IntAccumulatorParam extends AccumulatorParam[Int] { - def addInPlace(t1: Int, t2: Int): Int = t1 + t2 - def zero(initialValue: Int): Int = 0 - } - - @deprecated("use AccumulatorV2", "2.0.0") - implicit object LongAccumulatorParam extends AccumulatorParam[Long] { - def addInPlace(t1: Long, t2: Long): Long = t1 + t2 - def zero(initialValue: Long): Long = 0L - } - - @deprecated("use AccumulatorV2", "2.0.0") - implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { - def addInPlace(t1: Float, t2: Float): Float = t1 + t2 - def zero(initialValue: Float): Float = 0f - } - - // Note: when merging values, this param just adopts the newer value. This is used only - // internally for things that shouldn't really be accumulated across tasks, like input - // read method, which should be the same across all tasks in the same stage. - @deprecated("use AccumulatorV2", "2.0.0") - private[spark] object StringAccumulatorParam extends AccumulatorParam[String] { - def addInPlace(t1: String, t2: String): String = t2 - def zero(initialValue: String): String = "" - } -} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 10f3168a4c2db..b3c9c030487cd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReferenc import scala.collection.JavaConverters._ import scala.collection.Map -import scala.collection.generic.Growable import scala.collection.mutable.HashMap import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} @@ -51,7 +50,7 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.status.{AppStatusSource, AppStatusStore} import org.apache.spark.status.api.v1.ThreadStackTrace @@ -1337,76 +1336,6 @@ class SparkContext(config: SparkConf) extends Logging { // Methods for creating shared variables - /** - * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" - * values to using the `+=` method. Only the driver can access the accumulator's `value`. - */ - @deprecated("use AccumulatorV2", "2.0.0") - def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = { - val acc = new Accumulator(initialValue, param) - cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) - acc - } - - /** - * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display - * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the - * driver can access the accumulator's `value`. - */ - @deprecated("use AccumulatorV2", "2.0.0") - def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) - : Accumulator[T] = { - val acc = new Accumulator(initialValue, param, Option(name)) - cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) - acc - } - - /** - * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values - * with `+=`. Only the driver can access the accumulable's `value`. - * @tparam R accumulator result type - * @tparam T type that can be added to the accumulator - */ - @deprecated("use AccumulatorV2", "2.0.0") - def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) - : Accumulable[R, T] = { - val acc = new Accumulable(initialValue, param) - cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) - acc - } - - /** - * Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the - * Spark UI. Tasks can add values to the accumulable using the `+=` operator. Only the driver can - * access the accumulable's `value`. - * @tparam R accumulator result type - * @tparam T type that can be added to the accumulator - */ - @deprecated("use AccumulatorV2", "2.0.0") - def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) - : Accumulable[R, T] = { - val acc = new Accumulable(initialValue, param, Option(name)) - cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) - acc - } - - /** - * Create an accumulator from a "mutable collection" type. - * - * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by - * standard mutable collections. So you can use this with mutable Map, Set, etc. - */ - @deprecated("use AccumulatorV2", "2.0.0") - def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] - (initialValue: R): Accumulable[R, T] = { - // TODO the context bound (<%) above should be replaced with simple type bound and implicit - // conversion but is a breaking change. This should be fixed in Spark 3.x. - val param = new GrowableAccumulableParam[R, T] - val acc = new Accumulable(initialValue, param) - cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) - acc - } - /** * Register the given accumulator. * diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 09c83849e26b2..ef15f95b3fe5b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -30,7 +30,6 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ -import org.apache.spark.AccumulatorParam._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream @@ -530,118 +529,6 @@ class JavaSparkContext(val sc: SparkContext) new JavaDoubleRDD(sc.union(rdds)) } - /** - * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values - * to using the `add` method. Only the master can access the accumulator's `value`. - */ - @deprecated("use sc().longAccumulator()", "2.0.0") - def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] = - sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]] - - /** - * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values - * to using the `add` method. Only the master can access the accumulator's `value`. - * - * This version supports naming the accumulator for display in Spark's web UI. - */ - @deprecated("use sc().longAccumulator(String)", "2.0.0") - def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] = - sc.accumulator(initialValue, name)(IntAccumulatorParam) - .asInstanceOf[Accumulator[java.lang.Integer]] - - /** - * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values - * to using the `add` method. Only the master can access the accumulator's `value`. - */ - @deprecated("use sc().doubleAccumulator()", "2.0.0") - def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] = - sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]] - - /** - * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values - * to using the `add` method. Only the master can access the accumulator's `value`. - * - * This version supports naming the accumulator for display in Spark's web UI. - */ - @deprecated("use sc().doubleAccumulator(String)", "2.0.0") - def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] = - sc.accumulator(initialValue, name)(DoubleAccumulatorParam) - .asInstanceOf[Accumulator[java.lang.Double]] - - /** - * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values - * to using the `add` method. Only the master can access the accumulator's `value`. - */ - @deprecated("use sc().longAccumulator()", "2.0.0") - def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue) - - /** - * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values - * to using the `add` method. Only the master can access the accumulator's `value`. - * - * This version supports naming the accumulator for display in Spark's web UI. - */ - @deprecated("use sc().longAccumulator(String)", "2.0.0") - def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] = - intAccumulator(initialValue, name) - - /** - * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values - * to using the `add` method. Only the master can access the accumulator's `value`. - */ - @deprecated("use sc().doubleAccumulator()", "2.0.0") - def accumulator(initialValue: Double): Accumulator[java.lang.Double] = - doubleAccumulator(initialValue) - - - /** - * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values - * to using the `add` method. Only the master can access the accumulator's `value`. - * - * This version supports naming the accumulator for display in Spark's web UI. - */ - @deprecated("use sc().doubleAccumulator(String)", "2.0.0") - def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] = - doubleAccumulator(initialValue, name) - - /** - * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" - * values to using the `add` method. Only the master can access the accumulator's `value`. - */ - @deprecated("use AccumulatorV2", "2.0.0") - def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] = - sc.accumulator(initialValue)(accumulatorParam) - - /** - * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" - * values to using the `add` method. Only the master can access the accumulator's `value`. - * - * This version supports naming the accumulator for display in Spark's web UI. - */ - @deprecated("use AccumulatorV2", "2.0.0") - def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T]) - : Accumulator[T] = - sc.accumulator(initialValue, name)(accumulatorParam) - - /** - * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks - * can "add" values with `add`. Only the master can access the accumulable's `value`. - */ - @deprecated("use AccumulatorV2", "2.0.0") - def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] = - sc.accumulable(initialValue)(param) - - /** - * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks - * can "add" values with `add`. Only the master can access the accumulable's `value`. - * - * This version supports naming the accumulator for display in Spark's web UI. - */ - @deprecated("use AccumulatorV2", "2.0.0") - def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R]) - : Accumulable[T, R] = - sc.accumulable(initialValue, name)(param) - /** * Broadcast a read-only variable to the cluster, returning a * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala index 0a5fe5a1d3ee1..d745345f4e0d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala @@ -22,7 +22,7 @@ import org.apache.spark.annotation.DeveloperApi /** * :: DeveloperApi :: - * Information about an [[org.apache.spark.Accumulable]] modified during a task or stage. + * Information about an [[org.apache.spark.util.AccumulatorV2]] modified during a task or stage. * * @param id accumulator ID * @param name accumulator name diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index bf618b4afbce0..d5b3ce36e742a 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -485,34 +485,3 @@ class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { _list.addAll(newValue) } } - - -class LegacyAccumulatorWrapper[R, T]( - initialValue: R, - param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] { - private[spark] var _value = initialValue // Current value on driver - - @transient private lazy val _zero = param.zero(initialValue) - - override def isZero: Boolean = _value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef]) - - override def copy(): LegacyAccumulatorWrapper[R, T] = { - val acc = new LegacyAccumulatorWrapper(initialValue, param) - acc._value = _value - acc - } - - override def reset(): Unit = { - _value = _zero - } - - override def add(v: T): Unit = _value = param.addAccumulator(_value, v) - - override def merge(other: AccumulatorV2[T, R]): Unit = other match { - case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.value) - case _ => throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") - } - - override def value: R = _value -} diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 3992ab7049bdd..365a93d2601e7 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -33,8 +33,6 @@ import java.util.Map; import java.util.concurrent.*; -import org.apache.spark.Accumulator; -import org.apache.spark.AccumulatorParam; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; @@ -186,7 +184,7 @@ public void randomSplit() { long s1 = splits[1].count(); long s2 = splits[2].count(); assertTrue(s0 + " not within expected range", s0 > 150 && s0 < 250); - assertTrue(s1 + " not within expected range", s1 > 250 && s0 < 350); + assertTrue(s1 + " not within expected range", s1 > 250 && s1 < 350); assertTrue(s2 + " not within expected range", s2 > 430 && s2 < 570); } @@ -956,7 +954,7 @@ public void wholeTextFiles() throws Exception { } @Test - public void textFilesCompressed() throws IOException { + public void textFilesCompressed() { String outputDir = new File(tempDir, "output").getAbsolutePath(); JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); rdd.saveAsTextFile(outputDir, DefaultCodec.class); @@ -1183,46 +1181,6 @@ public void zipPartitions() { assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); } - @SuppressWarnings("deprecation") - @Test - public void accumulators() { - JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - - Accumulator intAccum = sc.intAccumulator(10); - rdd.foreach(intAccum::add); - assertEquals((Integer) 25, intAccum.value()); - - Accumulator doubleAccum = sc.doubleAccumulator(10.0); - rdd.foreach(x -> doubleAccum.add((double) x)); - assertEquals((Double) 25.0, doubleAccum.value()); - - // Try a custom accumulator type - AccumulatorParam floatAccumulatorParam = new AccumulatorParam() { - @Override - public Float addInPlace(Float r, Float t) { - return r + t; - } - - @Override - public Float addAccumulator(Float r, Float t) { - return r + t; - } - - @Override - public Float zero(Float initialValue) { - return 0.0f; - } - }; - - Accumulator floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); - rdd.foreach(x -> floatAccum.add((float) x)); - assertEquals((Float) 25.0f, floatAccum.value()); - - // Test the setValue method - floatAccum.setValue(5.0f); - assertEquals((Float) 5.0f, floatAccum.value()); - } - @Test public void keyBy() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2)); @@ -1410,13 +1368,13 @@ public void sampleByKeyExact() { JavaPairRDD wrExact = rdd2.sampleByKeyExact(true, fractions, 1L); Map wrExactCounts = wrExact.countByKey(); assertEquals(2, wrExactCounts.size()); - assertTrue(wrExactCounts.get(0) == 2); - assertTrue(wrExactCounts.get(1) == 4); + assertEquals(2, (long) wrExactCounts.get(0)); + assertEquals(4, (long) wrExactCounts.get(1)); JavaPairRDD worExact = rdd2.sampleByKeyExact(false, fractions, 1L); Map worExactCounts = worExact.countByKey(); assertEquals(2, worExactCounts.size()); - assertTrue(worExactCounts.get(0) == 2); - assertTrue(worExactCounts.get(1) == 4); + assertEquals(2, (long) worExactCounts.get(0)); + assertEquals(4, (long) worExactCounts.get(1)); } private static class SomeCustomClass implements Serializable { diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 5d0ffd92647bc..435665d8a1ce2 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -28,7 +28,6 @@ import scala.util.control.NonFatal import org.scalatest.Matchers import org.scalatest.exceptions.TestFailedException -import org.apache.spark.AccumulatorParam.StringAccumulatorParam import org.apache.spark.scheduler._ import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator} @@ -45,21 +44,6 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex } } - implicit def setAccum[A]: AccumulableParam[mutable.Set[A], A] = - new AccumulableParam[mutable.Set[A], A] { - def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = { - t1 ++= t2 - t1 - } - def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = { - t1 += t2 - t1 - } - def zero(t: mutable.Set[A]) : mutable.Set[A] = { - new mutable.HashSet[A]() - } - } - test("accumulator serialization") { val ser = new JavaSerializer(new SparkConf).newInstance() val acc = createLongAccum("x") @@ -81,122 +65,6 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex assert(acc3.isAtDriverSide) } - test ("basic accumulation") { - sc = new SparkContext("local", "test") - val acc: Accumulator[Int] = sc.accumulator(0) - - val d = sc.parallelize(1 to 20) - d.foreach{x => acc += x} - acc.value should be (210) - - val longAcc = sc.accumulator(0L) - val maxInt = Integer.MAX_VALUE.toLong - d.foreach{x => longAcc += maxInt + x} - longAcc.value should be (210L + maxInt * 20) - } - - test("value not assignable from tasks") { - sc = new SparkContext("local", "test") - val acc: Accumulator[Int] = sc.accumulator(0) - - val d = sc.parallelize(1 to 20) - intercept[SparkException] { - d.foreach(x => acc.value = x) - } - } - - test ("add value to collection accumulators") { - val maxI = 1000 - for (nThreads <- List(1, 10)) { // test single & multi-threaded - sc = new SparkContext("local[" + nThreads + "]", "test") - val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) - val d = sc.parallelize(1 to maxI) - d.foreach { - x => acc += x - } - val v = acc.value.asInstanceOf[mutable.Set[Int]] - for (i <- 1 to maxI) { - v should contain(i) - } - resetSparkContext() - } - } - - test("value not readable in tasks") { - val maxI = 1000 - for (nThreads <- List(1, 10)) { // test single & multi-threaded - sc = new SparkContext("local[" + nThreads + "]", "test") - val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) - val d = sc.parallelize(1 to maxI) - an [SparkException] should be thrownBy { - d.foreach { - x => acc.value += x - } - } - resetSparkContext() - } - } - - test ("collection accumulators") { - val maxI = 1000 - for (nThreads <- List(1, 10)) { - // test single & multi-threaded - sc = new SparkContext("local[" + nThreads + "]", "test") - val setAcc = sc.accumulableCollection(mutable.HashSet[Int]()) - val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]()) - val mapAcc = sc.accumulableCollection(mutable.HashMap[Int, String]()) - val d = sc.parallelize((1 to maxI) ++ (1 to maxI)) - d.foreach { - x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)} - } - - // Note that this is typed correctly -- no casts necessary - setAcc.value.size should be (maxI) - bufferAcc.value.size should be (2 * maxI) - mapAcc.value.size should be (maxI) - for (i <- 1 to maxI) { - setAcc.value should contain(i) - bufferAcc.value should contain(i) - mapAcc.value should contain (i -> i.toString) - } - resetSparkContext() - } - } - - test ("localValue readable in tasks") { - val maxI = 1000 - for (nThreads <- List(1, 10)) { // test single & multi-threaded - sc = new SparkContext("local[" + nThreads + "]", "test") - val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) - val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet} - val d = sc.parallelize(groupedInts) - d.foreach { - x => acc.localValue ++= x - } - acc.value should be ((0 to maxI).toSet) - resetSparkContext() - } - } - - test ("garbage collection") { - // Create an accumulator and let it go out of scope to test that it's properly garbage collected - sc = new SparkContext("local", "test") - var acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) - val accId = acc.id - val ref = WeakReference(acc) - - // Ensure the accumulator is present - assert(ref.get.isDefined) - - // Remove the explicit reference to it and allow weak reference to get garbage collected - acc = null - System.gc() - assert(ref.get.isEmpty) - - AccumulatorContext.remove(accId) - assert(!AccumulatorContext.get(accId).isDefined) - } - test("get accum") { // Don't register with SparkContext for cleanup var acc = createLongAccum("a") @@ -221,20 +89,6 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex assert(AccumulatorContext.get(100000).isEmpty) } - test("string accumulator param") { - val acc = new Accumulator("", StringAccumulatorParam, Some("darkness")) - assert(acc.value === "") - acc.setValue("feeds") - assert(acc.value === "feeds") - acc.add("your") - assert(acc.value === "your") // value is overwritten, not concatenated - acc += "soul" - assert(acc.value === "soul") - acc ++= "with" - assert(acc.value === "with") - acc.merge("kindness") - assert(acc.value === "kindness") - } } private[spark] object AccumulatorSuite { @@ -256,7 +110,7 @@ private[spark] object AccumulatorSuite { } /** - * Make an `AccumulableInfo` out of an [[Accumulable]] with the intent to use the + * Make an `AccumulableInfo` out of an `AccumulatorV2` with the intent to use the * info as an accumulator update. */ def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None) diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala index 94c79388e3639..621399af731f7 100644 --- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala +++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala @@ -18,7 +18,6 @@ package org.apache.spark.util import org.apache.spark._ -import org.apache.spark.serializer.JavaSerializer class AccumulatorV2Suite extends SparkFunSuite { @@ -128,58 +127,6 @@ class AccumulatorV2Suite extends SparkFunSuite { assert(acc3.value.isEmpty) } - test("LegacyAccumulatorWrapper") { - val acc = new LegacyAccumulatorWrapper("default", AccumulatorParam.StringAccumulatorParam) - assert(acc.value === "default") - assert(!acc.isZero) - - acc.add("foo") - assert(acc.value === "foo") - assert(!acc.isZero) - - acc.add(new java.lang.String("bar")) - - val acc2 = acc.copyAndReset() - assert(acc2.value === "") - assert(acc2.isZero) - - assert(acc.value === "bar") - assert(!acc.isZero) - - acc2.add("baz") - assert(acc2.value === "baz") - assert(!acc2.isZero) - - // Test merging - acc.merge(acc2) - assert(acc.value === "baz") - assert(!acc.isZero) - - val acc3 = acc.copy() - assert(acc3.value === "baz") - assert(!acc3.isZero) - - acc3.reset() - assert(acc3.isZero) - assert(acc3.value === "") - } - - test("LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode") { - val param = new AccumulatorParam[MyData] { - override def zero(initialValue: MyData): MyData = new MyData(0) - override def addInPlace(r1: MyData, r2: MyData): MyData = new MyData(r1.i + r2.i) - } - - val acc = new LegacyAccumulatorWrapper(new MyData(0), param) - acc.metadata = AccumulatorMetadata( - AccumulatorContext.newId(), - Some("test"), - countFailedValues = false) - AccumulatorContext.register(acc) - - val ser = new JavaSerializer(new SparkConf).newInstance() - ser.serialize(acc) - } } class MyData(val i: Int) extends Serializable diff --git a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala index 2fb09ead4b2d8..24762ea2f4e6b 100644 --- a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala +++ b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala @@ -74,26 +74,6 @@ class ImplicitSuite { rdd.stats() } - def testDoubleAccumulatorParam(): Unit = { - val sc = mockSparkContext - sc.accumulator(123.4) - } - - def testIntAccumulatorParam(): Unit = { - val sc = mockSparkContext - sc.accumulator(123) - } - - def testLongAccumulatorParam(): Unit = { - val sc = mockSparkContext - sc.accumulator(123L) - } - - def testFloatAccumulatorParam(): Unit = { - val sc = mockSparkContext - sc.accumulator(123F) - } - def testIntWritableConverter(): Unit = { val sc = mockSparkContext sc.sequenceFile[Int, Int]("/a/test/path") diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a5d6d6366ede9..d6beac14bed66 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,25 @@ object MimaExcludes { // Exclude rules for 3.0.x lazy val v30excludes = v24excludes ++ Seq( + // [SPARK-16775] Remove deprecated accumulator v1 APIs + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Accumulable"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.AccumulatorParam"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Accumulator"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Accumulator$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.AccumulableParam"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.AccumulatorParam$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.AccumulatorParam$FloatAccumulatorParam$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.AccumulatorParam$DoubleAccumulatorParam$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.AccumulatorParam$LongAccumulatorParam$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.AccumulatorParam$IntAccumulatorParam$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.accumulable"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.accumulableCollection"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.accumulator"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.LegacyAccumulatorWrapper"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.intAccumulator"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.accumulable"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.doubleAccumulator"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.accumulator"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.api.java.JavaPairRDD.flatMapValues"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.api.java.JavaPairDStream.flatMapValues"),