Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored Core API: ListFeatureSets, ListStore, and GetFeatureSet #309

Merged
merged 8 commits into from
Nov 17, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .prow/scripts/test-end-to-end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ redis_config:
host: localhost
port: 6379
subscriptions:
- name: .*
- name: "*"
version: ">0"
EOF

Expand Down
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ store {
name: "SERVING"
type: REDIS
subscriptions {
name: ".*"
name: "*"
version: ">0"
}
redis_config {
Expand All @@ -76,7 +76,7 @@ store {
name: "WAREHOUSE"
type: BIGQUERY
subscriptions {
name: ".*"
name: "*"
version: ">0"
}
bigquery_config {
Expand Down
2 changes: 1 addition & 1 deletion core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ If you have [grpc_cli](https://github.com/grpc/grpc/blob/master/doc/command_line
```
grpc_cli ls localhost:6565
grpc_cli call localhost:6565 GetFeastCoreVersion ""
grpc_cli call localhost:6565 GetStores ""
grpc_cli call localhost:6565 ListStores ""
```
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ public interface FeatureSetRepository extends JpaRepository<FeatureSet, String>

// find all versions of featureSets with names matching the regex
@Query(nativeQuery=true, value="SELECT * FROM feature_sets WHERE name LIKE ?1")
List<FeatureSet> findByNameRegex(String regex);
List<FeatureSet> findByNameWithWildcard(String name);
}
61 changes: 40 additions & 21 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import feast.core.CoreServiceProto.ApplyFeatureSetResponse;
import feast.core.CoreServiceProto.GetFeastCoreVersionRequest;
import feast.core.CoreServiceProto.GetFeastCoreVersionResponse;
import feast.core.CoreServiceProto.GetFeatureSetsRequest;
import feast.core.CoreServiceProto.GetFeatureSetsResponse;
import feast.core.CoreServiceProto.GetStoresRequest;
import feast.core.CoreServiceProto.GetStoresRequest.Filter;
import feast.core.CoreServiceProto.GetStoresResponse;
import feast.core.CoreServiceProto.GetFeatureSetRequest;
import feast.core.CoreServiceProto.GetFeatureSetResponse;
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
import feast.core.CoreServiceProto.ListStoresRequest;
import feast.core.CoreServiceProto.ListStoresRequest.Filter;
import feast.core.CoreServiceProto.ListStoresResponse;
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.CoreServiceProto.UpdateStoreResponse.Status;
Expand All @@ -37,11 +39,9 @@
import feast.core.StoreProto.Store;
import feast.core.StoreProto.Store.Subscription;
import feast.core.exception.RetrievalException;
import feast.core.model.Source;
import feast.core.service.JobCoordinatorService;
import feast.core.service.SpecService;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -73,28 +73,42 @@ public void getFeastCoreVersion(

@Override
@Transactional
public void getFeatureSets(
GetFeatureSetsRequest request, StreamObserver<GetFeatureSetsResponse> responseObserver) {
public void getFeatureSet(
GetFeatureSetRequest request, StreamObserver<GetFeatureSetResponse> responseObserver) {
try {
GetFeatureSetsResponse response = specService.getFeatureSets(request.getFilter());
GetFeatureSetResponse response = specService.getFeatureSet(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RetrievalException | InvalidProtocolBufferException e) {
log.error("Exception has occurred in GetFeatureSets method: ", e);
log.error("Exception has occurred in GetFeatureSet method: ", e);
responseObserver.onError(e);
}
}

@Override
@Transactional
public void getStores(
GetStoresRequest request, StreamObserver<GetStoresResponse> responseObserver) {
public void listFeatureSets(
ListFeatureSetsRequest request, StreamObserver<ListFeatureSetsResponse> responseObserver) {
try {
GetStoresResponse response = specService.getStores(request.getFilter());
ListFeatureSetsResponse response = specService.listFeatureSets(request.getFilter());
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RetrievalException | InvalidProtocolBufferException e) {
log.error("Exception has occurred in ListFeatureSet method: ", e);
responseObserver.onError(e);
}
}

@Override
@Transactional
public void listStores(
ListStoresRequest request, StreamObserver<ListStoresResponse> responseObserver) {
try {
ListStoresResponse response = specService.listStores(request.getFilter());
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RetrievalException e) {
log.error("Exception has occurred in GetStores method: ", e);
log.error("Exception has occurred in ListStores method: ", e);
responseObserver.onError(e);
}
}
Expand All @@ -106,22 +120,27 @@ public void applyFeatureSet(
try {
ApplyFeatureSetResponse response = specService.applyFeatureSet(request.getFeatureSet());
String featureSetName = response.getFeatureSet().getName();
GetStoresResponse stores = specService.getStores(Filter.newBuilder().build());
ListStoresResponse stores = specService.listStores(Filter.newBuilder().build());
for (Store store : stores.getStoreList()) {
List<Subscription> relevantSubscriptions =
store.getSubscriptionsList().stream()
.filter(
sub -> {
Pattern p = Pattern.compile(sub.getName());
String subString = sub.getName();
if (!subString.contains(".*"))
{
subString = subString.replace("*", ".*");
}
Pattern p = Pattern.compile(subString);
return p.matcher(featureSetName).matches();
})
.collect(Collectors.toList());
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
for (Subscription subscription : relevantSubscriptions) {
featureSetSpecs.addAll(
specService
.getFeatureSets(
GetFeatureSetsRequest.Filter.newBuilder()
.listFeatureSets(
ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName(subscription.getName())
.setFeatureSetVersion(subscription.getVersion())
.build())
Expand Down Expand Up @@ -157,8 +176,8 @@ public void updateStore(UpdateStoreRequest request,
Store store = response.getStore();
for (Subscription subscription : store.getSubscriptionsList()) {
featureSetSpecs.addAll(
specService.getFeatureSets(
GetFeatureSetsRequest.Filter.newBuilder()
specService.listFeatureSets(
ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName(subscription.getName())
.setFeatureSetVersion(subscription.getVersion())
.build())
Expand Down
94 changes: 74 additions & 20 deletions core/src/main/java/feast/core/service/SpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

package feast.core.service;

import static feast.core.validators.Matchers.checkValidCharacters;
import static feast.core.validators.Matchers.checkValidFeatureSetFilterName;

import com.google.common.collect.Ordering;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.CoreServiceProto.ApplyFeatureSetResponse;
import feast.core.CoreServiceProto.ApplyFeatureSetResponse.Status;
import feast.core.CoreServiceProto.GetFeatureSetsRequest;
import feast.core.CoreServiceProto.GetFeatureSetsResponse;
import feast.core.CoreServiceProto.GetStoresRequest;
import feast.core.CoreServiceProto.GetStoresResponse;
import feast.core.CoreServiceProto.GetStoresResponse.Builder;
import feast.core.CoreServiceProto.GetFeatureSetRequest;
import feast.core.CoreServiceProto.GetFeatureSetResponse;
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
import feast.core.CoreServiceProto.ListStoresRequest;
import feast.core.CoreServiceProto.ListStoresResponse;
import feast.core.CoreServiceProto.ListStoresResponse.Builder;
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.FeatureSetProto.FeatureSetSpec;
Expand Down Expand Up @@ -72,6 +77,59 @@ public SpecService(
this.defaultSource = defaultSource;
}

/**
* Get a feature set matching the feature name and version provided in the filter. The name
* is required. If the version is provided then it will be used for the lookup. If the version
* is omitted then the latest version will be returned.
*
* @param GetFeatureSetRequest containing the name and version of the feature set
* @return GetFeatureSetResponse containing a single feature set
*/
public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request)
throws InvalidProtocolBufferException {

// Validate input arguments
checkValidCharacters(request.getName(), "featureSetName");
if (request.getName().isEmpty()) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("No feature set name provided")
.asRuntimeException();
}
if (request.getVersion() < 0){
throw new IllegalArgumentException("Version number cannot be less than 0");
woop marked this conversation as resolved.
Show resolved Hide resolved
}

// Find a list of feature sets with the requested name
List<FeatureSet> featureSets = featureSetRepository.findByNameWithWildcard(request.getName());

// Filter the list based on version
if (request.getVersion() == 0){
// Version is not set, filter list to latest version
featureSets = Ordering.natural().reverse()
.sortedCopy(featureSets).subList(0, featureSets.size() == 0 ? 0 : 1);
} else if(request.getVersion() > 0) {
// Version is set, find specific version
featureSets = featureSets.stream()
.filter(fs -> request.getVersion() == fs.getVersion()).collect(Collectors.toList());
}

// Validate remaining items
if (featureSets.size() == 0){
throw io.grpc.Status.NOT_FOUND
.withDescription("Feature set could not be found")
.asRuntimeException();
}
if (featureSets.size() > 1){
throw io.grpc.Status.INTERNAL
.withDescription(String.format("Multiple feature sets found with the name %s and "
+ "version %s", request.getName(), request.getVersion()))
.asRuntimeException();
}

// Only a single item in list, return successfully
return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSets.get(0).toProto()).build();
}

/**
* Get featureSets matching the feature name and version provided in the filter. If the feature
* name is not provided, the method will return all featureSets currently registered to Feast.
Expand All @@ -84,25 +142,21 @@ public SpecService(
* comparator (<, <=, >, etc) and a version number, e.g. 10, <10, >=1
*
* @param filter filter containing the desired featureSet name and version filter
* @return GetFeatureSetsResponse with list of featureSets found matching the filter
* @return ListFeatureSetsResponse with list of featureSets found matching the filter
*/
public GetFeatureSetsResponse getFeatureSets(GetFeatureSetsRequest.Filter filter)
public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter filter)
throws InvalidProtocolBufferException {
String name = filter.getFeatureSetName();
checkValidFeatureSetFilterName(name, "featureSetName");
List<FeatureSet> featureSets;
if (name.equals("")) {
featureSets = featureSetRepository.findAll();
} else {
featureSets = featureSetRepository.findByNameRegex(name);
if (filter.getFeatureSetVersion().equals("latest")) {
featureSets = Ordering.natural().reverse()
.sortedCopy(featureSets).subList(0, featureSets.size() == 0 ? 0 : 1);
} else {
featureSets = featureSets.stream().filter(getVersionFilter(filter.getFeatureSetVersion()))
.collect(Collectors.toList());
}
featureSets = featureSetRepository.findByNameWithWildcard(name.replace('*', '%'));
featureSets = featureSets.stream().filter(getVersionFilter(filter.getFeatureSetVersion()))
.collect(Collectors.toList());
}
GetFeatureSetsResponse.Builder response = GetFeatureSetsResponse.newBuilder();
ListFeatureSetsResponse.Builder response = ListFeatureSetsResponse.newBuilder();
for (FeatureSet featureSet : featureSets) {
response.addFeatureSets(featureSet.toProto());
}
Expand All @@ -114,13 +168,13 @@ public GetFeatureSetsResponse getFeatureSets(GetFeatureSetsRequest.Filter filter
* the method will return all stores currently registered to Feast.
*
* @param filter filter containing the desired store name
* @return GetStoresResponse containing list of stores found matching the filter
* @return ListStoresResponse containing list of stores found matching the filter
*/
public GetStoresResponse getStores(GetStoresRequest.Filter filter) {
public ListStoresResponse listStores(ListStoresRequest.Filter filter) {
try {
String name = filter.getName();
if (name.equals("")) {
Builder responseBuilder = GetStoresResponse.newBuilder();
Builder responseBuilder = ListStoresResponse.newBuilder();
for (Store store : storeRepository.findAll()) {
responseBuilder.addStore(store.toProto());
}
Expand All @@ -129,7 +183,7 @@ public GetStoresResponse getStores(GetStoresRequest.Filter filter) {
Store store = storeRepository.findById(name)
.orElseThrow(() -> new RetrievalException(String.format("Store with name '%s' not found",
name)));
return GetStoresResponse.newBuilder()
return ListStoresResponse.newBuilder()
.addStore(store.toProto())
.build();
} catch (InvalidProtocolBufferException e) {
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/feast/core/validators/Matchers.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package feast.core.validators;

import com.google.common.base.Strings;

import java.util.regex.Pattern;

public class Matchers {

private static Pattern UPPER_SNAKE_CASE_REGEX = Pattern.compile("^[A-Z0-9]+(_[A-Z0-9]+)*$");
private static Pattern LOWER_SNAKE_CASE_REGEX = Pattern.compile("^[a-z0-9]+(_[a-z0-9]+)*$");
private static Pattern VALID_CHARACTERS_REGEX = Pattern.compile("^[a-zA-Z0-9\\-_]*$");
private static Pattern VALID_CHARACTERS_FSET_FILTER_REGEX = Pattern.compile("^[a-zA-Z0-9\\-_*]*$");

private static String ERROR_MESSAGE_TEMPLATE = "invalid value for field %s: %s";

Expand Down Expand Up @@ -61,4 +60,15 @@ public static void checkValidCharacters(String input, String fieldName)
"argument must only contain alphanumeric characters, dashes and underscores."));
}
}

public static void checkValidFeatureSetFilterName(String input, String fieldName)
throws IllegalArgumentException {
if (!VALID_CHARACTERS_FSET_FILTER_REGEX.matcher(input).matches()) {
throw new IllegalArgumentException(
String.format(
ERROR_MESSAGE_TEMPLATE,
fieldName,
"argument must only contain alphanumeric characters, dashes, underscores, or an asterisk."));
}
}
}
Loading