Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore features in FeatureRow if it's not requested in import spec #101

Merged
merged 5 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package feast.ingestion.transform.fn;

import feast.types.FeatureProto.Feature;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.transforms.DoFn;

/**
* Filter FeatureRow to only contain feature with given IDs
*/
public class FilterFeatureRowDoFn extends DoFn<FeatureRow, FeatureRow> {
private final Set<String> featureIds;

public FilterFeatureRowDoFn(List<String> featureIds) {
this.featureIds = new HashSet<>(featureIds);
}

@ProcessElement
public void processElement(ProcessContext context) {
FeatureRow input = context.element();
FeatureRow.Builder output = FeatureRow.newBuilder(input).clearFeatures();
for (Feature feature : input.getFeaturesList()) {
if (featureIds.contains(feature.getId())) {
output.addFeatures(feature);
}
}
context.output(output.build());
}
}
43 changes: 27 additions & 16 deletions ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import com.google.common.base.Strings;
import feast.ingestion.transform.fn.FilterFeatureRowDoFn;
import feast.options.Options;
import feast.options.OptionsParser;
import feast.source.FeatureSource;
import feast.source.FeatureSourceFactory;
import feast.source.kafka.deserializer.FeatureRowDeserializer;
import feast.source.kafka.deserializer.FeatureRowKeyDeserializer;
import feast.specs.ImportSpecProto.Field;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FeatureRowProto.FeatureRowKey;
Expand All @@ -51,7 +54,6 @@ public class KafkaFeatureSource extends FeatureSource {
public static final String KAFKA_FEATURE_SOURCE_TYPE = "kafka";
private ImportSpec importSpec;


@Override
public PCollection<FeatureRow> expand(PInput input) {
checkArgument(importSpec.getType().equals(KAFKA_FEATURE_SOURCE_TYPE));
Expand All @@ -71,24 +73,33 @@ public PCollection<FeatureRow> expand(PInput input) {
PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord =
input.getPipeline().apply(kafkaIOReader);

return
featureRowRecord.apply(
ParDo.of(
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}));
PCollection<FeatureRow> featureRow = featureRowRecord.apply(
ParDo.of(
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}));

if (options.discardUnknownFeatures) {
List<String> featureIds = new ArrayList<>();
for(Field field: importSpec.getSchema().getFieldsList()) {
String featureId = field.getFeatureId();
if (!Strings.isNullOrEmpty(featureId)) {
featureIds.add(featureId);
}
}
return featureRow.apply(ParDo.of(new FilterFeatureRowDoFn(featureIds)));
}
return featureRow;
}

public static class KafkaReadOptions implements Options {

@NotEmpty
public String server;
@NotEmpty
public String topics;
@NotEmpty public String server;
@NotEmpty public String topics;
public boolean discardUnknownFeatures = false;
}

@AutoService(FeatureSourceFactory.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import com.google.common.base.Strings;
import feast.ingestion.transform.fn.FilterFeatureRowDoFn;
import feast.options.Options;
import feast.options.OptionsParser;
import feast.source.FeatureSource;
import feast.source.FeatureSourceFactory;
import feast.specs.ImportSpecProto.Field;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.AssertTrue;
import lombok.Builder;
import lombok.NonNull;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.hadoop.hbase.util.Strings;


/**
* Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages
Expand All @@ -50,8 +54,7 @@ public class PubSubFeatureSource extends FeatureSource {

public static final String PUBSUB_FEATURE_SOURCE_TYPE = "pubsub";

@NonNull
private ImportSpec importSpec;
@NonNull private ImportSpec importSpec;

@Override
public PCollection<FeatureRow> expand(PInput input) {
Expand All @@ -61,32 +64,43 @@ public PCollection<FeatureRow> expand(PInput input) {

PubsubIO.Read<FeatureRow> read = readProtos();

if (!Strings.isEmpty(options.subscription)) {
if (!Strings.isNullOrEmpty(options.subscription)) {
read = read.fromSubscription(options.subscription);
} else if (!Strings.isEmpty(options.topic)) {
} else if (!Strings.isNullOrEmpty(options.topic)) {
read = read.fromTopic(options.topic);
}
return input.getPipeline().apply(read);
PCollection<FeatureRow> featureRow = input.getPipeline().apply(read);

if (options.discardUnknownFeatures) {
List<String> featureIds = new ArrayList<>();
for(Field field: importSpec.getSchema().getFieldsList()) {
String featureId = field.getFeatureId();
if (!Strings.isNullOrEmpty(featureId)) {
featureIds.add(featureId);
}
}
return featureRow.apply(ParDo.of(new FilterFeatureRowDoFn(featureIds)));
}
return featureRow;
}

PubsubIO.Read<FeatureRow> readProtos() {
return PubsubIO.readProtos(FeatureRow.class);
}

public static class PubSubReadOptions implements Options {

public String subscription;
public String topic;

@AssertTrue(message = "subscription or topic must be set")
boolean isValid() {
return !Strings.isEmpty(subscription) || !Strings.isEmpty(topic);
return !Strings.isNullOrEmpty(subscription) || !Strings.isNullOrEmpty(topic);
}
public boolean discardUnknownFeatures = false;
}

@AutoService(FeatureSourceFactory.class)
public static class Factory implements FeatureSourceFactory {

@Override
public String getType() {
return PUBSUB_FEATURE_SOURCE_TYPE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package feast.ingestion.transform.fn;

import feast.types.FeatureProto.Feature;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.ValueProto.Value;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;

public class FilterFeatureRowDoFnTest {
@Rule public TestPipeline testPipeline = TestPipeline.create();

@Test
public void shouldIgnoreUnspecifiedFeatureID() {
String featureId1 = "testentity.none.feature1";
String featureId2 = "testentity.hour.feature2";
String featureId3 = "testentity.day.feature3";

List<String> specifiedFeatureIds = Arrays.asList(featureId1, featureId2, featureId3);
FilterFeatureRowDoFn doFn = new FilterFeatureRowDoFn(specifiedFeatureIds);

FeatureRow row =
FeatureRow.newBuilder()
.setEntityKey("1234")
.setEntityName("testentity")
.addFeatures(
Feature.newBuilder().setId(featureId1).setValue(Value.newBuilder().setInt64Val(10)))
.addFeatures(
Feature.newBuilder().setId(featureId2).setValue(Value.newBuilder().setInt64Val(11)))
.addFeatures(
Feature.newBuilder().setId(featureId3).setValue(Value.newBuilder().setInt64Val(12)))
// this feature should be ignored
.addFeatures(Feature.newBuilder().setId("testEntity.none.unknown_feature"))
.build();

PCollection<FeatureRow> output = testPipeline.apply(Create.of(row))
.apply(ParDo.of(doFn));

FeatureRow expRow =
FeatureRow.newBuilder()
.setEntityKey("1234")
.setEntityName("testentity")
.addFeatures(
Feature.newBuilder().setId(featureId1).setValue(Value.newBuilder().setInt64Val(10)))
.addFeatures(
Feature.newBuilder().setId(featureId2).setValue(Value.newBuilder().setInt64Val(11)))
.addFeatures(
Feature.newBuilder().setId(featureId3).setValue(Value.newBuilder().setInt64Val(12)))
.build();
PAssert.that(output).containsInAnyOrder(expRow);

testPipeline.run();
}
}