Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-338] SkewSamplingPass #193

Merged
merged 30 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public static void shutdown() {
LOG.info("Wait for the driver to finish");
driverLauncher.wait();
} catch (final InterruptedException e) {
LOG.warn("Interrupted: " + e);
LOG.warn("Interrupted: ", e);
// clean up state...
Thread.currentThread().interrupt();
}
Expand Down
104 changes: 102 additions & 2 deletions common/src/main/java/org/apache/nemo/common/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.common.util;
package org.apache.nemo.common;

import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.*;
import org.apache.nemo.common.ir.vertex.IRVertex;

import java.util.Collection;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;

/**
* Class to hold the utility methods.
*/
public final class Util {
// Assume that this tag is never used in user application
public static final String CONTROL_EDGE_TAG = "CONTROL_EDGE";

/**
* Private constructor for utility class.
Expand All @@ -42,7 +50,7 @@ private Util() {
* @return whether or not we can say that they are equal.
*/
public static boolean checkEqualityOfIntPredicates(final IntPredicate firstPredicate,
final IntPredicate secondPredicate, final int noOfTimes) {
final IntPredicate secondPredicate, final int noOfTimes) {
for (int value = 0; value <= noOfTimes; value++) {
if (firstPredicate.test(value) != secondPredicate.test(value)) {
return false;
Expand All @@ -51,4 +59,96 @@ public static boolean checkEqualityOfIntPredicates(final IntPredicate firstPredi
return true;
}

/**
* @param edgeToClone to copy execution properties from.
* @param newSrc of the new edge.
* @param newDst of the new edge.
* @return the new edge.
*/
public static IREdge cloneEdge(final IREdge edgeToClone,
final IRVertex newSrc,
final IRVertex newDst) {
return cloneEdge(
edgeToClone.getPropertyValue(CommunicationPatternProperty.class).get(), edgeToClone, newSrc, newDst);
}

/**
* Creates a new edge with several execution properties same as the given edge.
* The copied execution properties include those minimally required for execution, such as encoder/decoders.
*
* @param commPattern to use.
* @param edgeToClone to copy execution properties from.
* @param newSrc of the new edge.
* @param newDst of the new edge.
* @return the new edge.
*/
public static IREdge cloneEdge(final CommunicationPatternProperty.Value commPattern,
final IREdge edgeToClone,
final IRVertex newSrc,
final IRVertex newDst) {
final IREdge clone = new IREdge(commPattern, newSrc, newDst);

if (edgeToClone.getPropertySnapshot().containsKey(EncoderProperty.class)) {
clone.setProperty(edgeToClone.getPropertySnapshot().get(EncoderProperty.class));
} else {
clone.setProperty(EncoderProperty.of(edgeToClone.getPropertyValue(EncoderProperty.class)
.orElseThrow(IllegalStateException::new)));
}

if (edgeToClone.getPropertySnapshot().containsKey(DecoderProperty.class)) {
clone.setProperty(edgeToClone.getPropertySnapshot().get(DecoderProperty.class));
} else {
clone.setProperty(DecoderProperty.of(edgeToClone.getPropertyValue(DecoderProperty.class)
.orElseThrow(IllegalStateException::new)));
}

edgeToClone.getPropertyValue(AdditionalOutputTagProperty.class).ifPresent(tag -> {
clone.setProperty(AdditionalOutputTagProperty.of(tag));
});

edgeToClone.getPropertyValue(PartitionerProperty.class).ifPresent(p -> {
if (p.right() == PartitionerProperty.NUM_EQUAL_TO_DST_PARALLELISM) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code needed? The result will be the same without this if clause.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clone.setProperty(PartitionerProperty.of(p.left()));
} else {
clone.setProperty(PartitionerProperty.of(p.left(), p.right()));
}
});

edgeToClone.getPropertyValue(KeyExtractorProperty.class).ifPresent(ke -> {
clone.setProperty(KeyExtractorProperty.of(ke));
});

return clone;
}

/**
* A control edge enforces an execution ordering between the source vertex and the destination vertex.
* The additional output tag property of control edges is set such that no actual data element is transferred
* via the edges. This minimizes the run-time overhead of executing control edges.
*
* @param src vertex.
* @param dst vertex.
* @return the control edge.
*/
public static IREdge createControlEdge(final IRVertex src, final IRVertex dst) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some method level comment for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final IREdge controlEdge = new IREdge(CommunicationPatternProperty.Value.BroadCast, src, dst);
controlEdge.setPropertyPermanently(AdditionalOutputTagProperty.of(CONTROL_EDGE_TAG));
return controlEdge;
}

/**
* @param vertices to stringify ids of.
* @return the string of ids.
*/
public static String stringifyIRVertexIds(final Collection<IRVertex> vertices) {
return vertices.stream().map(IRVertex::getId).collect(Collectors.toSet()).toString();
}

/**
* @param edges to stringify ids of.
* @return the string of ids.
*/
public static String stringifyIREdgeIds(final Collection<IREdge> edges) {
return edges.stream().map(IREdge::getId).collect(Collectors.toSet()).toString();
}
}
5 changes: 5 additions & 0 deletions common/src/main/java/org/apache/nemo/common/dag/DAG.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public List<V> getVertices() {
return vertices;
}

@Override
public List<E> getEdges() {
return incomingEdges.values().stream().flatMap(List::stream).collect(Collectors.toList());
}

@Override
public List<V> getRootVertices() {
return rootVertices;
Expand Down
27 changes: 13 additions & 14 deletions common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
package org.apache.nemo.common.dag;

import org.apache.nemo.common.exception.CompileTimeOptimizationException;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
import org.apache.nemo.common.ir.edge.executionproperty.MessageIdProperty;
import org.apache.nemo.common.ir.vertex.*;
import org.apache.nemo.common.exception.IllegalVertexOperationException;
import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
import org.apache.nemo.common.ir.vertex.utility.MessageAggregatorVertex;
import org.apache.nemo.common.ir.vertex.utility.SamplingVertex;

import java.io.Serializable;
import java.util.*;
Expand Down Expand Up @@ -227,7 +226,8 @@ private void sourceCheck() {
final Supplier<Stream<V>> verticesToObserve = () -> vertices.stream().filter(v -> incomingEdges.get(v).isEmpty())
.filter(v -> v instanceof IRVertex);
// They should all match SourceVertex
if (verticesToObserve.get().anyMatch(v -> !(v instanceof SourceVertex))) {
if (!(verticesToObserve.get().allMatch(v -> (v instanceof SourceVertex)
|| (v instanceof SamplingVertex && ((SamplingVertex) v).getOriginalVertex() instanceof SourceVertex)))) {
final String problematicVertices = verticesToObserve.get()
.filter(v -> !(v instanceof SourceVertex))
.map(V::getId)
Expand Down Expand Up @@ -258,16 +258,15 @@ private void sinkCheck() {
* Helper method to check that all execution properties are correct and makes sense.
*/
private void executionPropertyCheck() {
// DataSizeMetricCollection is not compatible with Push (All data have to be stored before the data collection)
vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e)
.filter(e -> e.getPropertyValue(MessageIdProperty.class).isPresent())
.filter(e -> !(e.getDst() instanceof OperatorVertex
&& e.getDst() instanceof MessageAggregatorVertex))
.filter(e -> DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
.forEach(e -> {
throw new CompileTimeOptimizationException("DAG execution property check: "
+ "DataSizeMetricCollection edge is not compatible with push" + e.getId());
}));
final long numOfMAV = vertices.stream().filter(v -> v instanceof MessageAggregatorVertex).count();
final long numOfDistinctMessageIds = vertices.stream()
.filter(v -> v instanceof MessageAggregatorVertex)
.map(v -> ((MessageAggregatorVertex) v).getPropertyValue(MessageIdVertexProperty.class).get())
.distinct()
.count();
if (numOfMAV != numOfDistinctMessageIds) {
throw getException("A unique message id must exist for each MessageAggregator", "");
}
}

/**
Expand Down
18 changes: 12 additions & 6 deletions common/src/main/java/org/apache/nemo/common/dag/DAGInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,51 @@ public interface DAGInterface<V extends Vertex, E extends Edge<V>> extends Seria

/**
* Retrieves the vertices of this DAG.
* @return the set of vertices.
* @return the list of vertices.
* Note that the result is never null, ensured by {@link DAGBuilder}.
*/
List<V> getVertices();

/**
* Retrieves the edges of this DAG.
* @return the list of edges.
*/
List<E> getEdges();

/**
* Retrieves the root vertices of this DAG.
* @return the set of root vertices.
* @return the list of root vertices.
*/
List<V> getRootVertices();

/**
* Retrieves the incoming edges of the given vertex.
* @param v the subject vertex.
* @return the set of incoming edges to the vertex.
* @return the list of incoming edges to the vertex.
* Note that the result is never null, ensured by {@link DAGBuilder}.
*/
List<E> getIncomingEdgesOf(final V v);

/**
* Retrieves the incoming edges of the given vertex.
* @param vertexId the ID of the subject vertex.
* @return the set of incoming edges to the vertex.
* @return the list of incoming edges to the vertex.
* Note that the result is never null, ensured by {@link DAGBuilder}.
*/
List<E> getIncomingEdgesOf(final String vertexId);

/**
* Retrieves the outgoing edges of the given vertex.
* @param v the subject vertex.
* @return the set of outgoing edges to the vertex.
* @return the list of outgoing edges to the vertex.
* Note that the result is never null, ensured by {@link DAGBuilder}.
*/
List<E> getOutgoingEdgesOf(final V v);

/**
* Retrieves the outgoing edges of the given vertex.
* @param vertexId the ID of the subject vertex.
* @return the set of outgoing edges to the vertex.
* @return the list of outgoing edges to the vertex.
* Note that the result is never null, ensured by {@link DAGBuilder}.
*/
List<E> getOutgoingEdgesOf(final String vertexId);
Expand Down
Loading