diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index b335c0f042..5e7287351b 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -167,4 +167,4 @@ jobs: SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }} SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} - run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 --timeout=1200 --timeout_method=thread + run: make test-python-integration \ No newline at end of file diff --git a/.github/workflows/pr_local_integration_tests.yml b/.github/workflows/pr_local_integration_tests.yml index 17ff54b1f8..266cdcc9b9 100644 --- a/.github/workflows/pr_local_integration_tests.yml +++ b/.github/workflows/pr_local_integration_tests.yml @@ -61,9 +61,4 @@ jobs: run: make install-python-ci-dependencies - name: Test local integration tests if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak - env: - FEAST_USAGE: "False" - IS_TEST: "True" - FEAST_LOCAL_ONLINE_CONTAINER: "True" - FEAST_IS_LOCAL_TEST: "True" - run: pytest -n 8 --cov=./ --cov-report=xml --color=yes --integration -k "not gcs_registry and not s3_registry and not test_lambda_materialization and not test_snowflake_materialization" sdk/python/tests + run: make test-python-integration-local diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 7e2e3b577a..f3f91bb67f 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -23,17 +23,6 @@ jobs: with: python-version: ${{ matrix.python-version }} architecture: x64 - - name: Install mysql on macOS - if: startsWith(matrix.os, 'macOS') - run: | - brew install mysql - PATH=$PATH:/usr/local/mysql/bin - - name: Work around Homebrew MySQL being broken - # See https://github.com/Homebrew/homebrew-core/issues/130258 for more details. - if: startsWith(matrix.os, 'macOS') - run: | - brew install zlib - ln -sv $(brew --prefix zlib)/lib/libz.dylib $(brew --prefix)/lib/libzlib.dylib - name: Get pip cache dir id: pip-cache run: | @@ -56,7 +45,7 @@ jobs: - name: Install dependencies run: make install-python-ci-dependencies - name: Test Python - run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests + run: make test-python-unit unit-test-ui: diff --git a/Makefile b/Makefile index 0eac7e03a2..55896b8c11 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ format: format-python format-java lint: lint-python lint-java -test: test-python test-java +test: test-python-unit test-java protos: compile-protos-python compile-protos-docs @@ -63,32 +63,26 @@ benchmark-python: benchmark-python-local: FEAST_USAGE=False IS_TEST=True FEAST_IS_LOCAL_TEST=True python -m pytest --integration --benchmark --benchmark-autosave --benchmark-save-data sdk/python/tests -test-python: - FEAST_USAGE=False \ - IS_TEST=True \ - python -m pytest -n 8 sdk/python/tests \ +test-python-unit: + python -m pytest -n 8 --color=yes sdk/python/tests test-python-integration: - FEAST_USAGE=False IS_TEST=True python -m pytest -n 8 --integration sdk/python/tests + python -m pytest -n 8 --integration --color=yes --durations=5 --timeout=1200 --timeout_method=thread sdk/python/tests test-python-integration-local: @(docker info > /dev/null 2>&1 && \ - FEAST_USAGE=False \ - IS_TEST=True \ FEAST_IS_LOCAL_TEST=True \ FEAST_LOCAL_ONLINE_CONTAINER=True \ - python -m pytest -n 8 --integration \ + python -m pytest -n 8 --color=yes --integration \ -k "not gcs_registry and \ not s3_registry and \ not test_lambda_materialization and \ - not test_snowflake" \ + not test_snowflake_materialization" \ sdk/python/tests \ ) || echo "This script uses Docker, and it isn't running - please start the Docker Daemon and try again!"; test-python-integration-container: @(docker info > /dev/null 2>&1 && \ - FEAST_USAGE=False \ - IS_TEST=True \ FEAST_LOCAL_ONLINE_CONTAINER=True \ python -m pytest -n 8 --integration sdk/python/tests \ ) || echo "This script uses Docker, and it isn't running - please start the Docker Daemon and try again!"; @@ -97,7 +91,6 @@ test-python-universal-spark: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.spark_repo_configuration \ PYTEST_PLUGINS=feast.infra.offline_stores.contrib.spark_offline_store.tests \ - FEAST_USAGE=False IS_TEST=True \ python -m pytest -n 8 --integration \ -k "not test_historical_retrieval_fails_on_validation and \ not test_historical_retrieval_with_validation and \ @@ -121,7 +114,6 @@ test-python-universal-trino: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.trino_repo_configuration \ PYTEST_PLUGINS=feast.infra.offline_stores.contrib.trino_offline_store.tests \ - FEAST_USAGE=False IS_TEST=True \ python -m pytest -n 8 --integration \ -k "not test_historical_retrieval_fails_on_validation and \ not test_historical_retrieval_with_validation and \ @@ -148,7 +140,6 @@ test-python-universal-mssql: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.mssql_repo_configuration \ PYTEST_PLUGINS=feast.infra.offline_stores.contrib.mssql_offline_store.tests \ - FEAST_USAGE=False IS_TEST=True \ FEAST_LOCAL_ONLINE_CONTAINER=True \ python -m pytest -n 8 --integration \ -k "not gcs_registry and \ @@ -166,7 +157,6 @@ test-python-universal-athena: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.athena_repo_configuration \ PYTEST_PLUGINS=feast.infra.offline_stores.contrib.athena_offline_store.tests \ - FEAST_USAGE=False IS_TEST=True \ ATHENA_REGION=ap-northeast-2 \ ATHENA_DATA_SOURCE=AwsDataCatalog \ ATHENA_DATABASE=default \ @@ -190,7 +180,6 @@ test-python-universal-athena: test-python-universal-duckdb: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.duckdb_repo_configuration \ - FEAST_USAGE=False IS_TEST=True \ python -m pytest -n 8 --integration \ -k "not test_nullable_online_store and \ not gcs_registry and \ @@ -204,8 +193,6 @@ test-python-universal-postgres-offline: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.postgres_repo_configuration \ PYTEST_PLUGINS=sdk.python.feast.infra.offline_stores.contrib.postgres_offline_store.tests \ - FEAST_USAGE=False \ - IS_TEST=True \ python -m pytest -n 8 --integration \ -k "not test_historical_retrieval_with_validation and \ not test_historical_features_persisting and \ @@ -226,8 +213,6 @@ test-python-universal-postgres-online: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.postgres_repo_configuration \ PYTEST_PLUGINS=sdk.python.feast.infra.offline_stores.contrib.postgres_offline_store.tests \ - FEAST_USAGE=False \ - IS_TEST=True \ python -m pytest -n 8 --integration \ -k "not test_universal_cli and \ not test_go_feature_server and \ @@ -247,8 +232,6 @@ test-python-universal-postgres-online: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.mysql_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.mysql \ - FEAST_USAGE=False \ - IS_TEST=True \ python -m pytest -n 8 --integration \ -k "not test_universal_cli and \ not test_go_feature_server and \ @@ -268,8 +251,6 @@ test-python-universal-cassandra: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.cassandra_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.cassandra \ - FEAST_USAGE=False \ - IS_TEST=True \ python -m pytest -x --integration \ sdk/python/tests @@ -277,8 +258,6 @@ test-python-universal-hazelcast: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.hazelcast_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.hazelcast \ - FEAST_USAGE=False \ - IS_TEST=True \ python -m pytest -n 8 --integration \ -k "not test_universal_cli and \ not test_go_feature_server and \ @@ -298,8 +277,6 @@ test-python-universal-cassandra-no-cloud-providers: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.cassandra_repo_configuration \ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.cassandra \ - FEAST_USAGE=False \ - IS_TEST=True \ python -m pytest -x --integration \ -k "not test_lambda_materialization_consistency and \ not test_apply_entity_integration and \ @@ -314,7 +291,7 @@ test-python-universal-cassandra-no-cloud-providers: sdk/python/tests test-python-universal: - FEAST_USAGE=False IS_TEST=True python -m pytest -n 8 --integration sdk/python/tests + python -m pytest -n 8 --integration sdk/python/tests format-python: # Sort diff --git a/docs/reference/offline-stores/spark.md b/docs/reference/offline-stores/spark.md index 3cca2aab1a..2e2facba64 100644 --- a/docs/reference/offline-stores/spark.md +++ b/docs/reference/offline-stores/spark.md @@ -30,6 +30,8 @@ offline_store: spark.sql.catalogImplementation: "hive" spark.sql.parser.quotedRegexColumnNames: "true" spark.sql.session.timeZone: "UTC" + spark.sql.execution.arrow.fallback.enabled: "true" + spark.sql.execution.arrow.pyspark.enabled: "true" online_store: path: data/online_store.db ``` diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index c43b33c1d2..7a5fec1650 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -27,6 +27,7 @@ import "feast/core/FeatureView.proto"; import "feast/core/FeatureViewProjection.proto"; import "feast/core/Feature.proto"; import "feast/core/DataSource.proto"; +import "feast/core/Transformation.proto"; message OnDemandFeatureView { // User-specified specifications of this feature view. @@ -48,10 +49,10 @@ message OnDemandFeatureViewSpec { // Map of sources for this feature view. map sources = 4; - oneof transformation { - UserDefinedFunction user_defined_function = 5; - OnDemandSubstraitTransformation on_demand_substrait_transformation = 9; - } + UserDefinedFunction user_defined_function = 5 [deprecated = true]; + + // Oneof with {user_defined_function, on_demand_substrait_transformation} + FeatureTransformationV2 feature_transformation = 10; // Description of the on demand feature view. string description = 6; @@ -61,6 +62,7 @@ message OnDemandFeatureViewSpec { // Owner of the on demand feature view. string owner = 8; + string mode = 11; } message OnDemandFeatureViewMeta { @@ -81,6 +83,8 @@ message OnDemandSource { // Serialized representation of python function. message UserDefinedFunction { + option deprecated = true; + // The function name string name = 1; @@ -90,7 +94,3 @@ message UserDefinedFunction { // The string representation of the udf string body_text = 3; } - -message OnDemandSubstraitTransformation { - bytes substrait_plan = 1; -} \ No newline at end of file diff --git a/protos/feast/core/Registry.proto b/protos/feast/core/Registry.proto index 7d80d8c837..0c3f8a53f9 100644 --- a/protos/feast/core/Registry.proto +++ b/protos/feast/core/Registry.proto @@ -27,7 +27,6 @@ import "feast/core/FeatureTable.proto"; import "feast/core/FeatureView.proto"; import "feast/core/InfraObject.proto"; import "feast/core/OnDemandFeatureView.proto"; -import "feast/core/RequestFeatureView.proto"; import "feast/core/StreamFeatureView.proto"; import "feast/core/DataSource.proto"; import "feast/core/SavedDataset.proto"; @@ -41,7 +40,6 @@ message Registry { repeated FeatureView feature_views = 6; repeated DataSource data_sources = 12; repeated OnDemandFeatureView on_demand_feature_views = 8; - repeated RequestFeatureView request_feature_views = 9; repeated StreamFeatureView stream_feature_views = 14; repeated FeatureService feature_services = 7; repeated SavedDataset saved_datasets = 11; diff --git a/protos/feast/core/RequestFeatureView.proto b/protos/feast/core/RequestFeatureView.proto deleted file mode 100644 index 4049053c2b..0000000000 --- a/protos/feast/core/RequestFeatureView.proto +++ /dev/null @@ -1,51 +0,0 @@ -// -// Copyright 2021 The Feast Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - - -syntax = "proto3"; -package feast.core; - -option go_package = "github.com/feast-dev/feast/go/protos/feast/core"; -option java_outer_classname = "RequestFeatureViewProto"; -option java_package = "feast.proto.core"; - -import "feast/core/DataSource.proto"; - -message RequestFeatureView { - // User-specified specifications of this feature view. - RequestFeatureViewSpec spec = 1; -} - -// Next available id: 7 -message RequestFeatureViewSpec { - // Name of the feature view. Must be unique. Not updated. - string name = 1; - - // Name of Feast project that this feature view belongs to. - string project = 2; - - // Request data which contains the underlying data schema and list of associated features - DataSource request_data_source = 3; - - // Description of the request feature view. - string description = 4; - - // User defined metadata. - map tags = 5; - - // Owner of the request feature view. - string owner = 6; -} diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index 3181bdf360..cb7da0faf3 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -29,6 +29,7 @@ import "feast/core/FeatureView.proto"; import "feast/core/Feature.proto"; import "feast/core/DataSource.proto"; import "feast/core/Aggregation.proto"; +import "feast/core/Transformation.proto"; message StreamFeatureView { // User-specified specifications of this feature view. @@ -77,7 +78,8 @@ message StreamFeatureViewSpec { bool online = 12; // Serialized function that is encoded in the streamfeatureview - UserDefinedFunction user_defined_function = 13; + UserDefinedFunction user_defined_function = 13 [deprecated = true]; + // Mode of execution string mode = 14; @@ -87,5 +89,8 @@ message StreamFeatureViewSpec { // Timestamp field for aggregation string timestamp_field = 16; + + // Oneof with {user_defined_function, on_demand_substrait_transformation} + FeatureTransformationV2 feature_transformation = 17; } diff --git a/protos/feast/core/Transformation.proto b/protos/feast/core/Transformation.proto new file mode 100644 index 0000000000..36f1e691fe --- /dev/null +++ b/protos/feast/core/Transformation.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; +package feast.core; + +option go_package = "github.com/feast-dev/feast/go/protos/feast/core"; +option java_outer_classname = "FeatureTransformationProto"; +option java_package = "feast.proto.core"; + +import "google/protobuf/duration.proto"; + +// Serialized representation of python function. +message UserDefinedFunctionV2 { + // The function name + string name = 1; + + // The python-syntax function body (serialized by dill) + bytes body = 2; + + // The string representation of the udf + string body_text = 3; +} + +// A feature transformation executed as a user-defined function +message FeatureTransformationV2 { + oneof transformation { + UserDefinedFunctionV2 user_defined_function = 1; + SubstraitTransformationV2 substrait_transformation = 2; + } +} + +message SubstraitTransformationV2 { + bytes substrait_plan = 1; +} diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto index ab324f9bd1..e99987eb2d 100644 --- a/protos/feast/registry/RegistryServer.proto +++ b/protos/feast/registry/RegistryServer.proto @@ -7,7 +7,6 @@ import "feast/core/Registry.proto"; import "feast/core/Entity.proto"; import "feast/core/DataSource.proto"; import "feast/core/FeatureView.proto"; -import "feast/core/RequestFeatureView.proto"; import "feast/core/StreamFeatureView.proto"; import "feast/core/OnDemandFeatureView.proto"; import "feast/core/FeatureService.proto"; @@ -28,10 +27,6 @@ service RegistryServer{ rpc GetFeatureView (GetFeatureViewRequest) returns (feast.core.FeatureView) {} rpc ListFeatureViews (ListFeatureViewsRequest) returns (ListFeatureViewsResponse) {} - // RequestFeatureView RPCs - rpc GetRequestFeatureView (GetRequestFeatureViewRequest) returns (feast.core.RequestFeatureView) {} - rpc ListRequestFeatureViews (ListRequestFeatureViewsRequest) returns (ListRequestFeatureViewsResponse) {} - // StreamFeatureView RPCs rpc GetStreamFeatureView (GetStreamFeatureViewRequest) returns (feast.core.StreamFeatureView) {} rpc ListStreamFeatureViews (ListStreamFeatureViewsRequest) returns (ListStreamFeatureViewsResponse) {} @@ -126,23 +121,6 @@ message ListFeatureViewsResponse { repeated feast.core.FeatureView feature_views = 1; } -// RequestFeatureView - -message GetRequestFeatureViewRequest { - string name = 1; - string project = 2; - bool allow_cache = 3; -} - -message ListRequestFeatureViewsRequest { - string project = 1; - bool allow_cache = 2; -} - -message ListRequestFeatureViewsResponse { - repeated feast.core.RequestFeatureView request_feature_views = 1; -} - // StreamFeatureView message GetStreamFeatureViewRequest { diff --git a/sdk/python/docs/source/feast.protos.feast.core.rst b/sdk/python/docs/source/feast.protos.feast.core.rst index aaed49cd73..5da16d2a26 100644 --- a/sdk/python/docs/source/feast.protos.feast.core.rst +++ b/sdk/python/docs/source/feast.protos.feast.core.rst @@ -228,22 +228,6 @@ feast.protos.feast.core.Registry\_pb2\_grpc module :undoc-members: :show-inheritance: -feast.protos.feast.core.RequestFeatureView\_pb2 module ------------------------------------------------------- - -.. automodule:: feast.protos.feast.core.RequestFeatureView_pb2 - :members: - :undoc-members: - :show-inheritance: - -feast.protos.feast.core.RequestFeatureView\_pb2\_grpc module ------------------------------------------------------------- - -.. automodule:: feast.protos.feast.core.RequestFeatureView_pb2_grpc - :members: - :undoc-members: - :show-inheritance: - feast.protos.feast.core.SavedDataset\_pb2 module ------------------------------------------------ diff --git a/sdk/python/docs/source/feast.rst b/sdk/python/docs/source/feast.rst index b0ed92c4cc..abb8783bf0 100644 --- a/sdk/python/docs/source/feast.rst +++ b/sdk/python/docs/source/feast.rst @@ -273,14 +273,6 @@ feast.repo\_upgrade module :undoc-members: :show-inheritance: -feast.request\_feature\_view module ------------------------------------ - -.. automodule:: feast.request_feature_view - :members: - :undoc-members: - :show-inheritance: - feast.saved\_dataset module --------------------------- diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index d043f1a973..3eff91d65f 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -22,7 +22,6 @@ from .field import Field from .on_demand_feature_view import OnDemandFeatureView from .repo_config import RepoConfig -from .request_feature_view import RequestFeatureView from .stream_feature_view import StreamFeatureView from .value_type import ValueType @@ -49,7 +48,6 @@ "BigQuerySource", "FileSource", "RedshiftSource", - "RequestFeatureView", "SnowflakeSource", "PushSource", "RequestSource", diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index 975537a394..31140e2899 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -13,13 +13,20 @@ # limitations under the License. from abc import ABC, abstractmethod from datetime import datetime -from typing import Dict, List, Optional, Type +from typing import Dict, List, Optional, Type, Union from google.protobuf.json_format import MessageToJson -from proto import Message +from google.protobuf.message import Message from feast.feature_view_projection import FeatureViewProjection from feast.field import Field +from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureView as OnDemandFeatureViewProto, +) +from feast.protos.feast.core.StreamFeatureView_pb2 import ( + StreamFeatureView as StreamFeatureViewProto, +) class BaseFeatureView(ABC): @@ -89,7 +96,9 @@ def proto_class(self) -> Type[Message]: pass @abstractmethod - def to_proto(self) -> Message: + def to_proto( + self, + ) -> Union[FeatureViewProto, OnDemandFeatureViewProto, StreamFeatureViewProto]: pass @classmethod diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 7ce8aaef2b..7673eee20d 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -381,7 +381,6 @@ def feature_view_list(ctx: click.Context): table = [] for feature_view in [ *store.list_feature_views(), - *store.list_request_feature_views(), *store.list_on_demand_feature_views(), ]: entities = set() diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 15f880e392..106d34bf48 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -20,9 +20,6 @@ OnDemandFeatureView as OnDemandFeatureViewProto, ) from feast.protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec -from feast.protos.feast.core.RequestFeatureView_pb2 import ( - RequestFeatureView as RequestFeatureViewProto, -) from feast.protos.feast.core.StreamFeatureView_pb2 import ( StreamFeatureView as StreamFeatureViewProto, ) @@ -110,7 +107,6 @@ def tag_objects_for_keep_delete_update_add( FeatureViewProto, FeatureServiceProto, OnDemandFeatureViewProto, - RequestFeatureViewProto, StreamFeatureViewProto, ValidationReferenceProto, ) @@ -144,11 +140,26 @@ def diff_registry_objects( if _field.name in FIELDS_TO_IGNORE: continue elif getattr(current_spec, _field.name) != getattr(new_spec, _field.name): - if _field.name == "user_defined_function": + if _field.name == "feature_transformation": current_spec = cast(OnDemandFeatureViewSpec, current_spec) new_spec = cast(OnDemandFeatureViewSpec, new_spec) - current_udf = current_spec.user_defined_function - new_udf = new_spec.user_defined_function + # Check if the old proto is populated and use that if it is + feature_transformation_udf = ( + current_spec.feature_transformation.user_defined_function + ) + if ( + current_spec.HasField("user_defined_function") + and not feature_transformation_udf + ): + deprecated_udf = current_spec.user_defined_function + else: + deprecated_udf = None + current_udf = ( + deprecated_udf + if deprecated_udf is not None + else feature_transformation_udf + ) + new_udf = new_spec.feature_transformation.user_defined_function for _udf_field in current_udf.DESCRIPTOR.fields: if _udf_field.name == "body": continue @@ -324,7 +335,6 @@ def apply_diff_to_registry( elif feast_object_diff.feast_object_type in [ FeastObjectType.FEATURE_VIEW, FeastObjectType.ON_DEMAND_FEATURE_VIEW, - FeastObjectType.REQUEST_FEATURE_VIEW, FeastObjectType.STREAM_FEATURE_VIEW, ]: feature_view_obj = cast( @@ -368,7 +378,6 @@ def apply_diff_to_registry( elif feast_object_diff.feast_object_type in [ FeastObjectType.FEATURE_VIEW, FeastObjectType.ON_DEMAND_FEATURE_VIEW, - FeastObjectType.REQUEST_FEATURE_VIEW, FeastObjectType.STREAM_FEATURE_VIEW, ]: registry.apply_feature_view( diff --git a/sdk/python/feast/feast_object.py b/sdk/python/feast/feast_object.py index 7cccf26455..2d06d8d669 100644 --- a/sdk/python/feast/feast_object.py +++ b/sdk/python/feast/feast_object.py @@ -11,12 +11,10 @@ from .protos.feast.core.FeatureService_pb2 import FeatureServiceSpec from .protos.feast.core.FeatureView_pb2 import FeatureViewSpec from .protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec -from .protos.feast.core.RequestFeatureView_pb2 import RequestFeatureViewSpec from .protos.feast.core.StreamFeatureView_pb2 import StreamFeatureViewSpec from .protos.feast.core.ValidationProfile_pb2 import ( ValidationReference as ValidationReferenceProto, ) -from .request_feature_view import RequestFeatureView from .saved_dataset import ValidationReference from .stream_feature_view import StreamFeatureView @@ -24,7 +22,6 @@ FeastObject = Union[ FeatureView, OnDemandFeatureView, - RequestFeatureView, BatchFeatureView, StreamFeatureView, Entity, @@ -36,7 +33,6 @@ FeastObjectSpecProto = Union[ FeatureViewSpec, OnDemandFeatureViewSpec, - RequestFeatureViewSpec, StreamFeatureViewSpec, EntitySpecV2, FeatureServiceSpec, diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 44236248fe..9ac2c14527 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -91,7 +91,6 @@ from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value from feast.repo_config import RepoConfig, load_repo_config from feast.repo_contents import RepoContents -from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, SavedDatasetStorage, ValidationReference from feast.stream_feature_view import StreamFeatureView from feast.type_map import python_values_to_proto_values @@ -266,23 +265,6 @@ def list_feature_views(self, allow_cache: bool = False) -> List[FeatureView]: """ return self._list_feature_views(allow_cache) - @log_exceptions_and_usage - def list_request_feature_views( - self, allow_cache: bool = False - ) -> List[RequestFeatureView]: - """ - Retrieves the list of feature views from the registry. - - Args: - allow_cache: Whether to allow returning entities from a cached registry. - - Returns: - A list of feature views. - """ - return self._registry.list_request_feature_views( - self.project, allow_cache=allow_cache - ) - def _list_feature_views( self, allow_cache: bool = False, @@ -562,7 +544,6 @@ def _validate_all_feature_views( self, views_to_update: List[FeatureView], odfvs_to_update: List[OnDemandFeatureView], - request_views_to_update: List[RequestFeatureView], sfvs_to_update: List[StreamFeatureView], ): """Validates all feature views.""" @@ -577,7 +558,6 @@ def _validate_all_feature_views( [ *views_to_update, *odfvs_to_update, - *request_views_to_update, *sfvs_to_update, ] ) @@ -716,7 +696,6 @@ def plan( ... feature_views=[driver_hourly_stats_view], ... on_demand_feature_views=list(), ... stream_feature_views=list(), - ... request_feature_views=list(), ... entities=[driver], ... feature_services=list())) # register entity and feature view """ @@ -724,7 +703,6 @@ def plan( self._validate_all_feature_views( desired_repo_contents.feature_views, desired_repo_contents.on_demand_feature_views, - desired_repo_contents.request_feature_views, desired_repo_contents.stream_feature_views, ) _validate_data_sources(desired_repo_contents.data_sources) @@ -781,7 +759,6 @@ def apply( Entity, FeatureView, OnDemandFeatureView, - RequestFeatureView, BatchFeatureView, StreamFeatureView, FeatureService, @@ -848,9 +825,6 @@ def apply( ) ] sfvs_to_update = [ob for ob in objects if isinstance(ob, StreamFeatureView)] - request_views_to_update = [ - ob for ob in objects if isinstance(ob, RequestFeatureView) - ] odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)] services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)] data_sources_set_to_update = { @@ -877,16 +851,6 @@ def apply( if fv.stream_source: data_sources_set_to_update.add(fv.stream_source) - if request_views_to_update: - warnings.warn( - "Request feature view is deprecated. " - "Please use request data source instead", - DeprecationWarning, - ) - - for rfv in request_views_to_update: - data_sources_set_to_update.add(rfv.request_source) - for odfv in odfvs_to_update: for v in odfv.source_request_sources.values(): data_sources_set_to_update.add(v) @@ -898,7 +862,7 @@ def apply( # Validate all feature views and make inferences. self._validate_all_feature_views( - views_to_update, odfvs_to_update, request_views_to_update, sfvs_to_update + views_to_update, odfvs_to_update, sfvs_to_update ) self._make_inferences( data_sources_to_update, @@ -912,9 +876,7 @@ def apply( # Add all objects to the registry and update the provider's infrastructure. for ds in data_sources_to_update: self._registry.apply_data_source(ds, project=self.project, commit=False) - for view in itertools.chain( - views_to_update, odfvs_to_update, request_views_to_update, sfvs_to_update - ): + for view in itertools.chain(views_to_update, odfvs_to_update, sfvs_to_update): self._registry.apply_feature_view(view, project=self.project, commit=False) for ent in entities_to_update: self._registry.apply_entity(ent, project=self.project, commit=False) @@ -943,9 +905,6 @@ def apply( and not isinstance(ob, StreamFeatureView) ) ] - request_views_to_delete = [ - ob for ob in objects_to_delete if isinstance(ob, RequestFeatureView) - ] odfvs_to_delete = [ ob for ob in objects_to_delete if isinstance(ob, OnDemandFeatureView) ] @@ -974,10 +933,6 @@ def apply( self._registry.delete_feature_view( view.name, project=self.project, commit=False ) - for request_view in request_views_to_delete: - self._registry.delete_feature_view( - request_view.name, project=self.project, commit=False - ) for odfv in odfvs_to_delete: self._registry.delete_feature_view( odfv.name, project=self.project, commit=False @@ -1088,43 +1043,26 @@ def get_historical_features( _feature_refs = self._get_features(features) ( all_feature_views, - all_request_feature_views, all_on_demand_feature_views, ) = self._get_feature_views_to_use(features) - if all_request_feature_views: - warnings.warn( - "Request feature view is deprecated. " - "Please use request data source instead", - DeprecationWarning, - ) - # TODO(achal): _group_feature_refs returns the on demand feature views, but it's not passed into the provider. # This is a weird interface quirk - we should revisit the `get_historical_features` to # pass in the on demand feature views as well. - fvs, odfvs, request_fvs, request_fv_refs = _group_feature_refs( + fvs, odfvs = _group_feature_refs( _feature_refs, all_feature_views, - all_request_feature_views, all_on_demand_feature_views, ) feature_views = list(view for view, _ in fvs) on_demand_feature_views = list(view for view, _ in odfvs) - request_feature_views = list(view for view, _ in request_fvs) set_usage_attribute("odfv", bool(on_demand_feature_views)) - set_usage_attribute("request_fv", bool(request_feature_views)) # Check that the right request data is present in the entity_df if type(entity_df) == pd.DataFrame: if self.config.coerce_tz_aware: entity_df = utils.make_df_tzaware(cast(pd.DataFrame, entity_df)) - for fv in request_feature_views: - for feature in fv.features: - if feature.name not in entity_df.columns: - raise RequestDataNotFoundInEntityDfException( - feature_name=feature.name, feature_view_name=fv.name - ) for odfv in on_demand_feature_views: odfv_request_data_schema = odfv.get_request_data_schema() for feature_name in odfv_request_data_schema.keys(): @@ -1135,9 +1073,6 @@ def get_historical_features( ) _validate_feature_refs(_feature_refs, full_feature_names) - # Drop refs that refer to RequestFeatureViews since they don't need to be fetched and - # already exist in the entity_df - _feature_refs = [ref for ref in _feature_refs if ref not in request_fv_refs] provider = self._get_provider() job = provider.get_historical_features( @@ -1615,19 +1550,11 @@ def _get_online_features( _feature_refs = self._get_features(features, allow_cache=True) ( requested_feature_views, - requested_request_feature_views, requested_on_demand_feature_views, ) = self._get_feature_views_to_use( features=features, allow_cache=True, hide_dummy_entity=False ) - if requested_request_feature_views: - warnings.warn( - "Request feature view is deprecated. " - "Please use request data source instead", - DeprecationWarning, - ) - ( entity_name_to_join_key_map, entity_type_map, @@ -1648,19 +1575,12 @@ def _get_online_features( num_rows = _validate_entity_values(entity_proto_values) _validate_feature_refs(_feature_refs, full_feature_names) - ( - grouped_refs, - grouped_odfv_refs, - grouped_request_fv_refs, - _, - ) = _group_feature_refs( + (grouped_refs, grouped_odfv_refs,) = _group_feature_refs( _feature_refs, requested_feature_views, - requested_request_feature_views, requested_on_demand_feature_views, ) set_usage_attribute("odfv", bool(grouped_odfv_refs)) - set_usage_attribute("request_fv", bool(grouped_request_fv_refs)) # All requested features should be present in the result. requested_result_row_names = { @@ -1673,23 +1593,14 @@ def _get_online_features( feature_views = list(view for view, _ in grouped_refs) - needed_request_data, needed_request_fv_features = self.get_needed_request_data( - grouped_odfv_refs, grouped_request_fv_refs - ) + needed_request_data = self.get_needed_request_data(grouped_odfv_refs) join_key_values: Dict[str, List[Value]] = {} request_data_features: Dict[str, List[Value]] = {} # Entity rows may be either entities or request data. for join_key_or_entity_name, values in entity_proto_values.items(): # Found request data - if ( - join_key_or_entity_name in needed_request_data - or join_key_or_entity_name in needed_request_fv_features - ): - if join_key_or_entity_name in needed_request_fv_features: - # If the data was requested as a feature then - # make sure it appears in the result. - requested_result_row_names.add(join_key_or_entity_name) + if join_key_or_entity_name in needed_request_data: request_data_features[join_key_or_entity_name] = values else: if join_key_or_entity_name in join_keys_set: @@ -1711,7 +1622,7 @@ def _get_online_features( join_key_values[join_key] = values self.ensure_request_data_values_exist( - needed_request_data, needed_request_fv_features, request_data_features + needed_request_data, request_data_features ) # Populate online features response proto with join keys and request data features @@ -1870,33 +1781,21 @@ def _populate_result_rows_from_columnar( @staticmethod def get_needed_request_data( grouped_odfv_refs: List[Tuple[OnDemandFeatureView, List[str]]], - grouped_request_fv_refs: List[Tuple[RequestFeatureView, List[str]]], - ) -> Tuple[Set[str], Set[str]]: + ) -> Set[str]: needed_request_data: Set[str] = set() - needed_request_fv_features: Set[str] = set() for odfv, _ in grouped_odfv_refs: odfv_request_data_schema = odfv.get_request_data_schema() needed_request_data.update(odfv_request_data_schema.keys()) - for request_fv, _ in grouped_request_fv_refs: - for feature in request_fv.features: - needed_request_fv_features.add(feature.name) - return needed_request_data, needed_request_fv_features + return needed_request_data @staticmethod def ensure_request_data_values_exist( needed_request_data: Set[str], - needed_request_fv_features: Set[str], request_data_features: Dict[str, List[Any]], ): - if len(needed_request_data) + len(needed_request_fv_features) != len( - request_data_features.keys() - ): + if len(needed_request_data) != len(request_data_features.keys()): missing_features = [ - x - for x in itertools.chain( - needed_request_data, needed_request_fv_features - ) - if x not in request_data_features + x for x in needed_request_data if x not in request_data_features ] raise RequestDataNotFoundInEntityRowsException( feature_names=missing_features @@ -2161,7 +2060,7 @@ def _get_feature_views_to_use( features: Optional[Union[List[str], FeatureService]], allow_cache=False, hide_dummy_entity: bool = True, - ) -> Tuple[List[FeatureView], List[RequestFeatureView], List[OnDemandFeatureView]]: + ) -> Tuple[List[FeatureView], List[OnDemandFeatureView]]: fvs = { fv.name: fv for fv in [ @@ -2172,13 +2071,6 @@ def _get_feature_views_to_use( ] } - request_fvs = { - fv.name: fv - for fv in self._registry.list_request_feature_views( - project=self.project, allow_cache=allow_cache - ) - } - od_fvs = { fv.name: fv for fv in self._registry.list_on_demand_feature_views( @@ -2187,7 +2079,7 @@ def _get_feature_views_to_use( } if isinstance(features, FeatureService): - fvs_to_use, request_fvs_to_use, od_fvs_to_use = [], [], [] + fvs_to_use, od_fvs_to_use = [], [] for fv_name, projection in [ (projection.name, projection) for projection in features.feature_view_projections @@ -2196,10 +2088,6 @@ def _get_feature_views_to_use( fvs_to_use.append( fvs[fv_name].with_projection(copy.copy(projection)) ) - elif fv_name in request_fvs: - request_fvs_to_use.append( - request_fvs[fv_name].with_projection(copy.copy(projection)) - ) elif fv_name in od_fvs: odfv = od_fvs[fv_name].with_projection(copy.copy(projection)) od_fvs_to_use.append(odfv) @@ -2214,11 +2102,10 @@ def _get_feature_views_to_use( f"{fv_name} which doesn't exist. Please make sure that you have created the feature view" f'{fv_name} and that you have registered it by running "apply".' ) - views_to_use = (fvs_to_use, request_fvs_to_use, od_fvs_to_use) + views_to_use = (fvs_to_use, od_fvs_to_use) else: views_to_use = ( [*fvs.values()], - [*request_fvs.values()], [*od_fvs.values()], ) @@ -2456,24 +2343,15 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F def _group_feature_refs( features: List[str], all_feature_views: List[FeatureView], - all_request_feature_views: List[RequestFeatureView], all_on_demand_feature_views: List[OnDemandFeatureView], ) -> Tuple[ - List[Tuple[FeatureView, List[str]]], - List[Tuple[OnDemandFeatureView, List[str]]], - List[Tuple[RequestFeatureView, List[str]]], - Set[str], + List[Tuple[FeatureView, List[str]]], List[Tuple[OnDemandFeatureView, List[str]]] ]: """Get list of feature views and corresponding feature names based on feature references""" # view name to view proto view_index = {view.projection.name_to_use(): view for view in all_feature_views} - # request view name to proto - request_view_index = { - view.projection.name_to_use(): view for view in all_request_feature_views - } - # on demand view to on demand view proto on_demand_view_index = { view.projection.name_to_use(): view for view in all_on_demand_feature_views @@ -2481,8 +2359,6 @@ def _group_feature_refs( # view name to feature names views_features = defaultdict(set) - request_views_features = defaultdict(set) - request_view_refs = set() # on demand view name to feature names on_demand_view_features = defaultdict(set) @@ -2503,26 +2379,17 @@ def _group_feature_refs( ].source_feature_view_projections.values(): for input_feat in input_fv_projection.features: views_features[input_fv_projection.name].add(input_feat.name) - elif view_name in request_view_index: - request_view_index[view_name].projection.get_feature( - feat_name - ) # For validation - request_views_features[view_name].add(feat_name) - request_view_refs.add(ref) else: raise FeatureViewNotFoundException(view_name) fvs_result: List[Tuple[FeatureView, List[str]]] = [] odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = [] - request_fvs_result: List[Tuple[RequestFeatureView, List[str]]] = [] for view_name, feature_names in views_features.items(): fvs_result.append((view_index[view_name], list(feature_names))) - for view_name, feature_names in request_views_features.items(): - request_fvs_result.append((request_view_index[view_name], list(feature_names))) for view_name, feature_names in on_demand_view_features.items(): odfvs_result.append((on_demand_view_index[view_name], list(feature_names))) - return fvs_result, odfvs_result, request_fvs_result, request_view_refs + return fvs_result, odfvs_result def _print_materialization_log( diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 9b300d7bf4..17a8e20f78 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -94,7 +94,7 @@ def pull_latest_from_table_or_query( FROM ( SELECT {a_field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row - FROM ({from_expression}) a + FROM {from_expression} a WHERE a."{timestamp_field}" BETWEEN '{start_date}'::timestamptz AND '{end_date}'::timestamptz ) b WHERE _feast_row = 1 diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 8cd392ce5d..0809043a01 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -185,6 +185,19 @@ def get_table_query_string(self) -> str: return f"`{tmp_table_name}`" + def __eq__(self, other): + base_eq = super().__eq__(other) + if not base_eq: + return False + return ( + self.table == other.table + and self.query == other.query + and self.path == other.path + ) + + def __hash__(self): + return super().__hash__() + class SparkOptions: allowed_formats = [format.value for format in SparkSourceFormat] diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 9ee3bbbabc..c67164103e 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -12,13 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import json +import warnings from abc import ABC, abstractmethod from collections import defaultdict from datetime import datetime from typing import Any, Dict, List, Optional from google.protobuf.json_format import MessageToJson -from proto import Message +from google.protobuf.message import Message from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource @@ -29,9 +30,10 @@ from feast.on_demand_feature_view import OnDemandFeatureView from feast.project_metadata import ProjectMetadata from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView +from feast.transformation.pandas_transformation import PandasTransformation +from feast.transformation.substrait_transformation import SubstraitTransformation class BaseRegistry(ABC): @@ -347,41 +349,6 @@ def list_feature_views( """ raise NotImplementedError - # request feature view operations - @abstractmethod - def get_request_feature_view( - self, name: str, project: str, allow_cache: bool = False - ) -> RequestFeatureView: - """ - Retrieves a request feature view. - - Args: - name: Name of request feature view - project: Feast project that this feature view belongs to - allow_cache: Allow returning feature view from the cached registry - - Returns: - Returns either the specified feature view, or raises an exception if - none is found - """ - raise NotImplementedError - - @abstractmethod - def list_request_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[RequestFeatureView]: - """ - Retrieve a list of request feature views from the registry - - Args: - allow_cache: Allow returning feature views from the cached registry - project: Filter feature views based on project name - - Returns: - List of request feature views - """ - raise NotImplementedError - @abstractmethod def apply_materialization( self, @@ -662,18 +629,40 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: key=lambda on_demand_feature_view: on_demand_feature_view.name, ): odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto()) - - odfv_dict["spec"]["userDefinedFunction"][ - "body" - ] = on_demand_feature_view.transformation.udf_string - registry_dict["onDemandFeatureViews"].append(odfv_dict) - for request_feature_view in sorted( - self.list_request_feature_views(project=project), - key=lambda request_feature_view: request_feature_view.name, - ): - registry_dict["requestFeatureViews"].append( - self._message_to_sorted_dict(request_feature_view.to_proto()) + # We are logging a warning because the registry object may be read from a proto that is not updated + # i.e., we have to submit dual writes but in order to ensure the read behavior succeeds we have to load + # both objects to compare any changes in the registry + warnings.warn( + "We will be deprecating the usage of spec.userDefinedFunction in a future release please upgrade cautiously.", + DeprecationWarning, ) + if on_demand_feature_view.feature_transformation: + if isinstance( + on_demand_feature_view.feature_transformation, PandasTransformation + ): + if "userDefinedFunction" not in odfv_dict["spec"]: + odfv_dict["spec"]["userDefinedFunction"] = {} + odfv_dict["spec"]["userDefinedFunction"][ + "body" + ] = on_demand_feature_view.feature_transformation.udf_string + odfv_dict["spec"]["featureTransformation"]["userDefinedFunction"][ + "body" + ] = on_demand_feature_view.feature_transformation.udf_string + elif isinstance( + on_demand_feature_view.feature_transformation, + SubstraitTransformation, + ): + odfv_dict["spec"]["featureTransformation"]["substraitPlan"][ + "body" + ] = on_demand_feature_view.feature_transformation.substrait_plan + else: + odfv_dict["spec"]["featureTransformation"]["userDefinedFunction"][ + "body" + ] = None + odfv_dict["spec"]["featureTransformation"]["substraitPlan"][ + "body" + ] = None + registry_dict["onDemandFeatureViews"].append(odfv_dict) for stream_feature_view in sorted( self.list_stream_feature_views(project=project), key=lambda stream_feature_view: stream_feature_view.name, @@ -684,6 +673,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: "body" ] = stream_feature_view.udf_string registry_dict["streamFeatureViews"].append(sfv_dict) + for saved_dataset in sorted( self.list_saved_datasets(project=project), key=lambda item: item.name ): diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 4c408b0a46..3101b073d5 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -14,7 +14,6 @@ from feast.infra.registry.base_registry import BaseRegistry from feast.on_demand_feature_view import OnDemandFeatureView from feast.project_metadata import ProjectMetadata -from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView @@ -145,34 +144,6 @@ def list_on_demand_feature_views( ) return self._list_on_demand_feature_views(project) - @abstractmethod - def _get_request_feature_view(self, name: str, project: str) -> RequestFeatureView: - pass - - def get_request_feature_view( - self, name: str, project: str, allow_cache: bool = False - ) -> RequestFeatureView: - if allow_cache: - self._refresh_cached_registry_if_necessary() - return proto_registry_utils.get_request_feature_view( - self.cached_registry_proto, name, project - ) - return self._get_request_feature_view(name, project) - - @abstractmethod - def _list_request_feature_views(self, project: str) -> List[RequestFeatureView]: - pass - - def list_request_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[RequestFeatureView]: - if allow_cache: - self._refresh_cached_registry_if_necessary() - return proto_registry_utils.list_request_feature_views( - self.cached_registry_proto, project - ) - return self._list_request_feature_views(project) - @abstractmethod def _get_stream_feature_view(self, name: str, project: str) -> StreamFeatureView: pass diff --git a/sdk/python/feast/infra/registry/contrib/postgres/postgres_registry_store.py b/sdk/python/feast/infra/registry/contrib/postgres/postgres_registry_store.py index 362ec9f485..877e0a018a 100644 --- a/sdk/python/feast/infra/registry/contrib/postgres/postgres_registry_store.py +++ b/sdk/python/feast/infra/registry/contrib/postgres/postgres_registry_store.py @@ -1,3 +1,4 @@ +import warnings from typing import Optional import psycopg2 @@ -37,6 +38,11 @@ def __init__(self, config: PostgresRegistryConfig, registry_path: str): sslcert_path=getattr(config, "sslcert_path", None), sslrootcert_path=getattr(config, "sslrootcert_path", None), ) + warnings.warn( + "PostgreSQLRegistryStore is deprecated and will be removed in the future releases. Please use SqlRegistry instead.", + DeprecationWarning, + ) + self.table_name = config.path self.cache_ttl_seconds = config.cache_ttl_seconds diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index e93f513b69..4d2e16cb02 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -19,7 +19,6 @@ from feast.project_metadata import ProjectMetadata from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView @@ -99,16 +98,6 @@ def get_stream_feature_view( raise FeatureViewNotFoundException(name, project) -def get_request_feature_view(registry_proto: RegistryProto, name: str, project: str): - for feature_view_proto in registry_proto.feature_views: - if ( - feature_view_proto.spec.name == name - and feature_view_proto.spec.project == project - ): - return RequestFeatureView.from_proto(feature_view_proto) - raise FeatureViewNotFoundException(name, project) - - def get_on_demand_feature_view( registry_proto: RegistryProto, name: str, project: str ) -> OnDemandFeatureView: @@ -180,19 +169,6 @@ def list_feature_views( return feature_views -@registry_proto_cache -def list_request_feature_views( - registry_proto: RegistryProto, project: str -) -> List[RequestFeatureView]: - feature_views: List[RequestFeatureView] = [] - for request_feature_view_proto in registry_proto.request_feature_views: - if request_feature_view_proto.spec.project == project: - feature_views.append( - RequestFeatureView.from_proto(request_feature_view_proto) - ) - return feature_views - - @registry_proto_cache def list_stream_feature_views( registry_proto: RegistryProto, project: str diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index a9d6c44f38..d949b6079d 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -20,7 +20,7 @@ from urllib.parse import urlparse from google.protobuf.internal.containers import RepeatedCompositeFieldContainer -from proto import Message +from google.protobuf.message import Message from feast import usage from feast.base_feature_view import BaseFeatureView @@ -46,7 +46,6 @@ from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.repo_config import RegistryConfig from feast.repo_contents import RepoContents -from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView @@ -73,7 +72,6 @@ class FeastObjectType(Enum): ENTITY = "entity" FEATURE_VIEW = "feature view" ON_DEMAND_FEATURE_VIEW = "on demand feature view" - REQUEST_FEATURE_VIEW = "request feature view" STREAM_FEATURE_VIEW = "stream feature view" FEATURE_SERVICE = "feature service" @@ -88,9 +86,6 @@ def get_objects_from_registry( FeastObjectType.ON_DEMAND_FEATURE_VIEW: registry.list_on_demand_feature_views( project=project ), - FeastObjectType.REQUEST_FEATURE_VIEW: registry.list_request_feature_views( - project=project - ), FeastObjectType.STREAM_FEATURE_VIEW: registry.list_stream_feature_views( project=project, ), @@ -108,7 +103,6 @@ def get_objects_from_repo_contents( FeastObjectType.ENTITY: repo_contents.entities, FeastObjectType.FEATURE_VIEW: repo_contents.feature_views, FeastObjectType.ON_DEMAND_FEATURE_VIEW: repo_contents.on_demand_feature_views, - FeastObjectType.REQUEST_FEATURE_VIEW: repo_contents.request_feature_views, FeastObjectType.STREAM_FEATURE_VIEW: repo_contents.stream_feature_views, FeastObjectType.FEATURE_SERVICE: repo_contents.feature_services, } @@ -402,10 +396,6 @@ def apply_feature_view( existing_feature_views_of_same_type = ( self.cached_registry_proto.on_demand_feature_views ) - elif isinstance(feature_view, RequestFeatureView): - existing_feature_views_of_same_type = ( - self.cached_registry_proto.request_feature_views - ) else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") @@ -532,24 +522,6 @@ def list_feature_views( ) return proto_registry_utils.list_feature_views(registry_proto, project) - def get_request_feature_view( - self, name: str, project: str, allow_cache: bool = False - ): - registry_proto = self._get_registry_proto( - project=project, allow_cache=allow_cache - ) - return proto_registry_utils.get_request_feature_view( - registry_proto, name, project - ) - - def list_request_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[RequestFeatureView]: - registry_proto = self._get_registry_proto( - project=project, allow_cache=allow_cache - ) - return proto_registry_utils.list_request_feature_views(registry_proto, project) - def get_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> FeatureView: @@ -601,18 +573,6 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): self.commit() return - for idx, existing_request_feature_view_proto in enumerate( - self.cached_registry_proto.request_feature_views - ): - if ( - existing_request_feature_view_proto.spec.name == name - and existing_request_feature_view_proto.spec.project == project - ): - del self.cached_registry_proto.request_feature_views[idx] - if commit: - self.commit() - return - for idx, existing_on_demand_feature_view_proto in enumerate( self.cached_registry_proto.on_demand_feature_views ): @@ -890,10 +850,7 @@ def _existing_feature_view_names_to_fvs(self) -> Dict[str, Message]: for fv in self.cached_registry_proto.on_demand_feature_views } fvs = {fv.spec.name: fv for fv in self.cached_registry_proto.feature_views} - request_fvs = { - fv.spec.name: fv for fv in self.cached_registry_proto.request_feature_views - } sfv = { fv.spec.name: fv for fv in self.cached_registry_proto.stream_feature_views } - return {**odfvs, **fvs, **request_fvs, **sfv} + return {**odfvs, **fvs, **sfv} diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index 67d61ffec7..f93e1ab1c0 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -19,7 +19,6 @@ from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.registry import RegistryServer_pb2, RegistryServer_pb2_grpc from feast.repo_config import RegistryConfig -from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView @@ -215,31 +214,6 @@ def list_feature_views( for feature_view in response.feature_views ] - def get_request_feature_view( - self, name: str, project: str, allow_cache: bool = False - ) -> RequestFeatureView: - request = RegistryServer_pb2.GetRequestFeatureViewRequest( - name=name, project=project, allow_cache=allow_cache - ) - - response = self.stub.GetRequestFeatureView(request) - - return RequestFeatureView.from_proto(response) - - def list_request_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[RequestFeatureView]: - request = RegistryServer_pb2.ListRequestFeatureViewsRequest( - project=project, allow_cache=allow_cache - ) - - response = self.stub.ListRequestFeatureViews(request) - - return [ - RequestFeatureView.from_proto(request_feature_view) - for request_feature_view in response.request_feature_views - ] - def apply_materialization( self, feature_view: FeatureView, diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index cdf79c78b5..326d2e0226 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -44,9 +44,6 @@ OnDemandFeatureView as OnDemandFeatureViewProto, ) from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.protos.feast.core.RequestFeatureView_pb2 import ( - RequestFeatureView as RequestFeatureViewProto, -) from feast.protos.feast.core.SavedDataset_pb2 import SavedDataset as SavedDatasetProto from feast.protos.feast.core.StreamFeatureView_pb2 import ( StreamFeatureView as StreamFeatureViewProto, @@ -55,7 +52,6 @@ ValidationReference as ValidationReferenceProto, ) from feast.repo_config import RegistryConfig -from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView @@ -370,7 +366,6 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): deleted_count = 0 for table in { "FEATURE_VIEWS", - "REQUEST_FEATURE_VIEWS", "ON_DEMAND_FEATURE_VIEWS", "STREAM_FEATURE_VIEWS", }: @@ -529,25 +524,6 @@ def get_on_demand_feature_view( FeatureViewNotFoundException, ) - def get_request_feature_view( - self, name: str, project: str, allow_cache: bool = False - ) -> RequestFeatureView: - if allow_cache: - self._refresh_cached_registry_if_necessary() - return proto_registry_utils.get_request_feature_view( - self.cached_registry_proto, name, project - ) - return self._get_object( - "REQUEST_FEATURE_VIEWS", - name, - project, - RequestFeatureViewProto, - RequestFeatureView, - "REQUEST_FEATURE_VIEW_NAME", - "REQUEST_FEATURE_VIEW_PROTO", - FeatureViewNotFoundException, - ) - def get_saved_dataset( self, name: str, project: str, allow_cache: bool = False ) -> SavedDataset: @@ -709,22 +685,6 @@ def list_on_demand_feature_views( "ON_DEMAND_FEATURE_VIEW_PROTO", ) - def list_request_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[RequestFeatureView]: - if allow_cache: - self._refresh_cached_registry_if_necessary() - return proto_registry_utils.list_request_feature_views( - self.cached_registry_proto, project - ) - return self._list_objects( - "REQUEST_FEATURE_VIEWS", - project, - RequestFeatureViewProto, - RequestFeatureView, - "REQUEST_FEATURE_VIEW_PROTO", - ) - def list_saved_datasets( self, project: str, allow_cache: bool = False ) -> List[SavedDataset]: @@ -809,7 +769,7 @@ def apply_materialization( fv_column_name = fv_table_str[:-1] python_class, proto_class = self._infer_fv_classes(feature_view) - if python_class in {RequestFeatureView, OnDemandFeatureView}: + if python_class in {OnDemandFeatureView}: raise ValueError( f"Cannot apply materialization for feature {feature_view.name} of type {python_class}" ) @@ -933,7 +893,6 @@ def proto(self) -> RegistryProto: (self.list_feature_views, r.feature_views), (self.list_data_sources, r.data_sources), (self.list_on_demand_feature_views, r.on_demand_feature_views), - (self.list_request_feature_views, r.request_feature_views), (self.list_stream_feature_views, r.stream_feature_views), (self.list_feature_services, r.feature_services), (self.list_saved_datasets, r.saved_datasets), @@ -968,7 +927,6 @@ def _get_all_projects(self) -> Set[str]: "ENTITIES", "FEATURE_VIEWS", "ON_DEMAND_FEATURE_VIEWS", - "REQUEST_FEATURE_VIEWS", "STREAM_FEATURE_VIEWS", ] @@ -1010,8 +968,6 @@ def _infer_fv_classes(self, feature_view): python_class, proto_class = FeatureView, FeatureViewProto elif isinstance(feature_view, OnDemandFeatureView): python_class, proto_class = OnDemandFeatureView, OnDemandFeatureViewProto - elif isinstance(feature_view, RequestFeatureView): - python_class, proto_class = RequestFeatureView, RequestFeatureViewProto else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") return python_class, proto_class @@ -1023,8 +979,6 @@ def _infer_fv_table(self, feature_view) -> str: table = "FEATURE_VIEWS" elif isinstance(feature_view, OnDemandFeatureView): table = "ON_DEMAND_FEATURE_VIEWS" - elif isinstance(feature_view, RequestFeatureView): - table = "REQUEST_FEATURE_VIEWS" else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") return table diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 597c9b8513..f9030a6875 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -50,9 +50,6 @@ OnDemandFeatureView as OnDemandFeatureViewProto, ) from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.protos.feast.core.RequestFeatureView_pb2 import ( - RequestFeatureView as RequestFeatureViewProto, -) from feast.protos.feast.core.SavedDataset_pb2 import SavedDataset as SavedDatasetProto from feast.protos.feast.core.StreamFeatureView_pb2 import ( StreamFeatureView as StreamFeatureViewProto, @@ -61,7 +58,6 @@ ValidationReference as ValidationReferenceProto, ) from feast.repo_config import RegistryConfig -from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView @@ -96,16 +92,6 @@ Column("user_metadata", LargeBinary, nullable=True), ) -request_feature_views = Table( - "request_feature_views", - metadata, - Column("feature_view_name", String(50), primary_key=True), - Column("project_id", String(50), primary_key=True), - Column("last_updated_timestamp", BigInteger, nullable=False), - Column("feature_view_proto", LargeBinary, nullable=False), - Column("user_metadata", LargeBinary, nullable=True), -) - stream_feature_views = Table( "stream_feature_views", metadata, @@ -216,7 +202,6 @@ def teardown(self): feature_views, feature_services, on_demand_feature_views, - request_feature_views, saved_datasets, validation_references, }: @@ -292,18 +277,6 @@ def _get_on_demand_feature_view( not_found_exception=FeatureViewNotFoundException, ) - def _get_request_feature_view(self, name: str, project: str): - return self._get_object( - table=request_feature_views, - name=name, - project=project, - proto_class=RequestFeatureViewProto, - python_class=RequestFeatureView, - id_field_name="feature_view_name", - proto_field_name="feature_view_proto", - not_found_exception=FeatureViewNotFoundException, - ) - def _get_feature_service(self, name: str, project: str) -> FeatureService: return self._get_object( table=feature_services, @@ -363,7 +336,6 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): deleted_count = 0 for table in { feature_views, - request_feature_views, on_demand_feature_views, stream_feature_views, }: @@ -459,15 +431,6 @@ def _list_saved_datasets(self, project: str) -> List[SavedDataset]: "saved_dataset_proto", ) - def _list_request_feature_views(self, project: str) -> List[RequestFeatureView]: - return self._list_objects( - request_feature_views, - project, - RequestFeatureViewProto, - RequestFeatureView, - "feature_view_proto", - ) - def _list_on_demand_feature_views(self, project: str) -> List[OnDemandFeatureView]: return self._list_objects( on_demand_feature_views, @@ -532,7 +495,7 @@ def apply_materialization( table = self._infer_fv_table(feature_view) python_class, proto_class = self._infer_fv_classes(feature_view) - if python_class in {RequestFeatureView, OnDemandFeatureView}: + if python_class in {OnDemandFeatureView}: raise ValueError( f"Cannot apply materialization for feature {feature_view.name} of type {python_class}" ) @@ -628,8 +591,6 @@ def _infer_fv_table(self, feature_view): table = feature_views elif isinstance(feature_view, OnDemandFeatureView): table = on_demand_feature_views - elif isinstance(feature_view, RequestFeatureView): - table = request_feature_views else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") return table @@ -641,8 +602,6 @@ def _infer_fv_classes(self, feature_view): python_class, proto_class = FeatureView, FeatureViewProto elif isinstance(feature_view, OnDemandFeatureView): python_class, proto_class = OnDemandFeatureView, OnDemandFeatureViewProto - elif isinstance(feature_view, RequestFeatureView): - python_class, proto_class = RequestFeatureView, RequestFeatureViewProto else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") return python_class, proto_class @@ -671,7 +630,6 @@ def proto(self) -> RegistryProto: (self.list_feature_views, r.feature_views), (self.list_data_sources, r.data_sources), (self.list_on_demand_feature_views, r.on_demand_feature_views), - (self.list_request_feature_views, r.request_feature_views), (self.list_stream_feature_views, r.stream_feature_views), (self.list_feature_services, r.feature_services), (self.list_saved_datasets, r.saved_datasets), @@ -733,7 +691,10 @@ def _apply_object( } update_stmt = ( update(table) - .where(getattr(table.c, id_field_name) == name) + .where( + getattr(table.c, id_field_name) == name, + table.c.project_id == project, + ) .values( values, ) @@ -905,7 +866,6 @@ def _get_all_projects(self) -> Set[str]: entities, data_sources, feature_views, - request_feature_views, on_demand_feature_views, stream_feature_views, }: diff --git a/sdk/python/feast/infra/utils/snowflake/registry/snowflake_table_creation.sql b/sdk/python/feast/infra/utils/snowflake/registry/snowflake_table_creation.sql index 4b53d6bb3f..aa35caeac4 100644 --- a/sdk/python/feast/infra/utils/snowflake/registry/snowflake_table_creation.sql +++ b/sdk/python/feast/infra/utils/snowflake/registry/snowflake_table_creation.sql @@ -57,15 +57,6 @@ CREATE TABLE IF NOT EXISTS REGISTRY_PATH."ON_DEMAND_FEATURE_VIEWS" ( PRIMARY KEY (on_demand_feature_view_name, project_id) ); -CREATE TABLE IF NOT EXISTS REGISTRY_PATH."REQUEST_FEATURE_VIEWS" ( - request_feature_view_name VARCHAR, - project_id VARCHAR, - last_updated_timestamp TIMESTAMP_LTZ NOT NULL, - request_feature_view_proto BINARY NOT NULL, - user_metadata BINARY, - PRIMARY KEY (request_feature_view_name, project_id) -); - CREATE TABLE IF NOT EXISTS REGISTRY_PATH."SAVED_DATASETS" ( saved_dataset_name VARCHAR, project_id VARCHAR, diff --git a/sdk/python/feast/infra/utils/snowflake/registry/snowflake_table_deletion.sql b/sdk/python/feast/infra/utils/snowflake/registry/snowflake_table_deletion.sql index 7f5c1991ea..a355c72062 100644 --- a/sdk/python/feast/infra/utils/snowflake/registry/snowflake_table_deletion.sql +++ b/sdk/python/feast/infra/utils/snowflake/registry/snowflake_table_deletion.sql @@ -12,8 +12,6 @@ DROP TABLE IF EXISTS REGISTRY_PATH."MANAGED_INFRA"; DROP TABLE IF EXISTS REGISTRY_PATH."ON_DEMAND_FEATURE_VIEWS"; -DROP TABLE IF EXISTS REGISTRY_PATH."REQUEST_FEATURE_VIEWS"; - DROP TABLE IF EXISTS REGISTRY_PATH."SAVED_DATASETS"; DROP TABLE IF EXISTS REGISTRY_PATH."STREAM_FEATURE_VIEWS"; diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 586286a3d4..ce416fff2a 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -17,8 +17,6 @@ from feast.feature_view import FeatureView from feast.feature_view_projection import FeatureViewProjection from feast.field import Field, from_value_type -from feast.on_demand_pandas_transformation import OnDemandPandasTransformation -from feast.on_demand_substrait_transformation import OnDemandSubstraitTransformation from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( OnDemandFeatureView as OnDemandFeatureViewProto, ) @@ -27,6 +25,14 @@ OnDemandFeatureViewSpec, OnDemandSource, ) +from feast.protos.feast.core.Transformation_pb2 import ( + FeatureTransformationV2 as FeatureTransformationProto, +) +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProto, +) +from feast.transformation.pandas_transformation import PandasTransformation +from feast.transformation.substrait_transformation import SubstraitTransformation from feast.type_map import ( feast_value_type_to_pandas_type, python_type_to_feast_value_type, @@ -51,7 +57,7 @@ class OnDemandFeatureView(BaseFeatureView): sources with type FeatureViewProjection. source_request_sources: A map from input source names to the actual input sources with type RequestSource. - transformation: The user defined transformation. + feature_transformation: The user defined transformation. description: A human-readable description. tags: A dictionary of key-value pairs to store arbitrary metadata. owner: The owner of the on demand feature view, typically the email of the primary @@ -62,7 +68,7 @@ class OnDemandFeatureView(BaseFeatureView): features: List[Field] source_feature_view_projections: Dict[str, FeatureViewProjection] source_request_sources: Dict[str, RequestSource] - transformation: Union[OnDemandPandasTransformation] + feature_transformation: Union[PandasTransformation, SubstraitTransformation] description: str tags: Dict[str, str] owner: str @@ -82,7 +88,9 @@ def __init__( # noqa: C901 ], udf: Optional[FunctionType] = None, udf_string: str = "", - transformation: Optional[Union[OnDemandPandasTransformation]] = None, + feature_transformation: Optional[ + Union[PandasTransformation, SubstraitTransformation] + ] = None, description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -100,7 +108,7 @@ def __init__( # noqa: C901 udf (deprecated): The user defined transformation function, which must take pandas dataframes as inputs. udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI) - transformation: The user defined transformation. + feature_transformation: The user defined transformation. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email @@ -114,13 +122,13 @@ def __init__( # noqa: C901 owner=owner, ) - if not transformation: + if not feature_transformation: if udf: warnings.warn( "udf and udf_string parameters are deprecated. Please use transformation=OnDemandPandasTransformation(udf, udf_string) instead.", DeprecationWarning, ) - transformation = OnDemandPandasTransformation(udf, udf_string) + feature_transformation = PandasTransformation(udf, udf_string) else: raise Exception( "OnDemandFeatureView needs to be initialized with either transformation or udf arguments" @@ -138,7 +146,7 @@ def __init__( # noqa: C901 odfv_source.name ] = odfv_source.projection - self.transformation = transformation + self.feature_transformation = feature_transformation @property def proto_class(self) -> Type[OnDemandFeatureViewProto]: @@ -150,7 +158,7 @@ def __copy__(self): schema=self.features, sources=list(self.source_feature_view_projections.values()) + list(self.source_request_sources.values()), - transformation=self.transformation, + feature_transformation=self.feature_transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -171,7 +179,7 @@ def __eq__(self, other): self.source_feature_view_projections != other.source_feature_view_projections or self.source_request_sources != other.source_request_sources - or self.transformation != other.transformation + or self.feature_transformation != other.feature_transformation ): return False @@ -205,16 +213,19 @@ def to_proto(self) -> OnDemandFeatureViewProto: request_data_source=request_sources.to_proto() ) + feature_transformation = FeatureTransformationProto( + user_defined_function=self.feature_transformation.to_proto() + if isinstance(self.feature_transformation, PandasTransformation) + else None, + substrait_transformation=self.feature_transformation.to_proto() + if isinstance(self.feature_transformation, SubstraitTransformation) + else None, + ) spec = OnDemandFeatureViewSpec( name=self.name, features=[feature.to_proto() for feature in self.features], sources=sources, - user_defined_function=self.transformation.to_proto() - if type(self.transformation) == OnDemandPandasTransformation - else None, - on_demand_substrait_transformation=self.transformation.to_proto() # type: ignore - if type(self.transformation) == OnDemandSubstraitTransformation - else None, + feature_transformation=feature_transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -254,18 +265,37 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): ) if ( - on_demand_feature_view_proto.spec.WhichOneof("transformation") + on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( + "transformation" + ) == "user_defined_function" + and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text + != "" ): - transformation = OnDemandPandasTransformation.from_proto( - on_demand_feature_view_proto.spec.user_defined_function + transformation = PandasTransformation.from_proto( + on_demand_feature_view_proto.spec.feature_transformation.user_defined_function ) elif ( - on_demand_feature_view_proto.spec.WhichOneof("transformation") - == "on_demand_substrait_transformation" + on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( + "transformation" + ) + == "substrait_transformation" ): - transformation = OnDemandSubstraitTransformation.from_proto( - on_demand_feature_view_proto.spec.on_demand_substrait_transformation + transformation = SubstraitTransformation.from_proto( + on_demand_feature_view_proto.spec.feature_transformation.substrait_transformation + ) + elif ( + hasattr(on_demand_feature_view_proto.spec, "user_defined_function") + and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text + == "" + ): + backwards_compatible_udf = UserDefinedFunctionProto( + name=on_demand_feature_view_proto.spec.user_defined_function.name, + body=on_demand_feature_view_proto.spec.user_defined_function.body, + body_text=on_demand_feature_view_proto.spec.user_defined_function.body_text, + ) + transformation = PandasTransformation.from_proto( + user_defined_function_proto=backwards_compatible_udf, ) else: raise Exception("At least one transformation type needs to be provided") @@ -280,7 +310,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): for feature in on_demand_feature_view_proto.spec.features ], sources=sources, - transformation=transformation, + feature_transformation=transformation, description=on_demand_feature_view_proto.spec.description, tags=dict(on_demand_feature_view_proto.spec.tags), owner=on_demand_feature_view_proto.spec.owner, @@ -340,7 +370,9 @@ def get_transformed_features_df( # Compute transformed values and apply to each result row - df_with_transformed_features = self.transformation.transform(df_with_features) + df_with_transformed_features = self.feature_transformation.transform( + df_with_features + ) # Work out whether the correct columns names are used. rename_columns: Dict[str, str] = {} @@ -390,7 +422,7 @@ def infer_features(self) -> None: dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) sample_val = rand_df_value[dtype] if dtype in rand_df_value else None df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype) - output_df: pd.DataFrame = self.transformation.transform(df) + output_df: pd.DataFrame = self.feature_transformation.transform(df) inferred_features = [] for f, dt in zip(output_df.columns, output_df.dtypes): inferred_features.append( @@ -487,7 +519,7 @@ def decorator(user_function): input_fields: Field = [] for s in sources: - if type(s) == FeatureView: + if isinstance(s, FeatureView): fields = s.projection.features else: fields = s.features @@ -506,19 +538,19 @@ def decorator(user_function): expr = user_function(ibis.table(input_fields, "t")) - transformation = OnDemandSubstraitTransformation( + transformation = SubstraitTransformation( substrait_plan=compiler.compile(expr).SerializeToString() ) else: udf_string = dill.source.getsource(user_function) mainify(user_function) - transformation = OnDemandPandasTransformation(user_function, udf_string) + transformation = PandasTransformation(user_function, udf_string) on_demand_feature_view_obj = OnDemandFeatureView( name=user_function.__name__, sources=sources, schema=schema, - transformation=transformation, + feature_transformation=transformation, description=description, tags=tags, owner=owner, diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py index 221715480e..7de0cc43e1 100644 --- a/sdk/python/feast/registry_server.py +++ b/sdk/python/feast/registry_server.py @@ -59,23 +59,6 @@ def ListFeatureViews(self, request, context): ] ) - def GetRequestFeatureView( - self, request: RegistryServer_pb2.GetRequestFeatureViewRequest, context - ): - return self.proxied_registry.get_request_feature_view( - name=request.name, project=request.project, allow_cache=request.allow_cache - ).to_proto() - - def ListRequestFeatureViews(self, request, context): - return RegistryServer_pb2.ListRequestFeatureViewsResponse( - request_feature_views=[ - request_feature_view.to_proto() - for request_feature_view in self.proxied_registry.list_request_feature_views( - project=request.project, allow_cache=request.allow_cache - ) - ] - ) - def GetStreamFeatureView( self, request: RegistryServer_pb2.GetStreamFeatureViewRequest, context ): diff --git a/sdk/python/feast/repo_contents.py b/sdk/python/feast/repo_contents.py index fe5cbd284b..33b99f29b2 100644 --- a/sdk/python/feast/repo_contents.py +++ b/sdk/python/feast/repo_contents.py @@ -19,7 +19,6 @@ from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.request_feature_view import RequestFeatureView from feast.stream_feature_view import StreamFeatureView @@ -31,7 +30,6 @@ class RepoContents(NamedTuple): data_sources: List[DataSource] feature_views: List[FeatureView] on_demand_feature_views: List[OnDemandFeatureView] - request_feature_views: List[RequestFeatureView] stream_feature_views: List[StreamFeatureView] entities: List[Entity] feature_services: List[FeatureService] @@ -46,9 +44,6 @@ def to_registry_proto(self) -> RegistryProto: registry_proto.on_demand_feature_views.extend( [fv.to_proto() for fv in self.on_demand_feature_views] ) - registry_proto.request_feature_views.extend( - [fv.to_proto() for fv in self.request_feature_views] - ) registry_proto.feature_services.extend( [fs.to_proto() for fs in self.feature_services] ) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 120f6e7a42..000e000438 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -29,7 +29,6 @@ from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import RepoConfig from feast.repo_contents import RepoContents -from feast.request_feature_view import RequestFeatureView from feast.stream_feature_view import StreamFeatureView from feast.usage import log_exceptions_and_usage @@ -114,7 +113,6 @@ def parse_repo(repo_root: Path) -> RepoContents: feature_services=[], on_demand_feature_views=[], stream_feature_views=[], - request_feature_views=[], ) for repo_file in get_repo_files(repo_root): @@ -196,10 +194,6 @@ def parse_repo(repo_root: Path) -> RepoContents: (obj is odfv) for odfv in res.on_demand_feature_views ): res.on_demand_feature_views.append(obj) - elif isinstance(obj, RequestFeatureView) and not any( - (obj is rfv) for rfv in res.request_feature_views - ): - res.request_feature_views.append(obj) res.entities.append(DUMMY_ENTITY) return res @@ -250,7 +244,6 @@ def extract_objects_for_apply_delete(project, registry, repo): Union[ Entity, FeatureView, - RequestFeatureView, OnDemandFeatureView, StreamFeatureView, FeatureService, @@ -264,7 +257,6 @@ def extract_objects_for_apply_delete(project, registry, repo): Union[ Entity, FeatureView, - RequestFeatureView, OnDemandFeatureView, StreamFeatureView, FeatureService, diff --git a/sdk/python/feast/request_feature_view.py b/sdk/python/feast/request_feature_view.py deleted file mode 100644 index 7248ffe989..0000000000 --- a/sdk/python/feast/request_feature_view.py +++ /dev/null @@ -1,137 +0,0 @@ -import copy -import warnings -from typing import Dict, List, Optional, Type - -from feast.base_feature_view import BaseFeatureView -from feast.data_source import RequestSource -from feast.feature_view_projection import FeatureViewProjection -from feast.field import Field -from feast.protos.feast.core.RequestFeatureView_pb2 import ( - RequestFeatureView as RequestFeatureViewProto, -) -from feast.protos.feast.core.RequestFeatureView_pb2 import RequestFeatureViewSpec -from feast.usage import log_exceptions - - -class RequestFeatureView(BaseFeatureView): - """ - [Experimental] A RequestFeatureView defines a logical group of features that should - be available as an input to an on demand feature view at request time. - - Attributes: - name: The unique name of the request feature view. - request_source: The request source that specifies the schema and - features of the request feature view. - features: The list of features defined as part of this request feature view. - description: A human-readable description. - tags: A dictionary of key-value pairs to store arbitrary metadata. - owner: The owner of the request feature view, typically the email of the primary - maintainer. - """ - - name: str - request_source: RequestSource - features: List[Field] - description: str - tags: Dict[str, str] - owner: str - - @log_exceptions - def __init__( - self, - name: str, - request_data_source: RequestSource, - description: str = "", - tags: Optional[Dict[str, str]] = None, - owner: str = "", - ): - """ - Creates a RequestFeatureView object. - - Args: - name: The unique name of the request feature view. - request_data_source: The request data source that specifies the schema and - features of the request feature view. - description (optional): A human-readable description. - tags (optional): A dictionary of key-value pairs to store arbitrary metadata. - owner (optional): The owner of the request feature view, typically the email - of the primary maintainer. - """ - warnings.warn( - "Request feature view is deprecated. " - "Please use request data source instead", - DeprecationWarning, - ) - - if isinstance(request_data_source.schema, Dict): - new_features = [ - Field(name=name, dtype=dtype) - for name, dtype in request_data_source.schema.items() - ] - else: - new_features = request_data_source.schema - - super().__init__( - name=name, - features=new_features, - description=description, - tags=tags, - owner=owner, - ) - self.request_source = request_data_source - - @property - def proto_class(self) -> Type[RequestFeatureViewProto]: - return RequestFeatureViewProto - - def to_proto(self) -> RequestFeatureViewProto: - """ - Converts an request feature view object to its protobuf representation. - - Returns: - A RequestFeatureViewProto protobuf. - """ - spec = RequestFeatureViewSpec( - name=self.name, - request_data_source=self.request_source.to_proto(), - description=self.description, - tags=self.tags, - owner=self.owner, - ) - - return RequestFeatureViewProto(spec=spec) - - @classmethod - def from_proto(cls, request_feature_view_proto: RequestFeatureViewProto): - """ - Creates a request feature view from a protobuf representation. - - Args: - request_feature_view_proto: A protobuf representation of an request feature view. - - Returns: - A RequestFeatureView object based on the request feature view protobuf. - """ - - request_feature_view_obj = cls( - name=request_feature_view_proto.spec.name, - request_data_source=RequestSource.from_proto( - request_feature_view_proto.spec.request_data_source - ), - description=request_feature_view_proto.spec.description, - tags=dict(request_feature_view_proto.spec.tags), - owner=request_feature_view_proto.spec.owner, - ) - - # FeatureViewProjections are not saved in the RequestFeatureView proto. - # Create the default projection. - request_feature_view_obj.projection = FeatureViewProjection.from_definition( - request_feature_view_obj - ) - - return request_feature_view_obj - - def __copy__(self): - fv = RequestFeatureView(name=self.name, request_data_source=self.request_source) - fv.projection = copy.copy(self.projection) - return fv diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 13abbc5e28..301cf6cba5 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -25,6 +25,13 @@ from feast.protos.feast.core.StreamFeatureView_pb2 import ( StreamFeatureViewSpec as StreamFeatureViewSpecProto, ) +from feast.protos.feast.core.Transformation_pb2 import ( + FeatureTransformationV2 as FeatureTransformationProto, +) +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProtoV2, +) +from feast.transformation.pandas_transformation import PandasTransformation warnings.simplefilter("once", RuntimeWarning) @@ -73,6 +80,7 @@ class StreamFeatureView(FeatureView): materialization_intervals: List[Tuple[datetime, datetime]] udf: Optional[FunctionType] udf_string: Optional[str] + feature_transformation: Optional[PandasTransformation] def __init__( self, @@ -91,6 +99,7 @@ def __init__( timestamp_field: Optional[str] = "", udf: Optional[FunctionType] = None, udf_string: Optional[str] = "", + feature_transformation: Optional[Union[PandasTransformation]] = None, ): if not flags_helper.is_test(): warnings.warn( @@ -118,6 +127,7 @@ def __init__( self.timestamp_field = timestamp_field or "" self.udf = udf self.udf_string = udf_string + self.feature_transformation = feature_transformation super().__init__( name=name, @@ -171,19 +181,30 @@ def to_proto(self): stream_source_proto = self.stream_source.to_proto() stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}" - udf_proto = None + udf_proto, feature_transformation = None, None if self.udf: udf_proto = UserDefinedFunctionProto( name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True), body_text=self.udf_string, ) + udf_proto_v2 = UserDefinedFunctionProtoV2( + name=self.udf.__name__, + body=dill.dumps(self.udf, recurse=True), + body_text=self.udf_string, + ) + + feature_transformation = FeatureTransformationProto( + user_defined_function=udf_proto_v2, + ) + spec = StreamFeatureViewSpecProto( name=self.name, entities=self.entities, entity_columns=[field.to_proto() for field in self.entity_columns], features=[field.to_proto() for field in self.schema], user_defined_function=udf_proto, + feature_transformation=feature_transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -220,6 +241,11 @@ def from_proto(cls, sfv_proto): if sfv_proto.spec.HasField("user_defined_function") else None ) + # feature_transformation = ( + # sfv_proto.spec.feature_transformation.user_defined_function.body_text + # if sfv_proto.spec.HasField("feature_transformation") + # else None + # ) stream_feature_view = cls( name=sfv_proto.spec.name, description=sfv_proto.spec.description, @@ -238,6 +264,9 @@ def from_proto(cls, sfv_proto): mode=sfv_proto.spec.mode, udf=udf, udf_string=udf_string, + feature_transformation=PandasTransformation(udf, udf_string) + if udf + else None, aggregations=[ Aggregation.from_proto(agg_proto) for agg_proto in sfv_proto.spec.aggregations @@ -294,6 +323,7 @@ def __copy__(self): timestamp_field=self.timestamp_field, source=self.stream_source if self.stream_source else self.batch_source, udf=self.udf, + feature_transformation=self.feature_transformation, ) fv.entities = self.entities fv.features = copy.copy(self.features) @@ -343,6 +373,7 @@ def decorator(user_function): schema=schema, udf=user_function, udf_string=udf_string, + feature_transformation=PandasTransformation(user_function, udf_string), description=description, tags=tags, online=online, diff --git a/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml b/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml index f72c7c65f4..08383a29e1 100644 --- a/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml +++ b/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml @@ -12,6 +12,8 @@ offline_store: spark.sql.catalogImplementation: "hive" spark.sql.parser.quotedRegexColumnNames: "true" spark.sql.session.timeZone: "UTC" + spark.sql.execution.arrow.fallback.enabled: "true" + spark.sql.execution.arrow.pyspark.enabled: "true" online_store: path: data/online_store.db entity_key_serialization_version: 2 diff --git a/sdk/python/feast/transformation/__init__.py b/sdk/python/feast/transformation/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/on_demand_pandas_transformation.py b/sdk/python/feast/transformation/pandas_transformation.py similarity index 78% rename from sdk/python/feast/on_demand_pandas_transformation.py rename to sdk/python/feast/transformation/pandas_transformation.py index 32cb44b429..76f17e2106 100644 --- a/sdk/python/feast/on_demand_pandas_transformation.py +++ b/sdk/python/feast/transformation/pandas_transformation.py @@ -3,12 +3,12 @@ import dill import pandas as pd -from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( - UserDefinedFunction as UserDefinedFunctionProto, +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProto, ) -class OnDemandPandasTransformation: +class PandasTransformation: def __init__(self, udf: FunctionType, udf_string: str = ""): """ Creates an OnDemandPandasTransformation object. @@ -25,9 +25,9 @@ def transform(self, df: pd.DataFrame) -> pd.DataFrame: return self.udf.__call__(df) def __eq__(self, other): - if not isinstance(other, OnDemandPandasTransformation): + if not isinstance(other, PandasTransformation): raise TypeError( - "Comparisons should only involve OnDemandPandasTransformation class objects." + "Comparisons should only involve PandasTransformation class objects." ) if ( @@ -47,7 +47,7 @@ def to_proto(self) -> UserDefinedFunctionProto: @classmethod def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto): - return OnDemandPandasTransformation( + return PandasTransformation( udf=dill.loads(user_defined_function_proto.body), udf_string=user_defined_function_proto.body_text, ) diff --git a/sdk/python/feast/on_demand_substrait_transformation.py b/sdk/python/feast/transformation/substrait_transformation.py similarity index 54% rename from sdk/python/feast/on_demand_substrait_transformation.py rename to sdk/python/feast/transformation/substrait_transformation.py index 4e92e77dc8..b3dbe7a4b4 100644 --- a/sdk/python/feast/on_demand_substrait_transformation.py +++ b/sdk/python/feast/transformation/substrait_transformation.py @@ -2,15 +2,15 @@ import pyarrow import pyarrow.substrait as substrait # type: ignore # noqa -from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( - OnDemandSubstraitTransformation as OnDemandSubstraitTransformationProto, +from feast.protos.feast.core.Transformation_pb2 import ( + SubstraitTransformationV2 as SubstraitTransformationProto, ) -class OnDemandSubstraitTransformation: +class SubstraitTransformation: def __init__(self, substrait_plan: bytes): """ - Creates an OnDemandSubstraitTransformation object. + Creates an SubstraitTransformation object. Args: substrait_plan: The user-provided substrait plan. @@ -27,9 +27,9 @@ def table_provider(names, schema: pyarrow.Schema): return table.to_pandas() def __eq__(self, other): - if not isinstance(other, OnDemandSubstraitTransformation): + if not isinstance(other, SubstraitTransformation): raise TypeError( - "Comparisons should only involve OnDemandSubstraitTransformation class objects." + "Comparisons should only involve SubstraitTransformation class objects." ) if not super().__eq__(other): @@ -37,14 +37,14 @@ def __eq__(self, other): return self.substrait_plan == other.substrait_plan - def to_proto(self) -> OnDemandSubstraitTransformationProto: - return OnDemandSubstraitTransformationProto(substrait_plan=self.substrait_plan) + def to_proto(self) -> SubstraitTransformationProto: + return SubstraitTransformationProto(substrait_plan=self.substrait_plan) @classmethod def from_proto( cls, - on_demand_substrait_transformation_proto: OnDemandSubstraitTransformationProto, + substrait_transformation_proto: SubstraitTransformationProto, ): - return OnDemandSubstraitTransformation( - substrait_plan=on_demand_substrait_transformation_proto.substrait_plan + return SubstraitTransformation( + substrait_plan=substrait_transformation_proto.substrait_plan ) diff --git a/sdk/python/pytest.ini b/sdk/python/pytest.ini index 07a5e869dc..83317d36c9 100644 --- a/sdk/python/pytest.ini +++ b/sdk/python/pytest.ini @@ -1,4 +1,8 @@ [pytest] markers = universal_offline_stores: mark a test as using all offline stores. - universal_online_stores: mark a test as using all online stores. \ No newline at end of file + universal_online_stores: mark a test as using all online stores. + +env = + FEAST_USAGE=False + IS_TEST=True \ No newline at end of file diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 737271eee1..ac1994da37 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -61,11 +61,11 @@ black==22.12.0 # via feast (setup.py) bleach==6.1.0 # via nbconvert -boto3==1.34.67 +boto3==1.34.69 # via # feast (setup.py) # moto -botocore==1.34.67 +botocore==1.34.69 # via # boto3 # moto @@ -205,7 +205,7 @@ geojson==2.5.0 # via rockset geomet==0.2.1.post1 # via cassandra-driver -google-api-core[grpc]==2.17.1 +google-api-core[grpc]==2.18.0 # via # feast (setup.py) # firebase-admin @@ -217,7 +217,7 @@ google-api-core[grpc]==2.17.1 # google-cloud-datastore # google-cloud-firestore # google-cloud-storage -google-api-python-client==2.122.0 +google-api-python-client==2.123.0 # via firebase-admin google-auth==2.29.0 # via @@ -337,7 +337,7 @@ importlib-metadata==6.11.0 # via # dask # feast (setup.py) -importlib-resources==6.3.2 +importlib-resources==6.4.0 # via feast (setup.py) iniconfig==2.0.0 # via pytest @@ -489,7 +489,7 @@ mypy-protobuf==3.3.0 # via feast (setup.py) nbclient==0.10.0 # via nbconvert -nbconvert==7.16.2 +nbconvert==7.16.3 # via jupyter-server nbformat==5.10.3 # via @@ -587,6 +587,7 @@ prompt-toolkit==3.0.43 # via ipython proto-plus==1.23.0 # via + # google-api-core # google-cloud-bigquery # google-cloud-bigquery-storage # google-cloud-bigtable @@ -693,6 +694,7 @@ pytest==7.4.4 # feast (setup.py) # pytest-benchmark # pytest-cov + # pytest-env # pytest-lazy-fixture # pytest-mock # pytest-ordering @@ -702,6 +704,8 @@ pytest-benchmark==3.4.1 # via feast (setup.py) pytest-cov==4.1.0 # via feast (setup.py) +pytest-env==1.1.3 + # via feast (setup.py) pytest-lazy-fixture==0.6.3 # via feast (setup.py) pytest-mock==1.10.4 @@ -779,7 +783,7 @@ requests==2.31.0 # snowflake-connector-python # sphinx # trino -requests-oauthlib==1.4.0 +requests-oauthlib==2.0.0 # via kubernetes responses==0.25.0 # via moto @@ -893,6 +897,7 @@ tomli==2.0.1 # pip-tools # pyproject-hooks # pytest + # pytest-env tomlkit==0.12.4 # via snowflake-connector-python toolz==0.12.1 @@ -930,7 +935,7 @@ traitlets==5.14.2 # nbformat trino==0.328.0 # via feast (setup.py) -typeguard==4.1.5 +typeguard==4.2.1 # via feast (setup.py) types-protobuf==3.19.22 # via diff --git a/sdk/python/requirements/py3.10-requirements.txt b/sdk/python/requirements/py3.10-requirements.txt index 240f43b57e..6603171d45 100644 --- a/sdk/python/requirements/py3.10-requirements.txt +++ b/sdk/python/requirements/py3.10-requirements.txt @@ -62,7 +62,7 @@ importlib-metadata==6.11.0 # via # dask # feast (setup.py) -importlib-resources==6.3.2 +importlib-resources==6.4.0 # via feast (setup.py) jinja2==3.1.3 # via feast (setup.py) @@ -158,7 +158,7 @@ toolz==0.12.1 # partd tqdm==4.66.2 # via feast (setup.py) -typeguard==4.1.5 +typeguard==4.2.1 # via feast (setup.py) types-protobuf==4.24.0.20240311 # via mypy-protobuf diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index f2585a7978..367b5dc050 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -61,11 +61,11 @@ black==22.12.0 # via feast (setup.py) bleach==6.1.0 # via nbconvert -boto3==1.34.67 +boto3==1.34.69 # via # feast (setup.py) # moto -botocore==1.34.67 +botocore==1.34.69 # via # boto3 # moto @@ -205,7 +205,7 @@ geojson==2.5.0 # via rockset geomet==0.2.1.post1 # via cassandra-driver -google-api-core[grpc]==2.17.1 +google-api-core[grpc]==2.18.0 # via # feast (setup.py) # firebase-admin @@ -217,7 +217,7 @@ google-api-core[grpc]==2.17.1 # google-cloud-datastore # google-cloud-firestore # google-cloud-storage -google-api-python-client==2.122.0 +google-api-python-client==2.123.0 # via firebase-admin google-auth==2.29.0 # via @@ -345,7 +345,7 @@ importlib-metadata==6.11.0 # nbconvert # sphinx # typeguard -importlib-resources==6.3.2 +importlib-resources==6.4.0 # via feast (setup.py) iniconfig==2.0.0 # via pytest @@ -497,7 +497,7 @@ mypy-protobuf==3.3.0 # via feast (setup.py) nbclient==0.10.0 # via nbconvert -nbconvert==7.16.2 +nbconvert==7.16.3 # via jupyter-server nbformat==5.10.3 # via @@ -595,6 +595,7 @@ prompt-toolkit==3.0.43 # via ipython proto-plus==1.23.0 # via + # google-api-core # google-cloud-bigquery # google-cloud-bigquery-storage # google-cloud-bigtable @@ -701,6 +702,7 @@ pytest==7.4.4 # feast (setup.py) # pytest-benchmark # pytest-cov + # pytest-env # pytest-lazy-fixture # pytest-mock # pytest-ordering @@ -710,6 +712,8 @@ pytest-benchmark==3.4.1 # via feast (setup.py) pytest-cov==4.1.0 # via feast (setup.py) +pytest-env==1.1.3 + # via feast (setup.py) pytest-lazy-fixture==0.6.3 # via feast (setup.py) pytest-mock==1.10.4 @@ -787,7 +791,7 @@ requests==2.31.0 # snowflake-connector-python # sphinx # trino -requests-oauthlib==1.4.0 +requests-oauthlib==2.0.0 # via kubernetes responses==0.25.0 # via moto @@ -903,6 +907,7 @@ tomli==2.0.1 # pip-tools # pyproject-hooks # pytest + # pytest-env tomlkit==0.12.4 # via snowflake-connector-python toolz==0.12.1 @@ -940,7 +945,7 @@ traitlets==5.14.2 # nbformat trino==0.328.0 # via feast (setup.py) -typeguard==4.1.5 +typeguard==4.2.1 # via feast (setup.py) types-protobuf==3.19.22 # via diff --git a/sdk/python/requirements/py3.9-requirements.txt b/sdk/python/requirements/py3.9-requirements.txt index 43b0191ed4..3b8f555ca7 100644 --- a/sdk/python/requirements/py3.9-requirements.txt +++ b/sdk/python/requirements/py3.9-requirements.txt @@ -63,7 +63,7 @@ importlib-metadata==6.11.0 # dask # feast (setup.py) # typeguard -importlib-resources==6.3.2 +importlib-resources==6.4.0 # via feast (setup.py) jinja2==3.1.3 # via feast (setup.py) @@ -159,7 +159,7 @@ toolz==0.12.1 # partd tqdm==4.66.2 # via feast (setup.py) -typeguard==4.1.5 +typeguard==4.2.1 # via feast (setup.py) types-protobuf==4.24.0.20240311 # via mypy-protobuf diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 743a1ce4a0..1c9a958ce3 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -13,7 +13,6 @@ # limitations under the License. import logging import multiprocessing -import os import random from datetime import datetime, timedelta from multiprocessing import Process @@ -24,8 +23,6 @@ import pytest from _pytest.nodes import Item -os.environ["FEAST_USAGE"] = "False" -os.environ["IS_TEST"] = "True" from feast.feature_store import FeatureStore # noqa: E402 from feast.wait import wait_retry_backoff # noqa: E402 from tests.data.data_creator import create_basic_driver_dataset # noqa: E402 diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 421ef41601..55d2ed8425 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -15,7 +15,7 @@ ) from feast.data_source import DataSource, RequestSource from feast.feature_view_projection import FeatureViewProjection -from feast.on_demand_feature_view import OnDemandPandasTransformation +from feast.on_demand_feature_view import PandasTransformation from feast.types import Array, FeastType, Float32, Float64, Int32, Int64 from tests.integration.feature_repos.universal.entities import ( customer, @@ -71,7 +71,7 @@ def conv_rate_plus_100_feature_view( name=conv_rate_plus_100.__name__, schema=[] if infer_features else _features, sources=sources, - transformation=OnDemandPandasTransformation( + feature_transformation=PandasTransformation( udf=conv_rate_plus_100, udf_string="raw udf source" ), ) @@ -110,7 +110,7 @@ def similarity_feature_view( name=similarity.__name__, sources=sources, schema=[] if infer_features else _fields, - transformation=OnDemandPandasTransformation( + feature_transformation=PandasTransformation( udf=similarity, udf_string="similarity raw udf" ), ) diff --git a/sdk/python/tests/unit/diff/test_registry_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py index ce40295f8b..c209f1e0e0 100644 --- a/sdk/python/tests/unit/diff/test_registry_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -137,13 +137,14 @@ def post_changed(inputs: pd.DataFrame) -> pd.DataFrame: # if no code is changed assert len(feast_object_diffs.feast_object_property_diffs) == 3 assert feast_object_diffs.feast_object_property_diffs[0].property_name == "name" + # Note we should only now be looking at changes for the feature_transformation field assert ( feast_object_diffs.feast_object_property_diffs[1].property_name - == "user_defined_function.name" + == "feature_transformation.name" ) assert ( feast_object_diffs.feast_object_property_diffs[2].property_name - == "user_defined_function.body_text" + == "feature_transformation.body_text" ) diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 2ad9680703..0220d1a8a9 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -1,22 +1,16 @@ -import copy from datetime import timedelta import pytest from typeguard import TypeCheckError -from feast.aggregation import Aggregation from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat -from feast.data_source import KafkaSource, PushSource +from feast.data_source import KafkaSource from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource -from feast.protos.feast.core.StreamFeatureView_pb2 import ( - StreamFeatureView as StreamFeatureViewProto, -) from feast.protos.feast.types.Value_pb2 import ValueType -from feast.stream_feature_view import StreamFeatureView, stream_feature_view from feast.types import Float32 @@ -65,169 +59,10 @@ def test_create_batch_feature_view(): ) -def test_create_stream_feature_view(): - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - StreamFeatureView( - name="test kafka stream feature view", - entities=[], - ttl=timedelta(days=30), - source=stream_source, - aggregations=[], - ) - - push_source = PushSource( - name="push source", batch_source=FileSource(path="some path") - ) - StreamFeatureView( - name="test push source feature view", - entities=[], - ttl=timedelta(days=30), - source=push_source, - aggregations=[], - ) - - with pytest.raises(TypeError): - StreamFeatureView( - name="test batch feature view", - entities=[], - ttl=timedelta(days=30), - aggregations=[], - ) - - with pytest.raises(ValueError): - StreamFeatureView( - name="test batch feature view", - entities=[], - ttl=timedelta(days=30), - source=FileSource(path="some path"), - aggregations=[], - ) - - def simple_udf(x: int): return x + 3 -def test_stream_feature_view_serialization(): - entity = Entity(name="driver_entity", join_keys=["test_key"]) - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - - sfv = StreamFeatureView( - name="test kafka stream feature view", - entities=[entity], - ttl=timedelta(days=30), - owner="test@example.com", - online=True, - schema=[Field(name="dummy_field", dtype=Float32)], - description="desc", - aggregations=[ - Aggregation( - column="dummy_field", - function="max", - time_window=timedelta(days=1), - ) - ], - timestamp_field="event_timestamp", - mode="spark", - source=stream_source, - udf=simple_udf, - tags={}, - ) - - sfv_proto = sfv.to_proto() - - new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) - assert new_sfv == sfv - - -def test_stream_feature_view_udfs(): - entity = Entity(name="driver_entity", join_keys=["test_key"]) - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - - @stream_feature_view( - entities=[entity], - ttl=timedelta(days=30), - owner="test@example.com", - online=True, - schema=[Field(name="dummy_field", dtype=Float32)], - description="desc", - aggregations=[ - Aggregation( - column="dummy_field", - function="max", - time_window=timedelta(days=1), - ) - ], - timestamp_field="event_timestamp", - source=stream_source, - ) - def pandas_udf(pandas_df): - import pandas as pd - - assert type(pandas_df) == pd.DataFrame - df = pandas_df.transform(lambda x: x + 10, axis=1) - return df - - import pandas as pd - - df = pd.DataFrame({"A": [1, 2, 3], "B": [10, 20, 30]}) - sfv = pandas_udf - sfv_proto = sfv.to_proto() - new_sfv = StreamFeatureView.from_proto(sfv_proto) - new_df = new_sfv.udf(df) - - expected_df = pd.DataFrame({"A": [11, 12, 13], "B": [20, 30, 40]}) - - assert new_df.equals(expected_df) - - -def test_stream_feature_view_initialization_with_optional_fields_omitted(): - entity = Entity(name="driver_entity", join_keys=["test_key"]) - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - - sfv = StreamFeatureView( - name="test kafka stream feature view", - entities=[entity], - schema=[], - description="desc", - timestamp_field="event_timestamp", - source=stream_source, - tags={}, - ) - sfv_proto = sfv.to_proto() - - new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) - assert new_sfv == sfv - - def test_hash(): file_source = FileSource(name="my-file-source", path="test.parquet") feature_view_1 = FeatureView( @@ -282,41 +117,3 @@ def test_hash(): def test_field_types(): with pytest.raises(TypeCheckError): Field(name="name", dtype=ValueType.INT32) - - -def test_stream_feature_view_proto_type(): - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - sfv = StreamFeatureView( - name="test stream featureview proto class", - entities=[], - ttl=timedelta(days=30), - source=stream_source, - aggregations=[], - ) - assert sfv.proto_class is StreamFeatureViewProto - - -def test_stream_feature_view_copy(): - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=FileSource(path="some path"), - ) - sfv = StreamFeatureView( - name="test stream featureview proto class", - entities=[], - ttl=timedelta(days=30), - source=stream_source, - aggregations=[], - ) - assert sfv == copy.copy(sfv) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 66d02c65d1..d561bd8e84 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -18,10 +18,7 @@ from feast.feature_view import FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource -from feast.on_demand_feature_view import ( - OnDemandFeatureView, - OnDemandPandasTransformation, -) +from feast.on_demand_feature_view import OnDemandFeatureView, PandasTransformation from feast.types import Float32 @@ -59,7 +56,7 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - transformation=OnDemandPandasTransformation( + feature_transformation=PandasTransformation( udf=udf1, udf_string="udf1 source code" ), ) @@ -70,7 +67,7 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - transformation=OnDemandPandasTransformation( + feature_transformation=PandasTransformation( udf=udf1, udf_string="udf1 source code" ), ) @@ -81,7 +78,7 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - transformation=OnDemandPandasTransformation( + feature_transformation=PandasTransformation( udf=udf2, udf_string="udf2 source code" ), ) @@ -92,7 +89,7 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - transformation=OnDemandPandasTransformation( + feature_transformation=PandasTransformation( udf=udf2, udf_string="udf2 source code" ), description="test", @@ -126,6 +123,68 @@ def test_hash(): } assert len(s4) == 3 - assert on_demand_feature_view_5.transformation == OnDemandPandasTransformation( + assert on_demand_feature_view_5.feature_transformation == PandasTransformation( udf2, "udf2 source code" ) + + +@pytest.mark.filterwarnings("ignore:udf and udf_string parameters are deprecated") +def test_from_proto_backwards_compatible_udf(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + sources = [feature_view] + on_demand_feature_view = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + feature_transformation=PandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + ) + + # We need a proto with the "udf1 source code" in the user_defined_function.body_text + # and to populate it in feature_transformation + proto = on_demand_feature_view.to_proto() + assert ( + on_demand_feature_view.feature_transformation.udf_string + == proto.spec.feature_transformation.user_defined_function.body_text + ) + # Because of the current set of code this is just confirming it is empty + assert proto.spec.user_defined_function.body_text == "" + assert proto.spec.user_defined_function.body == b"" + assert proto.spec.user_defined_function.name == "" + + # Assuming we pull it from the registry we set it to the feature_transformation proto values + proto.spec.user_defined_function.name = ( + proto.spec.feature_transformation.user_defined_function.name + ) + proto.spec.user_defined_function.body = ( + proto.spec.feature_transformation.user_defined_function.body + ) + proto.spec.user_defined_function.body_text = ( + proto.spec.feature_transformation.user_defined_function.body_text + ) + + # And now we're going to null the feature_transformation proto object before reserializing the entire proto + # proto.spec.user_defined_function.body_text = on_demand_feature_view.transformation.udf_string + proto.spec.feature_transformation.user_defined_function.name = "" + proto.spec.feature_transformation.user_defined_function.body = b"" + proto.spec.feature_transformation.user_defined_function.body_text = "" + + # And now we expect the to get the same object back under feature_transformation + reserialized_proto = OnDemandFeatureView.from_proto(proto) + assert ( + reserialized_proto.feature_transformation.udf_string + == on_demand_feature_view.feature_transformation.udf_string + ) diff --git a/sdk/python/tests/unit/test_stream_feature_view.py b/sdk/python/tests/unit/test_stream_feature_view.py new file mode 100644 index 0000000000..b53f9a593a --- /dev/null +++ b/sdk/python/tests/unit/test_stream_feature_view.py @@ -0,0 +1,252 @@ +import copy +from datetime import timedelta + +import pytest + +from feast.aggregation import Aggregation +from feast.batch_feature_view import BatchFeatureView +from feast.data_format import AvroFormat +from feast.data_source import KafkaSource, PushSource +from feast.entity import Entity +from feast.field import Field +from feast.infra.offline_stores.file_source import FileSource +from feast.protos.feast.core.StreamFeatureView_pb2 import ( + StreamFeatureView as StreamFeatureViewProto, +) +from feast.stream_feature_view import StreamFeatureView, stream_feature_view +from feast.types import Float32 + + +def test_create_batch_feature_view(): + batch_source = FileSource(path="some path") + BatchFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=batch_source, + ) + + with pytest.raises(TypeError): + BatchFeatureView( + name="test batch feature view", entities=[], ttl=timedelta(days=30) + ) + + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + with pytest.raises(ValueError): + BatchFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + ) + + +def test_create_stream_feature_view(): + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + StreamFeatureView( + name="test kafka stream feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + + push_source = PushSource( + name="push source", batch_source=FileSource(path="some path") + ) + StreamFeatureView( + name="test push source feature view", + entities=[], + ttl=timedelta(days=30), + source=push_source, + aggregations=[], + ) + + with pytest.raises(TypeError): + StreamFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + aggregations=[], + ) + + with pytest.raises(ValueError): + StreamFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=FileSource(path="some path"), + aggregations=[], + ) + + +def simple_udf(x: int): + return x + 3 + + +def test_stream_feature_view_serialization(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + sfv = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ) + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + udf=simple_udf, + tags={}, + ) + + sfv_proto = sfv.to_proto() + + new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) + assert new_sfv == sfv + assert ( + sfv_proto.spec.feature_transformation.user_defined_function.name == "simple_udf" + ) + + +def test_stream_feature_view_udfs(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + @stream_feature_view( + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ) + ], + timestamp_field="event_timestamp", + source=stream_source, + ) + def pandas_udf(pandas_df): + import pandas as pd + + assert type(pandas_df) == pd.DataFrame + df = pandas_df.transform(lambda x: x + 10, axis=1) + return df + + import pandas as pd + + df = pd.DataFrame({"A": [1, 2, 3], "B": [10, 20, 30]}) + sfv = pandas_udf + sfv_proto = sfv.to_proto() + new_sfv = StreamFeatureView.from_proto(sfv_proto) + new_df = new_sfv.udf(df) + + expected_df = pd.DataFrame({"A": [11, 12, 13], "B": [20, 30, 40]}) + + assert new_df.equals(expected_df) + + +def test_stream_feature_view_initialization_with_optional_fields_omitted(): + entity = Entity(name="driver_entity", join_keys=["test_key"]) + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + + sfv = StreamFeatureView( + name="test kafka stream feature view", + entities=[entity], + schema=[], + description="desc", + timestamp_field="event_timestamp", + source=stream_source, + tags={}, + ) + sfv_proto = sfv.to_proto() + + new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) + assert new_sfv == sfv + + +def test_stream_feature_view_proto_type(): + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + sfv = StreamFeatureView( + name="test stream featureview proto class", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + assert sfv.proto_class is StreamFeatureViewProto + + +def test_stream_feature_view_copy(): + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + sfv = StreamFeatureView( + name="test stream featureview proto class", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + assert sfv == copy.copy(sfv) diff --git a/setup.py b/setup.py index ca89b09bf6..2d7bf63778 100644 --- a/setup.py +++ b/setup.py @@ -177,6 +177,7 @@ "pytest-timeout==1.4.2", "pytest-ordering~=0.6.0", "pytest-mock==1.10.4", + "pytest-env", "Sphinx>4.0.0,<7", "testcontainers>=3.5,<4", "firebase-admin>=5.2.0,<6", diff --git a/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx b/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx index ee8e41bbf6..aac3f6ac5b 100644 --- a/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx +++ b/ui/src/pages/feature-views/OnDemandFeatureViewOverviewTab.tsx @@ -57,7 +57,7 @@ const OnDemandFeatureViewOverviewTab = ({ - {data?.spec?.userDefinedFunction?.bodyText} + {data?.spec?.featureTransformation?.userDefinedFunction?.bodyText}