Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of partitions #7104

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait PartitionStrategy extends Serializable {
object PartitionStrategy {
/**
* Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
* guaranteeing a `2 * sqrt(numParts) - 1` bound on vertex replication.
* guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I'm changing the bounds given in the doc since they were never correct for non perfect squares.

*
* Suppose we have a graph with 12 vertices that we want to partition
* over 9 machines. We can use the following sparse matrix representation:
Expand Down Expand Up @@ -61,26 +61,36 @@ object PartitionStrategy {
* that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3,
* P6)` or the last
* row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be
* replicated to at most `2 * sqrt(numParts) - 1` machines.
* replicated to at most `2 * sqrt(numParts)` machines.
*
* Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
* balance. To improve balance we first multiply each vertex id by a large prime to shuffle the
* vertex locations.
*
* One of the limitations of this approach is that the number of machines must either be a
* perfect square. We partially address this limitation by computing the machine assignment to
* the next
* largest perfect square and then mapping back down to the actual number of machines.
* Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect
* square is used.
* When the number of partitions requested is not a perfect square we use a slightly different
* method where the last column can have a different number of rows than the others while still
* maintaining the same size per block.
*/
case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
val mixingPrime: VertexId = 1125899906842597L
val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
// Use old method for perfect squared to ensure we get same results
val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts

} else {
// Otherwise use new method
val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
val lastColRows = numParts - rows * (cols - 1)
val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
col * rows + row

}
}
}

Expand Down