diff --git a/.dockerignore b/.dockerignore index 0e3b22687d..bef7cf7069 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,3 +1,4 @@ docs !docs/coverage charts +env diff --git a/Makefile b/Makefile index d8b0ff2acc..d88cbf66d8 100644 --- a/Makefile +++ b/Makefile @@ -61,7 +61,7 @@ build-java-no-tests: # Python SDK install-python-ci-dependencies: - pip install -r sdk/python/requirements-ci.txt + pip install --no-cache-dir -r sdk/python/requirements-ci.txt compile-protos-python: install-python-ci-dependencies @$(foreach dir,$(PROTO_TYPE_SUBDIRS),cd ${ROOT_DIR}/protos; python -m grpc_tools.protoc -I. --python_out=../sdk/python/ --mypy_out=../sdk/python/ feast/$(dir)/*.proto;) @@ -121,8 +121,9 @@ build-push-docker: @$(MAKE) push-serving-docker registry=$(REGISTRY) version=$(VERSION) @$(MAKE) push-ci-docker registry=$(REGISTRY) version=$(VERSION) @$(MAKE) push-jobcontroller-docker registry=$(REGISTRY) version=$(VERSION) + @$(MAKE) push-jobservice-docker registry=$(REGISTRY) version=$(VERSION) -build-docker: build-core-docker build-serving-docker build-ci-docker build-jobcontroller-docker +build-docker: build-core-docker build-serving-docker build-ci-docker build-jobcontroller-docker build-jobservice-docker push-core-docker: docker push $(REGISTRY)/feast-core:$(VERSION) @@ -130,6 +131,9 @@ push-core-docker: push-jobcontroller-docker: docker push $(REGISTRY)/feast-jobcontroller:$(VERSION) +push-jobservice-docker: + docker push $(REGISTRY)/feast-jobservice:$(VERSION) + push-serving-docker: docker push $(REGISTRY)/feast-serving:$(VERSION) @@ -142,6 +146,9 @@ push-jupyter-docker: build-core-docker: docker build -t $(REGISTRY)/feast-core:$(VERSION) -f infra/docker/core/Dockerfile . +build-jobservice-docker: + docker build -t $(REGISTRY)/feast-jobservice:$(VERSION) -f infra/docker/jobservice/Dockerfile . + build-jobcontroller-docker: docker build -t $(REGISTRY)/feast-jobcontroller:$(VERSION) -f infra/docker/jobcontroller/Dockerfile . @@ -200,4 +207,4 @@ lint-versions: # Performance test-load: - ./infra/scripts/test-load.sh $(GIT_SHA) \ No newline at end of file + ./infra/scripts/test-load.sh $(GIT_SHA) diff --git a/infra/docker/jobservice/Dockerfile b/infra/docker/jobservice/Dockerfile new file mode 100644 index 0000000000..1f1d303157 --- /dev/null +++ b/infra/docker/jobservice/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.7-slim-buster + +USER root +WORKDIR /feast + +COPY sdk/python sdk/python +COPY Makefile Makefile +COPY protos protos + +# Install make +RUN apt-get update && apt-get -y install make git + +# Install Python dependencies +RUN make compile-protos-python + +# Install Feast SDK +COPY .git .git +COPY README.md README.md +RUN pip install -e sdk/python -U + +CMD ["feast", "server"] diff --git a/protos/feast/core/JobService.proto b/protos/feast/core/JobService.proto new file mode 100644 index 0000000000..861f3b74a5 --- /dev/null +++ b/protos/feast/core/JobService.proto @@ -0,0 +1,197 @@ +// +// Copyright 2018 The Feast Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; +package feast.core; + +option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; +option java_outer_classname = "JobServiceProto"; +option java_package = "feast.proto.core"; + +import "google/protobuf/timestamp.proto"; +import "feast/core/DataSource.proto"; +import "feast/serving/ServingService.proto"; + +service JobService { + // Start job to ingest data from offline store into online store + rpc StartOfflineToOnlineIngestionJob (StartOfflineToOnlineIngestionJobRequest) returns (StartOfflineToOnlineIngestionJobResponse); + + // Produce a training dataset, return a job id that will provide a file reference + rpc GetHistoricalFeatures (GetHistoricalFeaturesRequest) returns (GetHistoricalFeaturesResponse); + + // Start job to ingest data from stream into online store + rpc StartStreamToOnlineIngestionJob (StartStreamToOnlineIngestionJobRequest) returns (StartStreamToOnlineIngestionJobResponse); + + // List all types of jobs + rpc ListJobs (ListJobsRequest) returns (ListJobsResponse); + + // Stop a single job + rpc StopJob (StopJobRequest) returns (StopJobResponse); + + // Get details of a single job + rpc GetJob (GetJobRequest) returns (GetJobResponse); +} + + +enum JobType { + INVALID_JOB = 0; + OFFLINE_TO_ONLINE_JOB = 1; + STREAM_TO_ONLINE_JOB = 2; + EXPORT_JOB = 4; +} + +enum JobStatus { + JOB_STATUS_INVALID = 0; + // The Job has be registered and waiting to get scheduled to run + JOB_STATUS_PENDING = 1; + // The Job is currently processing its task + JOB_STATUS_RUNNING = 2; + // The Job has successfully completed its task + JOB_STATUS_DONE = 3; + // The Job has encountered an error while processing its task + JOB_STATUS_ERROR = 4; +} + +message Job { + // Identifier of the Job + string id = 1; + // External Identifier of the Job assigned by the Spark executor + string external_id = 2; + // Type of the Job + JobType type = 3; + // Current job status + JobStatus status = 4; + // Timestamp on when the job was is created + google.protobuf.Timestamp created_timestamp = 5; + // Timestamp on when the job has stopped. + google.protobuf.Timestamp stop_timestamp = 6; + + message ExportJobMeta { + // Glob of the exported files that should be retrieved to reconstruct + // the dataframe with retrieved features. + repeated string file_glob = 1; + // The Historical Features request that triggered this export job + GetHistoricalFeaturesRequest request = 2; + } + + message OfflineToOnlineMeta { + // Reference to the Feature Table being populated by this job + string project = 1; + string table_name = 2; + } + + message StreamToOnlineMeta { + // Reference to the Feature Table being populated by this job + string project = 1; + string table_name = 2; + } + + // JobType specific metadata on the job + oneof meta { + ExportJobMeta export = 7; + OfflineToOnlineMeta offline_to_online = 8; + StreamToOnlineMeta stream_to_online = 9; + } +} + +// Ingest data from offline store into online store +message StartOfflineToOnlineIngestionJobRequest { + // Feature table to ingest + string project = 1; + string table_name = 2; + + // Start of time range for source data from offline store + google.protobuf.Timestamp start_date = 3; + + // End of time range for source data from offline store + google.protobuf.Timestamp end_date = 4; +} + +message StartOfflineToOnlineIngestionJobResponse { + // Job ID assigned by Feast + string id = 1; +} + +message GetHistoricalFeaturesRequest { + // List of features that are being retrieved + repeated feast.serving.FeatureReferenceV2 features = 1; + + // Batch DataSource that can be used to obtain entity values for historical retrieval. + // For each entity value, a feature value will be retrieved for that value/timestamp + // Only 'BATCH_*' source types are supported. + // Currently only BATCH_FILE source type is supported. + DataSource entities_source = 2; + + // Optional field to specify project name override. If specified, uses the + // given project for retrieval. Overrides the projects specified in + // Feature References if both are specified. + string project = 3; + + // Specifies the path in a bucket to write the exported feature data files + // Export to AWS S3 - s3://path/to/features + // Export to GCP GCS - gs://path/to/features + string destination_path = 4; +} + +message GetHistoricalFeaturesResponse { + // Export Job with ID assigned by Feast + string id = 1; +} + +message StartStreamToOnlineIngestionJobRequest { + // Feature table to ingest + string project = 1; + string table_name = 2; +} + +message StartStreamToOnlineIngestionJobResponse { + // Job ID assigned by Feast + string id = 1; +} + +message ListJobsRequest { + Filter filter = 1; + message Filter { + // Filter jobs by job type + JobType type = 1; + // Filter jobs by current job status + JobStatus status = 2; + } +} + +message ListJobsResponse { + repeated Job jobs = 1; +} + +message GetJobRequest { + string job_id = 1; +} + +message GetJobResponse { + Job job = 1; +} + +message RestartJobRequest { + string job_id = 1; +} + +message RestartJobResponse {} + +message StopJobRequest{ + string job_id = 1; +} + +message StopJobResponse {} \ No newline at end of file diff --git a/protos/feast/serving/ServingService.proto b/protos/feast/serving/ServingService.proto index aa6523deca..9b98c071b8 100644 --- a/protos/feast/serving/ServingService.proto +++ b/protos/feast/serving/ServingService.proto @@ -84,7 +84,7 @@ message FeatureReference { message FeatureReferenceV2 { // Name of the Feature Table to retrieve the feature from. string feature_table = 1; - + // Name of the Feature to retrieve the feature from. string name = 2; } @@ -120,27 +120,26 @@ message GetOnlineFeaturesRequest { message GetOnlineFeaturesRequestV2 { // List of features that are being retrieved repeated FeatureReferenceV2 features = 4; - + // List of entity rows, containing entity id and timestamp data. // Used during retrieval of feature rows and for joining feature // rows into a final dataset repeated EntityRow entity_rows = 2; - + // Optional field to specify project name override. If specified, uses the // given project for retrieval. Overrides the projects specified in // Feature References if both are specified. string project = 5; - + message EntityRow { - // Request timestamp of this row. This value will be used, - // together with maxAge, to determine feature staleness. - google.protobuf.Timestamp timestamp = 1; - - // Map containing mapping of entity name to entity value. - map fields = 2; + // Request timestamp of this row. This value will be used, + // together with maxAge, to determine feature staleness. + google.protobuf.Timestamp timestamp = 1; + + // Map containing mapping of entity name to entity value. + map fields = 2; } - } - +} message GetBatchFeaturesRequest { // List of features that are being retrieved diff --git a/sdk/go/go.mod b/sdk/go/go.mod index 6a3b3dd477..712241d356 100644 --- a/sdk/go/go.mod +++ b/sdk/go/go.mod @@ -7,7 +7,6 @@ require ( github.com/golang/protobuf v1.4.2 github.com/google/go-cmp v0.5.1 github.com/opentracing/opentracing-go v1.1.0 - github.com/stretchr/testify v1.4.0 // indirect go.opencensus.io v0.22.4 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d google.golang.org/api v0.30.0 diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 46d0675b56..f0b1ea8b2f 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -26,6 +26,7 @@ from feast.constants import CONFIG_SPARK_LAUNCHER from feast.entity import Entity from feast.feature_table import FeatureTable +from feast.job_service import start_job_service from feast.loaders.yaml import yaml_loader _logger = logging.getLogger(__name__) @@ -33,6 +34,7 @@ _common_options = [ click.option("--core-url", help="Set Feast core URL to connect to"), click.option("--serving-url", help="Set Feast serving URL to connect to"), + click.option("--job-service-url", help="Set Feast job service URL to connect to"), ] @@ -491,5 +493,13 @@ def get_historical_features(features: str, entity_df_path: str, destination: str print(job.get_output_file_uri()) +@cli.command(name="server") +def server(): + """ + Start Feast Job Service + """ + start_job_service() + + if __name__ == "__main__": cli() diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index b96613ae30..94703e6af5 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -32,6 +32,9 @@ CONFIG_CORE_URL_KEY, CONFIG_ENABLE_AUTH_KEY, CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY, + CONFIG_JOB_SERVICE_ENABLE_SSL_KEY, + CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY, + CONFIG_JOB_SERVICE_URL_KEY, CONFIG_PROJECT_KEY, CONFIG_SERVING_ENABLE_SSL_KEY, CONFIG_SERVING_SERVER_SSL_CERT_KEY, @@ -63,6 +66,7 @@ ListProjectsResponse, ) from feast.core.CoreService_pb2_grpc import CoreServiceStub +from feast.core.JobService_pb2_grpc import JobServiceStub from feast.data_format import ParquetFormat from feast.data_source import BigQuerySource, FileSource from feast.entity import Entity @@ -132,6 +136,7 @@ def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs): self._core_service_stub: Optional[CoreServiceStub] = None self._serving_service_stub: Optional[ServingServiceStub] = None + self._job_service_stub: Optional[JobServiceStub] = None self._auth_metadata: Optional[grpc.AuthMetadataPlugin] = None # Configure Auth Metadata Plugin if auth is enabled @@ -178,6 +183,27 @@ def _serving_service(self): self._serving_service_stub = ServingServiceStub(channel) return self._serving_service_stub + @property + def _job_service(self): + """ + Creates or returns the gRPC Feast Job Service Stub + + Returns: JobServiceStub + """ + if not self._job_service_stub: + channel = create_grpc_channel( + url=self._config.get(CONFIG_JOB_SERVICE_URL_KEY), + enable_ssl=self._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_SSL_KEY), + enable_auth=self._config.getboolean(CONFIG_ENABLE_AUTH_KEY), + ssl_server_cert_path=self._config.get( + CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY + ), + auth_metadata_plugin=self._auth_metadata, + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + ) + self._job_service_service_stub = JobServiceStub(channel) + return self._job_service_service_stub + @property def core_url(self) -> str: """ @@ -201,7 +227,7 @@ def core_url(self, value: str): @property def serving_url(self) -> str: """ - Retrieve Serving Core URL + Retrieve Feast Serving URL Returns: Feast Serving URL string @@ -218,6 +244,26 @@ def serving_url(self, value: str): """ self._config.set(CONFIG_SERVING_URL_KEY, value) + @property + def job_service_url(self) -> str: + """ + Retrieve Feast Job Service URL + + Returns: + Feast Job Service URL string + """ + return self._config.get(CONFIG_JOB_SERVICE_URL_KEY) + + @job_service_url.setter + def job_service_url(self, value: str): + """ + Set the Feast Job Service URL + + Args: + value: Feast Job Service URL + """ + self._config.set(CONFIG_JOB_SERVICE_URL_KEY, value) + @property def core_secure(self) -> bool: """ @@ -258,6 +304,26 @@ def serving_secure(self, value: bool): """ self._config.set(CONFIG_SERVING_ENABLE_SSL_KEY, value) + @property + def job_service_secure(self) -> bool: + """ + Retrieve Feast Job Service client-side SSL/TLS setting + + Returns: + Whether client-side SSL/TLS is enabled + """ + return self._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_SSL_KEY) + + @job_service_secure.setter + def job_service_secure(self, value: bool): + """ + Set the Feast Job Service client-side SSL/TLS setting + + Args: + value: True to enable client-side SSL/TLS + """ + self._config.set(CONFIG_JOB_SERVICE_ENABLE_SSL_KEY, value) + def version(self): """ Returns version information from Feast Core and Feast Serving diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 77ccf90790..c62ca88d77 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -49,6 +49,9 @@ class AuthProvider(Enum): CONFIG_SERVING_URL_KEY = "serving_url" CONFIG_SERVING_ENABLE_SSL_KEY = "serving_enable_ssl" CONFIG_SERVING_SERVER_SSL_CERT_KEY = "serving_server_ssl_cert" +CONFIG_JOB_SERVICE_URL_KEY = "job_service_url" +CONFIG_JOB_SERVICE_ENABLE_SSL_KEY = "job_service_enable_ssl" +CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY = "job_service_server_ssl_cert" CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default" CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply" CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = ( diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py new file mode 100644 index 0000000000..767046fb17 --- /dev/null +++ b/sdk/python/feast/job_service.py @@ -0,0 +1,37 @@ +from concurrent.futures import ThreadPoolExecutor + +import grpc + +import feast +from feast.core import JobService_pb2_grpc + + +class JobServiceServicer(JobService_pb2_grpc.JobServiceServicer): + def __init__(self): + self.client = feast.Client() + + def StartOfflineToOnlineIngestionJob(self, request, context): + """Start job to ingest data from offline store into online store + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def GetHistoricalFeatures(self, request, context): + """Produce a training dataset, return a job id that will provide a file reference + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def start_job_service(): + """ + Start Feast Job Service + """ + server = grpc.server(ThreadPoolExecutor()) + JobService_pb2_grpc.add_JobServiceServicer_to_server(JobServiceServicer(), server) + server.add_insecure_port("[::]:6568") + server.start() + print("Feast job server listening on port :6568") + server.wait_for_termination()