Skip to content

Commit

Permalink
[SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero cor…
Browse files Browse the repository at this point in the history
…rectly

## What changes were proposed in this pull request?

It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode.

This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`.

## How was this patch tested?

a new test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21229 from cloud-fan/accumulator.
  • Loading branch information
cloud-fan committed May 4, 2018
1 parent 7f1b6b1 commit 4d5de4d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,9 @@ class LegacyAccumulatorWrapper[R, T](
param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] {
private[spark] var _value = initialValue // Current value on driver

override def isZero: Boolean = _value == param.zero(initialValue)
@transient private lazy val _zero = param.zero(initialValue)

override def isZero: Boolean = _value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef])

override def copy(): LegacyAccumulatorWrapper[R, T] = {
val acc = new LegacyAccumulatorWrapper(initialValue, param)
Expand All @@ -495,7 +497,7 @@ class LegacyAccumulatorWrapper[R, T](
}

override def reset(): Unit = {
_value = param.zero(initialValue)
_value = _zero
}

override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.util

import org.apache.spark._
import org.apache.spark.serializer.JavaSerializer

class AccumulatorV2Suite extends SparkFunSuite {

Expand Down Expand Up @@ -162,4 +163,22 @@ class AccumulatorV2Suite extends SparkFunSuite {
assert(acc3.isZero)
assert(acc3.value === "")
}

test("LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode") {
class MyData(val i: Int) extends Serializable

This comment has been minimized.

Copy link
@srowen

srowen Aug 4, 2018

Member

Weird, in the Scala 2.12 build, this class fails to be serializable, as it captures some scalatest class. I'm going to move it out of the method to see if that does it.

val param = new AccumulatorParam[MyData] {
override def zero(initialValue: MyData): MyData = new MyData(0)
override def addInPlace(r1: MyData, r2: MyData): MyData = new MyData(r1.i + r2.i)
}

val acc = new LegacyAccumulatorWrapper(new MyData(0), param)
acc.metadata = AccumulatorMetadata(
AccumulatorContext.newId(),
Some("test"),
countFailedValues = false)
AccumulatorContext.register(acc)

val ser = new JavaSerializer(new SparkConf).newInstance()
ser.serialize(acc)
}
}

0 comments on commit 4d5de4d

Please sign in to comment.