diff --git a/conf/core-site.xml b/conf/core-site.xml index d52ec9d921984..8917ebab2caf7 100644 --- a/conf/core-site.xml +++ b/conf/core-site.xml @@ -12,7 +12,7 @@ <property> <name>fs.default.name</name> - <value>hdfs://ec2-184-72-130-69.compute-1.amazonaws.com:9000</value> + <value>hdfs://ec2-54-80-197-211.compute-1.amazonaws.com:9000</value> </property> <property> diff --git a/conf/slaves b/conf/slaves index cb1ffa832a628..ff41e88b997c4 100644 --- a/conf/slaves +++ b/conf/slaves @@ -1,16 +1,16 @@ -ec2-54-80-151-251.compute-1.amazonaws.com -ec2-54-242-226-224.compute-1.amazonaws.com -ec2-54-205-35-26.compute-1.amazonaws.com -ec2-184-72-164-33.compute-1.amazonaws.com -ec2-107-20-33-174.compute-1.amazonaws.com -ec2-54-80-2-210.compute-1.amazonaws.com -ec2-50-17-77-137.compute-1.amazonaws.com -ec2-174-129-164-255.compute-1.amazonaws.com -ec2-23-20-81-32.compute-1.amazonaws.com -ec2-54-80-236-6.compute-1.amazonaws.com -ec2-54-226-129-134.compute-1.amazonaws.com -ec2-54-221-94-96.compute-1.amazonaws.com -ec2-23-22-81-129.compute-1.amazonaws.com -ec2-23-23-43-146.compute-1.amazonaws.com -ec2-54-196-107-67.compute-1.amazonaws.com -ec2-23-20-48-31.compute-1.amazonaws.com +ec2-54-81-55-13.compute-1.amazonaws.com +ec2-54-80-42-122.compute-1.amazonaws.com +ec2-107-20-119-33.compute-1.amazonaws.com +ec2-54-224-98-126.compute-1.amazonaws.com +ec2-54-80-27-215.compute-1.amazonaws.com +ec2-54-221-106-106.compute-1.amazonaws.com +ec2-23-21-15-4.compute-1.amazonaws.com +ec2-50-16-37-65.compute-1.amazonaws.com +ec2-54-80-199-27.compute-1.amazonaws.com +ec2-54-227-141-97.compute-1.amazonaws.com +ec2-54-80-200-145.compute-1.amazonaws.com +ec2-23-22-155-180.compute-1.amazonaws.com +ec2-54-197-154-251.compute-1.amazonaws.com +ec2-54-227-50-5.compute-1.amazonaws.com +ec2-54-197-117-246.compute-1.amazonaws.com +ec2-54-196-135-53.compute-1.amazonaws.com diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala index 3c9145c02f3f8..21ed03a63ad7c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -52,8 +52,9 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[LongWritable]) kryo.register(classOf[Text]) kryo.register(classOf[WikiArticle]) - kryo.register(classOf[JHashSet[VertexId]]) + // kryo.register(classOf[JHashSet[VertexId]]) kryo.register(classOf[JTreeSet[VertexId]]) + kryo.register(classOf[TrackCounts]) // kryo.register(classOf[MakeString]) // kryo.register(classOf[PrePostProcessWikipedia]) // kryo.register(classOf[(LongWritable, Text)]) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala b/graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala index 8b59e5619a739..ce71ec1ca3020 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala @@ -14,6 +14,43 @@ import org.apache.spark.Logging import java.util.{HashSet => JHashSet, TreeSet => JTreeSet} // import org.apache.spark.graphx.MakeString +class TrackCounts extends Serializable { + + var red: Long = 0 + var stub: Long = 0 + var disambig: Long = 0 + var notFound: Long = 0 + var titleNull: Long = 0 + var relevant: Long = 0 + var total: Long = 0 + + def update(o: TrackCounts) { + red += o.red + stub += o.stub + disambig += o.disambig + notFound += o.notFound + titleNull += o.titleNull + relevant += o.relevant + total += o.total + } + + def addArticle(art: WikiArticle) { + if (art.redirect) red += 1 + if (art.stub) stub += 1 + if (art.disambig) disambig += 1 + if (art.title == WikiArticle.notFoundString) notFound += 1 + if (art.title == null) titleNull += 1 + if (art.relevant) relevant += 1 + total += 1 + } + + override def toString: String = { + s"Redirects: $red, Stubs: $stub, Disambig: $disambig, Not Found: $notFound, Null: $titleNull, RELEVANT: $relevant, TOTAL: $total" + + } + +} + object PrePostProcessWikipedia extends Logging { @@ -104,19 +141,39 @@ object PrePostProcessWikipedia extends Logging { logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.") val allArtsRDD = xmlRDD.map { raw => new WikiArticle(raw) }.cache - val numRedirects = allArtsRDD.filter { art => art.redirect }.count - val numStubs = allArtsRDD.filter { art => art.stub }.count - val numDisambig = allArtsRDD.filter { art => art.disambig }.count - val numTitleNotFound = allArtsRDD.filter { art => art.title == WikiArticle.notFoundString }.count - logWarning(s"Filter results:\tRedirects: $numRedirects \tStubs: $numStubs \tDisambiguations: $numDisambig \t Title not found: $numTitleNotFound") + // val numRedirects = allArtsRDD.filter { art => art.redirect }.count + // val numStubs = allArtsRDD.filter { art => art.stub }.count + // val numDisambig = allArtsRDD.filter { art => art.disambig }.count + // val numTitleNotFound = allArtsRDD.filter { art => art.title == WikiArticle.notFoundString }.count + // logWarning(s"Filter results:\tRedirects: $numRedirects \tStubs: $numStubs \tDisambiguations: $numDisambig \t Title not found: $numTitleNotFound") - val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128) - logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles.") + val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128) + wikiRDD.repartition(128) + // val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128) + val wikiRDDCount = wikiRDD.count + logWarning(s"wikiRDD counted. Found ${wikiRDDCount} relevant articles.") + // logWarning("Counting differently") + + // count: redirects, stubs, disambigs, titlenotfound, titlenull, relevant, total +// val zeroCount = new TrackCounts +// val countSeqOp = (curCount: TrackCounts, art: WikiArticle) => { +// curCount.addArticle(art) +// curCount +// } +// val countCombOp = (c1: TrackCounts, c2: TrackCounts) => { +// c1.update(c2) +// c1 +// } +// +// val cr = allArtsRDD.aggregate(zeroCount)(countSeqOp, countCombOp) +// logWarning(s"Different count results: $cr") +// System.exit(0) + val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) } val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } logWarning("creating graph") val g = Graph(vertices, edges) - val cleanG = g.subgraph(x => true, (vid, vd) => vd != null) + val cleanG = g.subgraph(x => true, (vid, vd) => vd != null).cache logWarning(s"DIRTY graph has ${g.triplets.count()} EDGES, ${g.vertices.count()} VERTICES") logWarning(s"CLEAN graph has ${cleanG.triplets.count()} EDGES, ${cleanG.vertices.count()} VERTICES") val resultG = pagerankConnComponentsAlt(numIters, cleanG) @@ -134,12 +191,13 @@ object PrePostProcessWikipedia extends Logging { var currentGraph = g logWarning("starting iterations") for (i <- 0 to numRepetitions) { + currentGraph.cache val startTime = System.currentTimeMillis logWarning("starting pagerank") - val pr = PageRank.run(currentGraph, 20) + val pr = PageRank.run(currentGraph, 20).cache pr.vertices.count logWarning("Pagerank completed") - val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}) + val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}).cache prAndTitle.vertices.count logWarning("join completed.") val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2)) @@ -149,8 +207,8 @@ object PrePostProcessWikipedia extends Logging { val filterTop20 = {(v: VertexId, d: String) => !top20verts.contains(v) } - val newGraph = currentGraph.subgraph(x => true, filterTop20) - val ccGraph = ConnectedComponents.run(newGraph) + val newGraph = currentGraph.subgraph(x => true, filterTop20).cache + val ccGraph = ConnectedComponents.run(newGraph).cache // val zeroVal = new mutable.HashSet[VertexId]() // val seqOp = (s: mutable.HashSet[VertexId], vtuple: (VertexId, VertexId)) => { // s.add(vtuple._2) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 81f27d0a2b86c..10f4b380bb7a9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} import org.apache.spark.util.collection.PrimitiveVector -import org.apache.spark.{HashPartitioner, Partitioner} +import org.apache.spark.{HashPartitioner, Partitioner, Logging} import org.apache.spark.SparkContext._ import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl._ @@ -29,6 +29,7 @@ import org.apache.spark.graphx.util.BytecodeUtils import org.apache.spark.rdd.{ShuffledRDD, RDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.ClosureCleaner +import java.util.NoSuchElementException /** @@ -47,7 +48,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( @transient val edges: EdgeRDD[ED], @transient val routingTable: RoutingTable, @transient val replicatedVertexView: ReplicatedVertexView[VD]) - extends Graph[VD, ED] with Serializable { + extends Graph[VD, ED] with Serializable with Logging { /** Default constructor is provided to support serialization */ protected def this() = this(null, null, null, null) @@ -216,50 +217,55 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( // Map and combine. val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) => - val (ePid, edgePartition) = ePartIter.next() - val (vPid, vPart) = vPartIter.next() - assert(!vPartIter.hasNext) - assert(ePid == vPid) - // Choose scan method - val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat - val edgeIter = activeDirectionOpt match { - case Some(EdgeDirection.Both) => - if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) - .filter(e => vPart.isActive(e.dstId)) - } else { - edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) + if (ePartIter.hasNext) { + val (ePid, edgePartition) = ePartIter.next() + val (vPid, vPart) = vPartIter.next() + assert(!vPartIter.hasNext) + assert(ePid == vPid) + // Choose scan method + val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat + val edgeIter = activeDirectionOpt match { + case Some(EdgeDirection.Both) => + if (activeFraction < 0.8) { + edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) + .filter(e => vPart.isActive(e.dstId)) + } else { + edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) + } + case Some(EdgeDirection.Either) => + // TODO: Because we only have a clustered index on the source vertex ID, we can't filter + // the index here. Instead we have to scan all edges and then do the filter. + edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId)) + case Some(EdgeDirection.Out) => + if (activeFraction < 0.8) { + edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) + } else { + edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) + } + case Some(EdgeDirection.In) => + edgePartition.iterator.filter(e => vPart.isActive(e.dstId)) + case _ => // None + edgePartition.iterator + } + + // Scan edges and run the map function + val et = new EdgeTriplet[VD, ED] + val mapOutputs = edgeIter.flatMap { e => + et.set(e) + if (mapUsesSrcAttr) { + et.srcAttr = vPart(e.srcId) } - case Some(EdgeDirection.Either) => - // TODO: Because we only have a clustered index on the source vertex ID, we can't filter - // the index here. Instead we have to scan all edges and then do the filter. - edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId)) - case Some(EdgeDirection.Out) => - if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) - } else { - edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) + if (mapUsesDstAttr) { + et.dstAttr = vPart(e.dstId) } - case Some(EdgeDirection.In) => - edgePartition.iterator.filter(e => vPart.isActive(e.dstId)) - case _ => // None - edgePartition.iterator - } - - // Scan edges and run the map function - val et = new EdgeTriplet[VD, ED] - val mapOutputs = edgeIter.flatMap { e => - et.set(e) - if (mapUsesSrcAttr) { - et.srcAttr = vPart(e.srcId) - } - if (mapUsesDstAttr) { - et.dstAttr = vPart(e.dstId) + mapFunc(et) } - mapFunc(et) + // Note: This doesn't allow users to send messages to arbitrary vertices. + vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator + } else { + logError("preAgg in mapReduceTriplets tried to iterate over empty partition.") + Iterator.empty } - // Note: This doesn't allow users to send messages to arbitrary vertices. - vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator } // do the final reduction reusing the index map