Skip to content

Commit

Permalink
more debugging. Seems like we have empty partitions.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Feb 26, 2014
1 parent b788bfb commit fad630f
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 72 deletions.
2 changes: 1 addition & 1 deletion conf/core-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<property>
<name>fs.default.name</name>
<value>hdfs://ec2-184-72-130-69.compute-1.amazonaws.com:9000</value>
<value>hdfs://ec2-54-80-197-211.compute-1.amazonaws.com:9000</value>
</property>

<property>
Expand Down
32 changes: 16 additions & 16 deletions conf/slaves
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
ec2-54-80-151-251.compute-1.amazonaws.com
ec2-54-242-226-224.compute-1.amazonaws.com
ec2-54-205-35-26.compute-1.amazonaws.com
ec2-184-72-164-33.compute-1.amazonaws.com
ec2-107-20-33-174.compute-1.amazonaws.com
ec2-54-80-2-210.compute-1.amazonaws.com
ec2-50-17-77-137.compute-1.amazonaws.com
ec2-174-129-164-255.compute-1.amazonaws.com
ec2-23-20-81-32.compute-1.amazonaws.com
ec2-54-80-236-6.compute-1.amazonaws.com
ec2-54-226-129-134.compute-1.amazonaws.com
ec2-54-221-94-96.compute-1.amazonaws.com
ec2-23-22-81-129.compute-1.amazonaws.com
ec2-23-23-43-146.compute-1.amazonaws.com
ec2-54-196-107-67.compute-1.amazonaws.com
ec2-23-20-48-31.compute-1.amazonaws.com
ec2-54-81-55-13.compute-1.amazonaws.com
ec2-54-80-42-122.compute-1.amazonaws.com
ec2-107-20-119-33.compute-1.amazonaws.com
ec2-54-224-98-126.compute-1.amazonaws.com
ec2-54-80-27-215.compute-1.amazonaws.com
ec2-54-221-106-106.compute-1.amazonaws.com
ec2-23-21-15-4.compute-1.amazonaws.com
ec2-50-16-37-65.compute-1.amazonaws.com
ec2-54-80-199-27.compute-1.amazonaws.com
ec2-54-227-141-97.compute-1.amazonaws.com
ec2-54-80-200-145.compute-1.amazonaws.com
ec2-23-22-155-180.compute-1.amazonaws.com
ec2-54-197-154-251.compute-1.amazonaws.com
ec2-54-227-50-5.compute-1.amazonaws.com
ec2-54-197-117-246.compute-1.amazonaws.com
ec2-54-196-135-53.compute-1.amazonaws.com
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[LongWritable])
kryo.register(classOf[Text])
kryo.register(classOf[WikiArticle])
kryo.register(classOf[JHashSet[VertexId]])
// kryo.register(classOf[JHashSet[VertexId]])
kryo.register(classOf[JTreeSet[VertexId]])
kryo.register(classOf[TrackCounts])
// kryo.register(classOf[MakeString])
// kryo.register(classOf[PrePostProcessWikipedia])
// kryo.register(classOf[(LongWritable, Text)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,43 @@ import org.apache.spark.Logging
import java.util.{HashSet => JHashSet, TreeSet => JTreeSet}
// import org.apache.spark.graphx.MakeString

class TrackCounts extends Serializable {

var red: Long = 0
var stub: Long = 0
var disambig: Long = 0
var notFound: Long = 0
var titleNull: Long = 0
var relevant: Long = 0
var total: Long = 0

def update(o: TrackCounts) {
red += o.red
stub += o.stub
disambig += o.disambig
notFound += o.notFound
titleNull += o.titleNull
relevant += o.relevant
total += o.total
}

def addArticle(art: WikiArticle) {
if (art.redirect) red += 1
if (art.stub) stub += 1
if (art.disambig) disambig += 1
if (art.title == WikiArticle.notFoundString) notFound += 1
if (art.title == null) titleNull += 1
if (art.relevant) relevant += 1
total += 1
}

override def toString: String = {
s"Redirects: $red, Stubs: $stub, Disambig: $disambig, Not Found: $notFound, Null: $titleNull, RELEVANT: $relevant, TOTAL: $total"

}

}


object PrePostProcessWikipedia extends Logging {

Expand Down Expand Up @@ -104,19 +141,39 @@ object PrePostProcessWikipedia extends Logging {
logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.")

val allArtsRDD = xmlRDD.map { raw => new WikiArticle(raw) }.cache
val numRedirects = allArtsRDD.filter { art => art.redirect }.count
val numStubs = allArtsRDD.filter { art => art.stub }.count
val numDisambig = allArtsRDD.filter { art => art.disambig }.count
val numTitleNotFound = allArtsRDD.filter { art => art.title == WikiArticle.notFoundString }.count
logWarning(s"Filter results:\tRedirects: $numRedirects \tStubs: $numStubs \tDisambiguations: $numDisambig \t Title not found: $numTitleNotFound")
// val numRedirects = allArtsRDD.filter { art => art.redirect }.count
// val numStubs = allArtsRDD.filter { art => art.stub }.count
// val numDisambig = allArtsRDD.filter { art => art.disambig }.count
// val numTitleNotFound = allArtsRDD.filter { art => art.title == WikiArticle.notFoundString }.count
// logWarning(s"Filter results:\tRedirects: $numRedirects \tStubs: $numStubs \tDisambiguations: $numDisambig \t Title not found: $numTitleNotFound")

val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128)
logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles.")
val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128)
wikiRDD.repartition(128)
// val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128)
val wikiRDDCount = wikiRDD.count
logWarning(s"wikiRDD counted. Found ${wikiRDDCount} relevant articles.")
// logWarning("Counting differently")

// count: redirects, stubs, disambigs, titlenotfound, titlenull, relevant, total
// val zeroCount = new TrackCounts
// val countSeqOp = (curCount: TrackCounts, art: WikiArticle) => {
// curCount.addArticle(art)
// curCount
// }
// val countCombOp = (c1: TrackCounts, c2: TrackCounts) => {
// c1.update(c2)
// c1
// }
//
// val cr = allArtsRDD.aggregate(zeroCount)(countSeqOp, countCombOp)
// logWarning(s"Different count results: $cr")
// System.exit(0)

val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
logWarning("creating graph")
val g = Graph(vertices, edges)
val cleanG = g.subgraph(x => true, (vid, vd) => vd != null)
val cleanG = g.subgraph(x => true, (vid, vd) => vd != null).cache
logWarning(s"DIRTY graph has ${g.triplets.count()} EDGES, ${g.vertices.count()} VERTICES")
logWarning(s"CLEAN graph has ${cleanG.triplets.count()} EDGES, ${cleanG.vertices.count()} VERTICES")
val resultG = pagerankConnComponentsAlt(numIters, cleanG)
Expand All @@ -134,12 +191,13 @@ object PrePostProcessWikipedia extends Logging {
var currentGraph = g
logWarning("starting iterations")
for (i <- 0 to numRepetitions) {
currentGraph.cache
val startTime = System.currentTimeMillis
logWarning("starting pagerank")
val pr = PageRank.run(currentGraph, 20)
val pr = PageRank.run(currentGraph, 20).cache
pr.vertices.count
logWarning("Pagerank completed")
val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}).cache
prAndTitle.vertices.count
logWarning("join completed.")
val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2))
Expand All @@ -149,8 +207,8 @@ object PrePostProcessWikipedia extends Logging {
val filterTop20 = {(v: VertexId, d: String) =>
!top20verts.contains(v)
}
val newGraph = currentGraph.subgraph(x => true, filterTop20)
val ccGraph = ConnectedComponents.run(newGraph)
val newGraph = currentGraph.subgraph(x => true, filterTop20).cache
val ccGraph = ConnectedComponents.run(newGraph).cache
// val zeroVal = new mutable.HashSet[VertexId]()
// val seqOp = (s: mutable.HashSet[VertexId], vtuple: (VertexId, VertexId)) => {
// s.add(vtuple._2)
Expand Down
90 changes: 48 additions & 42 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}

import org.apache.spark.util.collection.PrimitiveVector
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.{HashPartitioner, Partitioner, Logging}
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl._
Expand All @@ -29,6 +29,7 @@ import org.apache.spark.graphx.util.BytecodeUtils
import org.apache.spark.rdd.{ShuffledRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.ClosureCleaner
import java.util.NoSuchElementException


/**
Expand All @@ -47,7 +48,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
@transient val edges: EdgeRDD[ED],
@transient val routingTable: RoutingTable,
@transient val replicatedVertexView: ReplicatedVertexView[VD])
extends Graph[VD, ED] with Serializable {
extends Graph[VD, ED] with Serializable with Logging {

/** Default constructor is provided to support serialization */
protected def this() = this(null, null, null, null)
Expand Down Expand Up @@ -216,50 +217,55 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (

// Map and combine.
val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vPartIter.next()
assert(!vPartIter.hasNext)
assert(ePid == vPid)
// Choose scan method
val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
.filter(e => vPart.isActive(e.dstId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
if (ePartIter.hasNext) {
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vPartIter.next()
assert(!vPartIter.hasNext)
assert(ePid == vPid)
// Choose scan method
val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
.filter(e => vPart.isActive(e.dstId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
}
case Some(EdgeDirection.Either) =>
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
// the index here. Instead we have to scan all edges and then do the filter.
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
}
case Some(EdgeDirection.In) =>
edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
case _ => // None
edgePartition.iterator
}

// Scan edges and run the map function
val et = new EdgeTriplet[VD, ED]
val mapOutputs = edgeIter.flatMap { e =>
et.set(e)
if (mapUsesSrcAttr) {
et.srcAttr = vPart(e.srcId)
}
case Some(EdgeDirection.Either) =>
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
// the index here. Instead we have to scan all edges and then do the filter.
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
if (mapUsesDstAttr) {
et.dstAttr = vPart(e.dstId)
}
case Some(EdgeDirection.In) =>
edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
case _ => // None
edgePartition.iterator
}

// Scan edges and run the map function
val et = new EdgeTriplet[VD, ED]
val mapOutputs = edgeIter.flatMap { e =>
et.set(e)
if (mapUsesSrcAttr) {
et.srcAttr = vPart(e.srcId)
}
if (mapUsesDstAttr) {
et.dstAttr = vPart(e.dstId)
mapFunc(et)
}
mapFunc(et)
// Note: This doesn't allow users to send messages to arbitrary vertices.
vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
} else {
logError("preAgg in mapReduceTriplets tried to iterate over empty partition.")
Iterator.empty
}
// Note: This doesn't allow users to send messages to arbitrary vertices.
vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
}

// do the final reduction reusing the index map
Expand Down

0 comments on commit fad630f

Please sign in to comment.