Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java feature server] Converge ServingService API to make Python and Java feature servers consistent #2166

Merged
merged 16 commits into from
Jan 5, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import feast.proto.serving.ServingAPIProto.FeatureReferenceV2;

public class FeatureV2 {
public class Feature {

/**
* Accepts FeatureReferenceV2 object and returns its reference in String
Expand All @@ -27,10 +27,10 @@ public class FeatureV2 {
* @param featureReference {@link FeatureReferenceV2}
* @return String format of FeatureReferenceV2
*/
public static String getFeatureStringRef(FeatureReferenceV2 featureReference) {
String ref = featureReference.getName();
if (!featureReference.getFeatureTable().isEmpty()) {
ref = featureReference.getFeatureTable() + ":" + ref;
public static String getFeatureReference(FeatureReferenceV2 featureReference) {
String ref = featureReference.getFeatureName();
if (!featureReference.getFeatureViewName().isEmpty()) {
ref = featureReference.getFeatureViewName() + ":" + ref;
}
return ref;
}
Expand All @@ -47,4 +47,12 @@ public static String getFeatureName(String featureReference) {
String[] tokens = featureReference.split(":", 2);
return tokens[tokens.length - 1];
}

public static FeatureReferenceV2 parseFeatureReference(String featureReference) {
String[] tokens = featureReference.split(":", 2);
return FeatureReferenceV2.newBuilder()
.setFeatureViewName(tokens[0])
.setFeatureName(tokens[1])
.build();
}
}
48 changes: 0 additions & 48 deletions java/common/src/main/java/feast/common/models/FeatureTable.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public List<AuditLogEntry> getTestAuditLogs() {
.addAllFeatures(
Arrays.asList(
FeatureReferenceV2.newBuilder()
.setFeatureTable("featuretable_1")
.setName("feature1")
.setFeatureViewName("featuretable_1")
.setFeatureName("feature1")
.build(),
FeatureReferenceV2.newBuilder()
.setFeatureTable("featuretable_1")
.setName("feature2")
.setFeatureViewName("featuretable_1")
.setFeatureName("feature2")
.build()))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ public class FeaturesTest {
public void setUp() {
featureReference =
FeatureReferenceV2.newBuilder()
.setFeatureTable("featuretable_1")
.setName("feature1")
.setFeatureViewName("featuretable_1")
.setFeatureName("feature1")
.build();
}

@Test
public void shouldReturnFeatureStringRef() {
String actualFeatureStringRef = FeatureV2.getFeatureStringRef(featureReference);
String actualFeatureStringRef = Feature.getFeatureReference(featureReference);
String expectedFeatureStringRef = "featuretable_1:feature1";

assertThat(actualFeatureStringRef, equalTo(expectedFeatureStringRef));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.gojek.feast;
package dev.feast;

import feast.proto.serving.ServingAPIProto.FeatureReferenceV2;
import com.google.common.collect.Lists;
import feast.proto.serving.ServingAPIProto;
import feast.proto.serving.ServingAPIProto.GetFeastServingInfoRequest;
import feast.proto.serving.ServingAPIProto.GetFeastServingInfoResponse;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2.EntityRow;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequest;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponseV2;
import feast.proto.serving.ServingServiceGrpc;
import feast.proto.serving.ServingServiceGrpc.ServingServiceBlockingStub;
import feast.proto.types.ValueProto;
import io.grpc.CallCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
Expand All @@ -32,9 +33,8 @@
import io.opentracing.contrib.grpc.TracingClientInterceptor;
import io.opentracing.util.GlobalTracer;
import java.io.File;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
Expand Down Expand Up @@ -118,11 +118,60 @@ public GetFeastServingInfoResponse getFeastServingInfo() {
* @param featureRefs list of string feature references to retrieve in the following format
* featureTable:feature, where 'featureTable' and 'feature' refer to the FeatureTable and
* Feature names respectively. Only the Feature name is required.
* @param rows list of {@link Row} to select the entities to retrieve the features for.
* @param entities list of {@link Row} to select the entities to retrieve the features for.
* @return list of {@link Row} containing retrieved data fields.
*/
public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> rows) {
return getOnlineFeatures(featureRefs, rows, "");
public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> entities) {
GetOnlineFeaturesRequest.Builder requestBuilder = GetOnlineFeaturesRequest.newBuilder();

requestBuilder.setFeatures(
ServingAPIProto.FeatureList.newBuilder().addAllVal(featureRefs).build());

requestBuilder.putAllEntities(getEntityValuesMap(entities));

GetOnlineFeaturesResponseV2 response = stub.getOnlineFeatures(requestBuilder.build());

List<Row> results = Lists.newArrayList();
if (response.getResultsCount() == 0) {
return results;
}

for (int rowIdx = 0; rowIdx < response.getResults(0).getValuesCount(); rowIdx++) {
Row row = Row.create();
for (int featureIdx = 0; featureIdx < response.getResultsCount(); featureIdx++) {
row.set(
response.getMetadata().getFeatureNames().getVal(featureIdx),
response.getResults(featureIdx).getValues(rowIdx),
response.getResults(featureIdx).getStatuses(rowIdx));

row.setEntityTimestamp(
Instant.ofEpochSecond(
response.getResults(featureIdx).getEventTimestamps(rowIdx).getSeconds()));
}
for (Map.Entry<String, ValueProto.Value> entry :
entities.get(rowIdx).getFields().entrySet()) {
row.set(entry.getKey(), entry.getValue());
}

results.add(row);
}
return results;
}

private Map<String, ValueProto.RepeatedValue> getEntityValuesMap(List<Row> entities) {
Map<String, ValueProto.RepeatedValue.Builder> columnarEntities = new HashMap<>();
for (Row row : entities) {
for (Map.Entry<String, ValueProto.Value> field : row.getFields().entrySet()) {
if (!columnarEntities.containsKey(field.getKey())) {
columnarEntities.put(field.getKey(), ValueProto.RepeatedValue.newBuilder());
}
columnarEntities.get(field.getKey()).addVal(field.getValue());
}
}

return columnarEntities.entrySet().stream()
.map((e) -> Map.entry(e.getKey(), e.getValue().build()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
Expand All @@ -149,42 +198,7 @@ public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> rows) {
* @return list of {@link Row} containing retrieved data fields.
*/
public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> rows, String project) {
List<FeatureReferenceV2> features = RequestUtil.createFeatureRefs(featureRefs);
// build entity rows and collect entity references
HashSet<String> entityRefs = new HashSet<>();
List<EntityRow> entityRows =
rows.stream()
.map(
row -> {
entityRefs.addAll(row.getFields().keySet());
return EntityRow.newBuilder()
.setTimestamp(row.getEntityTimestamp())
.putAllFields(row.getFields())
.build();
})
.collect(Collectors.toList());

GetOnlineFeaturesResponse response =
stub.getOnlineFeaturesV2(
GetOnlineFeaturesRequestV2.newBuilder()
.addAllFeatures(features)
.addAllEntityRows(entityRows)
.setProject(project)
.build());

return response.getFieldValuesList().stream()
.map(
fieldValues -> {
Row row = Row.create();
for (String fieldName : fieldValues.getFieldsMap().keySet()) {
row.set(
fieldName,
fieldValues.getFieldsMap().get(fieldName),
fieldValues.getStatusesMap().get(fieldName));
}
return row;
})
.collect(Collectors.toList());
return getOnlineFeatures(featureRefs, rows);
}

protected FeastClient(ManagedChannel channel, Optional<CallCredentials> credentials) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.gojek.feast;
package dev.feast;

import feast.proto.serving.ServingAPIProto.FeatureReferenceV2;
import java.util.List;
Expand Down Expand Up @@ -71,8 +71,8 @@ public static FeatureReferenceV2 parseFeatureRef(String featureRefString) {
String[] featureReferenceParts = featureRefString.split(":");
FeatureReferenceV2 featureRef =
FeatureReferenceV2.newBuilder()
.setFeatureTable(featureReferenceParts[0])
.setName(featureReferenceParts[1])
.setFeatureViewName(featureReferenceParts[0])
.setFeatureName(featureReferenceParts[1])
.build();

return featureRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.gojek.feast;
package dev.feast;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldStatus;
import feast.proto.serving.ServingAPIProto.FieldStatus;
import feast.proto.types.ValueProto.Value;
import feast.proto.types.ValueProto.Value.ValCase;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.gojek.feast;
package dev.feast;

import com.google.auto.value.AutoValue;
import io.grpc.CallCredentials;
Expand Down
Loading