From 2added768ff0a12266ace1784bf61cd3bb6e8410 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 26 Feb 2014 20:39:16 -0800 Subject: [PATCH 1/6] Make it distinct --- .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 1d029bf009e8c..414555cef0dec 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -388,9 +388,10 @@ object GraphImpl { private def collectVertexIdsFromEdges( edges: EdgeRDD[_], partitioner: Partitioner): RDD[(VertexId, Int)] = { - // TODO: Consider doing map side distinct before shuffle. new ShuffledRDD[VertexId, Int, (VertexId, Int)]( - edges.collectVertexIds.map(vid => (vid, 0)), partitioner) + edges.collectVertexIds.mapPartitions( + (vids => vids.map(vid => (vid, 0)).toStream.distinct.toIterator)), + partitioner) .setSerializer(classOf[VertexIdMsgSerializer].getName) } } // end of object GraphImpl From 3219bff59987dc1deb5ce3710c0c87952776e1f3 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 7 Mar 2014 23:59:13 -0800 Subject: [PATCH 2/6] Use a hash set for unique in graphx instead --- .../scala/org/apache/spark/graphx/impl/GraphImpl.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 414555cef0dec..d89ff19b5adb5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -17,6 +17,8 @@ package org.apache.spark.graphx.impl +import java.util.HashSet + import scala.reflect.{classTag, ClassTag} import org.apache.spark.util.collection.PrimitiveVector @@ -389,8 +391,10 @@ object GraphImpl { edges: EdgeRDD[_], partitioner: Partitioner): RDD[(VertexId, Int)] = { new ShuffledRDD[VertexId, Int, (VertexId, Int)]( - edges.collectVertexIds.mapPartitions( - (vids => vids.map(vid => (vid, 0)).toStream.distinct.toIterator)), + edges.collectVertexIds.mapPartitions { vids => + val present = new HashSet[VertexId]() + vids.filter(vid => present.add(vid)).map(vid => (vid, 0)) + }), partitioner) .setSerializer(classOf[VertexIdMsgSerializer].getName) } From 4eb59affe53b3d9783c538ddbcb7dd283016ce23 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 8 Mar 2014 00:03:10 -0800 Subject: [PATCH 3/6] Fix typo --- .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index d89ff19b5adb5..5ed8e16a6cf7b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -394,8 +394,8 @@ object GraphImpl { edges.collectVertexIds.mapPartitions { vids => val present = new HashSet[VertexId]() vids.filter(vid => present.add(vid)).map(vid => (vid, 0)) - }), - partitioner) + }, + partitioner) .setSerializer(classOf[VertexIdMsgSerializer].getName) } } // end of object GraphImpl From e055a31ef3c252b4d38513e267031f5b90dca152 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 8 Mar 2014 00:16:29 -0800 Subject: [PATCH 4/6] Update the OpenHashSet comment to make sense (there is no constant EXISTANCE_MASK), provide default params for rehashIfNeeded, and switch the graphimpl to use it instead of the java hash map which is probably what Reynold ment in the CR feedback --- .../apache/spark/util/collection/OpenHashSet.scala | 5 +++-- .../spark/util/collection/OpenHashSetSuite.scala | 3 +++ .../org/apache/spark/graphx/impl/GraphImpl.scala | 12 ++++++++---- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 148c12e64d2ce..c358fb57301e7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -114,7 +114,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( * The caller is responsible for calling rehashIfNeeded. * * Use (retval & POSITION_MASK) to get the actual position, and - * (retval & EXISTENCE_MASK) != 0 for prior existence. + * (retval & NONEXISTENCE_MASK) != 0 for prior existence. * * @return The position where the key is placed, plus the highest order bit is set if the key * exists previously. @@ -151,7 +151,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( * @param moveFunc Callback invoked when we move the key from one position (in the old data array) * to a new position (in the new data array). */ - def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { + def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit = grow, + moveFunc: (Int, Int) => Unit = move) { if (_size > _growThreshold) { rehash(k, allocateFunc, moveFunc) } diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala index ff4a98f5dcd4a..65dcd46b5075e 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala @@ -73,6 +73,9 @@ class OpenHashSetSuite extends FunSuite with ShouldMatchers { assert(set.contains(50)) assert(set.contains(999)) assert(!set.contains(10000)) + + assert(set.addWithoutResize(50) & set.NONEXISTENCE_MASK != 0) + assert(set.addWithoutResize(10000) & set.NONEXISTENCE_MASK === 0) } test("primitive long") { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 5ed8e16a6cf7b..51da8e581db4b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -17,11 +17,10 @@ package org.apache.spark.graphx.impl -import java.util.HashSet - import scala.reflect.{classTag, ClassTag} import org.apache.spark.util.collection.PrimitiveVector +import org.apache.spark.util.collection.OpenHashSet import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.SparkContext._ import org.apache.spark.graphx._ @@ -392,8 +391,13 @@ object GraphImpl { partitioner: Partitioner): RDD[(VertexId, Int)] = { new ShuffledRDD[VertexId, Int, (VertexId, Int)]( edges.collectVertexIds.mapPartitions { vids => - val present = new HashSet[VertexId]() - vids.filter(vid => present.add(vid)).map(vid => (vid, 0)) + val present = new OpenHashSet[VertexId](vids.size) + vids.filter{ vid => + // This is a bit ugly but we can't just call add since add is of type unit + val isPresent = ((present.addWithoutResize(vid) & OpenHashSet.NONEXISTENCE_MASK) != 0) + present.rehashIfNeeded(vid) + isPresent + }.map(vid => (vid, 0)) }, partitioner) .setSerializer(classOf[VertexIdMsgSerializer].getName) From 238a4d8a78a086b3a13c737b4c97e3e008377b55 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 8 Mar 2014 00:24:10 -0800 Subject: [PATCH 5/6] Fix the tests I added and the comment --- .../scala/org/apache/spark/util/collection/OpenHashSet.scala | 4 ++-- .../org/apache/spark/util/collection/OpenHashSetSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index c358fb57301e7..8b5abb250f78d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -113,8 +113,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( * Add an element to the set. This one differs from add in that it doesn't trigger rehashing. * The caller is responsible for calling rehashIfNeeded. * - * Use (retval & POSITION_MASK) to get the actual position, and - * (retval & NONEXISTENCE_MASK) != 0 for prior existence. + * Use (retval & _mask) to get the actual position, and + * (retval & NONEXISTENCE_MASK) == 0 for prior existence. * * @return The position where the key is placed, plus the highest order bit is set if the key * exists previously. diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala index 65dcd46b5075e..c9880f0c4a85a 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala @@ -74,8 +74,8 @@ class OpenHashSetSuite extends FunSuite with ShouldMatchers { assert(set.contains(999)) assert(!set.contains(10000)) - assert(set.addWithoutResize(50) & set.NONEXISTENCE_MASK != 0) - assert(set.addWithoutResize(10000) & set.NONEXISTENCE_MASK === 0) + assert((set.addWithoutResize(50) & OpenHashSet.NONEXISTENCE_MASK) === 0) + assert((set.addWithoutResize(10000) & OpenHashSet.NONEXISTENCE_MASK) != 0) } test("primitive long") { From 42d952227cd0737b2e0b84380b28844aa9704811 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 8 Mar 2014 00:49:09 -0800 Subject: [PATCH 6/6] traversableonce you drive me crazy one again --- .../org/apache/spark/graphx/impl/GraphImpl.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 51da8e581db4b..3a52198d97b4b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -391,14 +391,18 @@ object GraphImpl { partitioner: Partitioner): RDD[(VertexId, Int)] = { new ShuffledRDD[VertexId, Int, (VertexId, Int)]( edges.collectVertexIds.mapPartitions { vids => - val present = new OpenHashSet[VertexId](vids.size) + val present = new OpenHashSet[VertexId]() vids.filter{ vid => // This is a bit ugly but we can't just call add since add is of type unit - val isPresent = ((present.addWithoutResize(vid) & OpenHashSet.NONEXISTENCE_MASK) != 0) - present.rehashIfNeeded(vid) - isPresent + val isPresent = ((present.addWithoutResize(vid) & OpenHashSet.NONEXISTENCE_MASK) == 0) + if (!isPresent) { + present.rehashIfNeeded(vid) + true + } else { + false + } }.map(vid => (vid, 0)) - }, + }, partitioner) .setSerializer(classOf[VertexIdMsgSerializer].getName) }