Skip to content

Commit

Permalink
Changed kcore filenames.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Apr 29, 2014
1 parent a641dc1 commit 1e924df
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,14 @@ object Analytics extends Logging {

case "kcore" =>
var numEPart = 4
var kmax = 3
var kmax = 4
var kmin = 1
var partitionStrategy: Option[PartitionStrategy] = None

options.foreach{
case ("numEPart", v) => numEPart = v.toInt
case ("kmax", v) => kmax = v.toInt
case ("kmin", v) => kmin = v.toInt
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
Expand All @@ -147,6 +149,11 @@ object Analytics extends Logging {
logWarning("kmax must be positive")
sys.exit(1)
}
if (kmax < kmin) {
logWarning("kmax must be greater than or equal to kmin")
sys.exit(1)
}

println("======================================")
println("| KCORE |")
println("======================================")
Expand All @@ -156,7 +163,7 @@ object Analytics extends Logging {
minEdgePartitions = numEPart).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

val result = KCore.run(graph, kmax)
val result = KCore.run(graph, kmax, kmin)
println("Size of cores: " + result.vertices.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
sc.stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.apache.spark.graphx.lib
import org.apache.spark.graphx._
import org.apache.spark._
import scala.math._
import org.apache.spark.SparkContext._
import scala.reflect.ClassTag

object KCore extends Logging {
Expand All @@ -28,12 +29,15 @@ object KCore extends Logging {

def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
kmax: Int)
kmax: Int,
kmin: Int = 1)
: Graph[Int, ED] = {

// Graph[(Int, Boolean), ED] - boolean indicates whether it is active or not
var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => (newData.getOrElse(0), true)).cache
var curK = 1
var degrees = graph.degrees
println("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
var curK = kmin
while (curK <= kmax) {
g = computeCurrentKCore(g, curK).cache
val testK = curK
Expand Down

0 comments on commit 1e924df

Please sign in to comment.