diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 1cfd29b541..f775bc31bb 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -217,7 +217,9 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() .map(FeatureSpec::getName) .collect(Collectors.toList()) .contains(field.getName())) - .map(field -> field.toBuilder().clearName().build()) + .map( + field -> + field.toBuilder().setName(TestUtil.hash(field.getName())).build()) .collect(Collectors.toList()); randomRow = randomRow diff --git a/ingestion/src/test/java/feast/test/TestUtil.java b/ingestion/src/test/java/feast/test/TestUtil.java index b003137846..1fb8ea89ea 100644 --- a/ingestion/src/test/java/feast/test/TestUtil.java +++ b/ingestion/src/test/java/feast/test/TestUtil.java @@ -19,6 +19,7 @@ import static feast.common.models.FeatureSet.getFeatureSetStringRef; import com.google.common.collect.ImmutableList; +import com.google.common.hash.Hashing; import com.google.common.io.Files; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -517,4 +518,8 @@ public static void waitUntilAllElementsAreWrittenToStore( } } } + + public static String hash(String input) { + return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString(); + } } diff --git a/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java b/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java index 5f191d276c..773abd57d6 100644 --- a/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java +++ b/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java @@ -16,6 +16,7 @@ */ package feast.storage.common.testing; +import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import feast.proto.core.FeatureSetProto.FeatureSet; @@ -24,6 +25,7 @@ import feast.proto.types.FeatureRowProto.FeatureRow.Builder; import feast.proto.types.FieldProto.Field; import feast.proto.types.ValueProto.*; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.lang3.RandomStringUtils; @@ -191,4 +193,8 @@ public static Field field(String name, Object value, ValueType.Enum valueType) { throw new IllegalStateException("Unexpected valueType: " + value.getClass()); } } + + public static String hash(String input) { + return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString(); + } } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java index aad3147f71..d89e537366 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/FeatureRowDecoder.java @@ -16,12 +16,17 @@ */ package feast.storage.connectors.redis.retriever; +import com.google.common.hash.Hashing; import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.core.FeatureSetProto.FeatureSpec; import feast.proto.types.FeatureRowProto.FeatureRow; import feast.proto.types.FieldProto.Field; +import feast.proto.types.ValueProto.Value; +import feast.storage.connectors.redis.writer.RedisCustomIO; +import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -36,60 +41,102 @@ public FeatureRowDecoder(String featureSetRef, FeatureSetSpec spec) { } /** - * A feature row is considered encoded if the feature set and field names are not set. This method - * is required for backward compatibility purposes, to allow Feast serving to continue serving non - * encoded Feature Row ingested by an older version of Feast. + * Check if encoded feature row can be decoded by v1 Decoder. The v1 Decoder requires that the + * Feature Row to have both it's feature set reference and fields names are not set. The no. of + * fields in the feature row should also match up with the number of fields in the Feature Set + * spec. NOTE: This method is deprecated and will be removed in Feast v0.7. * * @param featureRow Feature row * @return boolean */ - public boolean isEncoded(FeatureRow featureRow) { + @Deprecated + private boolean isEncodedV1(FeatureRow featureRow) { return featureRow.getFeatureSet().isEmpty() - && featureRow.getFieldsList().stream().allMatch(field -> field.getName().isEmpty()); + && featureRow.getFieldsList().stream().allMatch(field -> field.getName().isEmpty()) + && featureRow.getFieldsList().size() == spec.getFeaturesList().size(); } /** - * Validates if an encoded feature row can be decoded without exception. + * Check if encoded feature row can be decoded by Decoder. The v2 Decoder requires that a Feature + * Row to have both it feature set reference and fields names are set. * * @param featureRow Feature row * @return boolean */ - public boolean isEncodingValid(FeatureRow featureRow) { - return featureRow.getFieldsList().size() == spec.getFeaturesList().size(); + private boolean isEncodedV2(FeatureRow featureRow) { + return !featureRow.getFieldsList().stream().anyMatch(field -> field.getName().isEmpty()); } /** - * Decoding feature row by repopulating the field names based on the corresponding feature set - * spec. + * Decode feature row encoded by {@link RedisCustomIO}. NOTE: The v1 Decoder will be removed in + * Feast 0.7 * + * @throws IllegalArgumentException if unable to the decode the given feature row * @param encodedFeatureRow Feature row * @return boolean */ public FeatureRow decode(FeatureRow encodedFeatureRow) { - final List fieldsWithoutName = encodedFeatureRow.getFieldsList(); + if (isEncodedV1(encodedFeatureRow)) { + // TODO: remove v1 feature row decoder in Feast 0.7 + // Decode Feature Rows using the v1 Decoder. + final List fieldsWithoutName = encodedFeatureRow.getFieldsList(); + List featureNames = + spec.getFeaturesList().stream() + .sorted(Comparator.comparing(FeatureSpec::getName)) + .map(FeatureSpec::getName) + .collect(Collectors.toList()); - List featureNames = - spec.getFeaturesList().stream() - .sorted(Comparator.comparing(FeatureSpec::getName)) - .map(FeatureSpec::getName) - .collect(Collectors.toList()); - List fields = - IntStream.range(0, featureNames.size()) - .mapToObj( - featureNameIndex -> { - String featureName = featureNames.get(featureNameIndex); - return fieldsWithoutName - .get(featureNameIndex) - .toBuilder() - .setName(featureName) - .build(); - }) - .collect(Collectors.toList()); - return encodedFeatureRow - .toBuilder() - .clearFields() - .setFeatureSet(featureSetRef) - .addAllFields(fields) - .build(); + List fields = + IntStream.range(0, featureNames.size()) + .mapToObj( + featureNameIndex -> { + String featureName = featureNames.get(featureNameIndex); + return fieldsWithoutName + .get(featureNameIndex) + .toBuilder() + .setName(featureName) + .build(); + }) + .collect(Collectors.toList()); + + return encodedFeatureRow + .toBuilder() + .clearFields() + .setFeatureSet(featureSetRef) + .addAllFields(fields) + .build(); + } + if (isEncodedV2(encodedFeatureRow)) { + // Decode Feature Rows using the v2 Decoder. + // v2 Decoder input Feature Rows should use a hashed name as the field name and + // should not have feature set reference set. + // Decoding reverts the field name to a unhashed string and set feature set reference. + Map nameHashValueMap = + encodedFeatureRow.getFieldsList().stream() + .collect(Collectors.toMap(field -> field.getName(), field -> field.getValue())); + + List featureNames = + spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList()); + + List fields = + featureNames.stream() + .map( + name -> { + String nameHash = + Hashing.murmur3_32().hashString(name, StandardCharsets.UTF_8).toString(); + Value value = + nameHashValueMap.getOrDefault(nameHash, Value.newBuilder().build()); + return Field.newBuilder().setName(name).setValue(value).build(); + }) + .collect(Collectors.toList()); + + return encodedFeatureRow + .toBuilder() + .clearFields() + .setFeatureSet(featureSetRef) + .addAllFields(fields) + .build(); + } + throw new IllegalArgumentException("Failed to decode FeatureRow row: Possible data corruption"); } } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java index c006149cd5..2146ec2f87 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java @@ -158,17 +158,11 @@ private List> getFeaturesFromRedis( // decode feature rows from data bytes using decoder. FeatureRow featureRow = FeatureRow.parseFrom(featureRowBytes); - if (decoder.isEncoded(featureRow)) { - if (decoder.isEncodingValid(featureRow)) { - featureRow = decoder.decode(featureRow); - } else { - // decoding feature row failed: data corruption could have occurred - throw Status.DATA_LOSS - .withDescription( - "Failed to decode FeatureRow from bytes retrieved from redis" - + ": Possible data corruption") - .asRuntimeException(); - } + try { + featureRow = decoder.decode(featureRow); + } catch (IllegalArgumentException e) { + // decoding feature row failed: data corruption could have occurred + throw Status.DATA_LOSS.withCause(e).withDescription(e.getMessage()).asRuntimeException(); } featureRows.add(Optional.of(featureRow)); } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java index ef80b06799..049175879d 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisOnlineRetriever.java @@ -151,15 +151,11 @@ private List> getFeaturesFromRedis( // decode feature rows from data bytes using decoder. FeatureRow featureRow = FeatureRow.parseFrom(featureRowBytes); - if (decoder.isEncoded(featureRow) && decoder.isEncodingValid(featureRow)) { + try { featureRow = decoder.decode(featureRow); - } else { + } catch (IllegalArgumentException e) { // decoding feature row failed: data corruption could have occurred - throw Status.DATA_LOSS - .withDescription( - "Failed to decode FeatureRow from bytes retrieved from redis" - + ": Possible data corruption") - .asRuntimeException(); + throw Status.DATA_LOSS.withCause(e).withDescription(e.getMessage()).asRuntimeException(); } featureRows.add(Optional.of(featureRow)); } diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java index dcd2e5bfda..f73c458d78 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java @@ -18,6 +18,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; import feast.proto.core.FeatureSetProto.EntitySpec; import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.core.FeatureSetProto.FeatureSpec; @@ -29,7 +30,9 @@ import feast.storage.api.writer.FailedElement; import feast.storage.api.writer.WriteResult; import feast.storage.common.retry.Retriable; +import feast.storage.connectors.redis.retriever.FeatureRowDecoder; import io.lettuce.core.RedisException; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -203,28 +206,45 @@ private byte[] getKey(FeatureRow featureRow, FeatureSetSpec spec) { return redisKeyBuilder.build().toByteArray(); } + /** + * Encode the Feature Row as bytes to store in Redis in encoded Feature Row encoding. To + * reduce storage space consumption in redis, feature rows are "encoded" by hashing the fields + * names and not unsetting the feature set reference. {@link FeatureRowDecoder} is + * rensponsible for reversing this "encoding" step. + */ private byte[] getValue(FeatureRow featureRow, FeatureSetSpec spec) { List featureNames = spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList()); - Map fieldValueOnlyMap = + + Map fieldValueOnlyMap = featureRow.getFieldsList().stream() .filter(field -> featureNames.contains(field.getName())) .distinct() .collect( Collectors.toMap( - Field::getName, - field -> Field.newBuilder().setValue(field.getValue()).build())); + Field::getName, field -> Field.newBuilder().setValue(field.getValue()))); List values = featureNames.stream() .sorted() .map( - featureName -> - fieldValueOnlyMap.getOrDefault( - featureName, - Field.newBuilder() - .setValue(ValueProto.Value.getDefaultInstance()) - .build())) + featureName -> { + Field.Builder field = + fieldValueOnlyMap.getOrDefault( + featureName, + Field.newBuilder().setValue(ValueProto.Value.getDefaultInstance())); + + // Encode the name of the as the hash of the field name. + // Use hash of name instead of the name of to reduce redis storage consumption + // per feature row stored. + String nameHash = + Hashing.murmur3_32() + .hashString(featureName, StandardCharsets.UTF_8) + .toString(); + field.setName(nameHash); + + return field.build(); + }) .collect(Collectors.toList()); return FeatureRow.newBuilder() diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java index 63ad7aa26d..c843d31127 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/retriever/FeatureRowDecoderTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.*; +import com.google.common.hash.Hashing; import com.google.protobuf.Timestamp; import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetSpec; @@ -25,6 +26,7 @@ import feast.proto.types.FieldProto.Field; import feast.proto.types.ValueProto.Value; import feast.proto.types.ValueProto.ValueType; +import java.nio.charset.StandardCharsets; import java.util.Collections; import org.junit.Test; @@ -48,10 +50,29 @@ public class FeatureRowDecoderTest { .build(); @Test - public void featureRowWithFieldNamesIsNotConsideredAsEncoded() { - + public void shouldDecodeValidEncodedFeatureRowV2() { FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); - FeatureRowProto.FeatureRow nonEncodedFeatureRow = + + FeatureRowProto.FeatureRow encodedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature1", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature2", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setFloatVal(1.0f))) + .build(); + + FeatureRowProto.FeatureRow expectedFeatureRow = FeatureRowProto.FeatureRow.newBuilder() .setFeatureSet("feature_set_ref") .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) @@ -62,26 +83,88 @@ public void featureRowWithFieldNamesIsNotConsideredAsEncoded() { .setName("feature2") .setValue(Value.newBuilder().setFloatVal(1.0f))) .build(); - assertFalse(decoder.isEncoded(nonEncodedFeatureRow)); + + assertEquals(expectedFeatureRow, decoder.decode(encodedFeatureRow)); } @Test - public void encodingIsInvalidIfNumberOfFeaturesInSpecDiffersFromFeatureRow() { - + public void shouldDecodeValidFeatureRowV2WithIncompleteFields() { FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); FeatureRowProto.FeatureRow encodedFeatureRow = FeatureRowProto.FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature1", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setInt32Val(2))) + .build(); + + // should decode missing fields as fields with unset value. + FeatureRowProto.FeatureRow expectedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setFeatureSet("feature_set_ref") + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields( + Field.newBuilder().setName("feature1").setValue(Value.newBuilder().setInt32Val(2))) + .addFields(Field.newBuilder().setName("feature2").setValue(Value.newBuilder().build())) .build(); - assertFalse(decoder.isEncodingValid(encodedFeatureRow)); + assertEquals(expectedFeatureRow, decoder.decode(encodedFeatureRow)); } @Test - public void shouldDecodeValidEncodedFeatureRow() { + public void shouldDecodeValidFeatureRowV2AndIgnoreExtraFields() { + FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); + FeatureRowProto.FeatureRow encodedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature1", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature2", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setFloatVal(1.0f))) + .addFields( + Field.newBuilder() + .setName( + Hashing.murmur3_32() + .hashString("feature3", StandardCharsets.UTF_8) + .toString()) + .setValue(Value.newBuilder().setStringVal("data"))) + .build(); + + // should decode missing fields as fields with unset value. + FeatureRowProto.FeatureRow expectedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setFeatureSet("feature_set_ref") + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields( + Field.newBuilder().setName("feature1").setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName("feature2") + .setValue(Value.newBuilder().setFloatVal(1.0f))) + .build(); + + assertEquals(expectedFeatureRow, decoder.decode(encodedFeatureRow)); + } + + // TODO: remove this test in Feast 0.7 when support for Feature Row v1 encoding is removed + @Test + public void shouldDecodeValidEncodedFeatureRowV1() { FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); FeatureRowProto.FeatureRow encodedFeatureRow = @@ -103,8 +186,6 @@ public void shouldDecodeValidEncodedFeatureRow() { .setValue(Value.newBuilder().setFloatVal(1.0f))) .build(); - assertTrue(decoder.isEncoded(encodedFeatureRow)); - assertTrue(decoder.isEncodingValid(encodedFeatureRow)); assertEquals(expectedFeatureRow, decoder.decode(encodedFeatureRow)); } } diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java index 2adf0cec47..62ddfff3a7 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java @@ -17,6 +17,7 @@ package feast.storage.connectors.redis.writer; import static feast.storage.common.testing.TestUtil.field; +import static feast.storage.common.testing.TestUtil.hash; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -160,7 +161,10 @@ public void shouldWriteToRedis() { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("one"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("one"))) .build()); kvs.put( RedisKey.newBuilder() @@ -169,7 +173,10 @@ public void shouldWriteToRedis() { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("two"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("two"))) .build()); List featureRows = @@ -205,7 +212,10 @@ public void shouldRetryFailConnection() throws InterruptedException { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("one"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("one"))) .build()); List featureRows = @@ -332,8 +342,14 @@ public void shouldConvertRowWithDuplicateEntitiesToValidKey() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001))) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001))) .build(); p.apply(Create.of(offendingRow)).apply(redisClusterFeatureSink.writer()); @@ -383,8 +399,14 @@ public void shouldConvertRowWithOutOfOrderFieldsToValidKey() { List expectedFields = Arrays.asList( - Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1")).build(), - Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001)).build()); + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1")) + .build(), + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001)) + .build()); FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) @@ -443,8 +465,14 @@ public void shouldMergeDuplicateFeatureFields() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001))) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001))) .build(); p.apply(Create.of(featureRowWithDuplicatedFeatureFields)) @@ -492,8 +520,12 @@ public void shouldPopulateMissingFeatureValuesWithDefaultInstance() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.getDefaultInstance())) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder().setName(hash("feature_2")).setValue(Value.getDefaultInstance())) .build(); p.apply(Create.of(featureRowWithDuplicatedFeatureFields)) diff --git a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java index 63ec136c5d..948b8d0fda 100644 --- a/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java +++ b/storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java @@ -17,6 +17,7 @@ package feast.storage.connectors.redis.writer; import static feast.storage.common.testing.TestUtil.field; +import static feast.storage.common.testing.TestUtil.hash; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -134,7 +135,10 @@ public void shouldWriteToRedis() { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("one"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("one"))) .build()); kvs.put( RedisKey.newBuilder() @@ -143,7 +147,10 @@ public void shouldWriteToRedis() { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("two"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("two"))) .build()); List featureRows = @@ -193,7 +200,10 @@ public void shouldRetryFailConnection() throws InterruptedException { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("one"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("one"))) .build()); List featureRows = @@ -251,7 +261,10 @@ public void shouldProduceFailedElementIfRetryExceeded() { .build(), FeatureRow.newBuilder() .setEventTimestamp(Timestamp.getDefaultInstance()) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("one"))) + .addFields( + Field.newBuilder() + .setName(hash("feature")) + .setValue(Value.newBuilder().setStringVal("one"))) .build()); List featureRows = @@ -318,8 +331,14 @@ public void shouldConvertRowWithDuplicateEntitiesToValidKey() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001))) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001))) .build(); p.apply(Create.of(offendingRow)).apply(redisFeatureSink.writer()); @@ -369,8 +388,14 @@ public void shouldConvertRowWithOutOfOrderFieldsToValidKey() { List expectedFields = Arrays.asList( - Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1")).build(), - Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001)).build()); + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1")) + .build(), + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001)) + .build()); FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) @@ -429,8 +454,14 @@ public void shouldMergeDuplicateFeatureFields() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001))) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName(hash("feature_2")) + .setValue(Value.newBuilder().setInt64Val(1001))) .build(); p.apply(Create.of(featureRowWithDuplicatedFeatureFields)).apply(redisFeatureSink.writer()); @@ -477,8 +508,12 @@ public void shouldPopulateMissingFeatureValuesWithDefaultInstance() { FeatureRow expectedValue = FeatureRow.newBuilder() .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) - .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) - .addFields(Field.newBuilder().setValue(Value.getDefaultInstance())) + .addFields( + Field.newBuilder() + .setName(hash("feature_1")) + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder().setName(hash("feature_2")).setValue(Value.getDefaultInstance())) .build(); p.apply(Create.of(featureRowWithDuplicatedFeatureFields)).apply(redisFeatureSink.writer()); diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 341c789f76..1fcae69ed3 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -675,6 +675,96 @@ def try_get_features(): assert online_features_actual.to_dict() == online_features_expected +@pytest.mark.timeout(600) +@pytest.mark.run(order=18) +def test_basic_retrieve_feature_row_missing_fields(client, cust_trans_df): + feature_refs = ["daily_transactions", "total_transactions", "null_values"] + + # apply cust_trans_fs and ingest dataframe + client.set_project(PROJECT_NAME + "_basic_retrieve_missing_fields") + old_cust_trans_fs = FeatureSet.from_yaml(f"{DIR_PATH}/basic/cust_trans_fs.yaml") + client.apply(old_cust_trans_fs) + client.ingest(old_cust_trans_fs, cust_trans_df) + + # update cust_trans_fs with one additional feature. + # feature rows ingested before the feature set update will be missing a field. + new_cust_trans_fs = client.get_feature_set(name="customer_transactions") + new_cust_trans_fs.add(Feature("n_trips", ValueType.INT64)) + client.apply(new_cust_trans_fs) + # sleep to ensure feature set update is propagated + time.sleep(15) + + # attempt to retrieve features from feature rows with missing fields + def try_get_features(): + response = client.get_online_features( + entity_rows=[ + {"customer_id": np.int64(cust_trans_df.iloc[0]["customer_id"])} + ], + feature_refs=feature_refs + ["n_trips"], + ) # type: GetOnlineFeaturesResponse + # check if the ingested fields can be correctly retrieved. + is_ok = all( + [ + check_online_response(ref, cust_trans_df, response) + for ref in feature_refs + ] + ) + # should return null_value status for missing field n_trips + is_missing_ok = ( + response.field_values[0].statuses["n_trips"] + == GetOnlineFeaturesResponse.FieldStatus.NULL_VALUE + ) + return response, is_ok and is_missing_ok + + wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values", + ) + + +@pytest.mark.timeout(600) +@pytest.mark.run(order=19) +def test_basic_retrieve_feature_row_extra_fields(client, cust_trans_df): + feature_refs = ["daily_transactions", "total_transactions"] + # apply cust_trans_fs and ingest dataframe + client.set_project(PROJECT_NAME + "_basic_retrieve_missing_fields") + old_cust_trans_fs = FeatureSet.from_yaml(f"{DIR_PATH}/basic/cust_trans_fs.yaml") + client.apply(old_cust_trans_fs) + client.ingest(old_cust_trans_fs, cust_trans_df) + + # update cust_trans_fs with the null_values feature dropped. + # feature rows ingested before the feature set update will have an extra field. + new_cust_trans_fs = client.get_feature_set(name="customer_transactions") + new_cust_trans_fs.drop("null_values") + client.apply(new_cust_trans_fs) + # sleep to ensure feature set update is propagated + time.sleep(15) + + # attempt to retrieve features from feature rows with extra fields + def try_get_features(): + response = client.get_online_features( + entity_rows=[ + {"customer_id": np.int64(cust_trans_df.iloc[0]["customer_id"])} + ], + feature_refs=feature_refs, + ) # type: GetOnlineFeaturesResponse + # check if the non dropped fields can be correctly retrieved. + is_ok = all( + [ + check_online_response(ref, cust_trans_df, response) + for ref in feature_refs + ] + ) + return response, is_ok + + wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values", + ) + + @pytest.fixture(scope="module") def all_types_dataframe(): return pd.DataFrame(