diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index 8ed561de61226..8873be93fdbcb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -133,12 +133,14 @@ object Analytics extends Logging { case "kcore" => var numEPart = 4 - var kmax = 3 + var kmax = 4 + var kmin = 1 var partitionStrategy: Option[PartitionStrategy] = None options.foreach{ case ("numEPart", v) => numEPart = v.toInt case ("kmax", v) => kmax = v.toInt + case ("kmin", v) => kmin = v.toInt case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } @@ -147,6 +149,11 @@ object Analytics extends Logging { logWarning("kmax must be positive") sys.exit(1) } + if (kmax < kmin) { + logWarning("kmax must be greater than or equal to kmin") + sys.exit(1) + } + println("======================================") println("| KCORE |") println("======================================") @@ -156,7 +163,7 @@ object Analytics extends Logging { minEdgePartitions = numEPart).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) - val result = KCore.run(graph, kmax) + val result = KCore.run(graph, kmax, kmin) println("Size of cores: " + result.vertices.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", ")) sc.stop() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Kcore.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/KCore.scala similarity index 93% rename from graphx/src/main/scala/org/apache/spark/graphx/lib/Kcore.scala rename to graphx/src/main/scala/org/apache/spark/graphx/lib/KCore.scala index 0d29992d0a34e..73502d2db3313 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Kcore.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/KCore.scala @@ -3,6 +3,7 @@ package org.apache.spark.graphx.lib import org.apache.spark.graphx._ import org.apache.spark._ import scala.math._ +import org.apache.spark.SparkContext._ import scala.reflect.ClassTag object KCore extends Logging { @@ -28,12 +29,15 @@ object KCore extends Logging { def run[VD: ClassTag, ED: ClassTag]( graph: Graph[VD, ED], - kmax: Int) + kmax: Int, + kmin: Int = 1) : Graph[Int, ED] = { // Graph[(Int, Boolean), ED] - boolean indicates whether it is active or not var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => (newData.getOrElse(0), true)).cache - var curK = 1 + var degrees = graph.degrees + println("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", ")) + var curK = kmin while (curK <= kmax) { g = computeCurrentKCore(g, curK).cache val testK = curK diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/KcoreSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/KCoreSuite.scala similarity index 100% rename from graphx/src/test/scala/org/apache/spark/graphx/lib/KcoreSuite.scala rename to graphx/src/test/scala/org/apache/spark/graphx/lib/KCoreSuite.scala