diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index a3fe7b5fde109..3e271e37eb874 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -127,6 +127,7 @@ object Pregel extends Logging { // Loop var prevG: Graph[VD, ED] = null.asInstanceOf[Graph[VD, ED]] var i = 0 + logWarning("Starting pregel.") while (activeMessages > 0 && i < maxIterations) { // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. val newVerts = g.vertices.innerJoin(messages)(vprog).cache() @@ -150,13 +151,15 @@ object Pregel extends Logging { SparkEnv.get.blockManager.shuffleBlockManager.removeAllShuffleStuff() } - logInfo("Pregel finished iteration " + i) + logWarning("Pregel finished iteration " + i) // Unpersist the RDDs hidden by newly-materialized RDDs oldMessages.unpersist(blocking=false) newVerts.unpersist(blocking=false) prevG.unpersistVertices(blocking=false) // count the iteration + logWarning(s"Pregel iteration $i") + // println(s"Pregel iteration $i") i += 1 } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index c1da54fa73752..6329168ac2439 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -18,8 +18,10 @@ package org.apache.spark.graphx.lib import org.apache.spark._ +import scala.math._ import org.apache.spark.graphx._ import org.apache.spark.graphx.PartitionStrategy._ +import org.apache.spark.SparkContext._ /** * Driver program for running graph algorithms. @@ -127,27 +129,69 @@ object Analytics extends Logging { val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) val cc = ConnectedComponents.run(graph) - println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) + logWarning("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct().count()) + sc.stop() + + case "kcore" => + var numEPart = 4 + var kmax = 4 + var kmin = 1 + var partitionStrategy: Option[PartitionStrategy] = None + + options.foreach{ + case ("numEPart", v) => numEPart = v.toInt + case ("kmax", v) => kmax = v.toInt + case ("kmin", v) => kmin = v.toInt + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + if (kmax < 1) { + logWarning("kmax must be positive") + sys.exit(1) + } + if (kmax < kmin) { + logWarning("kmax must be greater than or equal to kmin") + sys.exit(1) + } + + println("======================================") + println("| KCORE |") + println("======================================") + + val sc = new SparkContext(host, "KCore(" + fname + ")", conf) + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + logWarning("Starting kcore") + val result = KCore.run(graph, kmax, kmin) + + logWarning("Size of cores: " + result.vertices.map { case (vid,data) => (min(data, kmax), 1) }.reduceByKey((_+_)).collect().mkString(", ")) sc.stop() case "triangles" => var numEPart = 4 // TriangleCount requires the graph to be partitioned - var partitionStrategy: PartitionStrategy = RandomVertexCut + var partitionStrategy: Option[PartitionStrategy] = None options.foreach{ case ("numEPart", v) => numEPart = v.toInt - case ("partStrategy", v) => partitionStrategy = pickPartitioner(v) + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } println("======================================") println("| Triangle Count |") println("======================================") val sc = new SparkContext(host, "TriangleCount(" + fname + ")", conf) - val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, - minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, + minEdgePartitions = numEPart).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + // val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, + // minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() + logWarning(s"Graph has ${graph.vertices.count} vertices") val triangles = TriangleCount.run(graph) - println("Triangles: " + triangles.vertices.map { + logWarning("Triangles: " + triangles.vertices.map { case (vid,data) => data.toLong }.reduce(_ + _) / 3) sc.stop() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/KCore.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/KCore.scala new file mode 100644 index 0000000000000..560909917860e --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/KCore.scala @@ -0,0 +1,96 @@ +package org.apache.spark.graphx.lib + +import org.apache.spark.graphx._ +import org.apache.spark._ +import scala.math._ +import org.apache.spark.SparkContext._ +import scala.reflect.ClassTag + +object KCore extends Logging { + /** + * Compute the k-core decomposition of the graph for all k <= kmax. This + * uses the iterative pruning algorithm discussed by Alvarez-Hamelin et al. + * in K-Core Decomposition: a Tool For the Visualization of Large Scale Networks + * (see http://arxiv.org/abs/cs/0504107). + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * + * @param graph the graph for which to compute the connected components + * @param kmax the maximum value of k to decompose the graph + * + * @return a graph where the vertex attribute is the minimum of + * kmax or the highest value k for which that vertex was a member of + * the k-core. + * + * @note This method has the advantage of returning not just a single kcore of the + * graph but will yield all the cores for all k in [1, kmax]. + */ + + def run[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], + kmax: Int, + kmin: Int = 1) + : Graph[Int, ED] = { + + // Graph[(Int, Boolean), ED] - boolean indicates whether it is active or not + var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => (newData.getOrElse(0), true)).cache + val degrees = graph.degrees + val numVertices = degrees.count + // logWarning(s"Numvertices: $numVertices") + // logWarning(s"degree sample: ${degrees.take(10).mkString(", ")}") + // logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", ")) + // logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).take(10).mkString(", ")) + var curK = kmin + while (curK <= kmax) { + g = computeCurrentKCore(g, curK).cache + val testK = curK + val vCount = g.vertices.filter{ case (vid, (vd, _)) => vd >= testK}.count() + val eCount = g.triplets.map{t => t.srcAttr._1 >= testK && t.dstAttr._1 >= testK }.count() + logWarning(s"K=$curK, V=$vCount, E=$eCount") + curK += 1 + } + g.mapVertices({ case (_, (k, _)) => k}) + } + + def computeCurrentKCore[ED: ClassTag](graph: Graph[(Int, Boolean), ED], k: Int) = { + logWarning(s"Computing kcore for k=$k") + def sendMsg(et: EdgeTriplet[(Int, Boolean), ED]): Iterator[(VertexId, (Int, Boolean))] = { + if (!et.srcAttr._2 || !et.dstAttr._2) { + // if either vertex has already been turned off we do nothing + Iterator.empty + } else if (et.srcAttr._1 < k && et.dstAttr._1 < k) { + // tell both vertices to turn off but don't need change count value + Iterator((et.srcId, (0, false)), (et.dstId, (0, false))) + } else if (et.srcAttr._1 < k) { + // if src is being pruned, tell dst to subtract from vertex count but don't turn off + Iterator((et.srcId, (0, false)), (et.dstId, (1, true))) + } else if (et.dstAttr._1 < k) { + // if dst is being pruned, tell src to subtract from vertex count but don't turn off + Iterator((et.dstId, (0, false)), (et.srcId, (1, true))) + } else { + // no-op but keep these vertices active? + // Iterator((et.srcId, (0, true)), (et.dstId, (0, true))) + Iterator.empty + } + } + + // subtracts removed neighbors from neighbor count and tells vertex whether it was turned off or not + def mergeMsg(m1: (Int, Boolean), m2: (Int, Boolean)): (Int, Boolean) = { + (m1._1 + m2._1, m1._2 && m2._2) + } + + def vProg(vid: VertexId, data: (Int, Boolean), update: (Int, Boolean)): (Int, Boolean) = { + var newCount = data._1 + var on = data._2 + if (on) { + newCount = max(k - 1, data._1 - update._1) + on = update._2 + } + (newCount, on) + } + + // Note that initial message should have no effect + Pregel(graph, (0, true))(vProg, sendMsg, mergeMsg) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index 7c396e6e66a28..d4e5aef59b882 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -18,6 +18,7 @@ package org.apache.spark.graphx.lib import scala.reflect.ClassTag +import org.apache.spark.Logging import org.apache.spark.graphx._ @@ -36,10 +37,12 @@ import org.apache.spark.graphx._ * (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned * using [[org.apache.spark.graphx.Graph#partitionBy]]. */ -object TriangleCount { +object TriangleCount extends Logging { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = { // Remove redundant edges + + logWarning("Entering Triangle Count.") val g = graph.groupEdges((a, b) => a).cache() // Construct set representations of the neighborhoods @@ -56,6 +59,8 @@ object TriangleCount { } set } + + logWarning("Neighbor sets collected.") // join the sets with the graph val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) { (vid, _, optSet) => optSet.getOrElse(null) @@ -82,12 +87,14 @@ object TriangleCount { // compute the intersection along edges val counters: VertexRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _) // Merge counters with the graph and divide by two since each triangle is counted twice - g.outerJoinVertices(counters) { + val result = g.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) => val dblCount = optCounter.getOrElse(0) // double count should be even (divisible by two) assert((dblCount & 1) == 0) dblCount / 2 } + logWarning("Triangle count finished.") + result } // end of TriangleCount } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/KCoreSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/KCoreSuite.scala new file mode 100644 index 0000000000000..3d96a7359976a --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/KCoreSuite.scala @@ -0,0 +1,44 @@ +package org.apache.spark.graphx.lib + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +// import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + + +class KCoreSuite extends FunSuite with LocalSparkContext { + + def createTriple(sid: VertexId, did: VertexId, sattr: Int, dattr: Int, eattr: Int): EdgeTriplet[Int,Int] = { + val et = new EdgeTriplet[Int,Int] + et.srcId = sid + et.dstId = did + et.srcAttr = sattr + et.dstAttr = dattr + et.attr = eattr + et + } + + def createKCoreEdges(): Seq[Edge[Int]] = { + Seq(Edge(11,31), Edge(12,31), Edge(31,33), Edge(31,32), Edge(31,34), Edge(33,34), + Edge(33,32), Edge(34,32), Edge(32,13), Edge(32,23), Edge(34,23), Edge(23,14), + Edge(34,21), Edge(34,22), Edge(21,22)) + } + + test("KCore") { + withSpark { sc => + val rawEdges = createKCoreEdges() + val vertices = Set((11, 1), (12,1), (13,1), (14,1), (21,2), (22,2), (23,2), (31, 3), (32,3), (33,3), (34,3)) + val graph = Graph.fromEdges(sc.parallelize(rawEdges), "a") + val resultGraph = KCore.run(graph, 5) + val resultVerts = resultGraph.vertices.collect.toSet + assert(resultVerts === vertices) + + } + } + + + +}