Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support of GC and S3 storages for registry in Java Feature Server #2043

Merged
merged 2 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion java/serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,31 @@
<version>1.6.6</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.118.0</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-nio</artifactId>
<version>0.123.10</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.110</version>
</dependency>

<dependency>
<groupId>com.adobe.testing</groupId>
<artifactId>s3mock-testcontainers</artifactId>
<version>2.2.3</version>
<scope>test</scope>
</dependency>

<!--testCompile "io.grpc:grpc-testing:${grpc.version}"-->
<dependency>
Expand Down Expand Up @@ -323,7 +348,7 @@
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>3.0.0</version>
<version>4.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,36 @@ public void setRegistry(final String registry) {
this.registry = registry;
}

private int registryRefreshInterval;

public int getRegistryRefreshInterval() {
return registryRefreshInterval;
}

public void setRegistryRefreshInterval(final int registryRefreshInterval) {
this.registryRefreshInterval = registryRefreshInterval;
}

private String gcpProject;

public String getGcpProject() {
return gcpProject;
}

public void setGcpProject(final String gcpProject) {
this.gcpProject = gcpProject;
}

private String awsRegion;

public String getAwsRegion() {
return awsRegion;
}

public void setAwsRegion(final String awsRegion) {
this.awsRegion = awsRegion;
}

private String transformationServiceEndpoint;

public String getTransformationServiceEndpoint() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.serving.config;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import feast.serving.registry.*;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Optional;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

@Configuration
public class RegistryConfig {
@Bean
@Lazy
Storage googleStorage(FeastProperties feastProperties) {
return StorageOptions.newBuilder()
.setProjectId(feastProperties.getGcpProject())
.build()
.getService();
}

@Bean
@Lazy
AmazonS3 awsStorage(FeastProperties feastProperties) {
return AmazonS3ClientBuilder.standard().withRegion(feastProperties.getAwsRegion()).build();
}

@Bean
RegistryFile registryFile(FeastProperties feastProperties, ApplicationContext context) {

String registryPath = feastProperties.getRegistry();
Optional<String> scheme = Optional.ofNullable(URI.create(registryPath).getScheme());

switch (scheme.orElseGet(() -> "")) {
case "gs":
return new GSRegistryFile(context.getBean(Storage.class), registryPath);
case "s3":
return new S3RegistryFile(context.getBean(AmazonS3.class), registryPath);
case "":
case "file":
return new LocalRegistryFile(Paths.get(registryPath));
default:
throw new RuntimeException("Registry storage %s is unsupported");
}
}

@Bean
RegistryRepository registryRepository(
RegistryFile registryFile, FeastProperties feastProperties) {
return new RegistryRepository(registryFile, feastProperties.getRegistryRefreshInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
*/
package feast.serving.config;

import feast.serving.registry.LocalRegistryRepo;
import feast.serving.registry.*;
import feast.serving.service.OnlineServingServiceV2;
import feast.serving.service.OnlineTransformationService;
import feast.serving.service.ServingServiceV2;
import feast.serving.specs.FeatureSpecRetriever;
import feast.serving.specs.RegistryFeatureSpecRetriever;
import feast.storage.api.retriever.OnlineRetrieverV2;
import feast.storage.connectors.redis.retriever.*;
import io.opentracing.Tracer;
import java.nio.file.Paths;
import org.slf4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -36,7 +33,7 @@ public class ServingServiceConfigV2 {

@Bean
public ServingServiceV2 registryBasedServingServiceV2(
FeastProperties feastProperties, Tracer tracer) {
FeastProperties feastProperties, RegistryRepository registryRepository, Tracer tracer) {
final ServingServiceV2 servingService;
final FeastProperties.Store store = feastProperties.getActiveStore();

Expand All @@ -56,23 +53,19 @@ public ServingServiceV2 registryBasedServingServiceV2(
default:
throw new RuntimeException(
String.format(
"Unable to identify online store type: %s for Regsitry Backed Serving Service",
"Unable to identify online store type: %s for Registry Backed Serving Service",
store.getType()));
}

final FeatureSpecRetriever featureSpecRetriever;
log.info("Created RegistryFeatureSpecRetriever");
log.info("Working Directory = " + System.getProperty("user.dir"));
final LocalRegistryRepo repo = new LocalRegistryRepo(Paths.get(feastProperties.getRegistry()));
featureSpecRetriever = new RegistryFeatureSpecRetriever(repo);

final String transformationServiceEndpoint = feastProperties.getTransformationServiceEndpoint();
final OnlineTransformationService onlineTransformationService =
new OnlineTransformationService(transformationServiceEndpoint, featureSpecRetriever);
new OnlineTransformationService(transformationServiceEndpoint, registryRepository);

servingService =
new OnlineServingServiceV2(
retrieverV2, tracer, featureSpecRetriever, onlineTransformationService);
retrieverV2, tracer, registryRepository, onlineTransformationService);

return servingService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import net.devh.boot.grpc.server.service.GrpcService;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.AccessDeniedException;

@GrpcService(
interceptors = {
Expand Down Expand Up @@ -89,19 +88,13 @@ public void getOnlineFeaturesV2(
responseObserver.onNext(onlineFeatures);
responseObserver.onCompleted();
} catch (SpecRetrievalException e) {
log.error("Failed to retrieve specs in SpecService", e);
log.error("Failed to retrieve specs from Registry", e);
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
} catch (AccessDeniedException e) {
log.info(String.format("User prevented from accessing one of the projects in request"));
responseObserver.onError(
Status.PERMISSION_DENIED
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
} catch (Exception e) {
log.warn("Failed to get Online Features", e);
responseObserver.onError(e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.serving.registry;

import com.google.cloud.storage.*;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.proto.core.RegistryProto;
import java.util.Optional;

public class GSRegistryFile implements RegistryFile {
private Blob blob;

public GSRegistryFile(Storage storage, String url) {
blob = storage.get(BlobId.fromGsUtilUri(url));
if (blob == null) {
throw new RuntimeException(String.format("Registry file %s was not found", url));
}
}

public RegistryProto.Registry getContent() {
try {
return RegistryProto.Registry.parseFrom(blob.getContent());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(
String.format(
"Couldn't read remote registry: %s. Error: %s",
blob.getBlobId().toGsUtilUri(), e.getMessage()));
}
}

public Optional<RegistryProto.Registry> getContentIfModified() {
try {
this.blob = blob.reload(Blob.BlobSourceOption.generationNotMatch());
} catch (StorageException e) {
if (e.getCode() == 304) {
// Content not modified
return Optional.empty();
} else {
throw new RuntimeException(
String.format(
"Couldn't read remote registry: %s. Error: %s",
blob.getBlobId().toGsUtilUri(), e.getMessage()));
}
}

return Optional.of(this.getContent());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.serving.registry;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.proto.core.RegistryProto;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;

public class LocalRegistryFile implements RegistryFile {
private RegistryProto.Registry cachedRegistry;

public LocalRegistryFile(String path) {
try {
cachedRegistry = RegistryProto.Registry.parseFrom(Files.readAllBytes(Paths.get(path)));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(
String.format(
"Couldn't read local registry: %s. Protobuf is invalid: %s", path, e.getMessage()));
} catch (IOException e) {
throw new RuntimeException(
String.format("Couldn't read local registry file: %s. Error: %s", path, e.getMessage()));
}
}

@Override
public RegistryProto.Registry getContent() {
return this.cachedRegistry;
}

@Override
public Optional<RegistryProto.Registry> getContentIfModified() {
return Optional.empty();
}
}
Loading