Skip to content

Commit

Permalink
[SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in VertexRDD
Browse files Browse the repository at this point in the history
Changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]. This change maintains backwards compatibility and better unifies the VertexRDD methods to match each other.

Author: Brennon York <brennon.york@capitalone.com>

Closes apache#4733 from brennonyork/SPARK-5922 and squashes the following commits:

e800f08 [Brennon York] fixed merge conflicts
b9274af [Brennon York] fixed merge conflicts
f86375c [Brennon York] fixed minor include line
398ddb4 [Brennon York] fixed merge conflicts
aac1810 [Brennon York] updated to aggregateUsingIndex and added test to ensure that method works properly
2af0b88 [Brennon York] removed deprecation line
753c963 [Brennon York] fixed merge conflicts and set preference to use the diff(other: VertexRDD[VD]) method
2c678c6 [Brennon York] added mima exclude to exclude new public diff method from VertexRDD
93186f3 [Brennon York] added back the original diff method to sustain binary compatibility
f18356e [Brennon York] changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]
  • Loading branch information
Brennon York authored and ankurdave committed Mar 16, 2015
1 parent aa6536f commit 45f4c66
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 0 deletions.
9 changes: 9 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ abstract class VertexRDD[VD](
*/
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]

/**
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
* differing values; for values that are different, keeps the values from `other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.
*
* @param other the other RDD[(VertexId, VD)] with which to diff against.
*/
def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]

/**
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
* differing values; for values that are different, keeps the values from `other`. This is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] (
override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))

override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
}

override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
val otherPartition = other match {
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
Expand Down
13 changes: 13 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite

import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

class VertexRDDSuite extends FunSuite with LocalSparkContext {
Expand Down Expand Up @@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
}
}

test("diff with RDD[(VertexId, VD)]") {
withSpark { sc =>
val n = 100
val verts = vertices(sc, n).cache()
val flipEvens: RDD[(VertexId, Int)] =
sc.parallelize(0L to 100L)
.map(id => if (id % 2 == 0) (id, -id.toInt) else (id, id.toInt)).cache()
// diff should keep only the changed vertices
assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
}
}

test("diff vertices with the non-equal number of partitions") {
withSpark { sc =>
val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
) ++ Seq(
// SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff")
)

case v if v.startsWith("1.2") =>
Expand Down

0 comments on commit 45f4c66

Please sign in to comment.