-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3936] Add aggregateMessages, which supersedes mapReduceTriplets #3100
Conversation
1. EdgePartition now stores local vertex ids instead of global ids. This avoids hash lookups when looking up vertex attributes and aggregating messages. 2. Internal iterators in mapReduceTriplets are inlined into a while loop.
Also rename VertexPreservingEdgePartitionBuilder to ExistingEdgePartitionBuilder to better reflect its usage.
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements: 1. Messages are sent using an imperative interface based on EdgeContext rather than by returning an iterator of messages. This is more efficient, providing a 20.2% speedup on PageRank over apache#3054 (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 403 s to 322 s). 2. Rather than attempting bytecode inspection, the required triplet fields must be explicitly specified by the user by passing a TripletFields object. This fixes SPARK-3936. Subsumes apache#2815.
Test build #22908 has started for PR 3100 at commit
|
Test build #22908 has finished for PR 3100 at commit
|
Test PASSed. |
class TripletFields private ( | ||
val useSrc: Boolean, | ||
val useDst: Boolean, | ||
val useEdge: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I'm just missing it, but it seems like useEdge
is never used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we don't currently use it since it's cheap to access the edge attributes, but I think @jegonzal added it in case our internal representation changes and it becomes useful.
Test build #23248 has started for PR 3100 at commit
|
Test build #23248 has finished for PR 3100 at commit
|
Test PASSed. |
*/ | ||
abstract class EdgeContext[VD, ED, A] { | ||
/** The vertex id of the edge's source vertex. */ | ||
def srcId: VertexId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't we decide we want to get rid of VertexId when we expose it to the user?
Ok I'm merging this. I will make a small patch to clean up some stuff. Ankur - can you add the num change for srcMustBeActive, dstMustBeActive, maySatisfyEither, and remove type alias for vid and pid? Thanks. |
this.useEdge = useEdge; | ||
} | ||
|
||
public static final TripletFields None = new TripletFields(false, false, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find having so many triplet fields confusing. @jegonzal do we really need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I agree though I used many of them in the GraphOps
code and decided maybe it would make sense to go ahead and be exhaustive. I think we could cut a few.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we just keep:
EdgeOnly, SrcAndEdge, DstAndEdge, All
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements: 1. Messages are sent using an imperative interface based on EdgeContext rather than by returning an iterator of messages. 2. Rather than attempting bytecode inspection, the required triplet fields must be explicitly specified by the user by passing a TripletFields object. This fixes SPARK-3936. Additionally, this PR includes the following optimizations for aggregateMessages and EdgePartition: 1. EdgePartition now stores local vertex ids instead of global ids. This avoids hash lookups when looking up vertex attributes and aggregating messages. 2. Internal iterators in aggregateMessages are inlined into a while loop. In total, these optimizations were tested to provide a 37% speedup on PageRank (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 513 s to 322 s). Subsumes #2815. Also fixes SPARK-4173. Author: Ankur Dave <ankurdave@gmail.com> Closes #3100 from ankurdave/aggregateMessages and squashes the following commits: f5b65d0 [Ankur Dave] Address @rxin comments on #3054 and #3100 1e80aca [Ankur Dave] Add aggregateMessages, which supersedes mapReduceTriplets 194a2df [Ankur Dave] Test triplet iterator in EdgePartition serialization test e0f8ecc [Ankur Dave] Take activeSet in ExistingEdgePartitionBuilder c85076d [Ankur Dave] Readability improvements b567be2 [Ankur Dave] iter.foreach -> while loop 4a566dc [Ankur Dave] Optimizations for mapReduceTriplets and EdgePartition (cherry picked from commit faeb41d) Signed-off-by: Reynold Xin <rxin@databricks.com>
def aggregateMessages[A: ClassTag]( | ||
sendMsg: EdgeContext[VD, ED, A] => Unit, | ||
mergeMsg: (A, A) => A, | ||
tripletFields: TripletFields = TripletFields.All) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parameter tripletFields: TripletFields = TripletFields.All)
should not expose outside, construct it inside.
and, sendMsg will be construct outside of api, it's not reasonable, i think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand - aggregateMessages needs the user to specify the triplet fields information; it can't reliably infer this on its own. Similarly, the user has to specify sendMsg.
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements:
Additionally, this PR includes the following optimizations for aggregateMessages and EdgePartition:
In total, these optimizations were tested to provide a 37% speedup on PageRank (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 513 s to 322 s).
Subsumes #2815. Also fixes SPARK-4173.