From ed8d46168f9d0f8b31fa227cebb909faf422b9f2 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Sat, 12 Jun 2021 12:23:48 -0700 Subject: [PATCH 1/8] wip1 Signed-off-by: David Y Liu --- sdk/python/feast/data_source.py | 42 ++--------------- sdk/python/feast/feature_store.py | 8 +++- sdk/python/feast/inference.py | 50 +++++++++++++++++++++ sdk/python/feast/repo_operations.py | 9 +++- sdk/python/tests/test_inference.py | 43 +++++++++++------- sdk/python/tests/utils/data_source_utils.py | 6 +-- 6 files changed, 99 insertions(+), 59 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 96ef9e46a0..c563085bb8 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -519,41 +519,7 @@ def to_proto(self) -> DataSourceProto: Converts an DataSourceProto object to its protobuf representation. """ raise NotImplementedError - - def _infer_event_timestamp_column(self, ts_column_type_regex_pattern): - ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" - USER_GUIDANCE = "Please specify event_timestamp_column explicitly." - - if isinstance(self, FileSource) or isinstance(self, BigQuerySource): - event_timestamp_column, matched_flag = None, False - for col_name, col_datatype in self.get_table_column_names_and_types(): - if re.match(ts_column_type_regex_pattern, col_datatype): - if matched_flag: - raise TypeError( - f""" - {ERROR_MSG_PREFIX} due to multiple possible columns satisfying - the criteria. {USER_GUIDANCE} - """ - ) - matched_flag = True - event_timestamp_column = col_name - if matched_flag: - return event_timestamp_column - else: - raise TypeError( - f""" - {ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria. - {USER_GUIDANCE} - """ - ) - else: - raise TypeError( - f""" - {ERROR_MSG_PREFIX} because this DataSource currently does not support this inference. - {USER_GUIDANCE} - """ - ) - + class FileSource(DataSource): def __init__( @@ -598,7 +564,7 @@ def __init__( self._file_options = FileOptions(file_format=file_format, file_url=file_url) super().__init__( - event_timestamp_column or self._infer_event_timestamp_column(r"^timestamp"), + event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, @@ -672,8 +638,7 @@ def __init__( self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) super().__init__( - event_timestamp_column - or self._infer_event_timestamp_column("TIMESTAMP|DATETIME"), + event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, @@ -897,3 +862,4 @@ def to_proto(self) -> DataSourceProto: data_source_proto.date_partition_column = self.date_partition_column return data_source_proto + diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 497ba3a368..e0e2a9d795 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -26,7 +26,10 @@ from feast.entity import Entity from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException from feast.feature_view import FeatureView -from feast.inference import infer_entity_value_type_from_feature_views +from feast.inference import ( + infer_entity_value_type_from_feature_views, + infer_event_timestamp_column_for_data_sources +) from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.online_response import OnlineResponse, _infer_online_entity_rows from feast.protos.feast.serving.ServingService_pb2 import ( @@ -224,6 +227,9 @@ def apply( entities_to_update = infer_entity_value_type_from_feature_views( [ob for ob in objects if isinstance(ob, Entity)], views_to_update ) + infer_event_timestamp_column_for_data_sources( + [view.input for view in views_to_update] + ) if len(views_to_update) + len(entities_to_update) != len(objects): raise ValueError("Unknown object type provided as part of apply() call") diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 54105a9bc2..89123ddbe1 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -1,6 +1,8 @@ +import re from typing import List from feast import Entity +from feast.data_source import BigQuerySource, DataSource, FileSource from feast.feature_view import FeatureView from feast.value_type import ValueType @@ -54,3 +56,51 @@ def infer_entity_value_type_from_feature_views( entity.value_type = inferred_value_type return entities + + +def infer_event_timestamp_column_for_data_sources(data_sources: List[DataSource]) -> List[DataSource]: + ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" + USER_GUIDANCE = "Please specify event_timestamp_column explicitly." + + for data_source in data_sources: + if data_source.event_timestamp_column is None: + # prepare right match pattern for data source + ts_column_type_regex_pattern = "" + if isinstance(data_source, FileSource): + ts_column_type_regex_pattern = r"^timestamp" + elif isinstance(data_source, BigQuerySource): + ts_column_type_regex_pattern = "TIMESTAMP|DATETIME" + else: + raise TypeError( + f""" + Inference failed. Data source not supported + """ + ) + + # loop through table columns to find singular match + event_timestamp_column, matched_flag = None, False + for col_name, col_datatype in data_source.get_table_column_names_and_types(): + if re.match(ts_column_type_regex_pattern, col_datatype): + if matched_flag: + raise TypeError( + f""" + {ERROR_MSG_PREFIX} due to multiple possible columns satisfying + the criteria. {USER_GUIDANCE}. {col_name} {col_datatype} REGEX PAT:{ts_column_type_regex_pattern} + """ + ) + matched_flag = True + event_timestamp_column = col_name + if matched_flag: + data_source.event_timestamp_column = event_timestamp_column + else: + raise TypeError( + f""" + {ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria. + {USER_GUIDANCE} + """ + ) + + return data_sources + + + diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 63ed5c74d7..ef00af100b 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -13,7 +13,10 @@ from feast import Entity, FeatureTable from feast.feature_view import FeatureView -from feast.inference import infer_entity_value_type_from_feature_views +from feast.inference import ( + infer_entity_value_type_from_feature_views, + infer_event_timestamp_column_for_data_sources +) from feast.infra.offline_stores.helpers import assert_offline_store_supports_data_source from feast.infra.provider import get_provider from feast.names import adjectives, animals @@ -136,6 +139,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): ), feature_views=repo.feature_views, ) + infer_event_timestamp_column_for_data_sources( + [view.input for view in repo.feature_views] + ) + sys.dont_write_bytecode = False for entity in repo.entities: registry.apply_entity(entity, project=project) diff --git a/sdk/python/tests/test_inference.py b/sdk/python/tests/test_inference.py index 886aca8ab2..4214a6702d 100644 --- a/sdk/python/tests/test_inference.py +++ b/sdk/python/tests/test_inference.py @@ -1,3 +1,4 @@ +import pandas as pd import pytest from utils.data_source_utils import ( prep_file_source, @@ -7,22 +8,10 @@ from feast import Entity, ValueType from feast.feature_view import FeatureView -from feast.inference import infer_entity_value_type_from_feature_views - - -@pytest.mark.integration -def test_data_source_ts_col_inference_success(simple_dataset_1): - with prep_file_source(df=simple_dataset_1) as file_source: - actual_file_source = file_source.event_timestamp_column - actual_bq_1 = simple_bq_source_using_table_ref_arg( - simple_dataset_1 - ).event_timestamp_column - actual_bq_2 = simple_bq_source_using_query_arg( - simple_dataset_1 - ).event_timestamp_column - expected = "ts_1" - - assert expected == actual_file_source == actual_bq_1 == actual_bq_2 +from feast.inference import ( + infer_entity_value_type_from_feature_views, + infer_event_timestamp_column_for_data_sources +) def test_infer_entity_value_type_from_feature_views(simple_dataset_1, simple_dataset_2): @@ -47,3 +36,25 @@ def test_infer_entity_value_type_from_feature_views(simple_dataset_1, simple_dat with pytest.raises(ValueError): # two viable data types infer_entity_value_type_from_feature_views([Entity(name="id")], [fv1, fv2]) + + +def test_infer_event_timestamp_column_for_data_source(simple_dataset_1): + df_with_two_viable_timestamp_cols = simple_dataset_1.copy(deep=True) + df_with_two_viable_timestamp_cols["ts_2"] = simple_dataset_1["ts_1"] + + with prep_file_source(df=simple_dataset_1) as file_source: + actual_processed_data_sources = infer_event_timestamp_column_for_data_sources( + [file_source, simple_bq_source_using_table_ref_arg(simple_dataset_1)] + ) + actual_event_timestamp_cols = [source.event_timestamp_column for source in actual_processed_data_sources] + + assert actual_event_timestamp_cols == ["ts_1", "ts_1"] + + with prep_file_source( + df=df_with_two_viable_timestamp_cols + ) as file_source: + with pytest.raises(TypeError): + # two viable event_timestamp_columns + infer_event_timestamp_column_for_data_sources( + [file_source] + ) diff --git a/sdk/python/tests/utils/data_source_utils.py b/sdk/python/tests/utils/data_source_utils.py index 0aec0c6f1a..c848b8ea64 100644 --- a/sdk/python/tests/utils/data_source_utils.py +++ b/sdk/python/tests/utils/data_source_utils.py @@ -8,7 +8,7 @@ @contextlib.contextmanager -def prep_file_source(df, event_timestamp_column="") -> FileSource: +def prep_file_source(df, event_timestamp_column=None) -> FileSource: with tempfile.NamedTemporaryFile(suffix=".parquet") as f: f.close() df.to_parquet(f.name) @@ -21,7 +21,7 @@ def prep_file_source(df, event_timestamp_column="") -> FileSource: def simple_bq_source_using_table_ref_arg( - df, event_timestamp_column="" + df, event_timestamp_column=None ) -> BigQuerySource: client = bigquery.Client() gcp_project = client.project @@ -46,7 +46,7 @@ def simple_bq_source_using_table_ref_arg( ) -def simple_bq_source_using_query_arg(df, event_timestamp_column="") -> BigQuerySource: +def simple_bq_source_using_query_arg(df, event_timestamp_column=None) -> BigQuerySource: bq_source_using_table_ref = simple_bq_source_using_table_ref_arg( df, event_timestamp_column ) From 91ef76b4132d9cf6f8aa2eed4daafe0db0ba5414 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Sat, 12 Jun 2021 16:07:01 -0700 Subject: [PATCH 2/8] just need to do clean up Signed-off-by: David Y Liu --- sdk/python/feast/data_source.py | 18 +++------- sdk/python/feast/errors.py | 10 ++++++ sdk/python/feast/feature_store.py | 2 ++ sdk/python/feast/feature_view.py | 51 ++++++++++++++++------------- sdk/python/feast/inference.py | 24 ++++++++------ sdk/python/feast/repo_operations.py | 2 ++ sdk/python/tests/test_inference.py | 6 ++-- 7 files changed, 65 insertions(+), 48 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index c563085bb8..a58ce547a8 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -708,20 +708,12 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]: from google.cloud import bigquery client = bigquery.Client() - name_type_pairs = [] if self.table_ref is not None: - project_id, dataset_id, table_id = self.table_ref.split(".") - bq_columns_query = f""" - SELECT COLUMN_NAME, DATA_TYPE FROM {project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = '{table_id}' - """ - table_schema = ( - client.query(bq_columns_query).result().to_dataframe_iterable() - ) - for df in table_schema: - name_type_pairs.extend( - list(zip(df["COLUMN_NAME"].to_list(), df["DATA_TYPE"].to_list())) - ) + table_schema = client.get_table(self.table_ref).schema + if not isinstance(table_schema[0], bigquery.schema.SchemaField): + raise TypeError("Could not parse BigQuery table schema.") + + name_type_pairs = [(field.name, field.field_type) for field in table_schema] else: bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1" queryRes = client.query(bq_columns_query).result() diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index a0b3e2bf49..4bbe902d0c 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -94,3 +94,13 @@ def __init__( f"The DataFrame from {source} being materialized must have at least {join_key_columns} columns present, " f"but these were missing: {join_key_columns - source_columns} " ) + + +class RegistryInferenceFailure(Exception): + def __init__( + self, repo_obj_type: str, specific_issue: str + ): + super().__init__( + f"Inference to fill in missing information for {repo_obj_type} failed. {specific_issue}. " + "Try filling the information explicitly." + ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index e0e2a9d795..3c821e9ca0 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -230,6 +230,8 @@ def apply( infer_event_timestamp_column_for_data_sources( [view.input for view in views_to_update] ) + for view in views_to_update: + view.infer_features_from_input_source() if len(views_to_update) + len(entities_to_update) != len(objects): raise ValueError("Unknown object type provided as part of apply() call") diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index db756dda79..f0582c4a12 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -21,6 +21,7 @@ from feast import utils from feast.data_source import BigQuerySource, DataSource, FileSource +from feast.errors import RegistryInferenceFailure from feast.feature import Feature from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto from feast.protos.feast.core.FeatureView_pb2 import ( @@ -64,29 +65,6 @@ def __init__( tags: Optional[Dict[str, str]] = None, online: bool = True, ): - if not features: - features = [] # to handle python's mutable default arguments - columns_to_exclude = { - input.event_timestamp_column, - input.created_timestamp_column, - } | set(entities) - - for col_name, col_datatype in input.get_table_column_names_and_types(): - if col_name not in columns_to_exclude and not re.match( - "^__|__$", col_name - ): - features.append( - Feature( - col_name, - input.source_datatype_to_feast_value_type()(col_datatype), - ) - ) - - if not features: - raise ValueError( - f"Could not infer Features for the FeatureView named {name}. Please specify Features explicitly for this FeatureView." - ) - cols = [entity for entity in entities] + [feat.name for feat in features] for col in cols: if input.field_mapping is not None and col in input.field_mapping.keys(): @@ -241,3 +219,30 @@ def most_recent_end_time(self) -> Optional[datetime]: if len(self.materialization_intervals) == 0: return None return max([interval[1] for interval in self.materialization_intervals]) + + + def infer_features_from_input_source(self): + if not self.features: + columns_to_exclude = { + self.input.event_timestamp_column, + self.input.created_timestamp_column, + } | set(self.entities) + + for col_name, col_datatype in self.input.get_table_column_names_and_types(): + if col_name not in columns_to_exclude and not re.match( + "^__|__$", col_name # double underscores often signal an internal-use column + ): + feature_name = self.input.field_mapping[col_name] if col_name in self.input.field_mapping.keys() else col_name + self.features.append( + Feature( + feature_name, + self.input.source_datatype_to_feast_value_type()(col_datatype), + ) + ) + + if not self.features: + raise RegistryInferenceFailure( + "FeatureView", + f"Could not infer Features for the FeatureView named {self.name}." + ) + diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 89123ddbe1..1f60263cfb 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -3,6 +3,7 @@ from feast import Entity from feast.data_source import BigQuerySource, DataSource, FileSource +from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView from feast.value_type import ValueType @@ -47,10 +48,11 @@ def infer_entity_value_type_from_feature_views( entity.value_type != ValueType.UNKNOWN and entity.value_type != inferred_value_type ) or (len(extracted_entity_name_type_pairs) > 1): - raise ValueError( + raise RegistryInferenceFailure( + "Entity", f"""Entity value_type inference failed for {entity_name} entity. - Multiple viable matches. Please explicitly specify the entity value_type - for this entity.""" + Multiple viable matches. + """ ) entity.value_type = inferred_value_type @@ -60,7 +62,6 @@ def infer_entity_value_type_from_feature_views( def infer_event_timestamp_column_for_data_sources(data_sources: List[DataSource]) -> List[DataSource]: ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" - USER_GUIDANCE = "Please specify event_timestamp_column explicitly." for data_source in data_sources: if data_source.event_timestamp_column is None: @@ -71,9 +72,11 @@ def infer_event_timestamp_column_for_data_sources(data_sources: List[DataSource] elif isinstance(data_source, BigQuerySource): ts_column_type_regex_pattern = "TIMESTAMP|DATETIME" else: - raise TypeError( + raise RegistryInferenceFailure( + "DataSource", f""" - Inference failed. Data source not supported + DataSource inferencing of event_timestamp_column is currently only supported + for FileSource and BigQuerySource. """ ) @@ -82,10 +85,11 @@ def infer_event_timestamp_column_for_data_sources(data_sources: List[DataSource] for col_name, col_datatype in data_source.get_table_column_names_and_types(): if re.match(ts_column_type_regex_pattern, col_datatype): if matched_flag: - raise TypeError( + raise RegistryInferenceFailure( + "DataSource", f""" {ERROR_MSG_PREFIX} due to multiple possible columns satisfying - the criteria. {USER_GUIDANCE}. {col_name} {col_datatype} REGEX PAT:{ts_column_type_regex_pattern} + the criteria. {ts_column_type_regex_pattern} {col_name} """ ) matched_flag = True @@ -93,10 +97,10 @@ def infer_event_timestamp_column_for_data_sources(data_sources: List[DataSource] if matched_flag: data_source.event_timestamp_column = event_timestamp_column else: - raise TypeError( + raise RegistryInferenceFailure( + "DataSource", f""" {ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria. - {USER_GUIDANCE} """ ) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index ef00af100b..b91efdfcce 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -142,6 +142,8 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): infer_event_timestamp_column_for_data_sources( [view.input for view in repo.feature_views] ) + for view in repo.feature_views: + view.infer_features_from_input_source() sys.dont_write_bytecode = False for entity in repo.entities: diff --git a/sdk/python/tests/test_inference.py b/sdk/python/tests/test_inference.py index 4214a6702d..a4b7861436 100644 --- a/sdk/python/tests/test_inference.py +++ b/sdk/python/tests/test_inference.py @@ -7,6 +7,7 @@ ) from feast import Entity, ValueType +from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView from feast.inference import ( infer_entity_value_type_from_feature_views, @@ -33,11 +34,12 @@ def test_infer_entity_value_type_from_feature_views(simple_dataset_1, simple_dat assert actual_1 == [Entity(name="id", value_type=ValueType.INT64)] assert actual_2 == [Entity(name="id", value_type=ValueType.STRING)] - with pytest.raises(ValueError): + with pytest.raises(RegistryInferenceFailure): # two viable data types infer_entity_value_type_from_feature_views([Entity(name="id")], [fv1, fv2]) +@pytest.mark.integration def test_infer_event_timestamp_column_for_data_source(simple_dataset_1): df_with_two_viable_timestamp_cols = simple_dataset_1.copy(deep=True) df_with_two_viable_timestamp_cols["ts_2"] = simple_dataset_1["ts_1"] @@ -53,7 +55,7 @@ def test_infer_event_timestamp_column_for_data_source(simple_dataset_1): with prep_file_source( df=df_with_two_viable_timestamp_cols ) as file_source: - with pytest.raises(TypeError): + with pytest.raises(RegistryInferenceFailure): # two viable event_timestamp_columns infer_event_timestamp_column_for_data_sources( [file_source] From 0dab89cebbc971e761debcefee312a56e5078307 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Sat, 12 Jun 2021 16:56:03 -0700 Subject: [PATCH 3/8] linted Signed-off-by: David Y Liu --- sdk/python/feast/data_source.py | 8 +++----- sdk/python/feast/errors.py | 4 +--- sdk/python/feast/feature_store.py | 2 +- sdk/python/feast/feature_view.py | 17 ++++++++++------ sdk/python/feast/inference.py | 30 +++++++++++++++++------------ sdk/python/feast/repo_operations.py | 2 +- sdk/python/tests/test_inference.py | 16 ++++++--------- 7 files changed, 41 insertions(+), 38 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index a58ce547a8..bde966816b 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -14,7 +14,6 @@ import enum -import re from typing import Callable, Dict, Iterable, Optional, Tuple from pyarrow.parquet import ParquetFile @@ -519,7 +518,7 @@ def to_proto(self) -> DataSourceProto: Converts an DataSourceProto object to its protobuf representation. """ raise NotImplementedError - + class FileSource(DataSource): def __init__( @@ -564,7 +563,7 @@ def __init__( self._file_options = FileOptions(file_format=file_format, file_url=file_url) super().__init__( - event_timestamp_column, + event_timestamp_column or "", created_timestamp_column, field_mapping, date_partition_column, @@ -638,7 +637,7 @@ def __init__( self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) super().__init__( - event_timestamp_column, + event_timestamp_column or "", created_timestamp_column, field_mapping, date_partition_column, @@ -854,4 +853,3 @@ def to_proto(self) -> DataSourceProto: data_source_proto.date_partition_column = self.date_partition_column return data_source_proto - diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 4bbe902d0c..5bf9b27553 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -97,9 +97,7 @@ def __init__( class RegistryInferenceFailure(Exception): - def __init__( - self, repo_obj_type: str, specific_issue: str - ): + def __init__(self, repo_obj_type: str, specific_issue: str): super().__init__( f"Inference to fill in missing information for {repo_obj_type} failed. {specific_issue}. " "Try filling the information explicitly." diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 3c821e9ca0..d90f9d35e1 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -28,7 +28,7 @@ from feast.feature_view import FeatureView from feast.inference import ( infer_entity_value_type_from_feature_views, - infer_event_timestamp_column_for_data_sources + infer_event_timestamp_column_for_data_sources, ) from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.online_response import OnlineResponse, _infer_online_entity_rows diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index f0582c4a12..114bb37e61 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -220,7 +220,6 @@ def most_recent_end_time(self) -> Optional[datetime]: return None return max([interval[1] for interval in self.materialization_intervals]) - def infer_features_from_input_source(self): if not self.features: columns_to_exclude = { @@ -230,19 +229,25 @@ def infer_features_from_input_source(self): for col_name, col_datatype in self.input.get_table_column_names_and_types(): if col_name not in columns_to_exclude and not re.match( - "^__|__$", col_name # double underscores often signal an internal-use column + "^__|__$", + col_name, # double underscores often signal an internal-use column ): - feature_name = self.input.field_mapping[col_name] if col_name in self.input.field_mapping.keys() else col_name + feature_name = ( + self.input.field_mapping[col_name] + if col_name in self.input.field_mapping.keys() + else col_name + ) self.features.append( Feature( feature_name, - self.input.source_datatype_to_feast_value_type()(col_datatype), + self.input.source_datatype_to_feast_value_type()( + col_datatype + ), ) ) if not self.features: raise RegistryInferenceFailure( "FeatureView", - f"Could not infer Features for the FeatureView named {self.name}." + f"Could not infer Features for the FeatureView named {self.name}.", ) - diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 1f60263cfb..7b7c5e4992 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -51,8 +51,8 @@ def infer_entity_value_type_from_feature_views( raise RegistryInferenceFailure( "Entity", f"""Entity value_type inference failed for {entity_name} entity. - Multiple viable matches. - """ + Multiple viable matches. + """, ) entity.value_type = inferred_value_type @@ -60,7 +60,9 @@ def infer_entity_value_type_from_feature_views( return entities -def infer_event_timestamp_column_for_data_sources(data_sources: List[DataSource]) -> List[DataSource]: +def infer_event_timestamp_column_for_data_sources( + data_sources: List[DataSource], +) -> List[DataSource]: ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" for data_source in data_sources: @@ -74,15 +76,22 @@ def infer_event_timestamp_column_for_data_sources(data_sources: List[DataSource] else: raise RegistryInferenceFailure( "DataSource", - f""" + """ DataSource inferencing of event_timestamp_column is currently only supported for FileSource and BigQuerySource. - """ + """, ) + # for informing the type checker + assert isinstance(data_source, FileSource) or isinstance( + data_source, BigQuerySource + ) # loop through table columns to find singular match event_timestamp_column, matched_flag = None, False - for col_name, col_datatype in data_source.get_table_column_names_and_types(): + for ( + col_name, + col_datatype, + ) in data_source.get_table_column_names_and_types(): if re.match(ts_column_type_regex_pattern, col_datatype): if matched_flag: raise RegistryInferenceFailure( @@ -90,7 +99,7 @@ def infer_event_timestamp_column_for_data_sources(data_sources: List[DataSource] f""" {ERROR_MSG_PREFIX} due to multiple possible columns satisfying the criteria. {ts_column_type_regex_pattern} {col_name} - """ + """, ) matched_flag = True event_timestamp_column = col_name @@ -101,10 +110,7 @@ def infer_event_timestamp_column_for_data_sources(data_sources: List[DataSource] "DataSource", f""" {ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria. - """ - ) + """, + ) return data_sources - - - diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index b91efdfcce..ceabbf4b2c 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -15,7 +15,7 @@ from feast.feature_view import FeatureView from feast.inference import ( infer_entity_value_type_from_feature_views, - infer_event_timestamp_column_for_data_sources + infer_event_timestamp_column_for_data_sources, ) from feast.infra.offline_stores.helpers import assert_offline_store_supports_data_source from feast.infra.provider import get_provider diff --git a/sdk/python/tests/test_inference.py b/sdk/python/tests/test_inference.py index a4b7861436..0b370ed657 100644 --- a/sdk/python/tests/test_inference.py +++ b/sdk/python/tests/test_inference.py @@ -1,8 +1,6 @@ -import pandas as pd import pytest from utils.data_source_utils import ( prep_file_source, - simple_bq_source_using_query_arg, simple_bq_source_using_table_ref_arg, ) @@ -11,7 +9,7 @@ from feast.feature_view import FeatureView from feast.inference import ( infer_entity_value_type_from_feature_views, - infer_event_timestamp_column_for_data_sources + infer_event_timestamp_column_for_data_sources, ) @@ -48,15 +46,13 @@ def test_infer_event_timestamp_column_for_data_source(simple_dataset_1): actual_processed_data_sources = infer_event_timestamp_column_for_data_sources( [file_source, simple_bq_source_using_table_ref_arg(simple_dataset_1)] ) - actual_event_timestamp_cols = [source.event_timestamp_column for source in actual_processed_data_sources] + actual_event_timestamp_cols = [ + source.event_timestamp_column for source in actual_processed_data_sources + ] assert actual_event_timestamp_cols == ["ts_1", "ts_1"] - with prep_file_source( - df=df_with_two_viable_timestamp_cols - ) as file_source: + with prep_file_source(df=df_with_two_viable_timestamp_cols) as file_source: with pytest.raises(RegistryInferenceFailure): # two viable event_timestamp_columns - infer_event_timestamp_column_for_data_sources( - [file_source] - ) + infer_event_timestamp_column_for_data_sources([file_source]) From b24e5715faa85679a0250f62b0d81cd747891815 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Sat, 12 Jun 2021 17:29:52 -0700 Subject: [PATCH 4/8] improve test coverage Signed-off-by: David Y Liu --- sdk/python/feast/inference.py | 5 ++++- sdk/python/tests/test_inference.py | 9 +++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 7b7c5e4992..1648c83713 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -66,7 +66,10 @@ def infer_event_timestamp_column_for_data_sources( ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" for data_source in data_sources: - if data_source.event_timestamp_column is None: + if ( + data_source.event_timestamp_column is None + or data_source.event_timestamp_column == "" + ): # prepare right match pattern for data source ts_column_type_regex_pattern = "" if isinstance(data_source, FileSource): diff --git a/sdk/python/tests/test_inference.py b/sdk/python/tests/test_inference.py index 0b370ed657..4fc2b6531f 100644 --- a/sdk/python/tests/test_inference.py +++ b/sdk/python/tests/test_inference.py @@ -1,6 +1,7 @@ import pytest from utils.data_source_utils import ( prep_file_source, + simple_bq_source_using_query_arg, simple_bq_source_using_table_ref_arg, ) @@ -44,13 +45,17 @@ def test_infer_event_timestamp_column_for_data_source(simple_dataset_1): with prep_file_source(df=simple_dataset_1) as file_source: actual_processed_data_sources = infer_event_timestamp_column_for_data_sources( - [file_source, simple_bq_source_using_table_ref_arg(simple_dataset_1)] + [ + file_source, + simple_bq_source_using_table_ref_arg(simple_dataset_1), + simple_bq_source_using_query_arg(simple_dataset_1), + ] ) actual_event_timestamp_cols = [ source.event_timestamp_column for source in actual_processed_data_sources ] - assert actual_event_timestamp_cols == ["ts_1", "ts_1"] + assert actual_event_timestamp_cols == ["ts_1", "ts_1", "ts_1"] with prep_file_source(df=df_with_two_viable_timestamp_cols) as file_source: with pytest.raises(RegistryInferenceFailure): From bb3a08ad552770e0a85ff4b7611a222887086bf4 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Tue, 15 Jun 2021 14:37:40 -0700 Subject: [PATCH 5/8] changed placement of inference methods in repo_operation apply_total Signed-off-by: David Y Liu --- sdk/python/feast/repo_operations.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index ceabbf4b2c..e4463d0064 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -139,11 +139,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): ), feature_views=repo.feature_views, ) - infer_event_timestamp_column_for_data_sources( - [view.input for view in repo.feature_views] - ) - for view in repo.feature_views: - view.infer_features_from_input_source() sys.dont_write_bytecode = False for entity in repo.entities: @@ -165,6 +160,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): repo_config.offline_store, data_source ) + infer_event_timestamp_column_for_data_sources(data_sources) + for view in repo.feature_views: + view.infer_features_from_input_source() + tables_to_delete = [] for registry_table in registry.list_feature_tables(project=project): if registry_table.name not in repo_table_names: From 30535997911f52cff26cf24e5425913b5bc5a4a9 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Tue, 15 Jun 2021 15:01:33 -0700 Subject: [PATCH 6/8] updated inference method name + changed to void return since it updates in place Signed-off-by: David Y Liu --- sdk/python/feast/feature_store.py | 4 ++-- sdk/python/feast/inference.py | 12 +++++------- sdk/python/feast/repo_operations.py | 6 +++--- sdk/python/tests/test_inference.py | 6 +++--- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index d90f9d35e1..83e1cc9a99 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -28,7 +28,7 @@ from feast.feature_view import FeatureView from feast.inference import ( infer_entity_value_type_from_feature_views, - infer_event_timestamp_column_for_data_sources, + update_data_sources_with_inferred_event_timestamp_col, ) from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.online_response import OnlineResponse, _infer_online_entity_rows @@ -227,7 +227,7 @@ def apply( entities_to_update = infer_entity_value_type_from_feature_views( [ob for ob in objects if isinstance(ob, Entity)], views_to_update ) - infer_event_timestamp_column_for_data_sources( + update_data_sources_with_inferred_event_timestamp_col( [view.input for view in views_to_update] ) for view in views_to_update: diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 1648c83713..fac2155ee2 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -1,8 +1,8 @@ import re -from typing import List +from typing import List, Union from feast import Entity -from feast.data_source import BigQuerySource, DataSource, FileSource +from feast.data_source import BigQuerySource, FileSource from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView from feast.value_type import ValueType @@ -60,9 +60,9 @@ def infer_entity_value_type_from_feature_views( return entities -def infer_event_timestamp_column_for_data_sources( - data_sources: List[DataSource], -) -> List[DataSource]: +def update_data_sources_with_inferred_event_timestamp_col( + data_sources: List[Union[BigQuerySource, FileSource]], +) -> None: ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" for data_source in data_sources: @@ -115,5 +115,3 @@ def infer_event_timestamp_column_for_data_sources( {ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria. """, ) - - return data_sources diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index e4463d0064..b3cc7fa0c3 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -15,7 +15,7 @@ from feast.feature_view import FeatureView from feast.inference import ( infer_entity_value_type_from_feature_views, - infer_event_timestamp_column_for_data_sources, + update_data_sources_with_inferred_event_timestamp_col, ) from feast.infra.offline_stores.helpers import assert_offline_store_supports_data_source from feast.infra.provider import get_provider @@ -160,9 +160,9 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): repo_config.offline_store, data_source ) - infer_event_timestamp_column_for_data_sources(data_sources) + update_data_sources_with_inferred_event_timestamp_col(data_sources) for view in repo.feature_views: - view.infer_features_from_input_source() + view.infer_features_from_input_source() tables_to_delete = [] for registry_table in registry.list_feature_tables(project=project): diff --git a/sdk/python/tests/test_inference.py b/sdk/python/tests/test_inference.py index 4fc2b6531f..8d5061cb58 100644 --- a/sdk/python/tests/test_inference.py +++ b/sdk/python/tests/test_inference.py @@ -10,7 +10,7 @@ from feast.feature_view import FeatureView from feast.inference import ( infer_entity_value_type_from_feature_views, - infer_event_timestamp_column_for_data_sources, + update_data_sources_with_inferred_event_timestamp_col, ) @@ -44,7 +44,7 @@ def test_infer_event_timestamp_column_for_data_source(simple_dataset_1): df_with_two_viable_timestamp_cols["ts_2"] = simple_dataset_1["ts_1"] with prep_file_source(df=simple_dataset_1) as file_source: - actual_processed_data_sources = infer_event_timestamp_column_for_data_sources( + actual_processed_data_sources = update_data_sources_with_inferred_event_timestamp_col( [ file_source, simple_bq_source_using_table_ref_arg(simple_dataset_1), @@ -60,4 +60,4 @@ def test_infer_event_timestamp_column_for_data_source(simple_dataset_1): with prep_file_source(df=df_with_two_viable_timestamp_cols) as file_source: with pytest.raises(RegistryInferenceFailure): # two viable event_timestamp_columns - infer_event_timestamp_column_for_data_sources([file_source]) + update_data_sources_with_inferred_event_timestamp_col([file_source]) From d558b0b6b657a15c516a3589a8cbefb9aa88fffe Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Tue, 15 Jun 2021 16:58:54 -0700 Subject: [PATCH 7/8] fixed integration test and added comments Signed-off-by: David Y Liu --- sdk/python/feast/data_source.py | 8 ++++---- sdk/python/tests/test_inference.py | 15 +++++++-------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index bde966816b..ad053b8fd3 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -523,7 +523,7 @@ def to_proto(self) -> DataSourceProto: class FileSource(DataSource): def __init__( self, - event_timestamp_column: Optional[str] = None, + event_timestamp_column: Optional[str] = "", file_url: Optional[str] = None, path: Optional[str] = None, file_format: FileFormat = None, @@ -563,7 +563,7 @@ def __init__( self._file_options = FileOptions(file_format=file_format, file_url=file_url) super().__init__( - event_timestamp_column or "", + event_timestamp_column or "", # for satisfying type checker created_timestamp_column, field_mapping, date_partition_column, @@ -627,7 +627,7 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]: class BigQuerySource(DataSource): def __init__( self, - event_timestamp_column: Optional[str] = None, + event_timestamp_column: Optional[str] = "", table_ref: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, @@ -637,7 +637,7 @@ def __init__( self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) super().__init__( - event_timestamp_column or "", + event_timestamp_column or "", # for satisfying type checker created_timestamp_column, field_mapping, date_partition_column, diff --git a/sdk/python/tests/test_inference.py b/sdk/python/tests/test_inference.py index 8d5061cb58..1f626ac3cd 100644 --- a/sdk/python/tests/test_inference.py +++ b/sdk/python/tests/test_inference.py @@ -44,15 +44,14 @@ def test_infer_event_timestamp_column_for_data_source(simple_dataset_1): df_with_two_viable_timestamp_cols["ts_2"] = simple_dataset_1["ts_1"] with prep_file_source(df=simple_dataset_1) as file_source: - actual_processed_data_sources = update_data_sources_with_inferred_event_timestamp_col( - [ - file_source, - simple_bq_source_using_table_ref_arg(simple_dataset_1), - simple_bq_source_using_query_arg(simple_dataset_1), - ] - ) + data_sources = [ + file_source, + simple_bq_source_using_table_ref_arg(simple_dataset_1), + simple_bq_source_using_query_arg(simple_dataset_1), + ] + update_data_sources_with_inferred_event_timestamp_col(data_sources) actual_event_timestamp_cols = [ - source.event_timestamp_column for source in actual_processed_data_sources + source.event_timestamp_column for source in data_sources ] assert actual_event_timestamp_cols == ["ts_1", "ts_1", "ts_1"] From 9a854bf9f07c3fbb2f6361a4c5b12aedff063d12 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Fri, 18 Jun 2021 12:58:50 -0700 Subject: [PATCH 8/8] Made DataSource event_timestamp_column optional Signed-off-by: David Y Liu --- sdk/python/feast/data_source.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index ad053b8fd3..44badcb83b 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -370,7 +370,7 @@ class DataSource: def __init__( self, - event_timestamp_column: str, + event_timestamp_column: Optional[str] = "", created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", @@ -563,7 +563,7 @@ def __init__( self._file_options = FileOptions(file_format=file_format, file_url=file_url) super().__init__( - event_timestamp_column or "", # for satisfying type checker + event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, @@ -637,7 +637,7 @@ def __init__( self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) super().__init__( - event_timestamp_column or "", # for satisfying type checker + event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column,