From 6c988800a511417be5c6b787c4bd39281dbacf42 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Mon, 18 Apr 2022 13:17:53 -0700 Subject: [PATCH 01/15] Update Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 36 +++++++++++-------- .../infra/offline_stores/bigquery_source.py | 1 + .../spark_offline_store/spark_source.py | 1 + .../trino_offline_store/trino_source.py | 1 - .../feast/infra/offline_stores/file_source.py | 1 + .../infra/offline_stores/redshift_source.py | 1 + .../infra/offline_stores/snowflake_source.py | 1 + 7 files changed, 26 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 6040654784..59f61e0f8b 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -186,6 +186,7 @@ class DataSource(ABC): def __init__( self, + *, event_timestamp_column: Optional[str] = None, created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, @@ -354,11 +355,12 @@ def get_table_column_names_and_types( def __init__( self, - name: str, - event_timestamp_column: str, - bootstrap_servers: str, - message_format: StreamFormat, - topic: str, + *, + name: Optional[str] = None, + event_timestamp_column: Optional[str] = "", + bootstrap_servers: Optional[str] = "", + message_format: Optional[StreamFormat] = None, + topic: Optional[str] = "", created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", @@ -472,14 +474,17 @@ class RequestSource(DataSource): def __init__( self, - name: str, - schema: Union[Dict[str, ValueType], List[Field]], + *, + name: Optional[str] = None, + schema: Optional[Union[Dict[str, ValueType], List[Field]]] = None, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", ): """Creates a RequestSource object.""" super().__init__(name=name, description=description, tags=tags, owner=owner) + if not schema: + raise ValueError("Schema needs to be provided for Request Source") if isinstance(schema, Dict): warnings.warn( "Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. " @@ -643,12 +648,13 @@ def get_table_query_string(self) -> str: def __init__( self, - name: str, - event_timestamp_column: str, - created_timestamp_column: str, - record_format: StreamFormat, - region: str, - stream_name: str, + *, + name: Optional[str] = None, + event_timestamp_column: Optional[str] = "", + created_timestamp_column: Optional[str] = "", + record_format: Optional[StreamFormat] = None, + region: Optional[str] = "", + stream_name: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", description: Optional[str] = "", @@ -726,8 +732,8 @@ class PushSource(DataSource): def __init__( self, *, - name: str, - batch_source: DataSource, + name: Optional[str] = None, + batch_source: Optional[DataSource] = None, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 31b0ed617e..cb4cd1b5be 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -16,6 +16,7 @@ class BigQuerySource(DataSource): def __init__( self, + *, event_timestamp_column: Optional[str] = "", table: Optional[str] = None, created_timestamp_column: Optional[str] = "", diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 65997040cc..dc92e08a50 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -30,6 +30,7 @@ class SparkSourceFormat(Enum): class SparkSource(DataSource): def __init__( self, + *, name: Optional[str] = None, table: Optional[str] = None, query: Optional[str] = None, diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py index 7d6280746e..b8fddee89f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py @@ -88,7 +88,6 @@ def __init__( table: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, - date_partition_column: Optional[str] = None, query: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = "", diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 3df0db69b1..7cbb17f9fb 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -20,6 +20,7 @@ class FileSource(DataSource): def __init__( self, + *, path: str, event_timestamp_column: Optional[str] = "", file_format: Optional[FileFormat] = None, diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index f099e307cc..dcfcb50aa6 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -16,6 +16,7 @@ class RedshiftSource(DataSource): def __init__( self, + *, event_timestamp_column: Optional[str] = "", table: Optional[str] = None, schema: Optional[str] = None, diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 1d24cba44a..8f3f2f0bb5 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -15,6 +15,7 @@ class SnowflakeSource(DataSource): def __init__( self, + *, database: Optional[str] = None, warehouse: Optional[str] = None, schema: Optional[str] = None, From 092a1f021bede341445be4f2155af761172899c6 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Mon, 18 Apr 2022 13:33:11 -0700 Subject: [PATCH 02/15] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 59f61e0f8b..ba9d156f36 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -356,11 +356,11 @@ def get_table_column_names_and_types( def __init__( self, *, - name: Optional[str] = None, - event_timestamp_column: Optional[str] = "", - bootstrap_servers: Optional[str] = "", - message_format: Optional[StreamFormat] = None, - topic: Optional[str] = "", + name: str, + event_timestamp_column: str, + bootstrap_servers: str, + message_format: StreamFormat, + topic: str, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", @@ -649,12 +649,12 @@ def get_table_query_string(self) -> str: def __init__( self, *, - name: Optional[str] = None, - event_timestamp_column: Optional[str] = "", - created_timestamp_column: Optional[str] = "", - record_format: Optional[StreamFormat] = None, - region: Optional[str] = "", - stream_name: Optional[str] = "", + name: str, + event_timestamp_column: str, + created_timestamp_column: str, + record_format: StreamFormat, + region: str, + stream_name: str, field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", description: Optional[str] = "", From 44af59059605551ff5921c87f74518719e56a591 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Mon, 18 Apr 2022 15:18:31 -0700 Subject: [PATCH 03/15] Update to keyword args Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 134 ++++++++++++++++++++++++++----- sdk/python/feast/feature_view.py | 2 +- 2 files changed, 115 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index ba9d156f36..d620028f97 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from email import message import enum +from venv import create import warnings from abc import ABC, abstractmethod from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union @@ -355,12 +357,12 @@ def get_table_column_names_and_types( def __init__( self, - *, - name: str, - event_timestamp_column: str, - bootstrap_servers: str, - message_format: StreamFormat, - topic: str, + *args, + name: Optional[str] = None, + event_timestamp_column: Optional[str] = "", + bootstrap_servers: Optional[str] = None, + message_format: Optional[StreamFormat] = None, + topic: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", @@ -370,22 +372,57 @@ def __init__( timestamp_field: Optional[str] = "", batch_source: Optional[DataSource] = None, ): + positional_attributes = ["name", "event_timestamp_column", "bootstrap_servers", "message_format", "topic"] + _name = name + _event_timestamp_column = event_timestamp_column + _bootstrap_servers = bootstrap_servers or "" + _message_format = message_format + _topic = topic or "" + + + if args: + warnings.warn( + ( + "kafka parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct kafka sources" + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " + f"kafka sources, for backwards compatibility." + ) + if len(args) >= 1: + _name = args[0] + if len(args) >= 2: + _event_timestamp_column = args[1] + if len(args) >= 3: + _bootstrap_servers = args[2] + if len(args) >= 4: + _message_format = args[3] + if len(args) >= 5: + _topic = args[4] + + if _message_format is None: + raise ValueError("message format must be specified for kafka source") + super().__init__( - event_timestamp_column=event_timestamp_column, + event_timestamp_column=_event_timestamp_column, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping, date_partition_column=date_partition_column, description=description, tags=tags, owner=owner, - name=name, + name=_name, timestamp_field=timestamp_field, ) self.batch_source = batch_source self.kafka_options = KafkaOptions( - bootstrap_servers=bootstrap_servers, - message_format=message_format, - topic=topic, + bootstrap_servers=_bootstrap_servers, + message_format=_message_format, + topic=_topic, ) def __eq__(self, other): @@ -648,7 +685,7 @@ def get_table_query_string(self) -> str: def __init__( self, - *, + *args, name: str, event_timestamp_column: str, created_timestamp_column: str, @@ -663,10 +700,46 @@ def __init__( timestamp_field: Optional[str] = "", batch_source: Optional[DataSource] = None, ): + positional_attributes = ["name", "event_timestamp_column", "created_timestamp_column", "record_format", "region", "stream_name"] + _name = name + _event_timestamp_column = event_timestamp_column + _created_timestamp_column = created_timestamp_column + _record_format = record_format + _region = region or "" + _stream_name = stream_name or "" + if args: + warnings.warn( + ( + "kinesis parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct kinesis sources" + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " + f"kinesis sources, for backwards compatibility." + ) + if len(args) >= 1: + _name = args[0] + if len(args) >= 2: + _event_timestamp_column = args[1] + if len(args) >= 3: + _created_timestamp_column = args[2] + if len(args) >= 4: + _record_format = args[3] + if len(args) >= 5: + _region = args[4] + if len(args) >= 6: + _stream_name = args[5] + + if _record_format is None: + raise ValueError("record format must be specified for kinesis source") + super().__init__( - name=name, - event_timestamp_column=event_timestamp_column, - created_timestamp_column=created_timestamp_column, + name=_name, + event_timestamp_column=_event_timestamp_column, + created_timestamp_column=_created_timestamp_column, field_mapping=field_mapping, date_partition_column=date_partition_column, description=description, @@ -676,7 +749,7 @@ def __init__( ) self.batch_source = batch_source self.kinesis_options = KinesisOptions( - record_format=record_format, region=region, stream_name=stream_name + record_format=_record_format, region=_region, stream_name=_stream_name ) def __eq__(self, other): @@ -731,7 +804,7 @@ class PushSource(DataSource): def __init__( self, - *, + *args, name: Optional[str] = None, batch_source: Optional[DataSource] = None, description: Optional[str] = "", @@ -750,10 +823,31 @@ def __init__( maintainer. """ - super().__init__(name=name, description=description, tags=tags, owner=owner) - self.batch_source = batch_source - if not self.batch_source: + positional_attributes = ["name", "batch_source"] + _name = name + _batch_source=batch_source + if args: + warnings.warn( + ( + "push source parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct push sources" + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " + f"push sources, for backwards compatibility." + ) + if len(args) >= 1: + _name = args[0] + if len(args) >= 2: + _batch_source = args[1] + + super().__init__(name=_name, description=description, tags=tags, owner=owner) + if not _batch_source: raise ValueError(f"batch_source is needed for push source {self.name}") + self.batch_source = _batch_source def __eq__(self, other): if not isinstance(other, PushSource): diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index cea8f619cb..ea5953e223 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -137,7 +137,7 @@ def __init__( ValueError: A field mapping conflicts with an Entity or a Feature. """ - positional_attributes = ["name, entities, ttl"] + positional_attributes = ["name", "entities", "ttl"] _name = name _entities = entities From 50bd918be4d55bb02016c68a11280392002538b8 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Mon, 18 Apr 2022 15:19:44 -0700 Subject: [PATCH 04/15] Fix lint Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index d620028f97..c5b364fcb7 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -12,9 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from email import message import enum -from venv import create import warnings from abc import ABC, abstractmethod from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union @@ -372,14 +370,19 @@ def __init__( timestamp_field: Optional[str] = "", batch_source: Optional[DataSource] = None, ): - positional_attributes = ["name", "event_timestamp_column", "bootstrap_servers", "message_format", "topic"] + positional_attributes = [ + "name", + "event_timestamp_column", + "bootstrap_servers", + "message_format", + "topic", + ] _name = name _event_timestamp_column = event_timestamp_column _bootstrap_servers = bootstrap_servers or "" _message_format = message_format _topic = topic or "" - if args: warnings.warn( ( @@ -700,7 +703,14 @@ def __init__( timestamp_field: Optional[str] = "", batch_source: Optional[DataSource] = None, ): - positional_attributes = ["name", "event_timestamp_column", "created_timestamp_column", "record_format", "region", "stream_name"] + positional_attributes = [ + "name", + "event_timestamp_column", + "created_timestamp_column", + "record_format", + "region", + "stream_name", + ] _name = name _event_timestamp_column = event_timestamp_column _created_timestamp_column = created_timestamp_column From 8ddc48c779051c9976a96a08e25f6c8ffb02a3b2 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Mon, 18 Apr 2022 15:22:51 -0700 Subject: [PATCH 05/15] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 35 ++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index c5b364fcb7..0b3db9fff5 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -514,7 +514,7 @@ class RequestSource(DataSource): def __init__( self, - *, + *args, name: Optional[str] = None, schema: Optional[Union[Dict[str, ValueType], List[Field]]] = None, description: Optional[str] = "", @@ -522,27 +522,48 @@ def __init__( owner: Optional[str] = "", ): """Creates a RequestSource object.""" + positional_attributes = ["name", "schema"] + _name = name + _schema = schema + if args: + warnings.warn( + ( + "requestsource parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct request sources" + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " + f"feature views, for backwards compatibility." + ) + if len(args) >= 1: + _name = args[0] + if len(args) >= 2: + _schema = args[1] + super().__init__(name=name, description=description, tags=tags, owner=owner) - if not schema: + if not _schema: raise ValueError("Schema needs to be provided for Request Source") - if isinstance(schema, Dict): + if isinstance(_schema, Dict): warnings.warn( "Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. " "Please use List[Field] instead for the schema", DeprecationWarning, ) schemaList = [] - for key, valueType in schema.items(): + for key, valueType in _schema.items(): schemaList.append( Field(name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[valueType]) ) self.schema = schemaList - elif isinstance(schema, List): - self.schema = schema + elif isinstance(_schema, List): + self.schema = _schema else: raise Exception( "Schema type must be either dictionary or list, not " - + str(type(schema)) + + str(type(_schema)) ) def validate(self, config: RepoConfig): From d38859945369ddaa3f3159edfa2ee7305fd427f2 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Mon, 18 Apr 2022 15:23:15 -0700 Subject: [PATCH 06/15] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 0b3db9fff5..0127ad6005 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -543,7 +543,7 @@ def __init__( if len(args) >= 2: _schema = args[1] - super().__init__(name=name, description=description, tags=tags, owner=owner) + super().__init__(name=_name, description=description, tags=tags, owner=owner) if not _schema: raise ValueError("Schema needs to be provided for Request Source") if isinstance(_schema, Dict): From 06cd1b69c7870b002c5b8a34a7f851d158442490 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Mon, 18 Apr 2022 15:27:31 -0700 Subject: [PATCH 07/15] Change kinesis to optional Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 0127ad6005..b7b4cbc647 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -710,12 +710,12 @@ def get_table_query_string(self) -> str: def __init__( self, *args, - name: str, - event_timestamp_column: str, - created_timestamp_column: str, - record_format: StreamFormat, - region: str, - stream_name: str, + name: Optional[str] = None, + event_timestamp_column: Optional[str] = "", + created_timestamp_column: Optional[str] = "", + record_format: Optional[StreamFormat] = None, + region: Optional[str] = "", + stream_name: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", description: Optional[str] = "", From f9c374f1ec3526122d42cb55447421b739969693 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 19 Apr 2022 09:51:12 -0700 Subject: [PATCH 08/15] Fix review issues Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 18 ++++++------- .../feast/infra/offline_stores/file_source.py | 26 ++++++++++++++++--- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index b7b4cbc647..3ada52a618 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -386,15 +386,15 @@ def __init__( if args: warnings.warn( ( - "kafka parameters should be specified as a keyword argument instead of a positional arg." - "Feast 0.23+ will not support positional arguments to construct kafka sources" + "Kafka parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct Kafka sources" ), DeprecationWarning, ) if len(args) > len(positional_attributes): raise ValueError( f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " - f"kafka sources, for backwards compatibility." + f"Kafka sources, for backwards compatibility." ) if len(args) >= 1: _name = args[0] @@ -408,7 +408,7 @@ def __init__( _topic = args[4] if _message_format is None: - raise ValueError("message format must be specified for kafka source") + raise ValueError("Message format must be specified for Kafka source") super().__init__( event_timestamp_column=_event_timestamp_column, @@ -528,7 +528,7 @@ def __init__( if args: warnings.warn( ( - "requestsource parameters should be specified as a keyword argument instead of a positional arg." + "Requestsource parameters should be specified as a keyword argument instead of a positional arg." "Feast 0.23+ will not support positional arguments to construct request sources" ), DeprecationWarning, @@ -741,7 +741,7 @@ def __init__( if args: warnings.warn( ( - "kinesis parameters should be specified as a keyword argument instead of a positional arg." + "Kinesis parameters should be specified as a keyword argument instead of a positional arg." "Feast 0.23+ will not support positional arguments to construct kinesis sources" ), DeprecationWarning, @@ -765,7 +765,7 @@ def __init__( _stream_name = args[5] if _record_format is None: - raise ValueError("record format must be specified for kinesis source") + raise ValueError("Record format must be specified for kinesis source") super().__init__( name=_name, @@ -860,7 +860,7 @@ def __init__( if args: warnings.warn( ( - "push source parameters should be specified as a keyword argument instead of a positional arg." + "Push source parameters should be specified as a keyword argument instead of a positional arg." "Feast 0.23+ will not support positional arguments to construct push sources" ), DeprecationWarning, @@ -877,7 +877,7 @@ def __init__( super().__init__(name=_name, description=description, tags=tags, owner=owner) if not _batch_source: - raise ValueError(f"batch_source is needed for push source {self.name}") + raise ValueError(f"Batch_source is needed for push source {self.name}") self.batch_source = _batch_source def __eq__(self, other): diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 7cbb17f9fb..e177642a32 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -20,8 +20,8 @@ class FileSource(DataSource): def __init__( self, - *, - path: str, + *args, + path: Optional[str] = None, event_timestamp_column: Optional[str] = "", file_format: Optional[FileFormat] = None, created_timestamp_column: Optional[str] = "", @@ -59,13 +59,31 @@ def __init__( >>> from feast import FileSource >>> file_source = FileSource(path="my_features.parquet", timestamp_field="event_timestamp") """ - if path is None: + positional_attributes = ["path"] + _path = path + if args: + if args: + warnings.warn( + ( + "File Source parameters should be specified as a keyword argument instead of a positional arg." + "Feast 0.23+ will not support positional arguments to construct File sources" + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args when defining " + f"File sources, for backwards compatibility." + ) + if len(args) >= 1: + _path = args[0] + if _path is None: raise ValueError( 'No "path" argument provided. Please set "path" to the location of your file source.' ) self.file_options = FileOptions( file_format=file_format, - uri=path, + uri=_path, s3_endpoint_override=s3_endpoint_override, ) From 67eca66cd66780b2fbd4d7d6717a139698152d4a Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 19 Apr 2022 09:56:30 -0700 Subject: [PATCH 09/15] Fix lint Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 3ada52a618..ad1ec8abee 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -856,7 +856,7 @@ def __init__( """ positional_attributes = ["name", "batch_source"] _name = name - _batch_source=batch_source + _batch_source = batch_source if args: warnings.warn( ( From e2922146a785a2528b4f44508010f2615c2f5ff4 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 19 Apr 2022 13:09:49 -0700 Subject: [PATCH 10/15] Add unit tests Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 6 +-- sdk/python/tests/unit/test_data_sources.py | 58 +++++++++++++++++++++- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index ad1ec8abee..1706f24b1d 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -409,7 +409,7 @@ def __init__( if _message_format is None: raise ValueError("Message format must be specified for Kafka source") - + print("Asdfasdf") super().__init__( event_timestamp_column=_event_timestamp_column, created_timestamp_column=created_timestamp_column, @@ -528,7 +528,7 @@ def __init__( if args: warnings.warn( ( - "Requestsource parameters should be specified as a keyword argument instead of a positional arg." + "Request source parameters should be specified as a keyword argument instead of a positional arg." "Feast 0.23+ will not support positional arguments to construct request sources" ), DeprecationWarning, @@ -877,7 +877,7 @@ def __init__( super().__init__(name=_name, description=description, tags=tags, owner=owner) if not _batch_source: - raise ValueError(f"Batch_source is needed for push source {self.name}") + raise ValueError(f"batch_source parameter is needed for push source {self.name}") self.batch_source = _batch_source def __eq__(self, other): diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 883ab7ddc0..940ed4e80e 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -1,11 +1,18 @@ +from multiprocessing.sharedctypes import Value import pytest from feast import ValueType from feast.data_source import PushSource, RequestDataSource, RequestSource from feast.field import Field -from feast.infra.offline_stores.bigquery_source import BigQuerySource from feast.types import Bool, Float32 - +from feast.data_source import KafkaSource, KinesisSource, PushSource, RequestSource +from feast.infra.offline_stores.bigquery_source import BigQuerySource +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.offline_stores.redshift_source import RedshiftSource +from feast.infra.offline_stores.snowflake_source import SnowflakeSource +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import SparkSource +from sdk.python.feast.data_format import ProtoFormat +from feast.types import Int64 def test_push_with_batch(): push_source = PushSource( @@ -43,6 +50,7 @@ def test_request_source_primitive_type_to_proto(): deserialized_request_source = RequestSource.from_proto(request_proto) assert deserialized_request_source == request_source +<<<<<<< HEAD def test_hash(): push_source_1 = PushSource( @@ -71,3 +79,49 @@ def test_hash(): s4 = {push_source_1, push_source_2, push_source_3, push_source_4} assert len(s4) == 3 +======= +# TODO(kevjumba): Remove this test in feast 0.23 when positional arguments are removed. +def test_default_data_source_kw_arg_warning(): + # source_class = request.param + with pytest.warns(DeprecationWarning): + source = KafkaSource("name", "column", "bootstrap_servers", ProtoFormat("class_path"), "topic") + assert source.name == "name" + assert source.timestamp_field == "column" + assert source.kafka_options.bootstrap_servers == "bootstrap_servers" + assert source.kafka_options.topic == "topic" + with pytest.raises(ValueError): + KafkaSource("name", "column", "bootstrap_servers", topic="topic") + + with pytest.warns(DeprecationWarning): + source = KinesisSource("name", "column", "c_column", ProtoFormat("class_path"), "region", "stream_name") + assert source.name == "name" + assert source.timestamp_field == "column" + assert source.created_timestamp_column == "c_column" + assert source.kinesis_options.region == "region" + assert source.kinesis_options.stream_name == "stream_name" + + with pytest.raises(ValueError): + KinesisSource("name", "column", "c_column", region="region", stream_name="stream_name") + + with pytest.warns(DeprecationWarning): + source = RequestSource("name", [Field(name="val_to_add", dtype=Int64)], description="description") + assert source.name == "name" + assert source.description == "description" + + with pytest.raises(ValueError): + RequestSource("name") + + with pytest.warns(DeprecationWarning): + source = PushSource("name", BigQuerySource(name="bigquery_source", table="table"), description="description") + assert source.name == "name" + assert source.description == "description" + assert source.batch_source.name == "bigquery_source" + + with pytest.raises(ValueError): + PushSource("name") + + # No name warning for DataSource + with pytest.warns(UserWarning): + source = KafkaSource(event_timestamp_column="column", bootstrap_servers="bootstrap_servers", message_format=ProtoFormat("class_path"), topic="topic") + +>>>>>>> ed13af2f (Add unit tests) From ad3ba3fc6ff0d85d5ee8b350a20ce318ad2a33c9 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 19 Apr 2022 13:10:33 -0700 Subject: [PATCH 11/15] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/data_source.py | 4 +- sdk/python/tests/unit/test_data_sources.py | 52 ++++++++++++++++------ 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 1706f24b1d..79c6cbdf51 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -877,7 +877,9 @@ def __init__( super().__init__(name=_name, description=description, tags=tags, owner=owner) if not _batch_source: - raise ValueError(f"batch_source parameter is needed for push source {self.name}") + raise ValueError( + f"batch_source parameter is needed for push source {self.name}" + ) self.batch_source = _batch_source def __eq__(self, other): diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 940ed4e80e..1efd2c0101 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -1,10 +1,18 @@ from multiprocessing.sharedctypes import Value + import pytest +from sdk.python.feast.data_format import ProtoFormat from feast import ValueType -from feast.data_source import PushSource, RequestDataSource, RequestSource +from feast.data_source import ( + KafkaSource, + KinesisSource, + PushSource, + RequestDataSource, + RequestSource, +) from feast.field import Field -from feast.types import Bool, Float32 +from feast.types import Bool, Float32, Int64 from feast.data_source import KafkaSource, KinesisSource, PushSource, RequestSource from feast.infra.offline_stores.bigquery_source import BigQuerySource from feast.infra.offline_stores.file_source import FileSource @@ -50,8 +58,6 @@ def test_request_source_primitive_type_to_proto(): deserialized_request_source = RequestSource.from_proto(request_proto) assert deserialized_request_source == request_source -<<<<<<< HEAD - def test_hash(): push_source_1 = PushSource( name="test", batch_source=BigQuerySource(table="test.test"), @@ -79,12 +85,14 @@ def test_hash(): s4 = {push_source_1, push_source_2, push_source_3, push_source_4} assert len(s4) == 3 -======= + # TODO(kevjumba): Remove this test in feast 0.23 when positional arguments are removed. def test_default_data_source_kw_arg_warning(): # source_class = request.param with pytest.warns(DeprecationWarning): - source = KafkaSource("name", "column", "bootstrap_servers", ProtoFormat("class_path"), "topic") + source = KafkaSource( + "name", "column", "bootstrap_servers", ProtoFormat("class_path"), "topic" + ) assert source.name == "name" assert source.timestamp_field == "column" assert source.kafka_options.bootstrap_servers == "bootstrap_servers" @@ -93,7 +101,14 @@ def test_default_data_source_kw_arg_warning(): KafkaSource("name", "column", "bootstrap_servers", topic="topic") with pytest.warns(DeprecationWarning): - source = KinesisSource("name", "column", "c_column", ProtoFormat("class_path"), "region", "stream_name") + source = KinesisSource( + "name", + "column", + "c_column", + ProtoFormat("class_path"), + "region", + "stream_name", + ) assert source.name == "name" assert source.timestamp_field == "column" assert source.created_timestamp_column == "c_column" @@ -101,10 +116,14 @@ def test_default_data_source_kw_arg_warning(): assert source.kinesis_options.stream_name == "stream_name" with pytest.raises(ValueError): - KinesisSource("name", "column", "c_column", region="region", stream_name="stream_name") + KinesisSource( + "name", "column", "c_column", region="region", stream_name="stream_name" + ) with pytest.warns(DeprecationWarning): - source = RequestSource("name", [Field(name="val_to_add", dtype=Int64)], description="description") + source = RequestSource( + "name", [Field(name="val_to_add", dtype=Int64)], description="description" + ) assert source.name == "name" assert source.description == "description" @@ -112,7 +131,11 @@ def test_default_data_source_kw_arg_warning(): RequestSource("name") with pytest.warns(DeprecationWarning): - source = PushSource("name", BigQuerySource(name="bigquery_source", table="table"), description="description") + source = PushSource( + "name", + BigQuerySource(name="bigquery_source", table="table"), + description="description", + ) assert source.name == "name" assert source.description == "description" assert source.batch_source.name == "bigquery_source" @@ -122,6 +145,9 @@ def test_default_data_source_kw_arg_warning(): # No name warning for DataSource with pytest.warns(UserWarning): - source = KafkaSource(event_timestamp_column="column", bootstrap_servers="bootstrap_servers", message_format=ProtoFormat("class_path"), topic="topic") - ->>>>>>> ed13af2f (Add unit tests) + source = KafkaSource( + event_timestamp_column="column", + bootstrap_servers="bootstrap_servers", + message_format=ProtoFormat("class_path"), + topic="topic", + ) From a6007efc67a95938b82d5bcf2641d75c1f63f09a Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 19 Apr 2022 13:11:17 -0700 Subject: [PATCH 12/15] Fix Signed-off-by: Kevin Zhang --- sdk/python/tests/unit/test_data_sources.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 1efd2c0101..3b7b7b801c 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -1,5 +1,3 @@ -from multiprocessing.sharedctypes import Value - import pytest from sdk.python.feast.data_format import ProtoFormat From 2a6797a89dfa0db3244db51d71e73e3decc7e6cb Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 19 Apr 2022 13:18:45 -0700 Subject: [PATCH 13/15] Fix imports Signed-off-by: Kevin Zhang --- sdk/python/tests/unit/test_data_sources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 3b7b7b801c..dcdbd46916 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -1,5 +1,5 @@ import pytest -from sdk.python.feast.data_format import ProtoFormat +from feast.data_format import ProtoFormat from feast import ValueType from feast.data_source import ( From 8c487b492c432e208810aa7568bd8e5786bf71c9 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 19 Apr 2022 13:22:08 -0700 Subject: [PATCH 14/15] Fix lint Signed-off-by: Kevin Zhang --- sdk/python/tests/unit/test_data_sources.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index dcdbd46916..9e105c0873 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -1,7 +1,8 @@ import pytest -from feast.data_format import ProtoFormat +from sdk.python.feast.data_format import ProtoFormat from feast import ValueType +from feast.data_format import ProtoFormat from feast.data_source import ( KafkaSource, KinesisSource, @@ -10,15 +11,9 @@ RequestSource, ) from feast.field import Field -from feast.types import Bool, Float32, Int64 -from feast.data_source import KafkaSource, KinesisSource, PushSource, RequestSource from feast.infra.offline_stores.bigquery_source import BigQuerySource -from feast.infra.offline_stores.file_source import FileSource -from feast.infra.offline_stores.redshift_source import RedshiftSource -from feast.infra.offline_stores.snowflake_source import SnowflakeSource -from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import SparkSource -from sdk.python.feast.data_format import ProtoFormat -from feast.types import Int64 +from feast.types import Bool, Float32, Int64 + def test_push_with_batch(): push_source = PushSource( @@ -56,6 +51,7 @@ def test_request_source_primitive_type_to_proto(): deserialized_request_source = RequestSource.from_proto(request_proto) assert deserialized_request_source == request_source + def test_hash(): push_source_1 = PushSource( name="test", batch_source=BigQuerySource(table="test.test"), @@ -84,6 +80,7 @@ def test_hash(): s4 = {push_source_1, push_source_2, push_source_3, push_source_4} assert len(s4) == 3 + # TODO(kevjumba): Remove this test in feast 0.23 when positional arguments are removed. def test_default_data_source_kw_arg_warning(): # source_class = request.param From e023177c156f857a2b124d84c13ad5a35bc1989d Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 19 Apr 2022 13:23:29 -0700 Subject: [PATCH 15/15] Fix Signed-off-by: Kevin Zhang --- sdk/python/tests/unit/test_data_sources.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 9e105c0873..6bd4baf4fa 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -1,5 +1,4 @@ import pytest -from sdk.python.feast.data_format import ProtoFormat from feast import ValueType from feast.data_format import ProtoFormat