Skip to content

Commit

Permalink
Properly exclude entities from feature inference (feast-dev#2048)
Browse files Browse the repository at this point in the history
* Proper fixes for consistency and to make sure entities aren't inferred as features

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* Leaving out ODFV inference logic move for a different PR

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* updated tests

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>
(cherry picked from commit 63652e0)
  • Loading branch information
mavysavydav authored and Cody Lin committed Nov 19, 2021
1 parent 2bae5b3 commit 0d117d6
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 81 deletions.
6 changes: 4 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from feast.inference import (
update_data_sources_with_inferred_event_timestamp_col,
update_entities_with_inferred_types_from_feature_views,
update_feature_views_with_inferred_features,
)
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.on_demand_feature_view import OnDemandFeatureView
Expand Down Expand Up @@ -417,8 +418,9 @@ def apply(
[view.batch_source for view in views_to_update], self.config
)

for view in views_to_update:
view.infer_features_from_batch_source(self.config)
update_feature_views_with_inferred_features(
views_to_update, entities_to_update, self.config
)

for odfv in odfvs_to_update:
odfv.infer_features()
Expand Down
69 changes: 0 additions & 69 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import re
import warnings
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union
Expand All @@ -22,7 +21,6 @@

from feast import utils
from feast.data_source import DataSource
from feast.errors import RegistryInferenceFailure
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
Expand All @@ -35,7 +33,6 @@
from feast.protos.feast.core.FeatureView_pb2 import (
MaterializationInterval as MaterializationIntervalProto,
)
from feast.repo_config import RepoConfig
from feast.usage import log_exceptions
from feast.value_type import ValueType

Expand Down Expand Up @@ -428,69 +425,3 @@ def most_recent_end_time(self) -> Optional[datetime]:
if len(self.materialization_intervals) == 0:
return None
return max([interval[1] for interval in self.materialization_intervals])

def infer_features_from_batch_source(self, config: RepoConfig):
"""
Infers the set of features associated to this feature view from the input source.
Args:
config: Configuration object used to configure the feature store.
Raises:
RegistryInferenceFailure: The set of features could not be inferred.
"""
if not self.features:
columns_to_exclude = {
self.batch_source.event_timestamp_column,
self.batch_source.created_timestamp_column,
} | set(self.entities)

if (
self.batch_source.event_timestamp_column
in self.batch_source.field_mapping
):
columns_to_exclude.add(
self.batch_source.field_mapping[
self.batch_source.event_timestamp_column
]
)
if (
self.batch_source.created_timestamp_column
in self.batch_source.field_mapping
):
columns_to_exclude.add(
self.batch_source.field_mapping[
self.batch_source.created_timestamp_column
]
)
for e in self.entities:
if e in self.batch_source.field_mapping:
columns_to_exclude.add(self.batch_source.field_mapping[e])

for (
col_name,
col_datatype,
) in self.batch_source.get_table_column_names_and_types(config):
if col_name not in columns_to_exclude and not re.match(
"^__|__$",
col_name, # double underscores often signal an internal-use column
):
feature_name = (
self.batch_source.field_mapping[col_name]
if col_name in self.batch_source.field_mapping
else col_name
)
self.features.append(
Feature(
feature_name,
self.batch_source.source_datatype_to_feast_value_type()(
col_datatype
),
)
)

if not self.features:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {self.name}.",
)
66 changes: 65 additions & 1 deletion sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
from typing import List

from feast import BigQuerySource, Entity, FileSource, RedshiftSource
from feast import BigQuerySource, Entity, Feature, FileSource, RedshiftSource
from feast.data_source import DataSource
from feast.errors import RegistryInferenceFailure
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -118,3 +118,67 @@ def update_data_sources_with_inferred_event_timestamp_col(
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
""",
)


def update_feature_views_with_inferred_features(
fvs: List[FeatureView], entities: List[Entity], config: RepoConfig
) -> None:
"""
Infers the set of features associated to each FeatureView and updates the FeatureView with those features.
Inference occurs through considering each column of the underlying data source as a feature except columns that are
associated with the data source's timestamp columns and the FeatureView's entity columns.
"""
entity_name_to_join_key_map = {entity.name: entity.join_key for entity in entities}

for fv in fvs:
if not fv.features:
columns_to_exclude = {
fv.batch_source.event_timestamp_column,
fv.batch_source.created_timestamp_column,
} | {
entity_name_to_join_key_map[entity_name] for entity_name in fv.entities
}

if fv.batch_source.event_timestamp_column in fv.batch_source.field_mapping:
columns_to_exclude.add(
fv.batch_source.field_mapping[
fv.batch_source.event_timestamp_column
]
)
if (
fv.batch_source.created_timestamp_column
in fv.batch_source.field_mapping
):
columns_to_exclude.add(
fv.batch_source.field_mapping[
fv.batch_source.created_timestamp_column
]
)

for (
col_name,
col_datatype,
) in fv.batch_source.get_table_column_names_and_types(config):
if col_name not in columns_to_exclude and not re.match(
"^__|__$",
col_name, # double underscores often signal an internal-use column
):
feature_name = (
fv.batch_source.field_mapping[col_name]
if col_name in fv.batch_source.field_mapping
else col_name
)
fv.features.append(
Feature(
feature_name,
fv.batch_source.source_datatype_to_feast_value_type()(
col_datatype
),
)
)

if not fv.features:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {fv.name}.",
)
4 changes: 2 additions & 2 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def simple_dataset_1() -> pd.DataFrame:
now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")
data = {
"id": [1, 2, 1, 3, 3],
"id_join_key": [1, 2, 1, 3, 3],
"float_col": [0.1, 0.2, 0.3, 4, 5],
"int64_col": [1, 2, 3, 4, 5],
"string_col": ["a", "b", "c", "d", "e"],
Expand All @@ -102,7 +102,7 @@ def simple_dataset_2() -> pd.DataFrame:
now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")
data = {
"id": ["a", "b", "c", "d", "e"],
"id_join_key": ["a", "b", "c", "d", "e"],
"float_col": [0.1, 0.2, 0.3, 4, 5],
"int64_col": [1, 2, 3, 4, 5],
"string_col": ["a", "b", "c", "d", "e"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source):
with prep_file_source(
df=dataframe_source, event_timestamp_column="ts_1"
) as file_source:
entity = Entity(name="id", join_key="id_join_key", value_type=ValueType.INT64)

fv1 = FeatureView(
name="fv1",
entities=["id"],
Expand Down Expand Up @@ -245,7 +247,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source):
tags={},
)

test_feature_store.apply([fv1, fv2, fv3]) # Register Feature Views
test_feature_store.apply([entity, fv1, fv2, fv3]) # Register Feature Views
feature_view_1 = test_feature_store.list_feature_views()[0]
feature_view_2 = test_feature_store.list_feature_views()[1]
feature_view_3 = test_feature_store.list_feature_views()[2]
Expand Down Expand Up @@ -433,7 +435,7 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source):
df=dataframe_source, event_timestamp_column="ts_1"
) as file_source:

e = Entity(name="id", value_type=ValueType.STRING)
e = Entity(name="id", join_key="id_join_key", value_type=ValueType.STRING)

# Create Feature View
fv1 = FeatureView(
Expand Down
14 changes: 9 additions & 5 deletions sdk/python/tests/integration/registration/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@ def test_update_entities_with_inferred_types_from_feature_views(
name="fv2", entities=["id"], batch_source=file_source_2, ttl=None,
)

actual_1 = Entity(name="id")
actual_2 = Entity(name="id")
actual_1 = Entity(name="id", join_key="id_join_key")
actual_2 = Entity(name="id", join_key="id_join_key")

update_entities_with_inferred_types_from_feature_views(
[actual_1], [fv1], RepoConfig(provider="local", project="test")
)
update_entities_with_inferred_types_from_feature_views(
[actual_2], [fv2], RepoConfig(provider="local", project="test")
)
assert actual_1 == Entity(name="id", value_type=ValueType.INT64)
assert actual_2 == Entity(name="id", value_type=ValueType.STRING)
assert actual_1 == Entity(
name="id", join_key="id_join_key", value_type=ValueType.INT64
)
assert actual_2 == Entity(
name="id", join_key="id_join_key", value_type=ValueType.STRING
)

with pytest.raises(RegistryInferenceFailure):
# two viable data types
update_entities_with_inferred_types_from_feature_views(
[Entity(name="id")],
[Entity(name="id", join_key="id_join_key")],
[fv1, fv2],
RepoConfig(provider="local", project="test"),
)
Expand Down

0 comments on commit 0d117d6

Please sign in to comment.