Skip to content

Commit

Permalink
Ignore features in FeatureRow if it's not requested in import spec
Browse files Browse the repository at this point in the history
  • Loading branch information
Pradithya Aria committed Jan 23, 2019
1 parent 6797ba4 commit 98d11cf
Show file tree
Hide file tree
Showing 12 changed files with 405 additions and 29 deletions.
9 changes: 9 additions & 0 deletions ingestion/src/main/java/feast/ingestion/model/Specs.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ public FeatureSpec getFeatureSpec(String featureId) {
return featureSpecs.get(featureId);
}

/**
* Get feature spec for a feature with given ID.
* @param featureId feature ID of the feature spec
* @return feature spec, or null if no exists
*/
public FeatureSpec tryGetFeatureSpec(String featureId) {
return featureSpecs.get(featureId);
}

public List<FeatureSpec> getFeatureSpecByServingStoreId(String storeId) {
List<FeatureSpec> out = new ArrayList<>();
for (FeatureSpec featureSpec : featureSpecs.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package feast.ingestion.transform.fn;

import lombok.AllArgsConstructor;
import feast.ingestion.model.Specs;
import feast.ingestion.model.Values;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.types.FeatureProto.Feature;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import feast.types.FeatureRowProto.FeatureRow;
import lombok.AllArgsConstructor;

/**
* Convert value's type of feature inside {@link FeatureRowExtended} into the value type as
* specified in corresponding feature spec.
*/
@AllArgsConstructor
public class ConvertTypesDoFn extends BaseFeatureDoFn {
private Specs specs;
Expand All @@ -41,7 +45,10 @@ public void processElementImpl(ProcessContext context) {

for (Feature feature : row.getFeaturesList()) {
String featureId = feature.getId();
FeatureSpec featureSpec = specs.getFeatureSpec(featureId);
FeatureSpec featureSpec = specs.tryGetFeatureSpec(featureId);
if (featureSpec == null) {
continue;
}

rowBuilder.setGranularity(featureSpec.getGranularity());
rowBuilder.addFeatures(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public void processElement(ProcessContext context) {
Map<TupleTag<FeatureRowExtended>, FeatureRow.Builder> taggedOutput = new HashMap<>();

for (Feature feature : row.getFeaturesList()) {
FeatureSpec featureSpec = specs.getFeatureSpec(feature.getId());
FeatureSpec featureSpec = specs.tryGetFeatureSpec(feature.getId());
if (featureSpec == null) {
continue;
}

TupleTag<FeatureRowExtended> tag = splitStrategy.getTag(featureSpec);
FeatureRow.Builder builder = taggedOutput.get(tag);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package feast.ingestion.transform.fn;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -79,7 +78,6 @@ public void processElementImpl(ProcessContext context) {
FeatureRow row = context.element().getRow();
EntitySpec entitySpec = specs.getEntitySpec(row.getEntityName());
Preconditions.checkNotNull(entitySpec, "Entity spec not found for " + row.getEntityName());
ImportSpec importSpec = specs.getImportSpec();

try {
checkArgument(!row.getEntityKey().isEmpty(), "Entity key must not be empty");
Expand All @@ -98,9 +96,10 @@ public void processElementImpl(ProcessContext context) {
checkArgument(row.getFeaturesCount() > 0, "Must have at least one feature set");

for (Feature feature : row.getFeaturesList()) {
FeatureSpec featureSpec = specs.getFeatureSpec(feature.getId());
checkNotNull(
featureSpec, String.format("Feature spec not found featureId=%s", feature.getId()));
FeatureSpec featureSpec = specs.tryGetFeatureSpec(feature.getId());
if (featureSpec == null) {
continue;
}

String storageStoreId = featureSpec.getDataStores().getServing().getId();
StorageSpec servingStorageSpec = specs.getStorageSpec(storageStoreId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package feast.storage.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import feast.ingestion.util.DateUtil;
import feast.ingestion.model.Specs;
import feast.ingestion.model.Specs;
import feast.ingestion.util.DateUtil;
import feast.specs.FeatureSpecProto.FeatureSpec;
Expand Down Expand Up @@ -63,8 +61,11 @@ public TableRow toTableRow(FeatureRowExtended featureRowExtended) {
.setTimestampVal(DateUtil.toTimestamp(DateTime.now(DateTimeZone.UTC)))));

for (Feature feature : featureRow.getFeaturesList()) {
FeatureSpec featureSpec = specs.tryGetFeatureSpec(feature.getId());
if (featureSpec == null) {
continue;
}
Object featureValue = ValueBigQueryBuilder.bigQueryObjectOf(feature.getValue());
FeatureSpec featureSpec = specs.getFeatureSpec(feature.getId());
tableRow.set(featureSpec.getName(), featureValue);
}
return tableRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import feast.SerializableCache;
import feast.ingestion.model.Specs;
import feast.ingestion.util.DateUtil;
import feast.options.OptionsParser;
import feast.specs.EntitySpecProto.EntitySpec;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.storage.BigTableProto.BigTableRowKey;
import feast.types.FeatureProto.Feature;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.GranularityProto.Granularity;
import feast.types.GranularityProto.Granularity.Enum;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;

/**
* DoFn for taking a feature row and making Bigtable mutations out of it. Also keys the mutations by
Expand Down Expand Up @@ -86,7 +86,6 @@ public static long getReversedRoundedMillis(
public void processElement(ProcessContext context) {
FeatureRowExtended rowExtended = context.element();
FeatureRow row = rowExtended.getRow();
EntitySpec entitySpec = specs.getEntitySpec(row.getEntityName());
List<Put> mutations = makePut(rowExtended);
for (Put put : mutations) {
context.output(KV.of(getTableName(row), put));
Expand Down Expand Up @@ -116,14 +115,21 @@ public List<Put> makePut(FeatureRowExtended rowExtended) {
new Put(
makeBigTableRowKey(row.getEntityKey(), row.getEventTimestamp(), Enum.NONE)
.toByteArray());

Put timeseriesPut =
new Put(
makeBigTableRowKey(row.getEntityKey(), row.getEventTimestamp(), granularity)
.toByteArray());
boolean isTimeseries = granularity.getNumber() != Enum.NONE.getNumber();

// keep track of addColumn operation in case there is no mutation needed
int mutationCount = 0;
for (Feature feature : row.getFeaturesList()) {
FeatureSpec featureSpec = specs.getFeatureSpec(feature.getId());
FeatureSpec featureSpec = specs.tryGetFeatureSpec(feature.getId());
if (featureSpec == null) {
continue;
}

BigTableFeatureOptions options = servingOptionsCache.get(featureSpec);

byte[] family = options.family.getBytes(Charsets.UTF_8);
Expand All @@ -135,6 +141,12 @@ public List<Put> makePut(FeatureRowExtended rowExtended) {
if (isTimeseries) {
timeseriesPut.addColumn(family, qualifier, version, value);
}

mutationCount++;
}

if (mutationCount == 0) {
return Collections.emptyList();
}

if (isTimeseries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ public void processElement(ProcessContext context) {

for (Feature feature : row.getFeaturesList()) {
String featureId = feature.getId();
FeatureSpec featureSpec = specs.getFeatureSpec(featureId);
FeatureSpec featureSpec = specs.tryGetFeatureSpec(featureId);
if (featureSpec == null) {
continue;
}
String featureIdHash = getFeatureIdSha1Prefix(featureId);

RedisFeatureOptions options = servingOptionsCache.get(featureSpec);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package feast.ingestion.transform.fn;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

import com.google.protobuf.Timestamp;
import feast.ingestion.model.Specs;
import feast.ingestion.service.FileSpecService;
import feast.ingestion.service.SpecService;
import feast.types.FeatureProto.Feature;
import feast.types.FeatureRowExtendedProto.Attempt;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.GranularityProto.Granularity.Enum;
import feast.types.ValueProto.Value;
import java.util.Collections;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class ConvertTypesDoFnTest {

@Rule public TestPipeline testPipeline = TestPipeline.create();
private ConvertTypesDoFn doFn;
private Specs specs;
private SpecService fileSpecService;

@Before
public void setUp() {
fileSpecService =
new FileSpecService(getClass().getClassLoader().getResource("core_specs").getPath());
specs = mock(Specs.class, withSettings().serializable());
doFn = new ConvertTypesDoFn(specs);
}

@Test
public void shouldIgnoreUnknownFeatureId() {
when(specs.tryGetFeatureSpec("testEntity.day.testInt64"))
.thenReturn(
fileSpecService
.getFeatureSpecs(Collections.singletonList("testEntity.day.testInt64"))
.get("testEntity.day.testInt64"));

FeatureRow row =
FeatureRow.newBuilder()
.setEntityKey("1234")
.setEntityName("testEntity")
.addFeatures(
Feature.newBuilder()
.setId("testEntity.day.testInt64")
.setValue(Value.newBuilder().setInt64Val(10)))
// this feature should be ignored
.addFeatures(Feature.newBuilder().setId("testEntity.none.unknown_feature"))
.build();
FeatureRowExtended rowExtended = FeatureRowExtended.newBuilder().setRow(row).build();
PCollection<FeatureRowExtended> p = testPipeline.apply(Create.of(rowExtended));
PCollection<FeatureRowExtended> out = p.apply(ParDo.of(doFn));

FeatureRow expRow =
FeatureRow.newBuilder()
.setEntityKey("1234")
.setEntityName("testEntity")
.addFeatures(
Feature.newBuilder()
.setId("testEntity.day.testInt64")
.setValue(Value.newBuilder().setInt64Val(10)))
.setGranularity(Enum.DAY)
.setEventTimestamp(Timestamp.getDefaultInstance())
.build();

FeatureRowExtended expected =
FeatureRowExtended.newBuilder()
.setRow(expRow)
.setLastAttempt(Attempt.getDefaultInstance())
.build();
PAssert.that(out).containsInAnyOrder(expected);

testPipeline.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package feast.ingestion.transform.fn;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.withSettings;

import feast.ingestion.model.Specs;
import feast.ingestion.transform.SplitFeatures.SplitStrategy;
import feast.types.FeatureProto.Feature;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import feast.types.FeatureRowProto.FeatureRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class SplitFeaturesDoFnTest {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
Specs specs;
SplitFeaturesDoFn doFn;

@Before
public void setUp() throws Exception {
specs = mock(Specs.class, withSettings().serializable());
SplitStrategy strategy = mock(SplitStrategy.class, withSettings().serializable());
doFn = new SplitFeaturesDoFn(strategy, specs);
}

@Test
public void shouldIgnoreUnknownFeatureId() {
FeatureRow row =
FeatureRow.newBuilder()
.setEntityKey("1234")
.setEntityName("testEntity")
// this feature should be ignored
.addFeatures(Feature.newBuilder().setId("testEntity.none.unknown_feature"))
.build();
FeatureRowExtended rowExtended = FeatureRowExtended.newBuilder().setRow(row).build();
PCollection<FeatureRowExtended> p = pipeline.apply(Create.of(rowExtended));
PCollection<FeatureRowExtended> out = (PCollection<FeatureRowExtended>) p.apply(ParDo.of(doFn));
PAssert.that(out).empty();

pipeline.run();
}
}
Loading

0 comments on commit 98d11cf

Please sign in to comment.