Skip to content

Commit

Permalink
Merge pull request #942 from Gscim/branch-3.1.0
Browse files Browse the repository at this point in the history
Branch 3.1.0
  • Loading branch information
ouyangwen-it authored May 6, 2020
2 parents 345f5ea + 244af8a commit fda43d2
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object LPAExample {
.getOrElse(throw new Exception("checkpoint dir not provided"))
sc.setCheckpointDir(cpDir)

val maxIter = params.getOrElse("maxIter", "10").toInt

val sep = params.getOrElse("sep", "space") match {
case "space" => " "
case "comma" => ","
Expand All @@ -59,6 +61,7 @@ object LPAExample {
.setSrcNodeIdCol("src")
.setDstNodeIdCol("dst")
.setUseBalancePartition(useBalancePartition)
.setMaxIter(maxIter)

val df = GraphIO.load(input, isWeighted = false, srcIndex, dstIndex, sep = sep)
val mapping = lpa.transform(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.tencent.angel.spark.context.PSContext
import com.tencent.angel.graph.params._
import org.apache.spark.SparkContext
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.{IntParam, ParamMap}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
Expand All @@ -29,11 +29,13 @@ import org.apache.spark.storage.StorageLevel
class LPA(override val uid: String) extends Transformer
with HasSrcNodeIdCol with HasDstNodeIdCol with HasOutputNodeIdCol with HasOutputCoreIdCol
with HasStorageLevel with HasPartitionNum with HasPSPartitionNum with HasUseBalancePartition {



final val maxIter = new IntParam(this, "maxIter", "maxIter")
final def setMaxIter(numIters: Int): this.type = set(maxIter, numIters)
setDefault(maxIter, 10)

def this() = this(Identifiable.randomUID("LPA"))



override def transform(dataset: Dataset[_]): DataFrame = {
val edges = dataset.select($(srcNodeIdCol), $(dstNodeIdCol)).rdd
.filter(row => !row.anyNull)
Expand Down Expand Up @@ -68,7 +70,7 @@ class LPA(override val uid: String) extends Transformer
var curIteration = 0
var numMsgs = model.numMsgs()
var prev = graph

val maxIterNum = $(maxIter)
println(s"numMsgs = $numMsgs")

do {
Expand All @@ -81,7 +83,7 @@ class LPA(override val uid: String) extends Transformer
model.resetMsgs()
numMsgs = model.numMsgs()
println(s"curIteration=$curIteration numMsgs=$numMsgs")
} while (curIteration < 10)
} while (curIteration < maxIterNum)

val retRDD = graph.map(_.save).flatMap(f => f._1.zip(f._2))
.sortBy(_._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import org.apache.spark.storage.StorageLevel


class WCC(override val uid: String) extends Transformer
with HasWeightCol with HasSrcNodeIdCol with HasDstNodeIdCol
with HasOutputNodeIdCol with HasOutputCoreIdCol with HasBalancePartitionPercent
with HasIsWeighted with HasPartitionNum with HasPSPartitionNum
with HasStorageLevel with HasBatchSize with HasPullBatchSize
with HasBufferSize with HasUseBalancePartition {
with HasWeightCol with HasSrcNodeIdCol with HasDstNodeIdCol
with HasOutputNodeIdCol with HasOutputCoreIdCol with HasBalancePartitionPercent
with HasIsWeighted with HasPartitionNum with HasPSPartitionNum
with HasStorageLevel with HasBatchSize with HasPullBatchSize
with HasBufferSize with HasUseBalancePartition {

def this() = this(Identifiable.randomUID("WCC"))

override def transform(dataset: Dataset[_]): DataFrame = {
// read edges
val edges = dataset.select($(srcNodeIdCol), $(dstNodeIdCol)).rdd
Expand All @@ -43,56 +43,62 @@ class WCC(override val uid: String) extends Transformer
.filter(e => e._1 != e._2)

edges.persist(StorageLevel.DISK_ONLY)

val maxId = edges.map(e => math.max(e._1, e._2)).max() + 1
val minId = edges.map(e => math.min(e._1, e._2)).min()
val nodes = edges.flatMap(e => Iterator(e._1, e._2))
val numEdges = edges.count()

println(s"minId=$minId maxId=$maxId numEdges=$numEdges level=${$(storageLevel)}")

// Start PS and init the model
println("start to run ps")
PSContext.getOrCreate(SparkContext.getOrCreate())

val model = WCCPSModel.fromMinMax(minId, maxId, nodes, $(psPartitionNum), $(useBalancePartition), $(balancePartitionPercent))

// make un-directed graph, for wcc
var graph = edges.flatMap { case (srcId, dstId) => Iterator((srcId, dstId), (dstId, srcId)) }
.groupByKey($(partitionNum))
.mapPartitionsWithIndex((index, adjTable) => Iterator(WCCGraphPartition.apply(index, adjTable)))
.mapPartitionsWithIndex((index, adjTable) => Iterator((0, WCCGraphPartition.apply(index, adjTable))))
graph.persist($(storageLevel))
graph.foreachPartition(_ => Unit)
graph.foreach(_.initMsgs(model))
graph.foreach(_._2.initMsgs(model))

var numMsgs = model.numMsgs()
var curIteration = 0
var prev = graph
println(s"numMsgs=$numMsgs")

// each node change its label into the min id of its neighbors (including itself).
var changedCnt = 0
var changedCnt = 0
do {
curIteration += 1
changedCnt = 0
changedCnt = graph.map(_.process(model, numMsgs, curIteration == 1)).reduce((n1, n2) => n1 + n2)
changedCnt = 0
graph = prev.map(_._2.process(model, numMsgs, curIteration == 1))
graph.persist($(storageLevel))
graph.count()
changedCnt = graph.map(_._1).reduce((n1, n2) => n1 + n2)
prev.unpersist(true)
prev = graph
model.resetMsgs()

println(s"curIteration=$curIteration numMsgs=$changedCnt")
} while (changedCnt > 0)

val retRDD = graph.map(_.save()).flatMap(f => f._1.zip(f._2))
val retRDD = graph.map(_._2.save()).flatMap(f => f._1.zip(f._2))
.map(f => Row.fromSeq(Seq[Any](f._1, f._2)))
dataset.sparkSession.createDataFrame(retRDD, transformSchema(dataset.schema))
}

override def transformSchema(schema: StructType): StructType = {
StructType(Seq(
StructField(s"${$(outputNodeIdCol)}", LongType, nullable = false),
StructField(s"${$(outputCoreIdCol)}", LongType, nullable = false)
))
}

override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,32 @@ class WCCGraphPartition(index: Int,
val msgs = VFactory.sparseLongKeyLongVector(model.dim)
for (i <- keys.indices){
msgs.set(keys(i), keys(i))
keyLabels(i) = keys(i)
}
model.initMsgs(msgs)
msgs.size().toInt
}

// if label of node is larger than its neighbors',
// change it into min among its neighbors' labels
def process(model: WCCPSModel, numMsgs: Long, isFirstIteration: Boolean): Int = {
def process(model: WCCPSModel, numMsgs: Long, isFirstIteration: Boolean): (Int, WCCGraphPartition) = {
var changedNum = 0
if (numMsgs > indices.length || isFirstIteration) {
val inMsgs = model.readMsgs(indices)
val outMsgs = VFactory.sparseLongKeyLongVector(inMsgs.dim())

for (idx <- keys.indices) {
keyLabels(idx) = inMsgs.get(keys(idx))
val newLabel = minNbrLabel(idx, inMsgs)
if (newLabel < keyLabels(idx)) {
keyLabels(idx) = newLabel
outMsgs.set(keys(idx), newLabel)
changedNum += 1
}
outMsgs.set(keys(idx), newLabel)
}
model.writeMsgs(outMsgs)
changedNum
(changedNum, new WCCGraphPartition(index, keys, indptr, neighbors, keyLabels, indices))

}
else {
val inMsgs = model.readAllMsgs()
Expand All @@ -64,20 +66,20 @@ class WCCGraphPartition(index: Int,
val newLabel = minNbrLabel(idx, inMsgs)
if (newLabel < keyLabels(idx)) {
keyLabels(idx) = newLabel
outMsgs.set(keys(idx), newLabel)
changedNum += 1
}
outMsgs.set(keys(idx), newLabel)
}

model.writeMsgs(outMsgs)
changedNum
(changedNum, new WCCGraphPartition(index, keys, indptr, neighbors, keyLabels, indices))
}
}

def save(): (Array[Long], Array[Long]) = {
(keys, keyLabels)
}

def minNbrLabel(idx: Int, inMsgs: LongLongVector): Long = {
var j = indptr(idx)
var minLabel = keyLabels(idx)
Expand All @@ -89,7 +91,7 @@ class WCCGraphPartition(index: Int,
j += 1
}
minLabel

}
}

Expand All @@ -98,7 +100,7 @@ object WCCGraphPartition {
val indptr = new IntArrayList()
val keys = new LongArrayList()
val neighbors = new LongArrayList()

indptr.add(0)

while (iterator.hasNext) {
Expand All @@ -107,7 +109,7 @@ object WCCGraphPartition {
ns.toArray.distinct.foreach(n => neighbors.add(n))
indptr.add(neighbors.size())
}

val keysArray = keys.toLongArray()
val neighborsArray = neighbors.toLongArray()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,48 @@ import com.tencent.angel.spark.util.VectorUtils
import org.apache.spark.rdd.RDD

class WCCPSModel(var inMsgs: PSVector,
var outMsgs: PSVector) extends Serializable {
var outMsgs: PSVector) extends Serializable {
val dim: Long = inMsgs.dimension

def initMsgs(msgs: Vector): Unit = {
inMsgs.update(msgs)
}

def readMsgs(nodes: Array[Long]): LongLongVector = {
inMsgs.pull(nodes).asInstanceOf[LongLongVector]
}

def readAllMsgs(): LongLongVector = {
inMsgs.pull().asInstanceOf[LongLongVector]
}

def writeMsgs(msgs: Vector): Unit = {
inMsgs.update(msgs)
outMsgs.update(msgs)
}

def numMsgs(): Long = {
VectorUtils.nnz(inMsgs)
}


def resetMsgs(): Unit = {
val temp = inMsgs
inMsgs = outMsgs
outMsgs = temp
outMsgs.reset
}
}

object WCCPSModel {
def fromMinMax(minId: Long, maxId: Long, data: RDD[Long], psNumPartition: Int,
useBalancePartition: Boolean, balancePartitionPercent: Float): WCCPSModel = {
useBalancePartition: Boolean, balancePartitionPercent: Float): WCCPSModel = {
val matrix = new MatrixContext("labels", 2, minId, maxId)
matrix.setValidIndexNum(-1)
matrix.setRowType(RowType.T_LONG_SPARSE_LONGKEY)

if (useBalancePartition) {
LoadBalancePartitioner.partition(data, maxId, psNumPartition, matrix, balancePartitionPercent)
}

PSAgentContext.get().getMasterClient.createMatrix(matrix, 10000L)
val matrixId = PSAgentContext.get().getMasterClient.getMatrix("labels").getId
new WCCPSModel(new PSVectorImpl(matrixId, 0, maxId, matrix.getRowType),
Expand Down

0 comments on commit fda43d2

Please sign in to comment.