Skip to content

Commit

Permalink
Zl/ingestion fixes (#286)
Browse files Browse the repository at this point in the history
* 1. Refactor graph to only have a single read from source
2. Move validation outside of read from source

* Use post-validation transform for metrics

* Tidy up code

Co-Authored-By: Yu-Xi Lim <thirteen37@users.noreply.github.com>
  • Loading branch information
2 people authored and feast-ci-bot committed Nov 4, 2019
1 parent f3b1ce7 commit 79eb4ab
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 228 deletions.
86 changes: 62 additions & 24 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.SourceProto.Source;
import feast.core.StoreProto.Store;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.ValidateFeatureRows;
import feast.ingestion.transform.WriteFailedElementToBigQuery;
import feast.ingestion.transform.WriteToStore;
import feast.ingestion.transform.metrics.WriteMetricsTransform;
Expand All @@ -14,20 +16,26 @@
import feast.ingestion.values.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

public class ImportJob {

// Tag for main output containing Feature Row that has been successfully processed.
private static final TupleTag<FeatureRow> FEATURE_ROW_OUT = new TupleTag<FeatureRow>() {};
private static final TupleTag<FeatureRow> FEATURE_ROW_OUT = new TupleTag<FeatureRow>() {
};

// Tag for deadletter output containing elements and error messages from invalid input/transform.
private static final TupleTag<FailedElement> DEADLETTER_OUT = new TupleTag<FailedElement>() {};
private static final TupleTag<FailedElement> DEADLETTER_OUT = new TupleTag<FailedElement>() {
};
private static final Logger log = org.slf4j.LoggerFactory.getLogger(ImportJob.class);

/**
Expand All @@ -46,14 +54,17 @@ public static PipelineResult runPipeline(ImportOptions options)
/*
* Steps:
* 1. Read messages from Feast Source as FeatureRow
* 2. Write FeatureRow to the corresponding Store
* 3. Write elements that failed to be processed to a dead letter queue.
* 4. Write metrics to a metrics sink
* 2. Validate the feature rows to ensure the schema matches what is registered to the system
* 3. Write FeatureRow to the corresponding Store
* 4. Write elements that failed to be processed to a dead letter queue.
* 5. Write metrics to a metrics sink
*/

PipelineOptionsValidator.validate(ImportOptions.class, options);
Pipeline pipeline = Pipeline.create(options);

log.info("Starting import job with settings: \n{}", options.toString());

List<FeatureSetSpec> featureSetSpecs =
SpecUtil.parseFeatureSetSpecJsonList(options.getFeatureSetSpecJson());
List<Store> stores = SpecUtil.parseStoreJsonList(options.getStoreJson());
Expand All @@ -62,44 +73,71 @@ public static PipelineResult runPipeline(ImportOptions options)
List<FeatureSetSpec> subscribedFeatureSets =
SpecUtil.getSubscribedFeatureSets(store.getSubscriptionsList(), featureSetSpecs);

// Generate tags by key
Map<String, TupleTag<FeatureRow>> featureSetTagsByKey = subscribedFeatureSets.stream()
.map(fs -> {
String id = String.format("%s:%s", fs.getName(), fs.getVersion());
return Pair.of(id, new TupleTag<FeatureRow>(id) {
});
})
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

// TODO: make the source part of the job initialisation options
Source source = subscribedFeatureSets.get(0).getSource();

// Step 1. Read messages from Feast Source as FeatureRow.
PCollectionTuple convertedFeatureRows =
pipeline.apply(
"ReadFeatureRowFromSource",
ReadFromSource.newBuilder()
.setSource(source)
.setFeatureSetTagByKey(featureSetTagsByKey)
.setFailureTag(DEADLETTER_OUT)
.build());

for (FeatureSetSpec featureSet : subscribedFeatureSets) {
// Ensure Store has valid configuration and Feast can access it.
StoreUtil.setupStore(store, featureSet);
String id = String.format("%s:%s", featureSet.getName(), featureSet.getVersion());

// Step 2. Validate incoming FeatureRows
PCollectionTuple validatedRows = convertedFeatureRows
.get(featureSetTagsByKey.get(id))
.apply(ValidateFeatureRows.newBuilder()
.setFeatureSetSpec(featureSet)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());

// Step 1. Read messages from Feast Source as FeatureRow.
PCollectionTuple convertedFeatureRows =
pipeline.apply(
"ReadFeatureRowFromSource",
ReadFromSource.newBuilder()
.setSource(featureSet.getSource())
.setFieldByName(SpecUtil.getFieldByName(featureSet))
.setFeatureSetName(featureSet.getName())
.setFeatureSetVersion(featureSet.getVersion())
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());

// Step 2. Write FeatureRow to the corresponding Store.
convertedFeatureRows
// Step 3. Write FeatureRow to the corresponding Store.
validatedRows
.get(FEATURE_ROW_OUT)
.apply(
"WriteFeatureRowToStore",
WriteToStore.newBuilder().setFeatureSetSpec(featureSet).setStore(store).build());

// Step 3. Write FailedElements to a dead letter table in BigQuery.
// Step 4. Write FailedElements to a dead letter table in BigQuery.
if (options.getDeadLetterTableSpec() != null) {
convertedFeatureRows
.get(DEADLETTER_OUT)
.apply(
"WriteFailedElements",
"WriteFailedElements_ReadFromSource",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());

validatedRows
.get(DEADLETTER_OUT)
.apply("WriteFailedElements_ValidateRows",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
}

// Step 4. Write metrics to a metrics sink.
convertedFeatureRows
// Step 5. Write metrics to a metrics sink.
validatedRows
.apply("WriteMetrics", WriteMetricsTransform.newBuilder()
.setFeatureSetSpec(featureSet)
.setStoreName(store.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,29 @@

import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.common.collect.Lists;
import feast.core.SourceProto.Source;
import feast.core.SourceProto.SourceType;
import feast.ingestion.transform.fn.KafkaRecordToFeatureRowDoFn;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.Field;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FieldProto;
import feast.types.ValueProto.Value.ValCase;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.exception.ExceptionUtils;

@AutoValue
public abstract class ReadFromSource extends PTransform<PBegin, PCollectionTuple> {

public abstract Source getSource();

public abstract Map<String, Field> getFieldByName();

public abstract String getFeatureSetName();

public abstract int getFeatureSetVersion();

public abstract TupleTag<FeatureRow> getSuccessTag();
public abstract Map<String, TupleTag<FeatureRow>> getFeatureSetTagByKey();

public abstract TupleTag<FailedElement> getFailureTag();

Expand All @@ -49,13 +37,8 @@ public abstract static class Builder {

public abstract Builder setSource(Source source);

public abstract Builder setFeatureSetName(String featureSetName);

public abstract Builder setFeatureSetVersion(int featureSetVersion);

public abstract Builder setFieldByName(Map<String, Field> fieldByName);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);
public abstract Builder setFeatureSetTagByKey(
Map<String, TupleTag<FeatureRow>> featureSetTagByKey);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

Expand Down Expand Up @@ -93,13 +76,13 @@ public PCollectionTuple expand(PBegin input) {
.commitOffsetsInFinalize())
.apply(
"KafkaRecordToFeatureRow", ParDo.of(KafkaRecordToFeatureRowDoFn.newBuilder()
.setFeatureSetName(getFeatureSetName())
.setFeatureSetVersion(getFeatureSetVersion())
.setFieldByName(getFieldByName())
.setSuccessTag(getSuccessTag())
.setFeatureSetTagByKey(getFeatureSetTagByKey())
.setFailureTag(getFailureTag())
.build())
.withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
.withOutputTags(new TupleTag<FeatureRow>("placeholder") {},
TupleTagList.of(Lists
.newArrayList(getFeatureSetTagByKey().values()))
.and(getFailureTag())));
}

private String generateConsumerGroupId(String jobName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package feast.ingestion.transform;

import com.google.auto.value.AutoValue;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.ingestion.transform.fn.ValidateFeatureRowDoFn;
import feast.ingestion.utils.SpecUtil;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.Field;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Map;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;

@AutoValue
public abstract class ValidateFeatureRows extends
PTransform<PCollection<FeatureRow>, PCollectionTuple> {

public abstract FeatureSetSpec getFeatureSetSpec();

public abstract TupleTag<FeatureRow> getSuccessTag();

public abstract TupleTag<FailedElement> getFailureTag();

public static Builder newBuilder() {
return new AutoValue_ValidateFeatureRows.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

public abstract ValidateFeatureRows build();
}

@Override
public PCollectionTuple expand(PCollection<FeatureRow> input) {
Map<String, Field> fieldsByName = SpecUtil
.getFieldByName(getFeatureSetSpec());

return input.apply("ValidateFeatureRows",
ParDo.of(ValidateFeatureRowDoFn.newBuilder()
.setFeatureSetName(getFeatureSetSpec().getName())
.setFeatureSetVersion(getFeatureSetSpec().getVersion())
.setFieldByName(fieldsByName)
.setSuccessTag(getSuccessTag())
.setFailureTag(getFailureTag())
.build())
.withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.google.auto.value.AutoValue;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.ReadFromSource.Builder;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.Field;
import feast.types.FeatureRowProto.FeatureRow;
Expand All @@ -17,14 +19,7 @@
@AutoValue
public abstract class KafkaRecordToFeatureRowDoFn extends
DoFn<KafkaRecord<byte[], byte[]>, FeatureRow> {

public abstract String getFeatureSetName();

public abstract int getFeatureSetVersion();

public abstract Map<String, Field> getFieldByName();

public abstract TupleTag<FeatureRow> getSuccessTag();
public abstract Map<String, TupleTag<FeatureRow>> getFeatureSetTagByKey();

public abstract TupleTag<FailedElement> getFailureTag();

Expand All @@ -35,13 +30,7 @@ public static KafkaRecordToFeatureRowDoFn.Builder newBuilder() {
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setFeatureSetName(String featureSetName);

public abstract Builder setFeatureSetVersion(int featureSetVersion);

public abstract Builder setFieldByName(Map<String, Field> fieldByName);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);
public abstract Builder setFeatureSetTagByKey(Map<String, TupleTag<FeatureRow>> featureSetTagByKey);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

Expand All @@ -67,55 +56,19 @@ public void processElement(ProcessContext context) {
.build());
return;
}

// If FeatureRow contains field names that do not exist as EntitySpec
// or FeatureSpec in FeatureSetSpec, mark the FeatureRow as FailedElement.
String error = null;
String featureSetId = String.format("%s:%d", getFeatureSetName(), getFeatureSetVersion());
if (featureRow.getFeatureSet().equals(featureSetId)) {

for (FieldProto.Field field : featureRow.getFieldsList()) {
if (!getFieldByName().containsKey(field.getName())) {
error =
String.format(
"FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.",
field.getName(), getFeatureSetName(), getFeatureSetVersion());
break;
}
// If value is set in the FeatureRow, make sure the value type matches
// that defined in FeatureSetSpec
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
int expectedTypeFieldNumber =
getFieldByName().get(field.getName()).getType().getNumber();
int actualTypeFieldNumber = field.getValue().getValCase().getNumber();
if (expectedTypeFieldNumber != actualTypeFieldNumber) {
error =
String.format(
"FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.",
field.getName(),
field.getValue().getValCase(),
getFieldByName().get(field.getName()).getType());
break;
}
}
}
} else {
error = String.format(
"FeatureRow contains invalid feature set id %s. Please check that the feature rows are being published to the correct topic on the feature stream.",
featureSetId);
}

if (error != null) {
TupleTag<FeatureRow> tag = getFeatureSetTagByKey()
.getOrDefault(featureRow.getFeatureSet(), null);
if (tag == null) {
context.output(
getFailureTag(),
FailedElement.newBuilder()
.setTransformName("KafkaRecordToFeatureRow")
.setJobName(context.getPipelineOptions().getJobName())
.setPayload(featureRow.toString())
.setErrorMessage(error)
.setPayload(new String(Base64.getEncoder().encode(value)))
.setErrorMessage(String.format("Got row with unexpected feature set id %s. Expected one of %s.", featureRow.getFeatureSet(), getFeatureSetTagByKey().keySet()))
.build());
} else {
context.output(featureRow);
return;
}
context.output(tag, featureRow);
}
}
Loading

0 comments on commit 79eb4ab

Please sign in to comment.