Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Beam-6858] Validate that side-input parameters match the type of the PCollectionView #9372

Merged
merged 6 commits into from
Sep 7, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PCollectionViews.TypeDescriptorSupplier;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matchers;
import org.junit.Test;
Expand All @@ -50,12 +52,16 @@ public class CreatePCollectionViewTranslationTest {
CreatePCollectionView.of(
PCollectionViews.singletonView(
testPCollection,
(TypeDescriptorSupplier<String>) () -> TypeDescriptors.strings(),
testPCollection.getWindowingStrategy(),
false,
null,
StringUtf8Coder.of())),
CreatePCollectionView.of(
PCollectionViews.listView(testPCollection, testPCollection.getWindowingStrategy())));
PCollectionViews.listView(
testPCollection,
(TypeDescriptorSupplier<String>) () -> TypeDescriptors.strings(),
testPCollection.getWindowingStrategy())));
}

@Parameter(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -62,6 +63,11 @@ public Object apply(Object o) {
throw new UnsupportedOperationException();
}

@Override
public TypeDescriptor<Object> getTypeDescriptor() {
return new TypeDescriptor<Object>() {};
}

@Override
public boolean equals(Object obj) {
return obj instanceof TestViewFn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -405,7 +407,8 @@ public void createViewWithViewFnDifferentViewFn() {
PCollectionView<Iterable<Integer>> view = input.apply(View.asIterable());

// Purposely create a subclass to get a different class then what was expected.
ViewFn<?, ?> viewFn = new PCollectionViews.IterableViewFn() {};
IterableViewFn<Integer> viewFn =
new PCollectionViews.IterableViewFn<Integer>(() -> TypeDescriptors.integers()) {};
CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);

PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -805,7 +807,11 @@ private <T> void translateTestStream(
new LinkedHashMap<>();
// for PCollectionView compatibility, not used to transform materialization
ViewFn<Iterable<WindowedValue<?>>, ?> viewFn =
(ViewFn) new PCollectionViews.MultimapViewFn<Iterable<WindowedValue<Void>>, Void>();
(ViewFn)
new PCollectionViews.MultimapViewFn<>(
(PCollectionViews.TypeDescriptorSupplier<Iterable<WindowedValue<Void>>>)
() -> TypeDescriptors.iterables(new TypeDescriptor<WindowedValue<Void>>() {}),
(PCollectionViews.TypeDescriptorSupplier<Void>) TypeDescriptors::voids);

for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId :
stagePayload.getSideInputsList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
Expand Down Expand Up @@ -105,6 +106,11 @@ public Materialization<MultimapView<K, V>> getMaterialization() {
public MultimapView<K, V> apply(MultimapView<K, V> o) {
return o;
}

@Override
public TypeDescriptor<MultimapView<K, V>> getTypeDescriptor() {
throw new UnsupportedOperationException();
}
};

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PCollectionViews.TypeDescriptorSupplier;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
Expand Down Expand Up @@ -1305,9 +1306,12 @@ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
PCollection<KV<Void, OutputT>> materializationInput =
combined.apply(new VoidKeyToMultimapMaterialization<>());
Coder<OutputT> outputCoder = combined.getCoder();
PCollectionView<OutputT> view =
PCollectionViews.singletonView(
materializationInput,
(TypeDescriptorSupplier<OutputT>)
() -> outputCoder != null ? outputCoder.getEncodedTypeDescriptor() : null,
input.getWindowingStrategy(),
insertDefault,
insertDefault ? fn.defaultValue() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.MethodWithExtraParameters;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SchemaElementParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SideInputParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
Expand Down Expand Up @@ -437,6 +438,29 @@ private static void validateStateApplicableForInput(DoFn<?, ?> fn, PCollection<?
}
}

private static void validateSideInputTypes(
Map<String, PCollectionView<?>> sideInputs, DoFn<?, ?> fn) {
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
DoFnSignature.ProcessElementMethod processElementMethod = signature.processElement();
for (SideInputParameter sideInput : processElementMethod.getSideInputParameters()) {
PCollectionView<?> view = sideInputs.get(sideInput.sideInputId());
checkArgument(
view != null,
"the ProcessElement method expects a side input identified with the tag %s, but no such side input was"
+ " supplied. Use withSideInput(String, PCollectionView) to supply this side input.",
sideInput.sideInputId());
TypeDescriptor<?> viewType = view.getViewFn().getTypeDescriptor();

// Currently check that the types exactly match, even if the types are convertible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there plans to handle convertible types? Should there be a JIRA thats linked here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the same issue with Element parameters. I'll check to see if we have a JIRA for this (I think a user actually filed one some time back).

checkArgument(
viewType.equals(sideInput.elementT()),
"Side Input with tag %s and type %s cannot be bound to ProcessElement parameter with type %s",
sideInput.sideInputId(),
viewType,
sideInput.elementT());
}
}

private static FieldAccessDescriptor getFieldAccessDescriptorFromParameter(
@Nullable String fieldAccessString,
Schema inputSchema,
Expand Down Expand Up @@ -865,6 +889,8 @@ public PCollectionTuple expand(PCollection<? extends InputT> input) {
validateStateApplicableForInput(fn, input);
}

validateSideInputTypes(sideInputs, fn);

// TODO: We should validate OutputReceiver<Row> only happens if the output PCollection
// as schema. However coder/schema inference may not have happened yet at this point.
// Need to figure out where to validate this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PCollectionViews.TypeDescriptorSupplier;

/**
* Transforms for creating {@link PCollectionView PCollectionViews} from {@link PCollection
Expand Down Expand Up @@ -233,9 +234,12 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {

PCollection<KV<Void, T>> materializationInput =
input.apply(new VoidKeyToMultimapMaterialization<>());
Coder<T> inputCoder = input.getCoder();
PCollectionView<List<T>> view =
PCollectionViews.listView(
materializationInput, materializationInput.getWindowingStrategy());
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
Expand Down Expand Up @@ -263,9 +267,12 @@ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {

PCollection<KV<Void, T>> materializationInput =
input.apply(new VoidKeyToMultimapMaterialization<>());
Coder<T> inputCoder = input.getCoder();
PCollectionView<Iterable<T>> view =
PCollectionViews.iterableView(
materializationInput, materializationInput.getWindowingStrategy());
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
Expand Down Expand Up @@ -402,11 +409,17 @@ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input)
throw new IllegalStateException("Unable to create a side-input view from input", e);
}

KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
Coder<K> keyCoder = kvCoder.getKeyCoder();
Coder<V> valueCoder = kvCoder.getValueCoder();
PCollection<KV<Void, KV<K, V>>> materializationInput =
input.apply(new VoidKeyToMultimapMaterialization<>());
PCollectionView<Map<K, Iterable<V>>> view =
PCollectionViews.multimapView(
materializationInput, materializationInput.getWindowingStrategy());
materializationInput,
(TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
(TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
Expand Down Expand Up @@ -438,11 +451,18 @@ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}

KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
Coder<K> keyCoder = kvCoder.getKeyCoder();
Coder<V> valueCoder = kvCoder.getValueCoder();

PCollection<KV<Void, KV<K, V>>> materializationInput =
input.apply(new VoidKeyToMultimapMaterialization<>());
PCollectionView<Map<K, V>> view =
PCollectionViews.mapView(
materializationInput, materializationInput.getWindowingStrategy());
materializationInput,
(TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
(TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
Expand All @@ -45,4 +46,7 @@ public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {

/** A function to adapt a primitive view type to a desired view type. */
public abstract ViewT apply(PrimitiveViewT primitiveViewT);

/** Return the {@link TypeDescriptor} describing the output of this fn. */
public abstract TypeDescriptor<ViewT> getTypeDescriptor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SchemaElementParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SideInputParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
Expand Down Expand Up @@ -760,6 +761,14 @@ public List<SchemaElementParameter> getSchemaElementParameters() {
.collect(Collectors.toList());
}

@Nullable
public List<SideInputParameter> getSideInputParameters() {
return extraParameters().stream()
.filter(Predicates.instanceOf(SideInputParameter.class)::apply)
.map(SideInputParameter.class::cast)
.collect(Collectors.toList());
}

/** The {@link OutputReceiverParameter} for a main output, or null if there is none. */
@Nullable
public OutputReceiverParameter getMainOutputReceiver() {
Expand Down
Loading