From 8d758278b418092f6727dc0602cb254822e546ad Mon Sep 17 00:00:00 2001 From: zhilingc Date: Mon, 4 Nov 2019 18:03:16 +0800 Subject: [PATCH 1/3] 1. Refactor graph to only have a single read from source 2. Move validation outside of read from source --- .../main/java/feast/ingestion/ImportJob.java | 85 +++++++++---- .../ingestion/transform/ReadFromSource.java | 37 ++---- .../transform/ValidateFeatureRows.java | 59 +++++++++ .../fn/KafkaRecordToFeatureRowDoFn.java | 69 ++-------- .../transform/fn/ValidateFeatureRowDoFn.java | 97 ++++++++++++++ .../transform/metrics/WindowRecords.java | 1 + .../metrics/WriteMetricsTransform.java | 8 +- .../bigquery/FeatureRowToTableRowDoFn.java | 1 + .../fn/KafkaRecordToFeatureRowDoFnTest.java | 119 ------------------ 9 files changed, 244 insertions(+), 232 deletions(-) create mode 100644 ingestion/src/main/java/feast/ingestion/transform/ValidateFeatureRows.java create mode 100644 ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java delete mode 100644 ingestion/src/test/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFnTest.java diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 66d908185f..2816cd7a83 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -2,9 +2,11 @@ import com.google.protobuf.InvalidProtocolBufferException; import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.SourceProto.Source; import feast.core.StoreProto.Store; import feast.ingestion.options.ImportOptions; import feast.ingestion.transform.ReadFromSource; +import feast.ingestion.transform.ValidateFeatureRows; import feast.ingestion.transform.WriteFailedElementToBigQuery; import feast.ingestion.transform.WriteToStore; import feast.ingestion.transform.metrics.WriteMetricsTransform; @@ -14,20 +16,26 @@ import feast.ingestion.values.FailedElement; import feast.types.FeatureRowProto.FeatureRow; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; public class ImportJob { + // Tag for main output containing Feature Row that has been successfully processed. - private static final TupleTag FEATURE_ROW_OUT = new TupleTag() {}; + private static final TupleTag FEATURE_ROW_OUT = new TupleTag() { + }; // Tag for deadletter output containing elements and error messages from invalid input/transform. - private static final TupleTag DEADLETTER_OUT = new TupleTag() {}; + private static final TupleTag DEADLETTER_OUT = new TupleTag() { + }; private static final Logger log = org.slf4j.LoggerFactory.getLogger(ImportJob.class); /** @@ -46,14 +54,17 @@ public static PipelineResult runPipeline(ImportOptions options) /* * Steps: * 1. Read messages from Feast Source as FeatureRow - * 2. Write FeatureRow to the corresponding Store - * 3. Write elements that failed to be processed to a dead letter queue. - * 4. Write metrics to a metrics sink + * 2. Validate the feature rows to ensure the schema matches what is registered to the system + * 3. Write FeatureRow to the corresponding Store + * 4. Write elements that failed to be processed to a dead letter queue. + * 5. Write metrics to a metrics sink */ PipelineOptionsValidator.validate(ImportOptions.class, options); Pipeline pipeline = Pipeline.create(options); + log.info("Starting import job with settings: \n{}", options.toString()); + List featureSetSpecs = SpecUtil.parseFeatureSetSpecJsonList(options.getFeatureSetSpecJson()); List stores = SpecUtil.parseStoreJsonList(options.getStoreJson()); @@ -62,48 +73,74 @@ public static PipelineResult runPipeline(ImportOptions options) List subscribedFeatureSets = SpecUtil.getSubscribedFeatureSets(store.getSubscriptionsList(), featureSetSpecs); + // Generate tags by key + Map> featureSetTagsByKey = subscribedFeatureSets.stream() + .map(fs -> { + String id = String.format("%s:%s", fs.getName(), fs.getVersion()); + return Pair.of(id, new TupleTag(id) { + }); + }) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + + // TODO: make the source part of the job initialisation options + Source source = subscribedFeatureSets.get(0).getSource(); + + // Step 1. Read messages from Feast Source as FeatureRow. + PCollectionTuple convertedFeatureRows = + pipeline.apply( + "ReadFeatureRowFromSource", + ReadFromSource.newBuilder() + .setSource(source) + .setFeatureSetTagByKey(featureSetTagsByKey) + .setFailureTag(DEADLETTER_OUT) + .build()); + for (FeatureSetSpec featureSet : subscribedFeatureSets) { // Ensure Store has valid configuration and Feast can access it. StoreUtil.setupStore(store, featureSet); + String id = String.format("%s:%s", featureSet.getName(), featureSet.getVersion()); - // Step 1. Read messages from Feast Source as FeatureRow. - PCollectionTuple convertedFeatureRows = - pipeline.apply( - "ReadFeatureRowFromSource", - ReadFromSource.newBuilder() - .setSource(featureSet.getSource()) - .setFieldByName(SpecUtil.getFieldByName(featureSet)) - .setFeatureSetName(featureSet.getName()) - .setFeatureSetVersion(featureSet.getVersion()) - .setSuccessTag(FEATURE_ROW_OUT) - .setFailureTag(DEADLETTER_OUT) - .build()); - - // Step 2. Write FeatureRow to the corresponding Store. - convertedFeatureRows + // Step 2. Validate incoming FeatureRows + PCollectionTuple validatedRows = convertedFeatureRows + .get(featureSetTagsByKey.get(id)) + .apply(ValidateFeatureRows.newBuilder() + .setFeatureSetSpec(featureSet) + .setSuccessTag(FEATURE_ROW_OUT) + .setFailureTag(DEADLETTER_OUT).build()); + + // Step 3. Write FeatureRow to the corresponding Store. + validatedRows .get(FEATURE_ROW_OUT) .apply( "WriteFeatureRowToStore", WriteToStore.newBuilder().setFeatureSetSpec(featureSet).setStore(store).build()); - // Step 3. Write FailedElements to a dead letter table in BigQuery. + // Step 4. Write FailedElements to a dead letter table in BigQuery. if (options.getDeadLetterTableSpec() != null) { convertedFeatureRows .get(DEADLETTER_OUT) .apply( - "WriteFailedElements", + "WriteFailedElements_ReadFromSource", + WriteFailedElementToBigQuery.newBuilder() + .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) + .setTableSpec(options.getDeadLetterTableSpec()) + .build()); + + validatedRows + .get(DEADLETTER_OUT) + .apply("WriteFailedElements_ValidateRows", WriteFailedElementToBigQuery.newBuilder() .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) .setTableSpec(options.getDeadLetterTableSpec()) .build()); } - // Step 4. Write metrics to a metrics sink. + // Step 5. Write metrics to a metrics sink. convertedFeatureRows .apply("WriteMetrics", WriteMetricsTransform.newBuilder() .setFeatureSetSpec(featureSet) .setStoreName(store.getName()) - .setSuccessTag(FEATURE_ROW_OUT) + .setFeatureSetTag(featureSetTagsByKey.get(id)) .setFailureTag(DEADLETTER_OUT) .build()); } diff --git a/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java b/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java index 637904d419..012823f84f 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java @@ -2,20 +2,15 @@ import com.google.auto.value.AutoValue; import com.google.common.base.Preconditions; -import com.google.protobuf.InvalidProtocolBufferException; +import com.google.common.collect.Lists; import feast.core.SourceProto.Source; import feast.core.SourceProto.SourceType; import feast.ingestion.transform.fn.KafkaRecordToFeatureRowDoFn; import feast.ingestion.values.FailedElement; -import feast.ingestion.values.Field; import feast.types.FeatureRowProto.FeatureRow; -import feast.types.FieldProto; -import feast.types.ValueProto.Value.ValCase; -import java.util.Base64; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.io.kafka.KafkaRecord; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; @@ -23,20 +18,13 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.apache.commons.lang3.exception.ExceptionUtils; @AutoValue public abstract class ReadFromSource extends PTransform { public abstract Source getSource(); - public abstract Map getFieldByName(); - - public abstract String getFeatureSetName(); - - public abstract int getFeatureSetVersion(); - - public abstract TupleTag getSuccessTag(); + public abstract Map> getFeatureSetTagByKey(); public abstract TupleTag getFailureTag(); @@ -49,13 +37,8 @@ public abstract static class Builder { public abstract Builder setSource(Source source); - public abstract Builder setFeatureSetName(String featureSetName); - - public abstract Builder setFeatureSetVersion(int featureSetVersion); - - public abstract Builder setFieldByName(Map fieldByName); - - public abstract Builder setSuccessTag(TupleTag successTag); + public abstract Builder setFeatureSetTagByKey( + Map> featureSetTagByKey); public abstract Builder setFailureTag(TupleTag failureTag); @@ -93,13 +76,13 @@ public PCollectionTuple expand(PBegin input) { .commitOffsetsInFinalize()) .apply( "KafkaRecordToFeatureRow", ParDo.of(KafkaRecordToFeatureRowDoFn.newBuilder() - .setFeatureSetName(getFeatureSetName()) - .setFeatureSetVersion(getFeatureSetVersion()) - .setFieldByName(getFieldByName()) - .setSuccessTag(getSuccessTag()) + .setFeatureSetTagByKey(getFeatureSetTagByKey()) .setFailureTag(getFailureTag()) .build()) - .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag()))); + .withOutputTags(new TupleTag("placeholder") {}, + TupleTagList.of(Lists + .newArrayList(getFeatureSetTagByKey().values())) + .and(getFailureTag()))); } private String generateConsumerGroupId(String jobName) { diff --git a/ingestion/src/main/java/feast/ingestion/transform/ValidateFeatureRows.java b/ingestion/src/main/java/feast/ingestion/transform/ValidateFeatureRows.java new file mode 100644 index 0000000000..b026a4fd52 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/ValidateFeatureRows.java @@ -0,0 +1,59 @@ +package feast.ingestion.transform; + +import com.google.auto.value.AutoValue; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.ingestion.transform.fn.ValidateFeatureRowDoFn; +import feast.ingestion.utils.SpecUtil; +import feast.ingestion.values.FailedElement; +import feast.ingestion.values.Field; +import feast.types.FeatureRowProto.FeatureRow; +import java.util.Map; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +@AutoValue +public abstract class ValidateFeatureRows extends + PTransform, PCollectionTuple> { + + public abstract FeatureSetSpec getFeatureSetSpec(); + + public abstract TupleTag getSuccessTag(); + + public abstract TupleTag getFailureTag(); + + public static Builder newBuilder() { + return new AutoValue_ValidateFeatureRows.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); + + public abstract Builder setSuccessTag(TupleTag successTag); + + public abstract Builder setFailureTag(TupleTag failureTag); + + public abstract ValidateFeatureRows build(); + } + + @Override + public PCollectionTuple expand(PCollection input) { + Map fieldsByName = SpecUtil + .getFieldByName(getFeatureSetSpec()); + + return input.apply("ValidateFeatureRows", + ParDo.of(ValidateFeatureRowDoFn.newBuilder() + .setFeatureSetName(getFeatureSetSpec().getName()) + .setFeatureSetVersion(getFeatureSetSpec().getVersion()) + .setFieldByName(fieldsByName) + .setSuccessTag(getSuccessTag()) + .setFailureTag(getFailureTag()) + .build()) + .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag()))); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFn.java index 5f160c0dc5..9b43a5ade8 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFn.java @@ -2,6 +2,8 @@ import com.google.auto.value.AutoValue; import com.google.protobuf.InvalidProtocolBufferException; +import feast.ingestion.transform.ReadFromSource; +import feast.ingestion.transform.ReadFromSource.Builder; import feast.ingestion.values.FailedElement; import feast.ingestion.values.Field; import feast.types.FeatureRowProto.FeatureRow; @@ -17,14 +19,7 @@ @AutoValue public abstract class KafkaRecordToFeatureRowDoFn extends DoFn, FeatureRow> { - - public abstract String getFeatureSetName(); - - public abstract int getFeatureSetVersion(); - - public abstract Map getFieldByName(); - - public abstract TupleTag getSuccessTag(); + public abstract Map> getFeatureSetTagByKey(); public abstract TupleTag getFailureTag(); @@ -35,13 +30,7 @@ public static KafkaRecordToFeatureRowDoFn.Builder newBuilder() { @AutoValue.Builder public abstract static class Builder { - public abstract Builder setFeatureSetName(String featureSetName); - - public abstract Builder setFeatureSetVersion(int featureSetVersion); - - public abstract Builder setFieldByName(Map fieldByName); - - public abstract Builder setSuccessTag(TupleTag successTag); + public abstract Builder setFeatureSetTagByKey(Map> featureSetTagByKey); public abstract Builder setFailureTag(TupleTag failureTag); @@ -67,55 +56,19 @@ public void processElement(ProcessContext context) { .build()); return; } - - // If FeatureRow contains field names that do not exist as EntitySpec - // or FeatureSpec in FeatureSetSpec, mark the FeatureRow as FailedElement. - String error = null; - String featureSetId = String.format("%s:%d", getFeatureSetName(), getFeatureSetVersion()); - if (featureRow.getFeatureSet().equals(featureSetId)) { - - for (FieldProto.Field field : featureRow.getFieldsList()) { - if (!getFieldByName().containsKey(field.getName())) { - error = - String.format( - "FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.", - field.getName(), getFeatureSetName(), getFeatureSetVersion()); - break; - } - // If value is set in the FeatureRow, make sure the value type matches - // that defined in FeatureSetSpec - if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { - int expectedTypeFieldNumber = - getFieldByName().get(field.getName()).getType().getNumber(); - int actualTypeFieldNumber = field.getValue().getValCase().getNumber(); - if (expectedTypeFieldNumber != actualTypeFieldNumber) { - error = - String.format( - "FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.", - field.getName(), - field.getValue().getValCase(), - getFieldByName().get(field.getName()).getType()); - break; - } - } - } - } else { - error = String.format( - "FeatureRow contains invalid feature set id %s. Please check that the feature rows are being published to the correct topic on the feature stream.", - featureSetId); - } - - if (error != null) { + TupleTag tag = getFeatureSetTagByKey() + .getOrDefault(featureRow.getFeatureSet(), null); + if (tag == null) { context.output( getFailureTag(), FailedElement.newBuilder() .setTransformName("KafkaRecordToFeatureRow") .setJobName(context.getPipelineOptions().getJobName()) - .setPayload(featureRow.toString()) - .setErrorMessage(error) + .setPayload(new String(Base64.getEncoder().encode(value))) + .setErrorMessage(String.format("Got row with unexpected feature set id %s. Expected one of %s.", featureRow.getFeatureSet(), getFeatureSetTagByKey().keySet())) .build()); - } else { - context.output(featureRow); + return; } + context.output(tag, featureRow); } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java new file mode 100644 index 0000000000..2906e220b2 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java @@ -0,0 +1,97 @@ +package feast.ingestion.transform.fn; + +import com.google.auto.value.AutoValue; +import feast.ingestion.values.FailedElement; +import feast.ingestion.values.Field; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto; +import feast.types.ValueProto.Value.ValCase; +import java.util.Map; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.TupleTag; + +@AutoValue +public abstract class ValidateFeatureRowDoFn extends DoFn { + + public abstract String getFeatureSetName(); + + public abstract int getFeatureSetVersion(); + + public abstract Map getFieldByName(); + + public abstract TupleTag getSuccessTag(); + + public abstract TupleTag getFailureTag(); + + public static Builder newBuilder() { + return new AutoValue_ValidateFeatureRowDoFn.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setFeatureSetName(String featureSetName); + + public abstract Builder setFeatureSetVersion(int featureSetVersion); + + public abstract Builder setFieldByName(Map fieldByName); + + public abstract Builder setSuccessTag(TupleTag successTag); + + public abstract Builder setFailureTag(TupleTag failureTag); + + public abstract ValidateFeatureRowDoFn build(); + } + + + @ProcessElement + public void processElement(ProcessContext context) { + String error = null; + String featureSetId = String.format("%s:%d", getFeatureSetName(), getFeatureSetVersion()); + FeatureRow featureRow = context.element(); + if (featureRow.getFeatureSet().equals(featureSetId)) { + + for (FieldProto.Field field : featureRow.getFieldsList()) { + if (!getFieldByName().containsKey(field.getName())) { + error = + String.format( + "FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.", + field.getName(), getFeatureSetName(), getFeatureSetVersion()); + break; + } + // If value is set in the FeatureRow, make sure the value type matches + // that defined in FeatureSetSpec + if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { + int expectedTypeFieldNumber = + getFieldByName().get(field.getName()).getType().getNumber(); + int actualTypeFieldNumber = field.getValue().getValCase().getNumber(); + if (expectedTypeFieldNumber != actualTypeFieldNumber) { + error = + String.format( + "FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.", + field.getName(), + field.getValue().getValCase(), + getFieldByName().get(field.getName()).getType()); + break; + } + } + } + } else { + error = String.format( + "FeatureRow contains invalid feature set id %s. Please check that the feature rows are being published to the correct topic on the feature stream.", + featureSetId); + } + + if (error != null) { + context.output( + getFailureTag(), + FailedElement.newBuilder() + .setTransformName("ValidateFeatureRow") + .setJobName(context.getPipelineOptions().getJobName()) + .setPayload(featureRow.toString()) + .setErrorMessage(error) + .build()); + } else { + context.output(getSuccessTag(), featureRow); + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WindowRecords.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WindowRecords.java index fe82c6950d..4796c83603 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WindowRecords.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WindowRecords.java @@ -4,6 +4,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java index d0f31bbf66..c9caa59917 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java @@ -24,7 +24,7 @@ public abstract class WriteMetricsTransform extends PTransform getSuccessTag(); + public abstract TupleTag getFeatureSetTag(); public abstract TupleTag getFailureTag(); @@ -39,7 +39,7 @@ public abstract static class Builder { public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); - public abstract Builder setSuccessTag(TupleTag successTag); + public abstract Builder setFeatureSetTag(TupleTag featureSetTag); public abstract Builder setFailureTag(TupleTag failureTag); @@ -63,7 +63,7 @@ public PDone expand(PCollectionTuple input) { .setStoreName(getStoreName()) .build())); - input.get(getSuccessTag()) + input.get(getFeatureSetTag()) .apply("Window records", new WindowRecords<>(WINDOW_SIZE_SECONDS)) .apply("Write row metrics", ParDo @@ -76,7 +76,7 @@ public PDone expand(PCollectionTuple input) { return PDone.in(input.getPipeline()); case "none": default: - input.get(getSuccessTag()).apply("Noop", + input.get(getFeatureSetTag()).apply("Noop", ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { diff --git a/ingestion/src/main/java/feast/store/serving/bigquery/FeatureRowToTableRowDoFn.java b/ingestion/src/main/java/feast/store/serving/bigquery/FeatureRowToTableRowDoFn.java index 11f91e412c..a2ac738e01 100644 --- a/ingestion/src/main/java/feast/store/serving/bigquery/FeatureRowToTableRowDoFn.java +++ b/ingestion/src/main/java/feast/store/serving/bigquery/FeatureRowToTableRowDoFn.java @@ -32,6 +32,7 @@ public void processElement(@Element FeatureRow featureRow, OutputReceiver FEATURE_ROW_OUT = new TupleTag() { - }; - - private static final TupleTag DEADLETTER_OUT = new TupleTag() { - }; - - @Rule - public transient TestPipeline p = TestPipeline.create(); - - - @Test - public void shouldOutputInvalidRowsWithFailureTag() { - FeatureRow frWithStringVal = dummyFeatureRow("invalid:1", "field1", "field2") - .toBuilder() - .setFields(0, FieldProto.Field.newBuilder() - .setName("field1").setValue(Value.newBuilder() - .setStringVal("hi").build())) - .build(); - - Values> featureRows = Create - .of(kafkaRecordOf(dummyFeatureRow("invalid:1", "field1", "field2")), // invalid featureset name - kafkaRecordOf(frWithStringVal), // invalid field type - kafkaRecordOf(dummyFeatureRow("valid:1", "field1", "field2", "field3"))) // invalid fields - .withCoder(KafkaRecordCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of())); - - HashMap fieldByName = new HashMap<>(); - fieldByName.put("field1", new Field("field1", Enum.INT64)); - fieldByName.put("field2", new Field("field2", Enum.INT64)); - - PCollectionTuple output = p.apply(featureRows) - .apply(ParDo.of(KafkaRecordToFeatureRowDoFn.newBuilder() - .setFieldByName(fieldByName) - .setSuccessTag(FEATURE_ROW_OUT) - .setSuccessTag(FEATURE_ROW_OUT) - .setFailureTag(DEADLETTER_OUT) - .setFeatureSetName("valid") - .setFeatureSetVersion(1) - .build()).withOutputTags(FEATURE_ROW_OUT, TupleTagList.of(DEADLETTER_OUT))); - - PAssert.that(output.get(FEATURE_ROW_OUT)).empty(); - PAssert.thatSingleton(output.get(DEADLETTER_OUT).apply(Count.globally())).isEqualTo(3L); - p.run(); - } - - @Test - public void shouldOutputValidRowsWithSuccessTag() { - Values> featureRows = Create - .of(kafkaRecordOf(dummyFeatureRow("valid:1", "field1", "field2")), - kafkaRecordOf(dummyFeatureRow("valid:1", "field1", "field2")), - kafkaRecordOf(dummyFeatureRow("valid:1", "field1", "field2"))) - .withCoder(KafkaRecordCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of())); - - HashMap fieldByName = new HashMap<>(); - fieldByName.put("field1", new Field("field1", Enum.INT64)); - fieldByName.put("field2", new Field("field2", Enum.INT64)); - - PCollectionTuple output = p.apply(featureRows) - .apply(ParDo.of(KafkaRecordToFeatureRowDoFn.newBuilder() - .setFieldByName(fieldByName) - .setSuccessTag(FEATURE_ROW_OUT) - .setFailureTag(DEADLETTER_OUT) - .setFeatureSetName("valid") - .setFeatureSetVersion(1) - .build()).withOutputTags(FEATURE_ROW_OUT, TupleTagList.of(DEADLETTER_OUT))); - - - PAssert.that(output.get(DEADLETTER_OUT)).empty(); - PAssert.thatSingleton(output.get(FEATURE_ROW_OUT).apply(Count.globally())).isEqualTo(3L); - p.run(); - } - - private FeatureRow dummyFeatureRow(String featureSet, String... fieldNames) { - Builder builder = FeatureRow.newBuilder().setFeatureSet(featureSet); - for (String fieldName : fieldNames) { - builder.addFields(FieldProto.Field.newBuilder() - .setName(fieldName) - .setValue(Value.newBuilder() - .setInt64Val(1).build()).build()); - } - return builder.build(); - } - - private KafkaRecord kafkaRecordOf(FeatureRow featureRow) { - ConsumerRecord cr = new ConsumerRecord("", 1, 1, new byte[0], - featureRow.toByteArray()); - return new KafkaRecord(cr.topic(), cr.partition(), cr.offset(), 0, - KafkaTimestampType.NO_TIMESTAMP_TYPE, cr.headers(), cr.key(), cr.value()); - } -} \ No newline at end of file From 2b197f8bd8b67a3ae22dc325821bfd0a9a036971 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Mon, 4 Nov 2019 19:07:39 +0800 Subject: [PATCH 2/3] Use post-validation transform for metrics --- ingestion/src/main/java/feast/ingestion/ImportJob.java | 4 ++-- .../transform/metrics/WriteMetricsTransform.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 2816cd7a83..78ad328846 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -136,11 +136,11 @@ public static PipelineResult runPipeline(ImportOptions options) } // Step 5. Write metrics to a metrics sink. - convertedFeatureRows + validatedRows .apply("WriteMetrics", WriteMetricsTransform.newBuilder() .setFeatureSetSpec(featureSet) .setStoreName(store.getName()) - .setFeatureSetTag(featureSetTagsByKey.get(id)) + .setSuccessTag(FEATURE_ROW_OUT) .setFailureTag(DEADLETTER_OUT) .build()); } diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java index c9caa59917..d0f31bbf66 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java @@ -24,7 +24,7 @@ public abstract class WriteMetricsTransform extends PTransform getFeatureSetTag(); + public abstract TupleTag getSuccessTag(); public abstract TupleTag getFailureTag(); @@ -39,7 +39,7 @@ public abstract static class Builder { public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); - public abstract Builder setFeatureSetTag(TupleTag featureSetTag); + public abstract Builder setSuccessTag(TupleTag successTag); public abstract Builder setFailureTag(TupleTag failureTag); @@ -63,7 +63,7 @@ public PDone expand(PCollectionTuple input) { .setStoreName(getStoreName()) .build())); - input.get(getFeatureSetTag()) + input.get(getSuccessTag()) .apply("Window records", new WindowRecords<>(WINDOW_SIZE_SECONDS)) .apply("Write row metrics", ParDo @@ -76,7 +76,7 @@ public PDone expand(PCollectionTuple input) { return PDone.in(input.getPipeline()); case "none": default: - input.get(getFeatureSetTag()).apply("Noop", + input.get(getSuccessTag()).apply("Noop", ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { From 2728ef0797d486d79d91e90abe45325cf7725d6e Mon Sep 17 00:00:00 2001 From: Chen Zhiling Date: Mon, 4 Nov 2019 19:12:04 +0800 Subject: [PATCH 3/3] Tidy up code Co-Authored-By: Yu-Xi Lim --- ingestion/src/main/java/feast/ingestion/ImportJob.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 78ad328846..d919a18ac5 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -106,7 +106,8 @@ public static PipelineResult runPipeline(ImportOptions options) .apply(ValidateFeatureRows.newBuilder() .setFeatureSetSpec(featureSet) .setSuccessTag(FEATURE_ROW_OUT) - .setFailureTag(DEADLETTER_OUT).build()); + .setFailureTag(DEADLETTER_OUT) + .build()); // Step 3. Write FeatureRow to the corresponding Store. validatedRows