Skip to content

Commit

Permalink
[SPARK-6765] Fix test code style for graphx.
Browse files Browse the repository at this point in the history
So we can turn style checker on for test code.

Author: Reynold Xin <rxin@databricks.com>

Closes #5410 from rxin/test-style-graphx and squashes the following commits:

89e253a [Reynold Xin] [SPARK-6765] Fix test code style for graphx.
  • Loading branch information
rxin committed Apr 8, 2015
1 parent 9d44ddc commit 8d812f9
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 85 deletions.
71 changes: 39 additions & 32 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val doubleRing = ring ++ ring
val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1)
assert(graph.edges.count() === doubleRing.size)
assert(graph.edges.collect.forall(e => e.attr == 1))
assert(graph.edges.collect().forall(e => e.attr == 1))

// uniqueEdges option should uniquify edges and store duplicate count in edge attributes
val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, Some(RandomVertexCut))
assert(uniqueGraph.edges.count() === ring.size)
assert(uniqueGraph.edges.collect.forall(e => e.attr == 2))
assert(uniqueGraph.edges.collect().forall(e => e.attr == 2))
}
}

Expand All @@ -64,7 +64,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert( graph.edges.count() === rawEdges.size )
// Vertices not explicitly provided but referenced by edges should be created automatically
assert( graph.vertices.count() === 100)
graph.triplets.collect.map { et =>
graph.triplets.collect().map { et =>
assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr))
assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr))
}
Expand All @@ -75,15 +75,17 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
(1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect().toSet
=== (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
}
}

test("partitionBy") {
withSpark { sc =>
def mkGraph(edges: List[(Long, Long)]) = Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
def nonemptyParts(graph: Graph[Int, Int]) = {
def mkGraph(edges: List[(Long, Long)]): Graph[Int, Int] = {
Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
}
def nonemptyParts(graph: Graph[Int, Int]): RDD[List[Edge[Int]]] = {
graph.edges.partitionsRDD.mapPartitions { iter =>
Iterator(iter.next()._2.iterator.toList)
}.filter(_.nonEmpty)
Expand All @@ -102,7 +104,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1)
// partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
// the same partition
assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
assert(
nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
// partitionBy(EdgePartition2D) puts identical edges in the same partition
assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1)

Expand Down Expand Up @@ -140,10 +143,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val g = Graph(
sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
assert(g.triplets.collect.map(_.toTuple).toSet ===
assert(g.triplets.collect().map(_.toTuple).toSet ===
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
val gPart = g.partitionBy(EdgePartition2D)
assert(gPart.triplets.collect.map(_.toTuple).toSet ===
assert(gPart.triplets.collect().map(_.toTuple).toSet ===
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
}
}
Expand All @@ -154,10 +157,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n)
// mapVertices preserving type
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
assert(mappedVAttrs.vertices.collect().toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
// mapVertices changing type
val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
assert(mappedVAttrs2.vertices.collect().toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
}
}

Expand All @@ -177,12 +180,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// Trigger initial vertex replication
graph0.triplets.foreach(x => {})
// Change type of replicated vertices, but preserve erased type
val graph1 = graph0.mapVertices {
case (vid, integerOpt) => integerOpt.map((x: java.lang.Integer) => (x.toDouble): java.lang.Double)
val graph1 = graph0.mapVertices { case (vid, integerOpt) =>
integerOpt.map((x: java.lang.Integer) => x.toDouble: java.lang.Double)
}
// Access replicated vertices, exposing the erased type
val graph2 = graph1.mapTriplets(t => t.srcAttr.get)
assert(graph2.edges.map(_.attr).collect.toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
assert(graph2.edges.map(_.attr).collect().toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
}
}

Expand All @@ -202,7 +205,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet ===
assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect().toSet ===
(1L to n).map(x => Edge(0, x, "vv")).toSet)
}
}
Expand All @@ -211,7 +214,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
assert(star.reverse.outDegrees.collect().toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
}
}

Expand All @@ -221,7 +224,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(result.collect.toSet === Set((1L, 2)))
assert(result.collect().toSet === Set((1L, 2)))
}
}

Expand All @@ -237,7 +240,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet)

// And 4 edges.
assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
assert(subgraph.edges.map(_.copy()).collect().toSet ===
(2 to n by 2).map(x => Edge(0, x, 1)).toSet)
}
}

Expand Down Expand Up @@ -273,9 +277,9 @@ class GraphSuite extends FunSuite with LocalSparkContext {
sc.parallelize((1 to n).flatMap(x =>
List((0: VertexId, x: VertexId), (0: VertexId, x: VertexId))), 1), "v")
val star2 = doubleStar.groupEdges { (a, b) => a}
assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
assert(star2.edges.collect().toArray.sorted(Edge.lexicographicOrdering[Int]) ===
star.edges.collect().toArray.sorted(Edge.lexicographicOrdering[Int]))
assert(star2.vertices.collect().toSet === star.vertices.collect().toSet)
}
}

Expand All @@ -300,21 +304,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
}
Iterator((et.srcId, 1))
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect().toSet
assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)

// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x+1) % n: VertexId)), 3)
val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x + 1) % n: VertexId)), 3)
val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) =>
newOpt.getOrElse(old)
}
val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
// Map function should only run on edges with source in the active set
if (et.srcId % 2 != 1) {
throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
}
Iterator((et.dstId, 1))
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect().toSet
assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)

}
Expand All @@ -340,17 +346,18 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val n = 5
val reverseStar = starGraph(sc, n).reverse.cache()
// outerJoinVertices changing type
val reverseStarDegrees =
reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) {
(vid, a, bOpt) => bOpt.getOrElse(0)
}
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect.toSet
(a: Int, b: Int) => a + b).collect().toSet
assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
// outerJoinVertices preserving type
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
val newReverseStar =
reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
assert(newReverseStar.vertices.map(_._2).collect.toSet ===
assert(newReverseStar.vertices.map(_._2).collect().toSet ===
(0 to n).map(x => "v%d".format(x)).toSet)
}
}
Expand All @@ -361,7 +368,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
val graph = Graph(verts, edges)
val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
.collect.toSet
.collect().toSet
assert(triplets ===
Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a")))
}
Expand Down Expand Up @@ -417,7 +424,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val graph = Graph.fromEdgeTuples(edges, 1)
val neighborAttrSums = graph.mapReduceTriplets[Int](
et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
assert(neighborAttrSums.collect().toSet === Set((0: VertexId, n)))
} finally {
sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.SparkContext
*/
trait LocalSparkContext {
/** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
def withSpark[T](f: SparkContext => T) = {
def withSpark[T](f: SparkContext => T): T = {
val conf = new SparkConf()
GraphXUtils.registerKryoClasses(conf)
val sc = new SparkContext("local", "test", conf)
Expand Down
26 changes: 13 additions & 13 deletions graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.storage.StorageLevel

class VertexRDDSuite extends FunSuite with LocalSparkContext {

def vertices(sc: SparkContext, n: Int) = {
private def vertices(sc: SparkContext, n: Int) = {
VertexRDD(sc.parallelize((0 to n).map(x => (x.toLong, x)), 5))
}

Expand All @@ -52,7 +52,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexA = VertexRDD(sc.parallelize(0 until 75, 2).map(i => (i.toLong, 0))).cache()
val vertexB = VertexRDD(sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1))).cache()
val vertexC = vertexA.minus(vertexB)
assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
assert(vertexC.map(_._1).collect().toSet === (0 until 25).toSet)
}
}

Expand All @@ -62,7 +62,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexB: RDD[(VertexId, Int)] =
sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1)).cache()
val vertexC = vertexA.minus(vertexB)
assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
assert(vertexC.map(_._1).collect().toSet === (0 until 25).toSet)
}
}

Expand All @@ -72,7 +72,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexB = VertexRDD(sc.parallelize(50 until 100, 2).map(i => (i.toLong, 1)))
assert(vertexA.partitions.size != vertexB.partitions.size)
val vertexC = vertexA.minus(vertexB)
assert(vertexC.map(_._1).collect.toSet === (0 until 50).toSet)
assert(vertexC.map(_._1).collect().toSet === (0 until 50).toSet)
}
}

Expand Down Expand Up @@ -106,7 +106,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexB = VertexRDD(sc.parallelize(8 until 16, 2).map(i => (i.toLong, 1)))
assert(vertexA.partitions.size != vertexB.partitions.size)
val vertexC = vertexA.diff(vertexB)
assert(vertexC.map(_._1).collect.toSet === (8 until 16).toSet)
assert(vertexC.map(_._1).collect().toSet === (8 until 16).toSet)
}
}

Expand All @@ -116,11 +116,11 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val verts = vertices(sc, n).cache()
val evens = verts.filter(q => ((q._2 % 2) == 0)).cache()
// leftJoin with another VertexRDD
assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
// leftJoin with an RDD
val evensRDD = evens.map(identity)
assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
}
}
Expand All @@ -134,7 +134,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexC = vertexA.leftJoin(vertexB) { (vid, old, newOpt) =>
old - newOpt.getOrElse(0)
}
assert(vertexC.filter(v => v._2 != 0).map(_._1).collect.toSet == (1 to 99 by 2).toSet)
assert(vertexC.filter(v => v._2 != 0).map(_._1).collect().toSet == (1 to 99 by 2).toSet)
}
}

Expand All @@ -144,11 +144,11 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val verts = vertices(sc, n).cache()
val evens = verts.filter(q => ((q._2 % 2) == 0)).cache()
// innerJoin with another VertexRDD
assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect.toSet ===
assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet)
// innerJoin with an RDD
val evensRDD = evens.map(identity)
assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect.toSet ===
assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet) }
}

Expand All @@ -161,7 +161,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexC = vertexA.innerJoin(vertexB) { (vid, old, newVal) =>
old - newVal
}
assert(vertexC.filter(v => v._2 == 0).map(_._1).collect.toSet == (0 to 98 by 2).toSet)
assert(vertexC.filter(v => v._2 == 0).map(_._1).collect().toSet == (0 to 98 by 2).toSet)
}
}

Expand All @@ -171,7 +171,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val verts = vertices(sc, n)
val messageTargets = (0 to n) ++ (0 to n by 2)
val messages = sc.parallelize(messageTargets.map(x => (x.toLong, 1)))
assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect.toSet ===
assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect().toSet ===
(0 to n).map(x => (x.toLong, if (x % 2 == 0) 2 else 1)).toSet)
}
}
Expand All @@ -183,7 +183,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
// test merge function
assert(rdd.collect.toSet == Set((0L, 0), (1L, 3), (2L, 9)))
assert(rdd.collect().toSet == Set((0L, 0), (1L, 3), (2L, 9)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
val ccGraph = gridGraph.connectedComponents()
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum()
assert(maxCCid === 0)
}
} // end of Grid connected components
Expand All @@ -42,16 +42,16 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse
val ccGraph = gridGraph.connectedComponents()
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum()
assert(maxCCid === 0)
}
} // end of Grid connected components


test("Chain Connected Components") {
withSpark { sc =>
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val chain1 = (0 until 9).map(x => (x, x + 1))
val chain2 = (10 until 20).map(x => (x, x + 1))
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0)
val ccGraph = twoChains.connectedComponents()
Expand All @@ -73,12 +73,12 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {

test("Reverse Chain Connected Components") {
withSpark { sc =>
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val chain1 = (0 until 9).map(x => (x, x + 1))
val chain2 = (10 until 20).map(x => (x, x + 1))
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse
val ccGraph = twoChains.connectedComponents()
val vertices = ccGraph.vertices.collect
val vertices = ccGraph.vertices.collect()
for ( (id, cc) <- vertices ) {
if (id < 10) {
assert(cc === 0)
Expand Down Expand Up @@ -120,9 +120,9 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
val ccGraph = graph.connectedComponents()
val vertices = ccGraph.vertices.collect
val vertices = ccGraph.vertices.collect()
for ( (id, cc) <- vertices ) {
assert(cc == 0)
assert(cc === 0)
}
}
} // end of toy connected components
Expand Down
Loading

0 comments on commit 8d812f9

Please sign in to comment.