Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-338] SkewSamplingPass #193

Merged
merged 30 commits into from
Feb 21, 2019
Merged

[NEMO-338] SkewSamplingPass #193

merged 30 commits into from
Feb 21, 2019

Conversation

johnyangk
Copy link
Contributor

@johnyangk johnyangk commented Feb 13, 2019

JIRA: NEMO-338: SkewSamplingPass

Major changes:

  • SamplingSkewReshapingPass: Inserts SkewSampling and MessageBarrier vertices
  • SamplingVertex: Instantiated with (originalVertex, desiredSampleRate)
  • IRDAG: Automatically inserts IREdges from/to SamplingVertex objects, similar to other insert() methods
  • PhysicalPlanGenerator: Handles SamplingVertex objects appropriately
  • Stage: Uses getTaskIndices(), returns a subset of tasks if consists of SamplingVertex objects, to determine the tasks to execute

Minor changes to note:

  • Refactors other insert() methods to share code as much as possible

Tests for the changes:

  • PerKeyMedianITCase#testLargeShuffleSamplingSkew (combines large shuffle + skew handling optimizations)

Other comments:

  • Sanha(@sanha) wrote the original code. I refactored the code and added comments to create this PR.

Closes #193

@johnyangk johnyangk self-assigned this Feb 13, 2019
@johnyangk johnyangk requested a review from sanha February 13, 2019 03:32
@johnyangk
Copy link
Contributor Author

@sanha Can you check if I missed out anything? Other reviewers are also welcome to take a look, of course.

Copy link
Contributor

@sanha sanha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work @johnyangk! Here's my first review.

if (edgeToClone.getPropertySnapshot().containsKey(EncoderProperty.class)) {
clone.setProperty(edgeToClone.getPropertySnapshot().get(EncoderProperty.class));
} else {
clone.setProperty(EncoderProperty.of(edgeToClone.getPropertyValue(EncoderProperty.class).get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use orElseThrow or something like that instead of get.

if (edgeToClone.getPropertySnapshot().containsKey(DecoderProperty.class)) {
clone.setProperty(edgeToClone.getPropertySnapshot().get(DecoderProperty.class));
} else {
clone.setProperty(DecoderProperty.of(edgeToClone.getPropertyValue(DecoderProperty.class).get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use orElseThrow or something like that instead of get.

});

edgeToClone.getPropertyValue(PartitionerProperty.class).ifPresent(p -> {
if (p.right() == PartitionerProperty.NUM_EQUAL_TO_DST_PARALLELISM) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code needed? The result will be the same without this if clause.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @param dst vertex.
* @return the control edge.
*/
public static IREdge createControlEdge(final IRVertex src, final IRVertex dst) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some method level comment for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param messageBarrierVertex to insert.
* @param messageAggregatorVertex to insert.
* @param mbvOutputEncoder to use.
* @param mbvOutputDecoder to use.
* @param edgesToGetStatisticsOf to examine.
* @param edgesToOptimize to optimize.
*/
public void insert(final MessageBarrierVertex messageBarrierVertex,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inner BiFunction (messageFunction) of the messageBarrierVertex is only used in this method.
Why don't we receive the BiFunction instead of MessageBarrierVertex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @param samplingVertices to insert.
* @param executeAfterSamplingVertices that must be executed after samplingVertices.
*/
public void insert(final Set<SamplingVertex> samplingVertices,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we get the Set of the original vertices to sample instead of already sampled vertices?
If we receive the sampled vertices, opt pass builders can give some already connected sampling vertices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a JIRA for this: https://issues.apache.org/jira/browse/NEMO-341
I also added the assertNonExistence() checker in this method and the other insert() methods.

*
* Then, this pass will produce something like:
* P1' - P1 - P2
* - P2' - P2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to sample P2?

.orElseThrow(() -> new IllegalStateException());
final IREdge clonedShuffleEdge = rightBeforeShuffle.getCloneOfOriginalEdge(e);

final KeyExtractor keyExtractor = e.getPropertyValue(KeyExtractorProperty.class).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add control edges from the message aggregation vertex to the partitionSources instead of the vertex that receives the original shuffle edge.

For the example DAG that is partitioned into two sub-DAGs as follows: P1 -(shuffle)- P2,
the expected outcome looks like P1' -(o2o)- MCV -(shuffle)- MAV -(control)- P1 -(shuffle)- p2.

This is because that we must optimze the partitioning way of the target shuffle edge before the execution of P1.

Also, the P1' and message collection vertex must be in a single stage. If not, the whole intermediate data will be duplicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up!

(1) Regarding pipelining MessageBarrierVertex within a single stage with parent sampling vertices:

I've changed the semantics of insert() to use SamplingVertex(NewVertex) instead of the NewVertex, if an existing vertex that the NewVertex will connect to is a SamplingVertex. I think this is a reasonable assumption as new vertices that consume outputs from sampling vertices will process a subset of data anyways, and no such new vertex will reach the original DAG except via control edges. With this change Nemo is able to pipeline the MessageBarrierVertex (wrapped inside a SamplingVertex), avoiding duplicate data materialization.

(2) Regarding connecting the message aggregation vertex to the partition Sources:

I'd prefer not to do this, at least considering current use cases we have.

Here's the physical DAG diagram of PerKeyMedianITCase#testLargeShuffleSamplingSkew (including the fix for (1))
https://nemo.snuspl.snu.ac.kr:50443/nemo-dag-out/7a1c136ac24f427ebb4c34a43712da3f.svg

In the diagram the ScheduleGroup property is set such that the sampling partition does always execute prior to the original partition. In particular the ordering ScheduleGroup0(Stage3+Stage5) ==> ScheduleGroup1(Stage4+Stage6) is enforced although Stage4 ==> Stage6 is PUSH (which makes sense when considering each schedule group as a big vertex). The sampling should also happen prior to the execution of the original partition when Stage4 ==> Stage6 is PULL as well, although the schedule groups may differ in this case. I think this shows that a sequence of insert(samplingVertex) and insert(messageVertex) captures our intention fairly well.

I did write some code to try to 'extend' the control edges from (sampling vertices) to (existing vertices), by adding new control edges from (new vertices that connect to sampling vertices) to (existing vertices) upon each insert(). However, I ultimately I felt that this approach complicates the code quite a bit, and reverted the code back to the current approach which I think works for the current use cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For (2), I understood why the current version of the pass works normally.
However, scheduling indirectly depending on the logic of scheduling group is at a risk. (The order of scheduling according to the scheduling group is implicit and might be changed.)
If it is not simple to add the control dependency as I mentioned, please create an issue about it and mark as TODO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I've filed the JIRA: https://issues.apache.org/jira/browse/NEMO-343

Copy link
Contributor Author

@johnyangk johnyangk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sanha! I've addressed your comments.

});

edgeToClone.getPropertyValue(PartitionerProperty.class).ifPresent(p -> {
if (p.right() == PartitionerProperty.NUM_EQUAL_TO_DST_PARALLELISM) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @param dst vertex.
* @return the control edge.
*/
public static IREdge createControlEdge(final IRVertex src, final IRVertex dst) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param messageBarrierVertex to insert.
* @param messageAggregatorVertex to insert.
* @param mbvOutputEncoder to use.
* @param mbvOutputDecoder to use.
* @param edgesToGetStatisticsOf to examine.
* @param edgesToOptimize to optimize.
*/
public void insert(final MessageBarrierVertex messageBarrierVertex,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @param samplingVertices to insert.
* @param executeAfterSamplingVertices that must be executed after samplingVertices.
*/
public void insert(final Set<SamplingVertex> samplingVertices,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a JIRA for this: https://issues.apache.org/jira/browse/NEMO-341
I also added the assertNonExistence() checker in this method and the other insert() methods.

.orElseThrow(() -> new IllegalStateException());
final IREdge clonedShuffleEdge = rightBeforeShuffle.getCloneOfOriginalEdge(e);

final KeyExtractor keyExtractor = e.getPropertyValue(KeyExtractorProperty.class).get();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up!

(1) Regarding pipelining MessageBarrierVertex within a single stage with parent sampling vertices:

I've changed the semantics of insert() to use SamplingVertex(NewVertex) instead of the NewVertex, if an existing vertex that the NewVertex will connect to is a SamplingVertex. I think this is a reasonable assumption as new vertices that consume outputs from sampling vertices will process a subset of data anyways, and no such new vertex will reach the original DAG except via control edges. With this change Nemo is able to pipeline the MessageBarrierVertex (wrapped inside a SamplingVertex), avoiding duplicate data materialization.

(2) Regarding connecting the message aggregation vertex to the partition Sources:

I'd prefer not to do this, at least considering current use cases we have.

Here's the physical DAG diagram of PerKeyMedianITCase#testLargeShuffleSamplingSkew (including the fix for (1))
https://nemo.snuspl.snu.ac.kr:50443/nemo-dag-out/7a1c136ac24f427ebb4c34a43712da3f.svg

In the diagram the ScheduleGroup property is set such that the sampling partition does always execute prior to the original partition. In particular the ordering ScheduleGroup0(Stage3+Stage5) ==> ScheduleGroup1(Stage4+Stage6) is enforced although Stage4 ==> Stage6 is PUSH (which makes sense when considering each schedule group as a big vertex). The sampling should also happen prior to the execution of the original partition when Stage4 ==> Stage6 is PULL as well, although the schedule groups may differ in this case. I think this shows that a sequence of insert(samplingVertex) and insert(messageVertex) captures our intention fairly well.

I did write some code to try to 'extend' the control edges from (sampling vertices) to (existing vertices), by adding new control edges from (new vertices that connect to sampling vertices) to (existing vertices) upon each insert(). However, I ultimately I felt that this approach complicates the code quite a bit, and reverted the code back to the current approach which I think works for the current use cases.

Copy link
Contributor

@sanha sanha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change! I left some minor comments. Please check it out.

}

public IRVertex getCloneOfOriginalVertex() {
this.copyExecutionPropertiesTo(cloneOfOriginalVertex);
Copy link
Contributor

@sanha sanha Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the purpose of this method is creating a new clone of the original vertex (for every call), let's create a new clone and copy the EPs instead of returning already cloned cloneOfOriginalVertex.
If not, let's copy the EPs in construction.

Plus, please add some method level comments for these methods.

* This pass effectively partitions the IRDAG by non-oneToOne edges, clones each subDAG partition using SamplingVertex
* to process sampled data, and executes each cloned partition prior to executing the corresponding original partition.
*
* Suppose the IRDAG is partitioned into three sub-DAGs as follows:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

three sub-DAGs connected with shuffle edges?

*
* Then, this pass will produce something like:
* P1' - P1 - P2
* - P2' - P2 - P3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the P2 is executed twice?

.orElseThrow(() -> new IllegalStateException());
final IREdge clonedShuffleEdge = rightBeforeShuffle.getCloneOfOriginalEdge(e);

final KeyExtractor keyExtractor = e.getPropertyValue(KeyExtractorProperty.class).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For (2), I understood why the current version of the pass works normally.
However, scheduling indirectly depending on the logic of scheduling group is at a risk. (The order of scheduling according to the scheduling group is implicit and might be changed.)
If it is not simple to add the control dependency as I mentioned, please create an issue about it and mark as TODO.

@johnyangk
Copy link
Contributor Author

Thanks @sanha! I've addressed the comments.

Copy link
Contributor

@sanha sanha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @johnyangk! LGTM. I'll merge this.

@sanha sanha merged commit a2b02dc into apache:master Feb 21, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants