diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 363c0fddcc1f7..a0ab8a1becb21 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -71,12 +71,6 @@ class EdgePartition[ Some(activeSet)) } - /** Return a new `EdgePartition` with the specified active set. */ - def withActiveSet(activeSet: Option[VertexSet]): EdgePartition[ED, VD] = { - new EdgePartition( - localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet) - } - /** Return a new `EdgePartition` with updates to vertex attributes specified in `iter`. */ def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = { val newVertexAttrs = new Array[VD](vertexAttrs.length) @@ -116,8 +110,8 @@ class EdgePartition[ * @return a new edge partition with all edges reversed. */ def reverse: EdgePartition[ED, VD] = { - val builder = new VertexPreservingEdgePartitionBuilder( - global2local, local2global, vertexAttrs, size)(classTag[ED], classTag[VD]) + val builder = new ExistingEdgePartitionBuilder[ED, VD]( + global2local, local2global, vertexAttrs, activeSet, size) var i = 0 while (i < size) { val localSrcId = localSrcIds(i) @@ -128,7 +122,7 @@ class EdgePartition[ builder.add(dstId, srcId, localDstId, localSrcId, attr) i += 1 } - builder.toEdgePartition.withActiveSet(activeSet) + builder.toEdgePartition } /** @@ -189,8 +183,8 @@ class EdgePartition[ def filter( epred: EdgeTriplet[VD, ED] => Boolean, vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = { - val builder = new VertexPreservingEdgePartitionBuilder[ED, VD]( - global2local, local2global, vertexAttrs) + val builder = new ExistingEdgePartitionBuilder[ED, VD]( + global2local, local2global, vertexAttrs, activeSet) var i = 0 while (i < size) { // The user sees the EdgeTriplet, so we can't reuse it and must create one per edge. @@ -207,7 +201,7 @@ class EdgePartition[ } i += 1 } - builder.toEdgePartition.withActiveSet(activeSet) + builder.toEdgePartition } /** @@ -227,8 +221,8 @@ class EdgePartition[ * @return a new edge partition without duplicate edges */ def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = { - val builder = new VertexPreservingEdgePartitionBuilder[ED, VD]( - global2local, local2global, vertexAttrs) + val builder = new ExistingEdgePartitionBuilder[ED, VD]( + global2local, local2global, vertexAttrs, activeSet) var currSrcId: VertexId = null.asInstanceOf[VertexId] var currDstId: VertexId = null.asInstanceOf[VertexId] var currLocalSrcId = -1 @@ -260,7 +254,7 @@ class EdgePartition[ if (size > 0) { builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr) } - builder.toEdgePartition.withActiveSet(activeSet) + builder.toEdgePartition } /** @@ -276,8 +270,8 @@ class EdgePartition[ def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgePartition[ED2, _]) (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = { - val builder = new VertexPreservingEdgePartitionBuilder[ED3, VD]( - global2local, local2global, vertexAttrs) + val builder = new ExistingEdgePartitionBuilder[ED3, VD]( + global2local, local2global, vertexAttrs, activeSet) var i = 0 var j = 0 // For i = index of each edge in `this`... @@ -296,7 +290,7 @@ class EdgePartition[ } i += 1 } - builder.toEdgePartition.withActiveSet(activeSet) + builder.toEdgePartition } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index 675247d1686a9..95a9dca3d16e7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -77,14 +77,15 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla /** * Constructs an EdgePartition from an existing EdgePartition with the same vertex set. This enables - * reuse of the local vertex ids. + * reuse of the local vertex ids. Intended for internal use in EdgePartition only. */ -private[graphx] -class VertexPreservingEdgePartitionBuilder[ +private[impl] +class ExistingEdgePartitionBuilder[ @specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag]( global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int], local2global: Array[VertexId], vertexAttrs: Array[VD], + activeSet: Option[VertexSet], size: Int = 64) { var edges = new PrimitiveVector[EdgeWithLocalIds[ED]](size) @@ -119,14 +120,14 @@ class VertexPreservingEdgePartitionBuilder[ } new EdgePartition( - localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs) + localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet) } } -private[graphx] case class EdgeWithLocalIds[@specialized ED]( +private[impl] case class EdgeWithLocalIds[@specialized ED]( srcId: VertexId, dstId: VertexId, localSrcId: Int, localDstId: Int, attr: ED) -private[graphx] object EdgeWithLocalIds { +private[impl] object EdgeWithLocalIds { implicit def lexicographicOrdering[ED] = new Ordering[EdgeWithLocalIds[ED]] { override def compare(a: EdgeWithLocalIds[ED], b: EdgeWithLocalIds[ED]): Int = { if (a.srcId == b.srcId) {