diff --git a/Makefile b/Makefile index e1fd342881..a104bf9ca5 100644 --- a/Makefile +++ b/Makefile @@ -259,6 +259,27 @@ test-python-universal-cassandra: python -m pytest -x --integration \ sdk/python/tests +test-python-universal-hazelcast: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.hazelcast_repo_configuration \ + PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.hazelcast \ + FEAST_USAGE=False \ + IS_TEST=True \ + python -m pytest -n 8 --integration \ + -k "not test_universal_cli and \ + not test_go_feature_server and \ + not test_feature_logging and \ + not test_reorder_columns and \ + not test_logged_features_validation and \ + not test_lambda_materialization_consistency and \ + not test_offline_write and \ + not test_push_features_to_offline_store and \ + not gcs_registry and \ + not s3_registry and \ + not test_universal_types and \ + not test_snowflake" \ + sdk/python/tests + test-python-universal-cassandra-no-cloud-providers: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.cassandra_repo_configuration \ diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index cdca6f3784..9b22d1e286 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -97,6 +97,7 @@ * [Cassandra + Astra DB (contrib)](reference/online-stores/cassandra.md) * [MySQL (contrib)](reference/online-stores/mysql.md) * [Rockset (contrib)](reference/online-stores/rockset.md) + * [Hazelcast (contrib)](reference/online-stores/hazelcast.md) * [Providers](reference/providers/README.md) * [Local](reference/providers/local.md) * [Google Cloud Platform](reference/providers/google-cloud-platform.md) diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index 64b707a7e5..2fdfd50f7c 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -46,3 +46,8 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli [rockset.md](rockset.md) {% endcontent-ref %} +{% content-ref url="hazelcast.md" %} +[hazelcast.md](hazelcast.md) +{% endcontent-ref %} + + diff --git a/docs/reference/online-stores/hazelcast.md b/docs/reference/online-stores/hazelcast.md new file mode 100644 index 0000000000..ef65f42b31 --- /dev/null +++ b/docs/reference/online-stores/hazelcast.md @@ -0,0 +1,59 @@ +# Hazelcast online store + +## Description + +Hazelcast online store is in alpha development. + +The [Hazelcast](htpps://hazelcast.com) online store provides support for materializing feature values into a Hazelcast cluster for serving online features in real-time. +In order to use Hazelcast as online store, you need to have a running Hazelcast cluster. You can create a cluster using Hazelcast Viridian Serverless. See this [getting started](https://hazelcast.com/get-started/) page for more details. + +* Each feature view is mapped one-to-one to a specific Hazelcast IMap +* This implementation inherits all strengths of Hazelcast such as high availability, fault-tolerance, and data distribution. +* Secure TSL/SSL connection is supported by Hazelcast online store. +* You can set TTL (Time-To-Live) setting for your features in Hazelcast cluster. + +Each feature view corresponds to an IMap in Hazelcast cluster and the entries in that IMap corresponds to features of entities. +Each feature value stored separately and can be retrieved individually. + +## Getting started + +In order to use Hazelcast online store, you'll need to run `pip install 'feast[hazelcast]'`. You can then get started with the command `feast init REPO_NAME -t hazelcast`. + + +## Examples + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: hazelcast + cluster_name: dev + cluster_members: ["localhost:5701"] + key_ttl_seconds: 36000 +``` + +## Functionality Matrix + +| | Hazelcast | +| :-------------------------------------------------------- |:----------| +| write feature values to the online store | yes | +| read feature values from the online store | yes | +| update infrastructure (e.g. tables) in the online store | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | +| generate a plan of infrastructure changes | no | +| support for on-demand transforms | yes | +| readable by Python SDK | yes | +| readable by Java | no | +| readable by Go | no | +| support for entityless feature views | yes | +| support for concurrent writing to the same key | yes | +| support for ttl (time to live) at retrieval | yes | +| support for deleting expired data | yes | +| collocated by feature view | no | +| collocated by feature service | no | +| collocated by entity key | yes | + +To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix). + diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.hazelcast_online_store.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.hazelcast_online_store.rst new file mode 100644 index 0000000000..bf3ed9d7d6 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.hazelcast_online_store.rst @@ -0,0 +1,21 @@ +feast.infra.online\_stores.contrib.hazelcast\_online\_store package +=================================================================== + +Submodules +---------- + +feast.infra.online\_stores.contrib.hazelcast\_online\_store.hazelcast\_online\_store module +------------------------------------------------------------------------------------------- + +.. automodule:: feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.online_stores.contrib.hazelcast_online_store + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst index f10ff306f3..b6c8a404ee 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst @@ -8,6 +8,7 @@ Subpackages :maxdepth: 4 feast.infra.online_stores.contrib.cassandra_online_store + feast.infra.online_stores.contrib.hazelcast_online_store feast.infra.online_stores.contrib.hbase_online_store feast.infra.online_stores.contrib.mysql_online_store feast.infra.online_stores.contrib.rockset_online_store @@ -23,6 +24,14 @@ feast.infra.online\_stores.contrib.cassandra\_repo\_configuration module :undoc-members: :show-inheritance: +feast.infra.online\_stores.contrib.hazelcast\_repo\_configuration module +------------------------------------------------------------------------ + +.. automodule:: feast.infra.online_stores.contrib.hazelcast_repo_configuration + :members: + :undoc-members: + :show-inheritance: + feast.infra.online\_stores.contrib.hbase\_repo\_configuration module -------------------------------------------------------------------- diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 8adf115226..e96ab772a6 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -641,6 +641,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List "hbase", "cassandra", "rockset", + "hazelcast", ], case_sensitive=False, ), diff --git a/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/README.md new file mode 100644 index 0000000000..8c7f102239 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/README.md @@ -0,0 +1,89 @@ +# Hazelcast Online Store + +This contribution makes it possible to use [Hazelcast](https://hazelcast.com/) as online store for Feast. + +Once the Hazelcast client configuration is given inside `feature_store.yaml` file, everything else +is handled as with any other online store: schema creation, read/write from/to Hazelcast and remove operations. + +## Quick usage + +The following refers to the [Feast quickstart](https://docs.feast.dev/getting-started/quickstart) page. +Only the Step 2 is different from this tutorial since it requires you to configure your Hazelcast online store. + +### Creating the feature repository + +The easiest way to get started is to use the Feast CLI to initialize a new +feature store. Once Feast is installed, the command + +``` +feast init FEATURE_STORE_NAME -t hazelcast +``` + +will interactively help you create the `feature_store.yaml` with the +required configuration details to access your Hazelcast cluster. + +Alternatively, you can run `feast init -t FEATURE_STORE_NAME`, as described +in the quickstart, and then manually edit the `online_store` section in +the `feature_store.yaml` file as detailed below. + +The following steps (setup of feature definitions, deployment of the store, +generation of training data, materialization, fetching of online/offline +features) proceed exactly as in the general Feast quickstart instructions. + +#### Hazelcast setup + +In order to use [Hazelcast](https://hazelcast.com) as online store, you need to have a running Hazelcast cluster. +You can create a cluster using Hazelcast Viridian Serverless easily or deploy one on your local/remote machine. +See this [getting started](https://hazelcast.com/get-started/) page for more details. + +Hazelcast online store provides capability to connect local/remote or Hazelcast Viridian Serverless cluster. +Following is an example to connect local cluster named "dev" running on port 5701 with TLS/SSL enabled. + +```yaml +[...] +online_store: + type: hazelcast + cluster_name: dev + cluster_members: ["localhost:5701"] + ssl_cafile_path: /path/to/ca/file + ssl_certfile_path: /path/to/cert/file + ssl_keyfile_path: /path/to/key/file + ssl_password: ${SSL_PASSWORD} # The password will be read form the `SSL_PASSWORD` environment variable. + key_ttl_seconds: 86400 # The default is 0 and means infinite. +``` + +If you want to connect your Hazelcast Viridian cluster instead of local/remote one, specify your configuration as follows: + +```yaml +[...] +online_store: + type: hazelcast + cluster_name: YOUR_CLUSTER_ID + discovery_token: YOUR_DISCOVERY_TOKEN + ssl_cafile_path: /path/to/ca/file + ssl_certfile_path: /path/to/cert/file + ssl_keyfile_path: /path/to/key/file + ssl_password: ${SSL_PASSWORD} # The password will be read form the `SSL_PASSWORD` environment variable. + key_ttl_seconds: 86400 # The default is 0 and means infinite. +``` + +#### TTL configuration + +TTL is the maximum time in seconds for each feature to stay idle in the map. +It limits the lifetime of the features relative to the time of the last read or write access performed on them. +The features whose idle period exceeds this limit are expired and evicted automatically. +A feature is idle if no get or put is called on it. +Valid values are integers between 0 and Integer.MAX_VALUE. +Its default value is 0, which means infinite. + +```yaml +[...] +online_store: + [...] + key_ttl_seconds: 86400 +``` + +### More info + +You can learn about Hazelcast more from the [Hazelcast Documentation](https://docs.hazelcast.com/home/). + diff --git a/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/hazelcast_online_store.py b/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/hazelcast_online_store.py new file mode 100644 index 0000000000..7ec803a69c --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/hazelcast_online_store.py @@ -0,0 +1,321 @@ +# +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Hazelcast online store for Feast. +""" +import base64 +import threading +from datetime import datetime, timezone +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple + +import pytz +from hazelcast.client import HazelcastClient +from hazelcast.core import HazelcastJsonValue +from hazelcast.discovery import HazelcastCloudDiscovery +from pydantic import StrictStr + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel +from feast.usage import log_exceptions_and_usage + +# Exception messages +EXCEPTION_HAZELCAST_UNEXPECTED_CONFIGURATION_CLASS = ( + "Unexpected configuration object (not a HazelcastOnlineStoreConfig instance)" +) + +# Hazelcast schema names for each field +D_FEATURE_NAME = "feature_name" +D_FEATURE_VALUE = "feature_value" +D_ENTITY_KEY = "entity_key" +D_EVENT_TS = "event_ts" +D_CREATED_TS = "created_ts" + + +class HazelcastInvalidConfig(Exception): + def __init__(self, msg: str): + super().__init__(msg) + + +class HazelcastOnlineStoreConfig(FeastConfigBaseModel): + """Online store config for Hazelcast store""" + + type: Literal["hazelcast"] = "hazelcast" + """Online store type selector""" + + cluster_name: StrictStr = "dev" + """Name of the cluster you want to connect. The default cluster name is `dev`""" + + cluster_members: Optional[List[str]] = ["localhost:5701"] + """List of member addresses which is connected to your cluster""" + + discovery_token: Optional[StrictStr] = "" + """The discovery token of your Hazelcast Viridian cluster""" + + ssl_cafile_path: Optional[StrictStr] = "" + """Absolute path of CA certificates in PEM format.""" + + ssl_certfile_path: Optional[StrictStr] = "" + """Absolute path of the client certificate in PEM format.""" + + ssl_keyfile_path: Optional[StrictStr] = "" + """Absolute path of the private key file for the client certificate in the PEM format.""" + + ssl_password: Optional[StrictStr] = "" + """Password for decrypting the keyfile if it is encrypted.""" + + key_ttl_seconds: Optional[int] = 0 + """Hazelcast key bin TTL (in seconds) for expiring entities""" + + +class HazelcastOnlineStore(OnlineStore): + """ + Hazelcast online store implementation for Feast + + Attributes: + _client: Hazelcast client connection. + _lock: Prevent race condition while creating the client connection + """ + + _client: Optional[HazelcastClient] = None + _lock = threading.Lock() + + def _get_client(self, config: HazelcastOnlineStoreConfig): + """ + Establish the client connection to Hazelcast cluster, if not yet created, + and return it. + + The established client connection could be Hazelcast Viridian and SSL enabled based on user config. + + Args: + config: The HazelcastOnlineStoreConfig for the online store. + """ + if self._client is None: + with self._lock: + if self._client is None: + if config.discovery_token != "": + HazelcastCloudDiscovery._CLOUD_URL_BASE = ( + "api.viridian.hazelcast.com" + ) + self._client = HazelcastClient( + cluster_name=config.cluster_name, + cloud_discovery_token=config.discovery_token, + statistics_enabled=True, + ssl_enabled=True, + ssl_cafile=config.ssl_cafile_path, + ssl_certfile=config.ssl_certfile_path, + ssl_keyfile=config.ssl_keyfile_path, + ssl_password=config.ssl_password, + ) + elif config.ssl_cafile_path != "": + self._client = HazelcastClient( + cluster_name=config.cluster_name, + statistics_enabled=True, + ssl_enabled=True, + ssl_cafile=config.ssl_cafile_path, + ssl_certfile=config.ssl_certfile_path, + ssl_keyfile=config.ssl_keyfile_path, + ssl_password=config.ssl_password, + ) + else: + self._client = HazelcastClient( + statistics_enabled=True, + cluster_members=config.cluster_members, + cluster_name=config.cluster_name, + ) + return self._client + + @log_exceptions_and_usage(online_store="hazelcast") + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + online_store_config = config.online_store + if not isinstance(online_store_config, HazelcastOnlineStoreConfig): + raise HazelcastInvalidConfig( + EXCEPTION_HAZELCAST_UNEXPECTED_CONFIGURATION_CLASS + ) + + client = self._get_client(online_store_config) + fv_map = client.get_map(_map_name(config.project, table)) + + for entity_key, values, event_ts, created_ts in data: + entity_key_str = base64.b64encode( + serialize_entity_key( + entity_key, + entity_key_serialization_version=2, + ) + ).decode("utf-8") + event_ts_utc = pytz.utc.localize(event_ts, is_dst=None).timestamp() + created_ts_utc = 0.0 + if created_ts is not None: + created_ts_utc = pytz.utc.localize(created_ts, is_dst=None).timestamp() + for feature_name, value in values.items(): + feature_value = base64.b64encode(value.SerializeToString()).decode( + "utf-8" + ) + hz_combined_key = entity_key_str + feature_name + fv_map.put( + hz_combined_key, + HazelcastJsonValue( + { + D_ENTITY_KEY: entity_key_str, + D_FEATURE_NAME: feature_name, + D_FEATURE_VALUE: feature_value, + D_EVENT_TS: event_ts_utc, + D_CREATED_TS: created_ts_utc, + } + ), + online_store_config.key_ttl_seconds, + ) + if progress: + progress(1) + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + + online_store_config = config.online_store + if not isinstance(online_store_config, HazelcastOnlineStoreConfig): + raise HazelcastInvalidConfig( + EXCEPTION_HAZELCAST_UNEXPECTED_CONFIGURATION_CLASS + ) + + client = self._get_client(online_store_config) + entries: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + fv_map = client.get_map(_map_name(config.project, table)) + + hz_keys = [] + entity_keys_str = {} + for entity_key in entity_keys: + entity_key_str = base64.b64encode( + serialize_entity_key( + entity_key, + entity_key_serialization_version=2, + ) + ).decode("utf-8") + if requested_features: + feature_keys = [ + entity_key_str + feature for feature in requested_features + ] + else: + feature_keys = [entity_key_str + f.name for f in table.features] + hz_keys.extend(feature_keys) + entity_keys_str[entity_key_str] = feature_keys + + data = fv_map.get_all(hz_keys).result() + entities = [] + for key in hz_keys: + try: + data[key] = data[key].loads() + entities.append(data[key][D_ENTITY_KEY]) + except KeyError: + continue + + for key in entity_keys_str: + if key in entities: + entry = {} + event_ts = None + for f_key in entity_keys_str[key]: + row = data[f_key] + value = ValueProto() + value.ParseFromString(base64.b64decode(row[D_FEATURE_VALUE])) + entry[row[D_FEATURE_NAME]] = value + event_ts = datetime.fromtimestamp(row[D_EVENT_TS], tz=timezone.utc) + entries.append((event_ts, entry)) + else: + entries.append((None, None)) + return entries + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + online_store_config = config.online_store + if not isinstance(online_store_config, HazelcastOnlineStoreConfig): + raise HazelcastInvalidConfig( + EXCEPTION_HAZELCAST_UNEXPECTED_CONFIGURATION_CLASS + ) + + client = self._get_client(online_store_config) + project = config.project + + for table in tables_to_keep: + client.sql.execute( + f"""CREATE OR REPLACE MAPPING {_map_name(project, table)} ( + __key VARCHAR, + {D_ENTITY_KEY} VARCHAR, + {D_FEATURE_NAME} VARCHAR, + {D_FEATURE_VALUE} VARCHAR, + {D_EVENT_TS} DECIMAL, + {D_CREATED_TS} DECIMAL + ) + TYPE IMap + OPTIONS ( + 'keyFormat' = 'varchar', + 'valueFormat' = 'json-flat' + ) + """ + ).result() + + for table in tables_to_delete: + client.sql.execute( + f"DELETE FROM {_map_name(config.project, table)}" + ).result() + client.sql.execute( + f"DROP MAPPING IF EXISTS {_map_name(config.project, table)}" + ).result() + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + online_store_config = config.online_store + if not isinstance(online_store_config, HazelcastOnlineStoreConfig): + raise HazelcastInvalidConfig( + EXCEPTION_HAZELCAST_UNEXPECTED_CONFIGURATION_CLASS + ) + + client = self._get_client(online_store_config) + project = config.project + + for table in tables: + client.sql.execute(f"DELETE FROM {_map_name(config.project, table)}") + client.sql.execute(f"DROP MAPPING IF EXISTS {_map_name(project, table)}") + + +def _map_name(project: str, table: FeatureView) -> str: + return f"{project}_{table.name}" diff --git a/sdk/python/feast/infra/online_stores/contrib/hazelcast_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/hazelcast_repo_configuration.py new file mode 100644 index 0000000000..5b3ea6e307 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/hazelcast_repo_configuration.py @@ -0,0 +1,26 @@ +# +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.hazelcast import ( + HazelcastOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig(online_store_creator=HazelcastOnlineStoreCreator), +] diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 200f9d284e..3461ae058b 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -62,6 +62,7 @@ "cassandra": "feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore", "mysql": "feast.infra.online_stores.contrib.mysql_online_store.mysql.MySQLOnlineStore", "rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore", + "hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/feast/templates/hazelcast/__init__.py b/sdk/python/feast/templates/hazelcast/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/templates/hazelcast/bootstrap.py b/sdk/python/feast/templates/hazelcast/bootstrap.py new file mode 100644 index 0000000000..e5018e4fe0 --- /dev/null +++ b/sdk/python/feast/templates/hazelcast/bootstrap.py @@ -0,0 +1,176 @@ +# +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pathlib +from datetime import datetime, timedelta + +import click + +from feast.file_utils import ( + remove_lines_from_file, + replace_str_in_file, + write_setting_or_remove, +) + + +def collect_hazelcast_online_store_settings(): + c_cluster_name = None + c_members = None + c_ca_path = None + c_cert_path = None + c_key_path = None + c_discovery_token = None + c_ttl_seconds = None + + cluster_type = click.prompt( + "Would you like to connect a [L]ocal cluster or [V]iridian cluster?", + type=click.Choice(["L", "V"]), + show_choices=False, + default="L", + ) + is_viridian = cluster_type == "V" + + if is_viridian: + c_cluster_name = click.prompt("Cluster ID: ") + c_discovery_token = click.prompt("Discovery Token: ") + c_ca_path = click.prompt("CA file path: ") + c_cert_path = click.prompt("CERT file path: ") + c_key_path = click.prompt("Key file path: ") + else: + c_cluster_name = click.prompt( + "Cluster name: ", + default="dev", + ) + c_members = click.prompt( + "Cluster members:", + default="localhost:5701", + ) + needs_ssl = click.confirm("Use TLS/SSL?", default=False) + if needs_ssl: + c_ca_path = click.prompt("CA file path: ") + c_cert_path = click.prompt("CERT file path: ") + c_key_path = click.prompt("Key file path: ") + + c_ttl_seconds = click.prompt( + "Key TTL seconds: ", + default=0, + ) + return { + "c_cluster_name": c_cluster_name, + "c_members": c_members, + "c_ca_path": c_ca_path, + "c_cert_path": c_cert_path, + "c_key_path": c_key_path, + "c_discovery_token": c_discovery_token, + "c_ttl_seconds": c_ttl_seconds, + } + + +def apply_hazelcast_store_settings(config_file, settings): + write_setting_or_remove( + config_file, + settings["c_cluster_name"], + "cluster_name", + "c_cluster_name", + ) + # + write_setting_or_remove( + config_file, + settings["c_discovery_token"], + "discovery_token", + "c_discovery_token", + ) + # + if settings["c_members"] is not None: + settings["c_members"] = "[" + settings["c_members"] + "]" + write_setting_or_remove( + config_file, + settings["c_members"], + "cluster_members", + "c_members", + ) + # + write_setting_or_remove( + config_file, + settings["c_ca_path"], + "ssl_cafile_path", + "c_ca_path", + ) + # + write_setting_or_remove( + config_file, + settings["c_cert_path"], + "ssl_certfile_path", + "c_cert_path", + ) + # + write_setting_or_remove( + config_file, + settings["c_key_path"], + "ssl_keyfile_path", + "c_key_path", + ) + if settings["c_ca_path"] is None: + remove_lines_from_file( + config_file, + "ssl_password: ${SSL_PASSWORD}", + True, + ) + # + replace_str_in_file( + config_file, + "c_ttl_seconds", + f"{settings['c_ttl_seconds']}", + ) + + +def bootstrap(): + """ + Bootstrap() will automatically be called + from the init_repo() during `feast init`. + """ + from feast.driver_test_data import create_driver_hourly_stats_df + + repo_path = pathlib.Path(__file__).parent.absolute() / "feature_repo" + config_file = repo_path / "feature_store.yaml" + + data_path = repo_path / "data" + data_path.mkdir(exist_ok=True) + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + # + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df( + driver_entities, + start_date, + end_date, + ) + # + driver_stats_path = data_path / "driver_stats.parquet" + driver_df.to_parquet(path=str(driver_stats_path), allow_truncated_timestamps=True) + + # example_repo.py + example_py_file = repo_path / "example_repo.py" + replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path)) + + # store config yaml, interact with user and then customize file: + settings = collect_hazelcast_online_store_settings() + apply_hazelcast_store_settings(config_file, settings) + + +if __name__ == "__main__": + bootstrap() diff --git a/sdk/python/feast/templates/hazelcast/feature_repo/__init__.py b/sdk/python/feast/templates/hazelcast/feature_repo/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/templates/hazelcast/feature_repo/example_repo.py b/sdk/python/feast/templates/hazelcast/feature_repo/example_repo.py new file mode 100644 index 0000000000..131f1bcaa6 --- /dev/null +++ b/sdk/python/feast/templates/hazelcast/feature_repo/example_repo.py @@ -0,0 +1,139 @@ +# This is an example feature definition file + +from datetime import timedelta + +import pandas as pd + +from feast import ( + Entity, + FeatureService, + FeatureView, + Field, + FileSource, + PushSource, + RequestSource, +) +from feast.on_demand_feature_view import on_demand_feature_view +from feast.types import Float32, Float64, Int64 + +# Define an entity for the driver. You can think of an entity as a primary key used to +# fetch features. +driver = Entity(name="driver", join_keys=["driver_id"]) + +# Read data from parquet files. Parquet is convenient for local development mode. For +# production, you can use your favorite DWH, such as BigQuery. See Feast documentation +# for more info. +driver_stats_source = FileSource( + name="driver_hourly_stats_source", + path="%PARQUET_PATH%", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +# Our parquet files contain sample data that includes a driver_id column, timestamps and +# three feature column. Here we define a Feature View that will allow us to serve this +# data to our model online. +driver_stats_fv = FeatureView( + # The unique name of this feature view. Two feature views in a single + # project cannot have the same name + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + # The list of features defined below act as a schema to both define features + # for both materialization of features into a store, and are used as references + # during retrieval for building a training dataset or serving features + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_source, + # Tags are user defined key/value pairs that are attached to each + # feature view + tags={"team": "driver_performance"}, +) + +# Define a request data source which encodes features / information only +# available at request time (e.g. part of the user initiated HTTP request) +input_request = RequestSource( + name="vals_to_add", + schema=[ + Field(name="val_to_add", dtype=Int64), + Field(name="val_to_add_2", dtype=Int64), + ], +) + + +# Define an on demand feature view which can generate new features based on +# existing feature views and RequestSource features +@on_demand_feature_view( + sources=[driver_stats_fv, input_request], + schema=[ + Field(name="conv_rate_plus_val1", dtype=Float64), + Field(name="conv_rate_plus_val2", dtype=Float64), + ], +) +def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"] + df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"] + return df + + +# This groups features into a model version +driver_activity_v1 = FeatureService( + name="driver_activity_v1", + features=[ + driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view + transformed_conv_rate, # Selects all features from the feature view + ], +) +driver_activity_v2 = FeatureService( + name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate] +) + +# Defines a way to push data (to be available offline, online or both) into Feast. +driver_stats_push_source = PushSource( + name="driver_stats_push_source", + batch_source=driver_stats_source, +) + +# Defines a slightly modified version of the feature view from above, where the source +# has been changed to the push source. This allows fresh features to be directly pushed +# to the online store for this feature view. +driver_stats_fresh_fv = FeatureView( + name="driver_hourly_stats_fresh", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_push_source, # Changed from above + tags={"team": "driver_performance"}, +) + + +# Define an on demand feature view which can generate new features based on +# existing feature views and RequestSource features +@on_demand_feature_view( + sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV + schema=[ + Field(name="conv_rate_plus_val1", dtype=Float64), + Field(name="conv_rate_plus_val2", dtype=Float64), + ], +) +def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"] + df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"] + return df + + +driver_activity_v3 = FeatureService( + name="driver_activity_v3", + features=[driver_stats_fresh_fv, transformed_conv_rate_fresh], +) diff --git a/sdk/python/feast/templates/hazelcast/feature_repo/feature_store.yaml b/sdk/python/feast/templates/hazelcast/feature_repo/feature_store.yaml new file mode 100644 index 0000000000..e26d1bf750 --- /dev/null +++ b/sdk/python/feast/templates/hazelcast/feature_repo/feature_store.yaml @@ -0,0 +1,14 @@ +project: my_project +registry: data/registry.db +provider: local +online_store: + type: hazelcast + cluster_name: c_cluster_name + cluster_members: c_members + discovery_token: c_discovery_token + ssl_cafile_path: c_ca_path + ssl_certfile_path: c_cert_path + ssl_keyfile_path: c_key_path + ssl_password: ${SSL_PASSWORD} # This value will be read form the `SSL_PASSWORD` environment variable. + key_ttl_seconds: c_ttl_seconds +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/hazelcast/feature_repo/test_workflow.py b/sdk/python/feast/templates/hazelcast/feature_repo/test_workflow.py new file mode 100644 index 0000000000..eebeb11311 --- /dev/null +++ b/sdk/python/feast/templates/hazelcast/feature_repo/test_workflow.py @@ -0,0 +1,130 @@ +import subprocess +from datetime import datetime + +import pandas as pd + +from feast import FeatureStore +from feast.data_source import PushMode + + +def run_demo(): + store = FeatureStore(repo_path=".") + print("\n--- Run feast apply ---") + subprocess.run(["feast", "apply"]) + + print("\n--- Historical features for training ---") + fetch_historical_features_entity_df(store, for_batch_scoring=False) + + print("\n--- Historical features for batch scoring ---") + fetch_historical_features_entity_df(store, for_batch_scoring=True) + + print("\n--- Load features into online store ---") + store.materialize_incremental(end_date=datetime.now()) + + print("\n--- Online features ---") + fetch_online_features(store) + + print("\n--- Online features retrieved (instead) through a feature service---") + fetch_online_features(store, source="feature_service") + + print( + "\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---" + ) + fetch_online_features(store, source="push") + + print("\n--- Simulate a stream event ingestion of the hourly stats df ---") + event_df = pd.DataFrame.from_dict( + { + "driver_id": [1001], + "event_timestamp": [ + datetime.now(), + ], + "created": [ + datetime.now(), + ], + "conv_rate": [1.0], + "acc_rate": [1.0], + "avg_daily_trips": [1000], + } + ) + print(event_df) + store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE) + + print("\n--- Online features again with updated values from a stream push---") + fetch_online_features(store, source="push") + + print("\n--- Run feast teardown ---") + subprocess.run(["feast", "teardown"]) + + +def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool): + # Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve + # for all entities in the offline store instead + entity_df = pd.DataFrame.from_dict( + { + # entity's join key -> entity values + "driver_id": [1001, 1002, 1003], + # "event_timestamp" (reserved key) -> timestamps + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + # (optional) label name -> label values. Feast does not process these + "label_driver_reported_satisfaction": [1, 5, 3], + # values we're using for an on-demand transformation + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } + ) + # For batch scoring, we want the latest timestamps + if for_batch_scoring: + entity_df["event_timestamp"] = pd.to_datetime("now", utc=True) + + training_df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ], + ).to_df() + print(training_df.head()) + + +def fetch_online_features(store, source: str = ""): + entity_rows = [ + # {join_key: entity_value} + { + "driver_id": 1001, + "val_to_add": 1000, + "val_to_add_2": 2000, + }, + { + "driver_id": 1002, + "val_to_add": 1001, + "val_to_add_2": 2002, + }, + ] + if source == "feature_service": + features_to_fetch = store.get_feature_service("driver_activity_v1") + elif source == "push": + features_to_fetch = store.get_feature_service("driver_activity_v3") + else: + features_to_fetch = [ + "driver_hourly_stats:acc_rate", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ] + returned_features = store.get_online_features( + features=features_to_fetch, + entity_rows=entity_rows, + ).to_dict() + for key, value in sorted(returned_features.items()): + print(key, " : ", value) + + +if __name__ == "__main__": + run_demo() diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index ae99962f67..07cd01ab70 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -336,6 +336,8 @@ h11==0.14.0 # uvicorn happybase==1.2.0 # via feast (setup.py) +hazelcast-python-client==5.1 + # via feast (setup.py) hiredis==2.2.2 # via feast (setup.py) httpcore==0.16.3 diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index b1476660b6..a14e731d73 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -340,6 +340,8 @@ h11==0.14.0 # uvicorn happybase==1.2.0 # via feast (setup.py) +hazelcast-python-client==5.1 + # via feast (setup.py) hiredis==2.2.2 # via feast (setup.py) httpcore==0.16.3 diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 9b8dbf9353..77308d5ec8 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -336,6 +336,8 @@ h11==0.14.0 # uvicorn happybase==1.2.0 # via feast (setup.py) +hazelcast-python-client==5.1 + # via feast (setup.py) hiredis==2.2.2 # via feast (setup.py) httpcore==0.16.3 diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py b/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py new file mode 100644 index 0000000000..65d74135ae --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py @@ -0,0 +1,48 @@ +import logging +import random +import string +from typing import Any, Dict + +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs + +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class HazelcastOnlineStoreCreator(OnlineStoreCreator): + + cluster_name: str = "" + container: DockerContainer = None + + def __init__(self, project_name: str, **kwargs): + logging.getLogger("hazelcast").setLevel(logging.ERROR) + super().__init__(project_name) + self.cluster_name = "".join( + random.choice(string.ascii_lowercase) for _ in range(5) + ) + self.container = ( + DockerContainer("hazelcast/hazelcast") + .with_env("HZ_CLUSTERNAME", self.cluster_name) + .with_env("HZ_NETWORK_PORT_AUTOINCREMENT", "true") + .with_exposed_ports(5701) + ) + + def create_online_store(self) -> Dict[str, Any]: + self.container.start() + cluster_member = ( + self.container.get_container_host_ip() + + ":" + + self.container.get_exposed_port(5701) + ) + log_string_to_wait_for = r"Cluster name: " + self.cluster_name + wait_for_logs(self.container, predicate=log_string_to_wait_for, timeout=10) + return { + "type": "hazelcast", + "cluster_name": self.cluster_name, + "cluster_members": [cluster_member], + } + + def teardown(self): + self.container.stop() diff --git a/setup.py b/setup.py index 09a02479cc..3f361f82c3 100644 --- a/setup.py +++ b/setup.py @@ -137,6 +137,10 @@ "rockset>=1.0.3", ] +HAZELCAST_REQUIRED = [ + "hazelcast-python-client>=5.1", +] + CI_REQUIRED = ( [ "build", @@ -195,6 +199,7 @@ + CASSANDRA_REQUIRED + AZURE_REQUIRED + ROCKSET_REQUIRED + + HAZELCAST_REQUIRED ) @@ -361,6 +366,7 @@ def run(self): "hbase": HBASE_REQUIRED, "docs": DOCS_REQUIRED, "cassandra": CASSANDRA_REQUIRED, + "hazelcast": HAZELCAST_REQUIRED, }, include_package_data=True, license="Apache",