Skip to content

Commit

Permalink
Add store validation and comments to batch retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
woop committed Oct 23, 2019
1 parent 2174903 commit 9225738
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
12 changes: 12 additions & 0 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
DatasetSource,
DataFormat,
FeatureSetRequest,
FeastServingType,
)
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse
Expand Down Expand Up @@ -326,12 +327,21 @@ def get_batch_features(

try:
fs_request = _build_feature_set_request(feature_ids)

# Validate entity rows based on entities in Feast Core
self._validate_entity_rows_for_batch_retrieval(entity_rows, fs_request)

# Retrieve serving information to determine store type and staging location
serving_info = (
self._serving_service_stub.GetFeastServingInfo()
) # type: GetFeastServingInfoResponse

if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH:
raise Exception(
f'You are connected to a store "{self._serving_url}" which does not support batch retrieval'
)

# Export and upload entity row dataframe to staging location provided by Feast
staged_file = export_dataframe_to_staging_location(
entity_rows, serving_info.job_staging_location
) # type: str
Expand All @@ -344,6 +354,8 @@ def get_batch_features(
)
),
)

# Retrieve Feast Job object to manage life cycle of retrieval
response = self._serving_service_stub.GetBatchFeatures(request)
return Job(response.job, self._serving_service_stub)

Expand Down
4 changes: 3 additions & 1 deletion sdk/python/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
JobStatus,
DataFormat,
GetJobResponse,
FeastServingType,
)
import pytest
from feast.client import Client
Expand Down Expand Up @@ -292,7 +293,8 @@ def test_get_batch_features(self, mock_client, mocker):
mock_client._serving_service_stub,
"GetFeastServingInfo",
return_value=GetFeastServingInfoResponse(
job_staging_location=f"file://{tempfile.mkdtemp()}"
job_staging_location=f"file://{tempfile.mkdtemp()}",
type=FeastServingType.FEAST_SERVING_TYPE_BATCH,
),
)

Expand Down

0 comments on commit 9225738

Please sign in to comment.