diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index cb17012eea..de242fea54 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -176,6 +176,7 @@ The services with containerized replacements currently implemented are: - Datastore - DynamoDB - Redis +- Trino You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies. diff --git a/sdk/python/feast/batch_feature_view.py b/sdk/python/feast/batch_feature_view.py new file mode 100644 index 0000000000..2f9fb080db --- /dev/null +++ b/sdk/python/feast/batch_feature_view.py @@ -0,0 +1,58 @@ +from datetime import timedelta +from typing import Dict, List, Optional, Union + +from feast.data_source import DataSource +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.field import Field +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto + +SUPPORTED_BATCH_SOURCES = { + "BigQuerySource", + "FileSource", + "RedshiftSource", + "SnowflakeSource", + "SparkSource", + "TrinoSource", +} + + +class BatchFeatureView(FeatureView): + def __init__( + self, + *, + name: Optional[str] = None, + entities: Optional[Union[List[Entity], List[str]]] = None, + ttl: Optional[timedelta] = None, + tags: Optional[Dict[str, str]] = None, + online: bool = True, + description: str = "", + owner: str = "", + schema: Optional[List[Field]] = None, + source: Optional[DataSource] = None, + ): + + if source is None: + raise ValueError("Feature views need a source specified") + if ( + type(source).__name__ not in SUPPORTED_BATCH_SOURCES + and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE + ): + raise ValueError( + f"Batch feature views need a batch source, expected one of {SUPPORTED_BATCH_SOURCES} " + f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead " + ) + + super().__init__( + name=name, + entities=entities, + ttl=ttl, + batch_source=None, + stream_source=None, + tags=tags, + online=online, + description=description, + owner=owner, + schema=schema, + source=source, + ) diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py new file mode 100644 index 0000000000..1c51b94a7c --- /dev/null +++ b/sdk/python/feast/stream_feature_view.py @@ -0,0 +1,54 @@ +from datetime import timedelta +from typing import Dict, List, Optional, Union + +from feast.data_source import DataSource +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.field import Field +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto + +SUPPORTED_STREAM_SOURCES = { + "KafkaSource", + "KinesisSource", +} + + +class StreamFeatureView(FeatureView): + def __init__( + self, + *, + name: Optional[str] = None, + entities: Optional[Union[List[Entity], List[str]]] = None, + ttl: Optional[timedelta] = None, + tags: Optional[Dict[str, str]] = None, + online: bool = True, + description: str = "", + owner: str = "", + schema: Optional[List[Field]] = None, + source: Optional[DataSource] = None, + ): + + if source is None: + raise ValueError("Feature views need a source specified") + if ( + type(source).__name__ not in SUPPORTED_STREAM_SOURCES + and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE + ): + raise ValueError( + f"Stream feature views need a stream source, expected one of {SUPPORTED_STREAM_SOURCES} " + f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead " + ) + + super().__init__( + name=name, + entities=entities, + ttl=ttl, + batch_source=None, + stream_source=None, + tags=tags, + online=online, + description=description, + owner=owner, + schema=schema, + source=source, + ) diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py new file mode 100644 index 0000000000..d78788f3ae --- /dev/null +++ b/sdk/python/tests/unit/test_feature_views.py @@ -0,0 +1,70 @@ +from datetime import timedelta + +import pytest + +from feast.batch_feature_view import BatchFeatureView +from feast.data_format import AvroFormat +from feast.data_source import KafkaSource +from feast.infra.offline_stores.file_source import FileSource +from feast.stream_feature_view import StreamFeatureView + + +def test_create_batch_feature_view(): + batch_source = FileSource(path="some path") + BatchFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=batch_source, + ) + + with pytest.raises(ValueError): + BatchFeatureView( + name="test batch feature view", entities=[], ttl=timedelta(days=30) + ) + + stream_source = KafkaSource( + name="kafka", + event_timestamp_column="", + bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + with pytest.raises(ValueError): + BatchFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + ) + + +def test_create_stream_feature_view(): + stream_source = KafkaSource( + name="kafka", + event_timestamp_column="", + bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + StreamFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + ) + + with pytest.raises(ValueError): + StreamFeatureView( + name="test batch feature view", entities=[], ttl=timedelta(days=30) + ) + + with pytest.raises(ValueError): + StreamFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=FileSource(path="some path"), + )