Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Online Serving unable to retrieve feature data after Feature Set update. #908

Merged
merged 16 commits into from
Aug 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ingestion/src/test/java/feast/ingestion/ImportJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow()
.map(FeatureSpec::getName)
.collect(Collectors.toList())
.contains(field.getName()))
.map(field -> field.toBuilder().clearName().build())
.map(
field ->
field.toBuilder().setName(TestUtil.hash(field.getName())).build())
.collect(Collectors.toList());
randomRow =
randomRow
Expand Down
5 changes: 5 additions & 0 deletions ingestion/src/test/java/feast/test/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static feast.common.models.FeatureSet.getFeatureSetStringRef;

import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
Expand Down Expand Up @@ -517,4 +518,8 @@ public static void waitUntilAllElementsAreWrittenToStore(
}
}
}

public static String hash(String input) {
return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.storage.common.testing;

import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import feast.proto.core.FeatureSetProto.FeatureSet;
Expand All @@ -24,6 +25,7 @@
import feast.proto.types.FeatureRowProto.FeatureRow.Builder;
import feast.proto.types.FieldProto.Field;
import feast.proto.types.ValueProto.*;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -191,4 +193,8 @@ public static Field field(String name, Object value, ValueType.Enum valueType) {
throw new IllegalStateException("Unexpected valueType: " + value.getClass());
}
}

public static String hash(String input) {
return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
*/
package feast.storage.connectors.redis.retriever;

import com.google.common.hash.Hashing;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.FeatureSetProto.FeatureSpec;
import feast.proto.types.FeatureRowProto.FeatureRow;
import feast.proto.types.FieldProto.Field;
import feast.proto.types.ValueProto.Value;
import feast.storage.connectors.redis.writer.RedisCustomIO;
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -36,60 +41,102 @@ public FeatureRowDecoder(String featureSetRef, FeatureSetSpec spec) {
}

/**
* A feature row is considered encoded if the feature set and field names are not set. This method
* is required for backward compatibility purposes, to allow Feast serving to continue serving non
* encoded Feature Row ingested by an older version of Feast.
* Check if encoded feature row can be decoded by v1 Decoder. The v1 Decoder requires that the
* Feature Row to have both it's feature set reference and fields names are not set. The no. of
* fields in the feature row should also match up with the number of fields in the Feature Set
* spec. NOTE: This method is deprecated and will be removed in Feast v0.7.
*
* @param featureRow Feature row
* @return boolean
*/
public boolean isEncoded(FeatureRow featureRow) {
@Deprecated
private boolean isEncodedV1(FeatureRow featureRow) {
return featureRow.getFeatureSet().isEmpty()
&& featureRow.getFieldsList().stream().allMatch(field -> field.getName().isEmpty());
&& featureRow.getFieldsList().stream().allMatch(field -> field.getName().isEmpty())
&& featureRow.getFieldsList().size() == spec.getFeaturesList().size();
}

/**
* Validates if an encoded feature row can be decoded without exception.
* Check if encoded feature row can be decoded by Decoder. The v2 Decoder requires that a Feature
* Row to have both it feature set reference and fields names are set.
*
* @param featureRow Feature row
* @return boolean
*/
public boolean isEncodingValid(FeatureRow featureRow) {
return featureRow.getFieldsList().size() == spec.getFeaturesList().size();
private boolean isEncodedV2(FeatureRow featureRow) {
return !featureRow.getFieldsList().stream().anyMatch(field -> field.getName().isEmpty());
}

/**
* Decoding feature row by repopulating the field names based on the corresponding feature set
* spec.
* Decode feature row encoded by {@link RedisCustomIO}. NOTE: The v1 Decoder will be removed in
* Feast 0.7
*
* @throws IllegalArgumentException if unable to the decode the given feature row
* @param encodedFeatureRow Feature row
* @return boolean
*/
public FeatureRow decode(FeatureRow encodedFeatureRow) {
final List<Field> fieldsWithoutName = encodedFeatureRow.getFieldsList();
if (isEncodedV1(encodedFeatureRow)) {
// TODO: remove v1 feature row decoder in Feast 0.7
// Decode Feature Rows using the v1 Decoder.
final List<Field> fieldsWithoutName = encodedFeatureRow.getFieldsList();
List<String> featureNames =
spec.getFeaturesList().stream()
.sorted(Comparator.comparing(FeatureSpec::getName))
.map(FeatureSpec::getName)
.collect(Collectors.toList());

List<String> featureNames =
spec.getFeaturesList().stream()
.sorted(Comparator.comparing(FeatureSpec::getName))
.map(FeatureSpec::getName)
.collect(Collectors.toList());
List<Field> fields =
IntStream.range(0, featureNames.size())
.mapToObj(
featureNameIndex -> {
String featureName = featureNames.get(featureNameIndex);
return fieldsWithoutName
.get(featureNameIndex)
.toBuilder()
.setName(featureName)
.build();
})
.collect(Collectors.toList());
return encodedFeatureRow
.toBuilder()
.clearFields()
.setFeatureSet(featureSetRef)
.addAllFields(fields)
.build();
List<Field> fields =
IntStream.range(0, featureNames.size())
.mapToObj(
featureNameIndex -> {
String featureName = featureNames.get(featureNameIndex);
return fieldsWithoutName
.get(featureNameIndex)
.toBuilder()
.setName(featureName)
.build();
})
.collect(Collectors.toList());

return encodedFeatureRow
.toBuilder()
.clearFields()
.setFeatureSet(featureSetRef)
.addAllFields(fields)
.build();
}
if (isEncodedV2(encodedFeatureRow)) {
// Decode Feature Rows using the v2 Decoder.
// v2 Decoder input Feature Rows should use a hashed name as the field name and
// should not have feature set reference set.
// Decoding reverts the field name to a unhashed string and set feature set reference.
Map<String, Value> nameHashValueMap =
encodedFeatureRow.getFieldsList().stream()
.collect(Collectors.toMap(field -> field.getName(), field -> field.getValue()));

List<String> featureNames =
spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList());

List<Field> fields =
featureNames.stream()
.map(
name -> {
String nameHash =
Hashing.murmur3_32().hashString(name, StandardCharsets.UTF_8).toString();
Copy link
Member

@woop woop Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the length of the hash string? Just want to make sure its as small as possible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 characters.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that the fields stored in the feature row are float values (32 bit, 4 bytes), this would mean a ~3x increase in space consumption.
@woop @pyalex @khorshuheng

Copy link
Collaborator

@pyalex pyalex Aug 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all fields that are stored are float, there're a lot strings as well and int64. So everything is not so bad

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I guess that we can safely cut hash string to 4-5 chars

Value value =
nameHashValueMap.getOrDefault(nameHash, Value.newBuilder().build());
return Field.newBuilder().setName(name).setValue(value).build();
})
.collect(Collectors.toList());

return encodedFeatureRow
.toBuilder()
.clearFields()
.setFeatureSet(featureSetRef)
.addAllFields(fields)
.build();
}
throw new IllegalArgumentException("Failed to decode FeatureRow row: Possible data corruption");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,11 @@ private List<Optional<FeatureRow>> getFeaturesFromRedis(

// decode feature rows from data bytes using decoder.
FeatureRow featureRow = FeatureRow.parseFrom(featureRowBytes);
if (decoder.isEncoded(featureRow)) {
if (decoder.isEncodingValid(featureRow)) {
featureRow = decoder.decode(featureRow);
} else {
// decoding feature row failed: data corruption could have occurred
throw Status.DATA_LOSS
.withDescription(
"Failed to decode FeatureRow from bytes retrieved from redis"
+ ": Possible data corruption")
.asRuntimeException();
}
try {
featureRow = decoder.decode(featureRow);
} catch (IllegalArgumentException e) {
// decoding feature row failed: data corruption could have occurred
throw Status.DATA_LOSS.withCause(e).withDescription(e.getMessage()).asRuntimeException();
}
featureRows.add(Optional.of(featureRow));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,11 @@ private List<Optional<FeatureRow>> getFeaturesFromRedis(

// decode feature rows from data bytes using decoder.
FeatureRow featureRow = FeatureRow.parseFrom(featureRowBytes);
if (decoder.isEncoded(featureRow) && decoder.isEncodingValid(featureRow)) {
try {
featureRow = decoder.decode(featureRow);
} else {
} catch (IllegalArgumentException e) {
// decoding feature row failed: data corruption could have occurred
throw Status.DATA_LOSS
.withDescription(
"Failed to decode FeatureRow from bytes retrieved from redis"
+ ": Possible data corruption")
.asRuntimeException();
throw Status.DATA_LOSS.withCause(e).withDescription(e.getMessage()).asRuntimeException();
}
featureRows.add(Optional.of(featureRow));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import feast.proto.core.FeatureSetProto.EntitySpec;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.FeatureSetProto.FeatureSpec;
Expand All @@ -29,7 +30,9 @@
import feast.storage.api.writer.FailedElement;
import feast.storage.api.writer.WriteResult;
import feast.storage.common.retry.Retriable;
import feast.storage.connectors.redis.retriever.FeatureRowDecoder;
import io.lettuce.core.RedisException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -203,28 +206,45 @@ private byte[] getKey(FeatureRow featureRow, FeatureSetSpec spec) {
return redisKeyBuilder.build().toByteArray();
}

/**
* Encode the Feature Row as bytes to store in Redis in encoded Feature Row encoding. To
* reduce storage space consumption in redis, feature rows are "encoded" by hashing the fields
* names and not unsetting the feature set reference. {@link FeatureRowDecoder} is
* rensponsible for reversing this "encoding" step.
*/
private byte[] getValue(FeatureRow featureRow, FeatureSetSpec spec) {
List<String> featureNames =
spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList());
Map<String, Field> fieldValueOnlyMap =

Map<String, Field.Builder> fieldValueOnlyMap =
featureRow.getFieldsList().stream()
.filter(field -> featureNames.contains(field.getName()))
.distinct()
.collect(
Collectors.toMap(
Field::getName,
field -> Field.newBuilder().setValue(field.getValue()).build()));
Field::getName, field -> Field.newBuilder().setValue(field.getValue())));

List<Field> values =
featureNames.stream()
.sorted()
.map(
featureName ->
fieldValueOnlyMap.getOrDefault(
featureName,
Field.newBuilder()
.setValue(ValueProto.Value.getDefaultInstance())
.build()))
featureName -> {
Field.Builder field =
fieldValueOnlyMap.getOrDefault(
featureName,
Field.newBuilder().setValue(ValueProto.Value.getDefaultInstance()));

// Encode the name of the as the hash of the field name.
// Use hash of name instead of the name of to reduce redis storage consumption
// per feature row stored.
String nameHash =
Hashing.murmur3_32()
.hashString(featureName, StandardCharsets.UTF_8)
.toString();
field.setName(nameHash);

return field.build();
})
.collect(Collectors.toList());

return FeatureRow.newBuilder()
Expand Down
Loading