Skip to content

Commit

Permalink
feat: Move get_online_features to OnlineStore interface (feast-dev#4319)
Browse files Browse the repository at this point in the history
* move get_online_features to OnlineStore interface

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>

* fix pydantic warnings

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>

* run ruff format

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>

---------

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>
  • Loading branch information
tokoko authored Jul 1, 2024
1 parent 9451d9c commit 7072fd0
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 182 deletions.
185 changes: 10 additions & 175 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,75 +1559,16 @@ def get_online_features(
... )
>>> online_response_dict = online_response.to_dict()
"""
if isinstance(entity_rows, list):
columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()}
for entity_row in entity_rows:
for key, value in entity_row.items():
try:
columnar[key].append(value)
except KeyError as e:
raise ValueError(
"All entity_rows must have the same keys."
) from e

entity_rows = columnar
provider = self._get_provider()

(
join_key_values,
grouped_refs,
entity_name_to_join_key_map,
requested_on_demand_feature_views,
feature_refs,
requested_result_row_names,
online_features_response,
) = utils._prepare_entities_to_read_from_online_store(
return provider.get_online_features(
config=self.config,
features=features,
entity_rows=entity_rows,
registry=self._registry,
project=self.project,
features=features,
entity_values=entity_rows,
full_feature_names=full_feature_names,
native_entity_values=True,
)

provider = self._get_provider()
for table, requested_features in grouped_refs:
# Get the correct set of entity values with the correct join keys.
table_entity_values, idxs = utils._get_unique_entities(
table,
join_key_values,
entity_name_to_join_key_map,
)

# Fetch feature data for the minimum set of Entities.
feature_data = self._read_from_online_store(
table_entity_values,
provider,
requested_features,
table,
)

# Populate the result_rows with the Features from the OnlineStore inplace.
utils._populate_response_from_feature_data(
feature_data,
idxs,
online_features_response,
full_feature_names,
requested_features,
table,
)

if requested_on_demand_feature_views:
utils._augment_response_with_on_demand_transforms(
online_features_response,
feature_refs,
requested_on_demand_feature_views,
full_feature_names,
)

utils._drop_unneeded_columns(
online_features_response, requested_result_row_names
)
return OnlineResponse(online_features_response)

async def get_online_features_async(
self,
Expand Down Expand Up @@ -1664,75 +1605,16 @@ async def get_online_features_async(
Raises:
Exception: No entity with the specified name exists.
"""
if isinstance(entity_rows, list):
columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()}
for entity_row in entity_rows:
for key, value in entity_row.items():
try:
columnar[key].append(value)
except KeyError as e:
raise ValueError(
"All entity_rows must have the same keys."
) from e

entity_rows = columnar
provider = self._get_provider()

(
join_key_values,
grouped_refs,
entity_name_to_join_key_map,
requested_on_demand_feature_views,
feature_refs,
requested_result_row_names,
online_features_response,
) = utils._prepare_entities_to_read_from_online_store(
return await provider.get_online_features_async(
config=self.config,
features=features,
entity_rows=entity_rows,
registry=self._registry,
project=self.project,
features=features,
entity_values=entity_rows,
full_feature_names=full_feature_names,
native_entity_values=True,
)

provider = self._get_provider()
for table, requested_features in grouped_refs:
# Get the correct set of entity values with the correct join keys.
table_entity_values, idxs = utils._get_unique_entities(
table,
join_key_values,
entity_name_to_join_key_map,
)

# Fetch feature data for the minimum set of Entities.
feature_data = await self._read_from_online_store_async(
table_entity_values,
provider,
requested_features,
table,
)

# Populate the result_rows with the Features from the OnlineStore inplace.
utils._populate_response_from_feature_data(
feature_data,
idxs,
online_features_response,
full_feature_names,
requested_features,
table,
)

if requested_on_demand_feature_views:
utils._augment_response_with_on_demand_transforms(
online_features_response,
feature_refs,
requested_on_demand_feature_views,
full_feature_names,
)

utils._drop_unneeded_columns(
online_features_response, requested_result_row_names
)
return OnlineResponse(online_features_response)

def retrieve_online_documents(
self,
Expand Down Expand Up @@ -1806,53 +1688,6 @@ def retrieve_online_documents(
)
return OnlineResponse(online_features_response)

def _read_from_online_store(
self,
entity_rows: Iterable[Mapping[str, Value]],
provider: Provider,
requested_features: List[str],
table: FeatureView,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
"""Read and process data from the OnlineStore for a given FeatureView.
This method guarantees that the order of the data in each element of the
List returned is the same as the order of `requested_features`.
This method assumes that `provider.online_read` returns data for each
combination of Entities in `entity_rows` in the same order as they
are provided.
"""
entity_key_protos = utils._get_entity_key_protos(entity_rows)

# Fetch data for Entities.
read_rows = provider.online_read(
config=self.config,
table=table,
entity_keys=entity_key_protos,
requested_features=requested_features,
)

return utils._convert_rows_to_protobuf(requested_features, read_rows)

async def _read_from_online_store_async(
self,
entity_rows: Iterable[Mapping[str, Value]],
provider: Provider,
requested_features: List[str],
table: FeatureView,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
entity_key_protos = utils._get_entity_key_protos(entity_rows)

# Fetch data for Entities.
read_rows = await provider.online_read_async(
config=self.config,
table=table,
entity_keys=entity_key_protos,
requested_features=requested_features,
)

return utils._convert_rows_to_protobuf(requested_features, read_rows)

def _retrieve_from_online_store(
self,
provider: Provider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
def _create_configuration_map(self, job_id, paths, feature_view, namespace):
"""Create a Kubernetes configmap for this job"""

feature_store_configuration = yaml.dump(self.repo_config.dict(by_alias=True))
feature_store_configuration = yaml.dump(
self.repo_config.model_dump(by_alias=True)
)

materialization_config = yaml.dump(
{"paths": paths, "feature_view": feature_view.name}
Expand Down
Loading

0 comments on commit 7072fd0

Please sign in to comment.