Skip to content

Commit

Permalink
Ignore features in FeatureRow if it's not requested in import spec (#101
Browse files Browse the repository at this point in the history
)

* Ignore features in FeatureRow if it's not requested in import spec

* Revert "Ignore features in FeatureRow if it's not requested in import spec"

This reverts commit 98d11cf.

* Filter unknown features in PubSub and Kafka source

* Add license

* Set default to false
  • Loading branch information
pradithya authored and feast-ci-bot committed Jan 24, 2019
1 parent a4fcbb5 commit 1691a12
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package feast.ingestion.transform.fn;

import feast.types.FeatureProto.Feature;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.transforms.DoFn;

/**
* Filter FeatureRow to only contain feature with given IDs
*/
public class FilterFeatureRowDoFn extends DoFn<FeatureRow, FeatureRow> {
private final Set<String> featureIds;

public FilterFeatureRowDoFn(List<String> featureIds) {
this.featureIds = new HashSet<>(featureIds);
}

@ProcessElement
public void processElement(ProcessContext context) {
FeatureRow input = context.element();
FeatureRow.Builder output = FeatureRow.newBuilder(input).clearFeatures();
for (Feature feature : input.getFeaturesList()) {
if (featureIds.contains(feature.getId())) {
output.addFeatures(feature);
}
}
context.output(output.build());
}
}
43 changes: 27 additions & 16 deletions ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import com.google.common.base.Strings;
import feast.ingestion.transform.fn.FilterFeatureRowDoFn;
import feast.options.Options;
import feast.options.OptionsParser;
import feast.source.FeatureSource;
import feast.source.FeatureSourceFactory;
import feast.source.kafka.deserializer.FeatureRowDeserializer;
import feast.source.kafka.deserializer.FeatureRowKeyDeserializer;
import feast.specs.ImportSpecProto.Field;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FeatureRowProto.FeatureRowKey;
Expand All @@ -51,7 +54,6 @@ public class KafkaFeatureSource extends FeatureSource {
public static final String KAFKA_FEATURE_SOURCE_TYPE = "kafka";
private ImportSpec importSpec;


@Override
public PCollection<FeatureRow> expand(PInput input) {
checkArgument(importSpec.getType().equals(KAFKA_FEATURE_SOURCE_TYPE));
Expand All @@ -71,24 +73,33 @@ public PCollection<FeatureRow> expand(PInput input) {
PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord =
input.getPipeline().apply(kafkaIOReader);

return
featureRowRecord.apply(
ParDo.of(
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}));
PCollection<FeatureRow> featureRow = featureRowRecord.apply(
ParDo.of(
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}));

if (options.discardUnknownFeatures) {
List<String> featureIds = new ArrayList<>();
for(Field field: importSpec.getSchema().getFieldsList()) {
String featureId = field.getFeatureId();
if (!Strings.isNullOrEmpty(featureId)) {
featureIds.add(featureId);
}
}
return featureRow.apply(ParDo.of(new FilterFeatureRowDoFn(featureIds)));
}
return featureRow;
}

public static class KafkaReadOptions implements Options {

@NotEmpty
public String server;
@NotEmpty
public String topics;
@NotEmpty public String server;
@NotEmpty public String topics;
public boolean discardUnknownFeatures = false;
}

@AutoService(FeatureSourceFactory.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import com.google.common.base.Strings;
import feast.ingestion.transform.fn.FilterFeatureRowDoFn;
import feast.options.Options;
import feast.options.OptionsParser;
import feast.source.FeatureSource;
import feast.source.FeatureSourceFactory;
import feast.specs.ImportSpecProto.Field;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.AssertTrue;
import lombok.Builder;
import lombok.NonNull;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.hadoop.hbase.util.Strings;


/**
* Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages
Expand All @@ -50,8 +54,7 @@ public class PubSubFeatureSource extends FeatureSource {

public static final String PUBSUB_FEATURE_SOURCE_TYPE = "pubsub";

@NonNull
private ImportSpec importSpec;
@NonNull private ImportSpec importSpec;

@Override
public PCollection<FeatureRow> expand(PInput input) {
Expand All @@ -61,32 +64,43 @@ public PCollection<FeatureRow> expand(PInput input) {

PubsubIO.Read<FeatureRow> read = readProtos();

if (!Strings.isEmpty(options.subscription)) {
if (!Strings.isNullOrEmpty(options.subscription)) {
read = read.fromSubscription(options.subscription);
} else if (!Strings.isEmpty(options.topic)) {
} else if (!Strings.isNullOrEmpty(options.topic)) {
read = read.fromTopic(options.topic);
}
return input.getPipeline().apply(read);
PCollection<FeatureRow> featureRow = input.getPipeline().apply(read);

if (options.discardUnknownFeatures) {
List<String> featureIds = new ArrayList<>();
for(Field field: importSpec.getSchema().getFieldsList()) {
String featureId = field.getFeatureId();
if (!Strings.isNullOrEmpty(featureId)) {
featureIds.add(featureId);
}
}
return featureRow.apply(ParDo.of(new FilterFeatureRowDoFn(featureIds)));
}
return featureRow;
}

PubsubIO.Read<FeatureRow> readProtos() {
return PubsubIO.readProtos(FeatureRow.class);
}

public static class PubSubReadOptions implements Options {

public String subscription;
public String topic;

@AssertTrue(message = "subscription or topic must be set")
boolean isValid() {
return !Strings.isEmpty(subscription) || !Strings.isEmpty(topic);
return !Strings.isNullOrEmpty(subscription) || !Strings.isNullOrEmpty(topic);
}
public boolean discardUnknownFeatures = false;
}

@AutoService(FeatureSourceFactory.class)
public static class Factory implements FeatureSourceFactory {

@Override
public String getType() {
return PUBSUB_FEATURE_SOURCE_TYPE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package feast.ingestion.transform.fn;

import feast.types.FeatureProto.Feature;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.ValueProto.Value;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;

public class FilterFeatureRowDoFnTest {
@Rule public TestPipeline testPipeline = TestPipeline.create();

@Test
public void shouldIgnoreUnspecifiedFeatureID() {
String featureId1 = "testentity.none.feature1";
String featureId2 = "testentity.hour.feature2";
String featureId3 = "testentity.day.feature3";

List<String> specifiedFeatureIds = Arrays.asList(featureId1, featureId2, featureId3);
FilterFeatureRowDoFn doFn = new FilterFeatureRowDoFn(specifiedFeatureIds);

FeatureRow row =
FeatureRow.newBuilder()
.setEntityKey("1234")
.setEntityName("testentity")
.addFeatures(
Feature.newBuilder().setId(featureId1).setValue(Value.newBuilder().setInt64Val(10)))
.addFeatures(
Feature.newBuilder().setId(featureId2).setValue(Value.newBuilder().setInt64Val(11)))
.addFeatures(
Feature.newBuilder().setId(featureId3).setValue(Value.newBuilder().setInt64Val(12)))
// this feature should be ignored
.addFeatures(Feature.newBuilder().setId("testEntity.none.unknown_feature"))
.build();

PCollection<FeatureRow> output = testPipeline.apply(Create.of(row))
.apply(ParDo.of(doFn));

FeatureRow expRow =
FeatureRow.newBuilder()
.setEntityKey("1234")
.setEntityName("testentity")
.addFeatures(
Feature.newBuilder().setId(featureId1).setValue(Value.newBuilder().setInt64Val(10)))
.addFeatures(
Feature.newBuilder().setId(featureId2).setValue(Value.newBuilder().setInt64Val(11)))
.addFeatures(
Feature.newBuilder().setId(featureId3).setValue(Value.newBuilder().setInt64Val(12)))
.build();
PAssert.that(output).containsInAnyOrder(expRow);

testPipeline.run();
}
}

0 comments on commit 1691a12

Please sign in to comment.