From 299650fa27f0d6e082fbe2190c1bc02bf4a128b0 Mon Sep 17 00:00:00 2001 From: Daniele Martinoli <86618610+dmartinol@users.noreply.github.com> Date: Fri, 10 May 2024 12:04:12 +0200 Subject: [PATCH 1/3] call do_put only once, postpone the invocation of do_put and simplified _make_flight_info --- .../feast/infra/offline_stores/remote.py | 58 ++++++++----------- sdk/python/feast/offline_server.py | 56 ++++++++---------- 2 files changed, 46 insertions(+), 68 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/remote.py b/sdk/python/feast/infra/offline_stores/remote.py index 9fa67d4b34..a7011872be 100644 --- a/sdk/python/feast/infra/offline_stores/remote.py +++ b/sdk/python/feast/infra/offline_stores/remote.py @@ -1,3 +1,4 @@ +import json import uuid from datetime import datetime from pathlib import Path @@ -39,45 +40,29 @@ def __init__( entity_df: Union[pd.DataFrame, str], # TODO add missing parameters from the OfflineStore API ): - # Generate unique command identifier - self.command = str(uuid.uuid4()) # Initialize the client connection self.client = fl.connect( f"grpc://{config.offline_store.host}:{config.offline_store.port}" ) - # Put API parameters - self._put_parameters(feature_refs, entity_df) - - def _put_parameters(self, feature_refs, entity_df): - historical_flight_descriptor = fl.FlightDescriptor.for_command(self.command) - - entity_df_table = pa.Table.from_pandas(entity_df) + self.feature_refs = feature_refs + self.entity_df = entity_df + + # TODO add one specialized implementation for each OfflineStore API + # This can result in a dictionary of functions indexed by api (e.g., "get_historical_features") + def _put_parameters(self, command_descriptor, command): + entity_df_table = pa.Table.from_pandas(self.entity_df) + + api_info = { + "command": command, + "api": "get_historical_features", + "features": json.dumps(self.feature_refs), + } writer, _ = self.client.do_put( - historical_flight_descriptor, - entity_df_table.schema.with_metadata( - { - "command": self.command, - "api": "get_historical_features", - "param": "entity_df", - } - ), + command_descriptor, + entity_df_table.schema.with_metadata({"api-info": json.dumps(api_info)}), ) - writer.write_table(entity_df_table) - writer.close() - features_array = pa.array(feature_refs) - features_batch = pa.RecordBatch.from_arrays([features_array], ["features"]) - writer, _ = self.client.do_put( - historical_flight_descriptor, - features_batch.schema.with_metadata( - { - "command": self.command, - "api": "get_historical_features", - "param": "features", - } - ), - ) - writer.write_batch(features_batch) + writer.write_table(entity_df_table) writer.close() # Invoked to realize the Pandas DataFrame @@ -88,8 +73,12 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: # Invoked to synchronously execute the underlying query and return the result as an arrow table # This is where do_get service is invoked def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: - upload_descriptor = fl.FlightDescriptor.for_command(self.command) - flight = self.client.get_flight_info(upload_descriptor) + # Generate unique command identifier + command = str(uuid.uuid4()) + command_descriptor = fl.FlightDescriptor.for_command(command) + + self._put_parameters(command_descriptor, command) + flight = self.client.get_flight_info(command_descriptor) ticket = flight.endpoints[0].ticket reader = self.client.do_get(ticket) @@ -112,7 +101,6 @@ def get_historical_features( project: str, full_feature_names: bool = False, ) -> RemoteRetrievalJob: - print(f"config.offline_store is {type(config.offline_store)}") assert isinstance(config.offline_store, RemoteOfflineStoreConfig) # TODO: extend RemoteRetrievalJob API with all method parameters diff --git a/sdk/python/feast/offline_server.py b/sdk/python/feast/offline_server.py index b87db9930b..72928d2746 100644 --- a/sdk/python/feast/offline_server.py +++ b/sdk/python/feast/offline_server.py @@ -1,6 +1,7 @@ import ast +import json import traceback -from typing import Dict +from typing import Any, Dict import pyarrow as pa import pyarrow.flight as fl @@ -12,7 +13,10 @@ class OfflineServer(fl.FlightServerBase): def __init__(self, store: FeatureStore, location: str, **kwargs): super(OfflineServer, self).__init__(location, **kwargs) self._location = location + # A dictionary of configured flights, e.g. API calls received and not yet served self.flights: Dict[str, Dict[str, str]] = {} + # The data-stream associated to each API call and not yet served + self.data: Dict[str, Any] = {} self.store = store @classmethod @@ -23,20 +27,12 @@ def descriptor_to_key(self, descriptor): tuple(descriptor.path or tuple()), ) - # TODO: since we cannot anticipate here the call to get_historical_features call, what data should we return? - # ATM it returns the metadata of the "entity_df" table def _make_flight_info(self, key, descriptor, params): - table = params["entity_df"] endpoints = [fl.FlightEndpoint(repr(key), [self._location])] - mock_sink = pa.MockOutputStream() - stream_writer = pa.RecordBatchStreamWriter(mock_sink, table.schema) - stream_writer.write_table(table) - stream_writer.close() - data_size = mock_sink.size() - - return fl.FlightInfo( - table.schema, descriptor, endpoints, table.num_rows, data_size - ) + # TODO calculate actual schema from the given features + schema = pa.schema([]) + + return fl.FlightInfo(schema, descriptor, endpoints, -1, -1) def get_flight_info(self, context, descriptor): key = OfflineServer.descriptor_to_key(descriptor) @@ -59,23 +55,17 @@ def list_flights(self, context, criteria): def do_put(self, context, descriptor, reader, writer): key = OfflineServer.descriptor_to_key(descriptor) - if key in self.flights: - params = self.flights[key] - else: - params = {} decoded_metadata = { key.decode(): value.decode() for key, value in reader.schema.metadata.items() } - if "command" in decoded_metadata: - command = decoded_metadata["command"] - api = decoded_metadata["api"] - param = decoded_metadata["param"] - value = reader.read_all() - # Merge the existing dictionary for the same key, as we have multiple calls to do_put for the same key - params.update({"command": command, "api": api, param: value}) - - self.flights[key] = params + if "api-info" in decoded_metadata: + api_info = decoded_metadata["api-info"] + data = reader.read_all() + self.flights[key] = api_info + self.data[key] = data + else: + print(f"No 'api-info' field in metadata: {decoded_metadata}") # Extracts the API parameters from the flights dictionary, delegates the execution to the FeatureStore instance # and returns the stream of data @@ -85,18 +75,17 @@ def do_get(self, context, ticket): print(f"Unknown key {key}") return None - api = self.flights[key]["api"] + api_info = json.loads(self.flights[key]) + api = api_info["api"] # print(f"get key is {key}") # print(f"requested api is {api}") if api == "get_historical_features": - # Extract parameters from the internal flight descriptor - entity_df_value = self.flights[key]["entity_df"] + # Extract parameters from the internal data dictionary + entity_df_value = self.data[key] entity_df = pa.Table.to_pandas(entity_df_value) # print(f"entity_df is {entity_df}") - features_value = self.flights[key]["features"] - features = pa.RecordBatch.to_pylist(features_value) - features = [item["features"] for item in features] + features = json.loads(api_info["features"]) # print(f"features is {features}") print( @@ -113,8 +102,9 @@ def do_get(self, context, ticket): traceback.print_exc() table = pa.Table.from_pandas(training_df) - # Get service is consumed, so we clear the corresponding flight + # Get service is consumed, so we clear the corresponding flight and data del self.flights[key] + del self.data[key] return fl.RecordBatchStream(table) else: From f29771197d325a5fd9ee8df6480373bb14dcd31a Mon Sep 17 00:00:00 2001 From: Daniele Martinoli <86618610+dmartinol@users.noreply.github.com> Date: Fri, 10 May 2024 13:35:29 +0200 Subject: [PATCH 2/3] added primitive parameters to the command descriptor --- .../feast/infra/offline_stores/remote.py | 20 ++++++++----- sdk/python/feast/offline_server.py | 30 +++++++------------ 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/remote.py b/sdk/python/feast/infra/offline_stores/remote.py index a7011872be..de2cbfac0f 100644 --- a/sdk/python/feast/infra/offline_stores/remote.py +++ b/sdk/python/feast/infra/offline_stores/remote.py @@ -52,14 +52,9 @@ def __init__( def _put_parameters(self, command_descriptor, command): entity_df_table = pa.Table.from_pandas(self.entity_df) - api_info = { - "command": command, - "api": "get_historical_features", - "features": json.dumps(self.feature_refs), - } writer, _ = self.client.do_put( command_descriptor, - entity_df_table.schema.with_metadata({"api-info": json.dumps(api_info)}), + entity_df_table.schema, ) writer.write_table(entity_df_table) @@ -74,8 +69,17 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: # This is where do_get service is invoked def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: # Generate unique command identifier - command = str(uuid.uuid4()) - command_descriptor = fl.FlightDescriptor.for_command(command) + command_id = str(uuid.uuid4()) + command = { + "command_id": command_id, + "api": "get_historical_features", + "features": self.feature_refs, + } + command_descriptor = fl.FlightDescriptor.for_command( + json.dumps( + command, + ) + ) self._put_parameters(command_descriptor, command) flight = self.client.get_flight_info(command_descriptor) diff --git a/sdk/python/feast/offline_server.py b/sdk/python/feast/offline_server.py index 72928d2746..077421f73c 100644 --- a/sdk/python/feast/offline_server.py +++ b/sdk/python/feast/offline_server.py @@ -14,9 +14,7 @@ def __init__(self, store: FeatureStore, location: str, **kwargs): super(OfflineServer, self).__init__(location, **kwargs) self._location = location # A dictionary of configured flights, e.g. API calls received and not yet served - self.flights: Dict[str, Dict[str, str]] = {} - # The data-stream associated to each API call and not yet served - self.data: Dict[str, Any] = {} + self.flights: Dict[str, Any] = {} self.store = store @classmethod @@ -55,17 +53,12 @@ def list_flights(self, context, criteria): def do_put(self, context, descriptor, reader, writer): key = OfflineServer.descriptor_to_key(descriptor) - decoded_metadata = { - key.decode(): value.decode() - for key, value in reader.schema.metadata.items() - } - if "api-info" in decoded_metadata: - api_info = decoded_metadata["api-info"] + command = json.loads(key[1]) + if "api" in command: data = reader.read_all() - self.flights[key] = api_info - self.data[key] = data + self.flights[key] = data else: - print(f"No 'api-info' field in metadata: {decoded_metadata}") + print(f"No 'api' field in command: {command}") # Extracts the API parameters from the flights dictionary, delegates the execution to the FeatureStore instance # and returns the stream of data @@ -75,17 +68,17 @@ def do_get(self, context, ticket): print(f"Unknown key {key}") return None - api_info = json.loads(self.flights[key]) - api = api_info["api"] - # print(f"get key is {key}") + command = json.loads(key[1]) + api = command["api"] + # print(f"get command is {command}") # print(f"requested api is {api}") if api == "get_historical_features": - # Extract parameters from the internal data dictionary - entity_df_value = self.data[key] + # Extract parameters from the internal flights dictionary + entity_df_value = self.flights[key] entity_df = pa.Table.to_pandas(entity_df_value) # print(f"entity_df is {entity_df}") - features = json.loads(api_info["features"]) + features = command["features"] # print(f"features is {features}") print( @@ -104,7 +97,6 @@ def do_get(self, context, ticket): # Get service is consumed, so we clear the corresponding flight and data del self.flights[key] - del self.data[key] return fl.RecordBatchStream(table) else: From 31d1fe82f4bd9dc9c74a12184af6494edfa137e6 Mon Sep 17 00:00:00 2001 From: Daniele Martinoli <86618610+dmartinol@users.noreply.github.com> Date: Mon, 13 May 2024 08:40:27 +0200 Subject: [PATCH 3/3] removed redundant param --- sdk/python/feast/infra/offline_stores/remote.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/remote.py b/sdk/python/feast/infra/offline_stores/remote.py index de2cbfac0f..ccdfbd014c 100644 --- a/sdk/python/feast/infra/offline_stores/remote.py +++ b/sdk/python/feast/infra/offline_stores/remote.py @@ -49,7 +49,7 @@ def __init__( # TODO add one specialized implementation for each OfflineStore API # This can result in a dictionary of functions indexed by api (e.g., "get_historical_features") - def _put_parameters(self, command_descriptor, command): + def _put_parameters(self, command_descriptor): entity_df_table = pa.Table.from_pandas(self.entity_df) writer, _ = self.client.do_put( @@ -81,7 +81,7 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: ) ) - self._put_parameters(command_descriptor, command) + self._put_parameters(command_descriptor) flight = self.client.get_flight_info(command_descriptor) ticket = flight.endpoints[0].ticket