diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 96ef9e46a0..44badcb83b 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -14,7 +14,6 @@ import enum -import re from typing import Callable, Dict, Iterable, Optional, Tuple from pyarrow.parquet import ParquetFile @@ -371,7 +370,7 @@ class DataSource: def __init__( self, - event_timestamp_column: str, + event_timestamp_column: Optional[str] = "", created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", @@ -520,45 +519,11 @@ def to_proto(self) -> DataSourceProto: """ raise NotImplementedError - def _infer_event_timestamp_column(self, ts_column_type_regex_pattern): - ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" - USER_GUIDANCE = "Please specify event_timestamp_column explicitly." - - if isinstance(self, FileSource) or isinstance(self, BigQuerySource): - event_timestamp_column, matched_flag = None, False - for col_name, col_datatype in self.get_table_column_names_and_types(): - if re.match(ts_column_type_regex_pattern, col_datatype): - if matched_flag: - raise TypeError( - f""" - {ERROR_MSG_PREFIX} due to multiple possible columns satisfying - the criteria. {USER_GUIDANCE} - """ - ) - matched_flag = True - event_timestamp_column = col_name - if matched_flag: - return event_timestamp_column - else: - raise TypeError( - f""" - {ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria. - {USER_GUIDANCE} - """ - ) - else: - raise TypeError( - f""" - {ERROR_MSG_PREFIX} because this DataSource currently does not support this inference. - {USER_GUIDANCE} - """ - ) - class FileSource(DataSource): def __init__( self, - event_timestamp_column: Optional[str] = None, + event_timestamp_column: Optional[str] = "", file_url: Optional[str] = None, path: Optional[str] = None, file_format: FileFormat = None, @@ -598,7 +563,7 @@ def __init__( self._file_options = FileOptions(file_format=file_format, file_url=file_url) super().__init__( - event_timestamp_column or self._infer_event_timestamp_column(r"^timestamp"), + event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, @@ -662,7 +627,7 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]: class BigQuerySource(DataSource): def __init__( self, - event_timestamp_column: Optional[str] = None, + event_timestamp_column: Optional[str] = "", table_ref: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, @@ -672,8 +637,7 @@ def __init__( self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) super().__init__( - event_timestamp_column - or self._infer_event_timestamp_column("TIMESTAMP|DATETIME"), + event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, @@ -743,20 +707,12 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]: from google.cloud import bigquery client = bigquery.Client() - name_type_pairs = [] if self.table_ref is not None: - project_id, dataset_id, table_id = self.table_ref.split(".") - bq_columns_query = f""" - SELECT COLUMN_NAME, DATA_TYPE FROM {project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = '{table_id}' - """ - table_schema = ( - client.query(bq_columns_query).result().to_dataframe_iterable() - ) - for df in table_schema: - name_type_pairs.extend( - list(zip(df["COLUMN_NAME"].to_list(), df["DATA_TYPE"].to_list())) - ) + table_schema = client.get_table(self.table_ref).schema + if not isinstance(table_schema[0], bigquery.schema.SchemaField): + raise TypeError("Could not parse BigQuery table schema.") + + name_type_pairs = [(field.name, field.field_type) for field in table_schema] else: bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1" queryRes = client.query(bq_columns_query).result() diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index a0b3e2bf49..5bf9b27553 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -94,3 +94,11 @@ def __init__( f"The DataFrame from {source} being materialized must have at least {join_key_columns} columns present, " f"but these were missing: {join_key_columns - source_columns} " ) + + +class RegistryInferenceFailure(Exception): + def __init__(self, repo_obj_type: str, specific_issue: str): + super().__init__( + f"Inference to fill in missing information for {repo_obj_type} failed. {specific_issue}. " + "Try filling the information explicitly." + ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 497ba3a368..83e1cc9a99 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -26,7 +26,10 @@ from feast.entity import Entity from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException from feast.feature_view import FeatureView -from feast.inference import infer_entity_value_type_from_feature_views +from feast.inference import ( + infer_entity_value_type_from_feature_views, + update_data_sources_with_inferred_event_timestamp_col, +) from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.online_response import OnlineResponse, _infer_online_entity_rows from feast.protos.feast.serving.ServingService_pb2 import ( @@ -224,6 +227,11 @@ def apply( entities_to_update = infer_entity_value_type_from_feature_views( [ob for ob in objects if isinstance(ob, Entity)], views_to_update ) + update_data_sources_with_inferred_event_timestamp_col( + [view.input for view in views_to_update] + ) + for view in views_to_update: + view.infer_features_from_input_source() if len(views_to_update) + len(entities_to_update) != len(objects): raise ValueError("Unknown object type provided as part of apply() call") diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index db756dda79..114bb37e61 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -21,6 +21,7 @@ from feast import utils from feast.data_source import BigQuerySource, DataSource, FileSource +from feast.errors import RegistryInferenceFailure from feast.feature import Feature from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto from feast.protos.feast.core.FeatureView_pb2 import ( @@ -64,29 +65,6 @@ def __init__( tags: Optional[Dict[str, str]] = None, online: bool = True, ): - if not features: - features = [] # to handle python's mutable default arguments - columns_to_exclude = { - input.event_timestamp_column, - input.created_timestamp_column, - } | set(entities) - - for col_name, col_datatype in input.get_table_column_names_and_types(): - if col_name not in columns_to_exclude and not re.match( - "^__|__$", col_name - ): - features.append( - Feature( - col_name, - input.source_datatype_to_feast_value_type()(col_datatype), - ) - ) - - if not features: - raise ValueError( - f"Could not infer Features for the FeatureView named {name}. Please specify Features explicitly for this FeatureView." - ) - cols = [entity for entity in entities] + [feat.name for feat in features] for col in cols: if input.field_mapping is not None and col in input.field_mapping.keys(): @@ -241,3 +219,35 @@ def most_recent_end_time(self) -> Optional[datetime]: if len(self.materialization_intervals) == 0: return None return max([interval[1] for interval in self.materialization_intervals]) + + def infer_features_from_input_source(self): + if not self.features: + columns_to_exclude = { + self.input.event_timestamp_column, + self.input.created_timestamp_column, + } | set(self.entities) + + for col_name, col_datatype in self.input.get_table_column_names_and_types(): + if col_name not in columns_to_exclude and not re.match( + "^__|__$", + col_name, # double underscores often signal an internal-use column + ): + feature_name = ( + self.input.field_mapping[col_name] + if col_name in self.input.field_mapping.keys() + else col_name + ) + self.features.append( + Feature( + feature_name, + self.input.source_datatype_to_feast_value_type()( + col_datatype + ), + ) + ) + + if not self.features: + raise RegistryInferenceFailure( + "FeatureView", + f"Could not infer Features for the FeatureView named {self.name}.", + ) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 54105a9bc2..fac2155ee2 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -1,6 +1,9 @@ -from typing import List +import re +from typing import List, Union from feast import Entity +from feast.data_source import BigQuerySource, FileSource +from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView from feast.value_type import ValueType @@ -45,12 +48,70 @@ def infer_entity_value_type_from_feature_views( entity.value_type != ValueType.UNKNOWN and entity.value_type != inferred_value_type ) or (len(extracted_entity_name_type_pairs) > 1): - raise ValueError( + raise RegistryInferenceFailure( + "Entity", f"""Entity value_type inference failed for {entity_name} entity. - Multiple viable matches. Please explicitly specify the entity value_type - for this entity.""" + Multiple viable matches. + """, ) entity.value_type = inferred_value_type return entities + + +def update_data_sources_with_inferred_event_timestamp_col( + data_sources: List[Union[BigQuerySource, FileSource]], +) -> None: + ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" + + for data_source in data_sources: + if ( + data_source.event_timestamp_column is None + or data_source.event_timestamp_column == "" + ): + # prepare right match pattern for data source + ts_column_type_regex_pattern = "" + if isinstance(data_source, FileSource): + ts_column_type_regex_pattern = r"^timestamp" + elif isinstance(data_source, BigQuerySource): + ts_column_type_regex_pattern = "TIMESTAMP|DATETIME" + else: + raise RegistryInferenceFailure( + "DataSource", + """ + DataSource inferencing of event_timestamp_column is currently only supported + for FileSource and BigQuerySource. + """, + ) + # for informing the type checker + assert isinstance(data_source, FileSource) or isinstance( + data_source, BigQuerySource + ) + + # loop through table columns to find singular match + event_timestamp_column, matched_flag = None, False + for ( + col_name, + col_datatype, + ) in data_source.get_table_column_names_and_types(): + if re.match(ts_column_type_regex_pattern, col_datatype): + if matched_flag: + raise RegistryInferenceFailure( + "DataSource", + f""" + {ERROR_MSG_PREFIX} due to multiple possible columns satisfying + the criteria. {ts_column_type_regex_pattern} {col_name} + """, + ) + matched_flag = True + event_timestamp_column = col_name + if matched_flag: + data_source.event_timestamp_column = event_timestamp_column + else: + raise RegistryInferenceFailure( + "DataSource", + f""" + {ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria. + """, + ) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 63ed5c74d7..b3cc7fa0c3 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -13,7 +13,10 @@ from feast import Entity, FeatureTable from feast.feature_view import FeatureView -from feast.inference import infer_entity_value_type_from_feature_views +from feast.inference import ( + infer_entity_value_type_from_feature_views, + update_data_sources_with_inferred_event_timestamp_col, +) from feast.infra.offline_stores.helpers import assert_offline_store_supports_data_source from feast.infra.provider import get_provider from feast.names import adjectives, animals @@ -136,6 +139,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): ), feature_views=repo.feature_views, ) + sys.dont_write_bytecode = False for entity in repo.entities: registry.apply_entity(entity, project=project) @@ -156,6 +160,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): repo_config.offline_store, data_source ) + update_data_sources_with_inferred_event_timestamp_col(data_sources) + for view in repo.feature_views: + view.infer_features_from_input_source() + tables_to_delete = [] for registry_table in registry.list_feature_tables(project=project): if registry_table.name not in repo_table_names: diff --git a/sdk/python/tests/test_inference.py b/sdk/python/tests/test_inference.py index 886aca8ab2..1f626ac3cd 100644 --- a/sdk/python/tests/test_inference.py +++ b/sdk/python/tests/test_inference.py @@ -6,23 +6,12 @@ ) from feast import Entity, ValueType +from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView -from feast.inference import infer_entity_value_type_from_feature_views - - -@pytest.mark.integration -def test_data_source_ts_col_inference_success(simple_dataset_1): - with prep_file_source(df=simple_dataset_1) as file_source: - actual_file_source = file_source.event_timestamp_column - actual_bq_1 = simple_bq_source_using_table_ref_arg( - simple_dataset_1 - ).event_timestamp_column - actual_bq_2 = simple_bq_source_using_query_arg( - simple_dataset_1 - ).event_timestamp_column - expected = "ts_1" - - assert expected == actual_file_source == actual_bq_1 == actual_bq_2 +from feast.inference import ( + infer_entity_value_type_from_feature_views, + update_data_sources_with_inferred_event_timestamp_col, +) def test_infer_entity_value_type_from_feature_views(simple_dataset_1, simple_dataset_2): @@ -44,6 +33,30 @@ def test_infer_entity_value_type_from_feature_views(simple_dataset_1, simple_dat assert actual_1 == [Entity(name="id", value_type=ValueType.INT64)] assert actual_2 == [Entity(name="id", value_type=ValueType.STRING)] - with pytest.raises(ValueError): + with pytest.raises(RegistryInferenceFailure): # two viable data types infer_entity_value_type_from_feature_views([Entity(name="id")], [fv1, fv2]) + + +@pytest.mark.integration +def test_infer_event_timestamp_column_for_data_source(simple_dataset_1): + df_with_two_viable_timestamp_cols = simple_dataset_1.copy(deep=True) + df_with_two_viable_timestamp_cols["ts_2"] = simple_dataset_1["ts_1"] + + with prep_file_source(df=simple_dataset_1) as file_source: + data_sources = [ + file_source, + simple_bq_source_using_table_ref_arg(simple_dataset_1), + simple_bq_source_using_query_arg(simple_dataset_1), + ] + update_data_sources_with_inferred_event_timestamp_col(data_sources) + actual_event_timestamp_cols = [ + source.event_timestamp_column for source in data_sources + ] + + assert actual_event_timestamp_cols == ["ts_1", "ts_1", "ts_1"] + + with prep_file_source(df=df_with_two_viable_timestamp_cols) as file_source: + with pytest.raises(RegistryInferenceFailure): + # two viable event_timestamp_columns + update_data_sources_with_inferred_event_timestamp_col([file_source]) diff --git a/sdk/python/tests/utils/data_source_utils.py b/sdk/python/tests/utils/data_source_utils.py index 0aec0c6f1a..c848b8ea64 100644 --- a/sdk/python/tests/utils/data_source_utils.py +++ b/sdk/python/tests/utils/data_source_utils.py @@ -8,7 +8,7 @@ @contextlib.contextmanager -def prep_file_source(df, event_timestamp_column="") -> FileSource: +def prep_file_source(df, event_timestamp_column=None) -> FileSource: with tempfile.NamedTemporaryFile(suffix=".parquet") as f: f.close() df.to_parquet(f.name) @@ -21,7 +21,7 @@ def prep_file_source(df, event_timestamp_column="") -> FileSource: def simple_bq_source_using_table_ref_arg( - df, event_timestamp_column="" + df, event_timestamp_column=None ) -> BigQuerySource: client = bigquery.Client() gcp_project = client.project @@ -46,7 +46,7 @@ def simple_bq_source_using_table_ref_arg( ) -def simple_bq_source_using_query_arg(df, event_timestamp_column="") -> BigQuerySource: +def simple_bq_source_using_query_arg(df, event_timestamp_column=None) -> BigQuerySource: bq_source_using_table_ref = simple_bq_source_using_table_ref_arg( df, event_timestamp_column )