Skip to content

Commit

Permalink
[SPARK-4142][GraphX] Default numEdgePartitions
Browse files Browse the repository at this point in the history
Changing the default number of edge partitions to match spark parallelism.

Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>

Closes apache#3006 from jegonzal/default_partitions and squashes the following commits:

a9a5c4f [Joseph E. Gonzalez] Changing the default number of edge partitions to match spark parallelism
  • Loading branch information
jegonzal authored and rxin committed Nov 1, 2014
1 parent 680fd87 commit f4e0b28
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object Analytics extends Logging {
val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))

val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
Expand Down Expand Up @@ -110,7 +110,7 @@ object Analytics extends Logging {

val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
Expand All @@ -131,7 +131,7 @@ object Analytics extends Logging {
val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
val graph = GraphLoader.edgeListFile(sc, fname,
canonicalOrientation = true,
minEdgePartitions = numEPart,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
// TriangleCount requires the graph to be partitioned
Expand Down
12 changes: 9 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,29 @@ object GraphLoader extends Logging {
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param minEdgePartitions the number of partitions for the edge RDD
* @param numEdgePartitions the number of partitions for the edge RDD
* Setting this value to -1 will use the default parallelism.
* @param edgeStorageLevel the desired storage level for the edge partitions
* @param vertexStorageLevel the desired storage level for the vertex partitions
*/
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1,
numEdgePartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] =
{
val startTime = System.currentTimeMillis

// Parse the edge data table directly into edge partitions
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
val lines =
if (numEdgePartitions > 0) {
sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
} else {
sc.textFile(path)
}
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
Expand Down

0 comments on commit f4e0b28

Please sign in to comment.