Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zl/ingestion fixes #286

Merged
merged 3 commits into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 61 additions & 24 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.SourceProto.Source;
import feast.core.StoreProto.Store;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.ValidateFeatureRows;
import feast.ingestion.transform.WriteFailedElementToBigQuery;
import feast.ingestion.transform.WriteToStore;
import feast.ingestion.transform.metrics.WriteMetricsTransform;
Expand All @@ -14,20 +16,26 @@
import feast.ingestion.values.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

public class ImportJob {

// Tag for main output containing Feature Row that has been successfully processed.
private static final TupleTag<FeatureRow> FEATURE_ROW_OUT = new TupleTag<FeatureRow>() {};
private static final TupleTag<FeatureRow> FEATURE_ROW_OUT = new TupleTag<FeatureRow>() {
};

// Tag for deadletter output containing elements and error messages from invalid input/transform.
private static final TupleTag<FailedElement> DEADLETTER_OUT = new TupleTag<FailedElement>() {};
private static final TupleTag<FailedElement> DEADLETTER_OUT = new TupleTag<FailedElement>() {
};
private static final Logger log = org.slf4j.LoggerFactory.getLogger(ImportJob.class);

/**
Expand All @@ -46,14 +54,17 @@ public static PipelineResult runPipeline(ImportOptions options)
/*
* Steps:
* 1. Read messages from Feast Source as FeatureRow
* 2. Write FeatureRow to the corresponding Store
* 3. Write elements that failed to be processed to a dead letter queue.
* 4. Write metrics to a metrics sink
* 2. Validate the feature rows to ensure the schema matches what is registered to the system
* 3. Write FeatureRow to the corresponding Store
* 4. Write elements that failed to be processed to a dead letter queue.
* 5. Write metrics to a metrics sink
*/

PipelineOptionsValidator.validate(ImportOptions.class, options);
Pipeline pipeline = Pipeline.create(options);

log.info("Starting import job with settings: \n{}", options.toString());

List<FeatureSetSpec> featureSetSpecs =
SpecUtil.parseFeatureSetSpecJsonList(options.getFeatureSetSpecJson());
List<Store> stores = SpecUtil.parseStoreJsonList(options.getStoreJson());
Expand All @@ -62,48 +73,74 @@ public static PipelineResult runPipeline(ImportOptions options)
List<FeatureSetSpec> subscribedFeatureSets =
SpecUtil.getSubscribedFeatureSets(store.getSubscriptionsList(), featureSetSpecs);

// Generate tags by key
Map<String, TupleTag<FeatureRow>> featureSetTagsByKey = subscribedFeatureSets.stream()
.map(fs -> {
String id = String.format("%s:%s", fs.getName(), fs.getVersion());
return Pair.of(id, new TupleTag<FeatureRow>(id) {
});
})
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

// TODO: make the source part of the job initialisation options
Source source = subscribedFeatureSets.get(0).getSource();

// Step 1. Read messages from Feast Source as FeatureRow.
PCollectionTuple convertedFeatureRows =
pipeline.apply(
"ReadFeatureRowFromSource",
ReadFromSource.newBuilder()
.setSource(source)
.setFeatureSetTagByKey(featureSetTagsByKey)
.setFailureTag(DEADLETTER_OUT)
.build());

for (FeatureSetSpec featureSet : subscribedFeatureSets) {
// Ensure Store has valid configuration and Feast can access it.
StoreUtil.setupStore(store, featureSet);
String id = String.format("%s:%s", featureSet.getName(), featureSet.getVersion());

// Step 1. Read messages from Feast Source as FeatureRow.
PCollectionTuple convertedFeatureRows =
pipeline.apply(
"ReadFeatureRowFromSource",
ReadFromSource.newBuilder()
.setSource(featureSet.getSource())
.setFieldByName(SpecUtil.getFieldByName(featureSet))
.setFeatureSetName(featureSet.getName())
.setFeatureSetVersion(featureSet.getVersion())
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());

// Step 2. Write FeatureRow to the corresponding Store.
convertedFeatureRows
// Step 2. Validate incoming FeatureRows
PCollectionTuple validatedRows = convertedFeatureRows
.get(featureSetTagsByKey.get(id))
.apply(ValidateFeatureRows.newBuilder()
.setFeatureSetSpec(featureSet)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT).build());
zhilingc marked this conversation as resolved.
Show resolved Hide resolved

// Step 3. Write FeatureRow to the corresponding Store.
validatedRows
.get(FEATURE_ROW_OUT)
.apply(
"WriteFeatureRowToStore",
WriteToStore.newBuilder().setFeatureSetSpec(featureSet).setStore(store).build());

// Step 3. Write FailedElements to a dead letter table in BigQuery.
// Step 4. Write FailedElements to a dead letter table in BigQuery.
if (options.getDeadLetterTableSpec() != null) {
convertedFeatureRows
.get(DEADLETTER_OUT)
.apply(
"WriteFailedElements",
"WriteFailedElements_ReadFromSource",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
zhilingc marked this conversation as resolved.
Show resolved Hide resolved

validatedRows
.get(DEADLETTER_OUT)
.apply("WriteFailedElements_ValidateRows",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
}

// Step 4. Write metrics to a metrics sink.
// Step 5. Write metrics to a metrics sink.
convertedFeatureRows
.apply("WriteMetrics", WriteMetricsTransform.newBuilder()
.setFeatureSetSpec(featureSet)
.setStoreName(store.getName())
.setSuccessTag(FEATURE_ROW_OUT)
.setFeatureSetTag(featureSetTagsByKey.get(id))
.setFailureTag(DEADLETTER_OUT)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,29 @@

import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.common.collect.Lists;
import feast.core.SourceProto.Source;
import feast.core.SourceProto.SourceType;
import feast.ingestion.transform.fn.KafkaRecordToFeatureRowDoFn;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.Field;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FieldProto;
import feast.types.ValueProto.Value.ValCase;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.exception.ExceptionUtils;

@AutoValue
public abstract class ReadFromSource extends PTransform<PBegin, PCollectionTuple> {

public abstract Source getSource();

public abstract Map<String, Field> getFieldByName();

public abstract String getFeatureSetName();

public abstract int getFeatureSetVersion();

public abstract TupleTag<FeatureRow> getSuccessTag();
public abstract Map<String, TupleTag<FeatureRow>> getFeatureSetTagByKey();

public abstract TupleTag<FailedElement> getFailureTag();

Expand All @@ -49,13 +37,8 @@ public abstract static class Builder {

public abstract Builder setSource(Source source);

public abstract Builder setFeatureSetName(String featureSetName);

public abstract Builder setFeatureSetVersion(int featureSetVersion);

public abstract Builder setFieldByName(Map<String, Field> fieldByName);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);
public abstract Builder setFeatureSetTagByKey(
Map<String, TupleTag<FeatureRow>> featureSetTagByKey);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

Expand Down Expand Up @@ -93,13 +76,13 @@ public PCollectionTuple expand(PBegin input) {
.commitOffsetsInFinalize())
.apply(
"KafkaRecordToFeatureRow", ParDo.of(KafkaRecordToFeatureRowDoFn.newBuilder()
.setFeatureSetName(getFeatureSetName())
.setFeatureSetVersion(getFeatureSetVersion())
.setFieldByName(getFieldByName())
.setSuccessTag(getSuccessTag())
.setFeatureSetTagByKey(getFeatureSetTagByKey())
.setFailureTag(getFailureTag())
.build())
.withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
.withOutputTags(new TupleTag<FeatureRow>("placeholder") {},
TupleTagList.of(Lists
.newArrayList(getFeatureSetTagByKey().values()))
.and(getFailureTag())));
}

private String generateConsumerGroupId(String jobName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package feast.ingestion.transform;

import com.google.auto.value.AutoValue;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.ingestion.transform.fn.ValidateFeatureRowDoFn;
import feast.ingestion.utils.SpecUtil;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.Field;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Map;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;

@AutoValue
public abstract class ValidateFeatureRows extends
PTransform<PCollection<FeatureRow>, PCollectionTuple> {

public abstract FeatureSetSpec getFeatureSetSpec();

public abstract TupleTag<FeatureRow> getSuccessTag();

public abstract TupleTag<FailedElement> getFailureTag();

public static Builder newBuilder() {
return new AutoValue_ValidateFeatureRows.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

public abstract ValidateFeatureRows build();
}

@Override
public PCollectionTuple expand(PCollection<FeatureRow> input) {
Map<String, Field> fieldsByName = SpecUtil
.getFieldByName(getFeatureSetSpec());

return input.apply("ValidateFeatureRows",
ParDo.of(ValidateFeatureRowDoFn.newBuilder()
.setFeatureSetName(getFeatureSetSpec().getName())
.setFeatureSetVersion(getFeatureSetSpec().getVersion())
.setFieldByName(fieldsByName)
.setSuccessTag(getSuccessTag())
.setFailureTag(getFailureTag())
.build())
.withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.google.auto.value.AutoValue;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.ReadFromSource.Builder;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.Field;
import feast.types.FeatureRowProto.FeatureRow;
Expand All @@ -17,14 +19,7 @@
@AutoValue
public abstract class KafkaRecordToFeatureRowDoFn extends
DoFn<KafkaRecord<byte[], byte[]>, FeatureRow> {

public abstract String getFeatureSetName();

public abstract int getFeatureSetVersion();

public abstract Map<String, Field> getFieldByName();

public abstract TupleTag<FeatureRow> getSuccessTag();
public abstract Map<String, TupleTag<FeatureRow>> getFeatureSetTagByKey();

public abstract TupleTag<FailedElement> getFailureTag();

Expand All @@ -35,13 +30,7 @@ public static KafkaRecordToFeatureRowDoFn.Builder newBuilder() {
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setFeatureSetName(String featureSetName);

public abstract Builder setFeatureSetVersion(int featureSetVersion);

public abstract Builder setFieldByName(Map<String, Field> fieldByName);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);
public abstract Builder setFeatureSetTagByKey(Map<String, TupleTag<FeatureRow>> featureSetTagByKey);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

Expand All @@ -67,55 +56,19 @@ public void processElement(ProcessContext context) {
.build());
return;
}

// If FeatureRow contains field names that do not exist as EntitySpec
// or FeatureSpec in FeatureSetSpec, mark the FeatureRow as FailedElement.
String error = null;
String featureSetId = String.format("%s:%d", getFeatureSetName(), getFeatureSetVersion());
if (featureRow.getFeatureSet().equals(featureSetId)) {

for (FieldProto.Field field : featureRow.getFieldsList()) {
if (!getFieldByName().containsKey(field.getName())) {
error =
String.format(
"FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.",
field.getName(), getFeatureSetName(), getFeatureSetVersion());
break;
}
// If value is set in the FeatureRow, make sure the value type matches
// that defined in FeatureSetSpec
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
int expectedTypeFieldNumber =
getFieldByName().get(field.getName()).getType().getNumber();
int actualTypeFieldNumber = field.getValue().getValCase().getNumber();
if (expectedTypeFieldNumber != actualTypeFieldNumber) {
error =
String.format(
"FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.",
field.getName(),
field.getValue().getValCase(),
getFieldByName().get(field.getName()).getType());
break;
}
}
}
} else {
error = String.format(
"FeatureRow contains invalid feature set id %s. Please check that the feature rows are being published to the correct topic on the feature stream.",
featureSetId);
}

if (error != null) {
TupleTag<FeatureRow> tag = getFeatureSetTagByKey()
.getOrDefault(featureRow.getFeatureSet(), null);
if (tag == null) {
context.output(
getFailureTag(),
FailedElement.newBuilder()
.setTransformName("KafkaRecordToFeatureRow")
.setJobName(context.getPipelineOptions().getJobName())
.setPayload(featureRow.toString())
.setErrorMessage(error)
.setPayload(new String(Base64.getEncoder().encode(value)))
.setErrorMessage(String.format("Got row with unexpected feature set id %s. Expected one of %s.", featureRow.getFeatureSet(), getFeatureSetTagByKey().keySet()))
.build());
} else {
context.output(featureRow);
return;
}
context.output(tag, featureRow);
}
}
Loading