Skip to content

Commit

Permalink
Merge pull request apache#9372: [Beam-6858] Validate that side-input …
Browse files Browse the repository at this point in the history
…parameters match the type of the PCollectionView
  • Loading branch information
reuvenlax authored and soyrice committed Sep 19, 2019
1 parent d1b1656 commit a9b5ee8
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PCollectionViews.TypeDescriptorSupplier;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matchers;
import org.junit.Test;
Expand All @@ -50,12 +52,16 @@ public class CreatePCollectionViewTranslationTest {
CreatePCollectionView.of(
PCollectionViews.singletonView(
testPCollection,
(TypeDescriptorSupplier<String>) () -> TypeDescriptors.strings(),
testPCollection.getWindowingStrategy(),
false,
null,
StringUtf8Coder.of())),
CreatePCollectionView.of(
PCollectionViews.listView(testPCollection, testPCollection.getWindowingStrategy())));
PCollectionViews.listView(
testPCollection,
(TypeDescriptorSupplier<String>) () -> TypeDescriptors.strings(),
testPCollection.getWindowingStrategy())));
}

@Parameter(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -62,6 +63,11 @@ public Object apply(Object o) {
throw new UnsupportedOperationException();
}

@Override
public TypeDescriptor<Object> getTypeDescriptor() {
return new TypeDescriptor<Object>() {};
}

@Override
public boolean equals(Object obj) {
return obj instanceof TestViewFn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -405,7 +407,8 @@ public void createViewWithViewFnDifferentViewFn() {
PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());

// Purposely create a subclass to get a different class then what was expected.
ViewFn<?, ?> viewFn = new PCollectionViews.IterableViewFn() {};
IterableViewFn<Integer> viewFn =
new PCollectionViews.IterableViewFn<Integer>(() -> TypeDescriptors.integers()) {};
CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);

PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -805,7 +807,11 @@ private <T> void translateTestStream(
new LinkedHashMap<>();
// for PCollectionView compatibility, not used to transform materialization
ViewFn<Iterable<WindowedValue<?>>, ?> viewFn =
(ViewFn) new PCollectionViews.MultimapViewFn<Iterable<WindowedValue<Void>>, Void>();
(ViewFn)
new PCollectionViews.MultimapViewFn<>(
(PCollectionViews.TypeDescriptorSupplier<Iterable<WindowedValue<Void>>>)
() -> TypeDescriptors.iterables(new TypeDescriptor<WindowedValue<Void>>() {}),
(PCollectionViews.TypeDescriptorSupplier<Void>) TypeDescriptors::voids);

for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId :
stagePayload.getSideInputsList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
Expand Down Expand Up @@ -105,6 +106,11 @@ public Materialization<MultimapView<K, V>> getMaterialization() {
public MultimapView<K, V> apply(MultimapView<K, V> o) {
return o;
}

@Override
public TypeDescriptor<MultimapView<K, V>> getTypeDescriptor() {
throw new UnsupportedOperationException();
}
};

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PCollectionViews.TypeDescriptorSupplier;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
Expand Down Expand Up @@ -1305,9 +1306,12 @@ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
PCollection<KV<Void, OutputT>> materializationInput =
combined.apply(new VoidKeyToMultimapMaterialization<>());
Coder<OutputT> outputCoder = combined.getCoder();
PCollectionView<OutputT> view =
PCollectionViews.singletonView(
materializationInput,
(TypeDescriptorSupplier<OutputT>)
() -> outputCoder != null ? outputCoder.getEncodedTypeDescriptor() : null,
input.getWindowingStrategy(),
insertDefault,
insertDefault ? fn.defaultValue() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.MethodWithExtraParameters;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SchemaElementParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SideInputParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
Expand Down Expand Up @@ -437,6 +438,29 @@ private static void validateStateApplicableForInput(DoFn<?, ?> fn, PCollection<?
}
}

private static void validateSideInputTypes(
Map<String, PCollectionView<?>> sideInputs, DoFn<?, ?> fn) {
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
DoFnSignature.ProcessElementMethod processElementMethod = signature.processElement();
for (SideInputParameter sideInput : processElementMethod.getSideInputParameters()) {
PCollectionView<?> view = sideInputs.get(sideInput.sideInputId());
checkArgument(
view != null,
"the ProcessElement method expects a side input identified with the tag %s, but no such side input was"
+ " supplied. Use withSideInput(String, PCollectionView) to supply this side input.",
sideInput.sideInputId());
TypeDescriptor<?> viewType = view.getViewFn().getTypeDescriptor();

// Currently check that the types exactly match, even if the types are convertible.
checkArgument(
viewType.equals(sideInput.elementT()),
"Side Input with tag %s and type %s cannot be bound to ProcessElement parameter with type %s",
sideInput.sideInputId(),
viewType,
sideInput.elementT());
}
}

private static FieldAccessDescriptor getFieldAccessDescriptorFromParameter(
@Nullable String fieldAccessString,
Schema inputSchema,
Expand Down Expand Up @@ -865,6 +889,8 @@ public PCollectionTuple expand(PCollection<? extends InputT> input) {
validateStateApplicableForInput(fn, input);
}

validateSideInputTypes(sideInputs, fn);

// TODO: We should validate OutputReceiver<Row> only happens if the output PCollection
// as schema. However coder/schema inference may not have happened yet at this point.
// Need to figure out where to validate this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PCollectionViews.TypeDescriptorSupplier;

/**
* Transforms for creating {@link PCollectionView PCollectionViews} from {@link PCollection
Expand Down Expand Up @@ -233,9 +234,12 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {

PCollection<KV<Void, T>> materializationInput =
input.apply(new VoidKeyToMultimapMaterialization<>());
Coder<T> inputCoder = input.getCoder();
PCollectionView<List<T>> view =
PCollectionViews.listView(
materializationInput, materializationInput.getWindowingStrategy());
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
Expand Down Expand Up @@ -263,9 +267,12 @@ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {

PCollection<KV<Void, T>> materializationInput =
input.apply(new VoidKeyToMultimapMaterialization<>());
Coder<T> inputCoder = input.getCoder();
PCollectionView<Iterable<T>> view =
PCollectionViews.iterableView(
materializationInput, materializationInput.getWindowingStrategy());
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
Expand Down Expand Up @@ -402,11 +409,17 @@ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input)
throw new IllegalStateException("Unable to create a side-input view from input", e);
}

KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
Coder<K> keyCoder = kvCoder.getKeyCoder();
Coder<V> valueCoder = kvCoder.getValueCoder();
PCollection<KV<Void, KV<K, V>>> materializationInput =
input.apply(new VoidKeyToMultimapMaterialization<>());
PCollectionView<Map<K, Iterable<V>>> view =
PCollectionViews.multimapView(
materializationInput, materializationInput.getWindowingStrategy());
materializationInput,
(TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
(TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
Expand Down Expand Up @@ -438,11 +451,18 @@ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}

KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
Coder<K> keyCoder = kvCoder.getKeyCoder();
Coder<V> valueCoder = kvCoder.getValueCoder();

PCollection<KV<Void, KV<K, V>>> materializationInput =
input.apply(new VoidKeyToMultimapMaterialization<>());
PCollectionView<Map<K, V>> view =
PCollectionViews.mapView(
materializationInput, materializationInput.getWindowingStrategy());
materializationInput,
(TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
(TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
Expand All @@ -45,4 +46,7 @@ public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {

/** A function to adapt a primitive view type to a desired view type. */
public abstract ViewT apply(PrimitiveViewT primitiveViewT);

/** Return the {@link TypeDescriptor} describing the output of this fn. */
public abstract TypeDescriptor<ViewT> getTypeDescriptor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SchemaElementParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SideInputParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
Expand Down Expand Up @@ -760,6 +761,14 @@ public List<SchemaElementParameter> getSchemaElementParameters() {
.collect(Collectors.toList());
}

@Nullable
public List<SideInputParameter> getSideInputParameters() {
return extraParameters().stream()
.filter(Predicates.instanceOf(SideInputParameter.class)::apply)
.map(SideInputParameter.class::cast)
.collect(Collectors.toList());
}

/** The {@link OutputReceiverParameter} for a main output, or null if there is none. */
@Nullable
public OutputReceiverParameter getMainOutputReceiver() {
Expand Down
Loading

0 comments on commit a9b5ee8

Please sign in to comment.