From 92257381d1581d82d348f70b320eb47b6e6b516b Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Wed, 23 Oct 2019 18:29:26 +0800 Subject: [PATCH] Add store validation and comments to batch retrieval --- sdk/python/feast/client.py | 12 ++++++++++++ sdk/python/tests/test_client.py | 4 +++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 74bd3ba0ab..f0855de207 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -43,6 +43,7 @@ DatasetSource, DataFormat, FeatureSetRequest, + FeastServingType, ) from feast.serving.ServingService_pb2_grpc import ServingServiceStub from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse @@ -326,12 +327,21 @@ def get_batch_features( try: fs_request = _build_feature_set_request(feature_ids) + + # Validate entity rows based on entities in Feast Core self._validate_entity_rows_for_batch_retrieval(entity_rows, fs_request) + # Retrieve serving information to determine store type and staging location serving_info = ( self._serving_service_stub.GetFeastServingInfo() ) # type: GetFeastServingInfoResponse + if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH: + raise Exception( + f'You are connected to a store "{self._serving_url}" which does not support batch retrieval' + ) + + # Export and upload entity row dataframe to staging location provided by Feast staged_file = export_dataframe_to_staging_location( entity_rows, serving_info.job_staging_location ) # type: str @@ -344,6 +354,8 @@ def get_batch_features( ) ), ) + + # Retrieve Feast Job object to manage life cycle of retrieval response = self._serving_service_stub.GetBatchFeatures(request) return Job(response.job, self._serving_service_stub) diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index d55b28b5d6..7966fabe8d 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -35,6 +35,7 @@ JobStatus, DataFormat, GetJobResponse, + FeastServingType, ) import pytest from feast.client import Client @@ -292,7 +293,8 @@ def test_get_batch_features(self, mock_client, mocker): mock_client._serving_service_stub, "GetFeastServingInfo", return_value=GetFeastServingInfoResponse( - job_staging_location=f"file://{tempfile.mkdtemp()}" + job_staging_location=f"file://{tempfile.mkdtemp()}", + type=FeastServingType.FEAST_SERVING_TYPE_BATCH, ), )