From 80f7f62caa3e467edb915238573d83b03342e588 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Fri, 4 Mar 2022 12:58:46 -0800 Subject: [PATCH] feat: Feast Spark Offline Store (#2349) * State of feast Signed-off-by: Kevin Zhang * Clean up changes Signed-off-by: Kevin Zhang * Fix random incorrect changes Signed-off-by: Kevin Zhang * Fix lint Signed-off-by: Kevin Zhang * Fix build errors Signed-off-by: Kevin Zhang * Fix lint Signed-off-by: Kevin Zhang * Add spark offline store components to test against current integration tests Signed-off-by: Kevin Zhang * Fix lint Signed-off-by: Kevin Zhang * Rename to pass checks Signed-off-by: Kevin Zhang * Fix issues Signed-off-by: Kevin Zhang * Fix type checking issues Signed-off-by: Kevin Zhang * Fix lint Signed-off-by: Kevin Zhang * Clean up print statements for first review Signed-off-by: Kevin Zhang * Fix lint Signed-off-by: Kevin Zhang * Fix flake 8 lint tests Signed-off-by: Kevin Zhang * Add warnings for alpha version release Signed-off-by: Kevin Zhang * Format Signed-off-by: Kevin Zhang * Address review Signed-off-by: Kevin Zhang * Address review Signed-off-by: Kevin Zhang * Fix lint Signed-off-by: Kevin Zhang * Add file store functionality Signed-off-by: Kevin Zhang * lint Signed-off-by: Kevin Zhang * Add example feature repo Signed-off-by: Kevin Zhang * Update data source creator Signed-off-by: Kevin Zhang * Make cli work for feast init with spark Signed-off-by: Kevin Zhang * Update the docs Signed-off-by: Kevin Zhang * Clean up code Signed-off-by: Kevin Zhang * Clean up more code Signed-off-by: Kevin Zhang * Uncomment repo configs Signed-off-by: Kevin Zhang * Fix setup.py Signed-off-by: Kevin Zhang * Update dependencies Signed-off-by: Kevin Zhang * Fix ci dependencies Signed-off-by: Kevin Zhang * Screwed up rebase Signed-off-by: Kevin Zhang * Screwed up rebase Signed-off-by: Kevin Zhang * Screwed up rebase Signed-off-by: Kevin Zhang * Realign with master Signed-off-by: Kevin Zhang * Fix accidental changes Signed-off-by: Kevin Zhang * Make type map change cleaner Signed-off-by: Kevin Zhang * Address review comments Signed-off-by: Kevin Zhang * Fix tests accidentally broken Signed-off-by: Kevin Zhang * Add comments Signed-off-by: Kevin Zhang * Reformat Signed-off-by: Kevin Zhang * Fix logger Signed-off-by: Kevin Zhang * Remove unused imports Signed-off-by: Kevin Zhang * Fix imports Signed-off-by: Kevin Zhang * Fix CI dependencies Signed-off-by: Danny Chiao * Prefix destinations with project name Signed-off-by: Kevin Zhang * Update comment Signed-off-by: Kevin Zhang * Fix 3.8 Signed-off-by: Kevin Zhang * temporary fix Signed-off-by: Kevin Zhang * rollback Signed-off-by: Kevin Zhang * update Signed-off-by: Kevin Zhang * Update ci? Signed-off-by: Kevin Zhang * Move third party to contrib Signed-off-by: Kevin Zhang * Fix imports Signed-off-by: Kevin Zhang * Remove third_party refactor Signed-off-by: Kevin Zhang * Revert ci requirements and update comment in type map Signed-off-by: Kevin Zhang * Revert 3.8-requirements Signed-off-by: Kevin Zhang Co-authored-by: Danny Chiao Signed-off-by: Achal Shah --- docs/reference/data-sources/README.md | 2 + docs/reference/data-sources/spark.md | 45 ++ docs/reference/offline-stores/README.md | 2 + docs/reference/offline-stores/spark.md | 38 ++ sdk/python/feast/__init__.py | 4 + sdk/python/feast/cli.py | 4 +- sdk/python/feast/inference.py | 7 +- .../contrib/spark_offline_store/__init__.py | 0 .../contrib/spark_offline_store/spark.py | 522 ++++++++++++++++++ .../spark_offline_store/spark_source.py | 242 ++++++++ sdk/python/feast/repo_config.py | 1 + sdk/python/feast/templates/spark/__init__.py | 0 sdk/python/feast/templates/spark/bootstrap.py | 50 ++ sdk/python/feast/templates/spark/example.py | 64 +++ .../feast/templates/spark/feature_store.yaml | 14 + sdk/python/feast/type_map.py | 90 ++- .../requirements/py3.7-ci-requirements.txt | 13 +- .../requirements/py3.8-ci-requirements.txt | 162 +++--- .../requirements/py3.9-ci-requirements.txt | 162 +++--- sdk/python/setup.py | 20 +- .../data_sources/spark_data_source_creator.py | 108 ++++ 21 files changed, 1366 insertions(+), 184 deletions(-) create mode 100644 docs/reference/data-sources/spark.md create mode 100644 docs/reference/offline-stores/spark.md create mode 100644 sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/__init__.py create mode 100644 sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py create mode 100644 sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py create mode 100644 sdk/python/feast/templates/spark/__init__.py create mode 100644 sdk/python/feast/templates/spark/bootstrap.py create mode 100644 sdk/python/feast/templates/spark/example.py create mode 100644 sdk/python/feast/templates/spark/feature_store.yaml create mode 100644 sdk/python/tests/integration/feature_repos/universal/data_sources/spark_data_source_creator.py diff --git a/docs/reference/data-sources/README.md b/docs/reference/data-sources/README.md index fc6e136a9c..cba652a91a 100644 --- a/docs/reference/data-sources/README.md +++ b/docs/reference/data-sources/README.md @@ -9,3 +9,5 @@ Please see [Data Source](../../getting-started/concepts/feature-view.md#data-sou {% page-ref page="bigquery.md" %} {% page-ref page="redshift.md" %} + +{% page-ref page="spark.md" %} diff --git a/docs/reference/data-sources/spark.md b/docs/reference/data-sources/spark.md new file mode 100644 index 0000000000..25b69c7355 --- /dev/null +++ b/docs/reference/data-sources/spark.md @@ -0,0 +1,45 @@ +# Spark + +## Description + +**NOTE**: Spark data source api is currently in alpha development and the API is not completely stable. The API may change or update in the future. + +The spark data source API allows for the retrieval of historical feature values from file/database sources for building training datasets as well as materializing features into an online store. + +* Either a table name, a SQL query, or a file path can be provided. + +## Examples + +Using a table reference from SparkSession(for example, either in memory or a Hive Metastore) + +```python +from feast import SparkSource + +my_spark_source = SparkSource( + table="FEATURE_TABLE", +) +``` + +Using a query + +```python +from feast import SparkSource + +my_spark_source = SparkSource( + query="SELECT timestamp as ts, created, f1, f2 " + "FROM spark_table", +) +``` + +Using a file reference + +```python +from feast import SparkSource + +my_spark_source = SparkSource( + path=f"{CURRENT_DIR}/data/driver_hourly_stats", + file_format="parquet", + event_timestamp_column="event_timestamp", + created_timestamp_column="created", +) +``` diff --git a/docs/reference/offline-stores/README.md b/docs/reference/offline-stores/README.md index 141a34d03b..b3c85470b9 100644 --- a/docs/reference/offline-stores/README.md +++ b/docs/reference/offline-stores/README.md @@ -9,3 +9,5 @@ Please see [Offline Store](../../getting-started/architecture-and-components/off {% page-ref page="bigquery.md" %} {% page-ref page="redshift.md" %} + +{% page-ref page="spark.md" %} diff --git a/docs/reference/offline-stores/spark.md b/docs/reference/offline-stores/spark.md new file mode 100644 index 0000000000..48ddf46d17 --- /dev/null +++ b/docs/reference/offline-stores/spark.md @@ -0,0 +1,38 @@ +# Spark + +## Description + +The Spark offline store is an offline store currently in alpha development that provides support for reading [SparkSources](../data-sources/spark.md). + +## Disclaimer + +This Spark offline store still does not achieve full test coverage and continues to fail some integration tests when integrating with the feast universal test suite. Please do NOT assume complete stability of the API. + +* Spark tables and views are allowed as sources that are loaded in from some Spark store(e.g in Hive or in memory). +* Entity dataframes can be provided as a SQL query or can be provided as a Pandas dataframe. Pandas dataframes will be converted to a Spark dataframe and processed as a temporary view. +* A `SparkRetrievalJob` is returned when calling `get_historical_features()`. + * This allows you to call + * `to_df` to retrieve the pandas dataframe. + * `to_arrow` to retrieve the dataframe as a pyarrow Table. + * `to_spark_df` to retrieve the dataframe the spark. + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_project +registry: data/registry.db +provider: local +offline_store: + type: spark + spark_conf: + spark.master: "local[*]" + spark.ui.enabled: "false" + spark.eventLog.enabled: "false" + spark.sql.catalogImplementation: "hive" + spark.sql.parser.quotedRegexColumnNames: "true" + spark.sql.session.timeZone: "UTC" +online_store: + path: data/online_store.db +``` +{% endcode %} diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index 83b504b0cb..0af226aa05 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -3,6 +3,9 @@ from pkg_resources import DistributionNotFound, get_distribution from feast.infra.offline_stores.bigquery_source import BigQuerySource +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( + SparkSource, +) from feast.infra.offline_stores.file_source import FileSource from feast.infra.offline_stores.redshift_source import RedshiftSource from feast.infra.offline_stores.snowflake_source import SnowflakeSource @@ -47,4 +50,5 @@ "RedshiftSource", "RequestFeatureView", "SnowflakeSource", + "SparkSource", ] diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index c23c3d104a..febac56fcc 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -477,7 +477,9 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List @click.option( "--template", "-t", - type=click.Choice(["local", "gcp", "aws", "snowflake"], case_sensitive=False), + type=click.Choice( + ["local", "gcp", "aws", "snowflake", "spark"], case_sensitive=False + ), help="Specify a template for the created project", default="local", ) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index ce8fa919f1..3fc6f054f1 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -8,6 +8,7 @@ FileSource, RedshiftSource, SnowflakeSource, + SparkSource, ) from feast.data_source import DataSource from feast.errors import RegistryInferenceFailure @@ -84,7 +85,9 @@ def update_data_sources_with_inferred_event_timestamp_col( ): # prepare right match pattern for data source ts_column_type_regex_pattern = "" - if isinstance(data_source, FileSource): + if isinstance(data_source, FileSource) or isinstance( + data_source, SparkSource + ): ts_column_type_regex_pattern = r"^timestamp" elif isinstance(data_source, BigQuerySource): ts_column_type_regex_pattern = "TIMESTAMP|DATETIME" @@ -97,7 +100,7 @@ def update_data_sources_with_inferred_event_timestamp_col( "DataSource", """ DataSource inferencing of event_timestamp_column is currently only supported - for FileSource and BigQuerySource. + for FileSource, SparkSource, BigQuerySource, RedshiftSource, and SnowflakeSource. """, ) # for informing the type checker diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/__init__.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py new file mode 100644 index 0000000000..95e306aa60 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -0,0 +1,522 @@ +import warnings +from datetime import datetime +from typing import Dict, List, Optional, Tuple, Union + +import numpy as np +import pandas +import pandas as pd +import pyarrow +import pyspark +from pydantic import StrictStr +from pyspark import SparkConf +from pyspark.sql import SparkSession +from pytz import utc + +from feast import FeatureView, OnDemandFeatureView +from feast.data_source import DataSource +from feast.errors import InvalidEntityType +from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL +from feast.infra.offline_stores import offline_utils +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( + SparkSource, +) +from feast.infra.offline_stores.offline_store import ( + OfflineStore, + RetrievalJob, + RetrievalMetadata, +) +from feast.registry import Registry +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.saved_dataset import SavedDatasetStorage +from feast.type_map import spark_schema_to_np_dtypes + + +class SparkOfflineStoreConfig(FeastConfigBaseModel): + type: StrictStr = "spark" + """ Offline store type selector""" + + spark_conf: Optional[Dict[str, str]] = None + """ Configuration overlay for the spark session """ + # sparksession is not serializable and we dont want to pass it around as an argument + + +class SparkOfflineStore(OfflineStore): + @staticmethod + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + event_timestamp_column: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + spark_session = get_spark_session_or_start_new_with_repoconfig( + config.offline_store + ) + assert isinstance(config.offline_store, SparkOfflineStoreConfig) + assert isinstance(data_source, SparkSource) + + warnings.warn( + "The spark offline store is an experimental feature in alpha development. " + "Some functionality may still be unstable so functionality can change in the future.", + RuntimeWarning, + ) + + print("Pulling latest features from spark offline store") + + from_expression = data_source.get_table_query_string() + + partition_by_join_key_string = ", ".join(join_key_columns) + if partition_by_join_key_string != "": + partition_by_join_key_string = ( + "PARTITION BY " + partition_by_join_key_string + ) + timestamps = [event_timestamp_column] + if created_timestamp_column: + timestamps.append(created_timestamp_column) + timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" + field_string = ", ".join(join_key_columns + feature_name_columns + timestamps) + + start_date_str = _format_datetime(start_date) + end_date_str = _format_datetime(end_date) + query = f""" + SELECT + {field_string} + {f", {repr(DUMMY_ENTITY_VAL)} AS {DUMMY_ENTITY_ID}" if not join_key_columns else ""} + FROM ( + SELECT {field_string}, + ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS feast_row_ + FROM {from_expression} t1 + WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}') + ) t2 + WHERE feast_row_ = 1 + """ + + return SparkRetrievalJob( + spark_session=spark_session, + query=query, + full_feature_names=False, + on_demand_feature_views=None, + ) + + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pandas.DataFrame, str], + registry: Registry, + project: str, + full_feature_names: bool = False, + ) -> RetrievalJob: + assert isinstance(config.offline_store, SparkOfflineStoreConfig) + warnings.warn( + "The spark offline store is an experimental feature in alpha development. " + "Some functionality may still be unstable so functionality can change in the future.", + RuntimeWarning, + ) + spark_session = get_spark_session_or_start_new_with_repoconfig( + store_config=config.offline_store + ) + tmp_entity_df_table_name = offline_utils.get_temp_entity_table_name() + + entity_schema = _upload_entity_df_and_get_entity_schema( + spark_session=spark_session, + table_name=tmp_entity_df_table_name, + entity_df=entity_df, + ) + event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df( + entity_schema=entity_schema, + ) + entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( + entity_df, event_timestamp_col, spark_session, + ) + + expected_join_keys = offline_utils.get_expected_join_keys( + project=project, feature_views=feature_views, registry=registry + ) + offline_utils.assert_expected_columns_in_entity_df( + entity_schema=entity_schema, + join_keys=expected_join_keys, + entity_df_event_timestamp_col=event_timestamp_col, + ) + + query_context = offline_utils.get_feature_view_query_context( + feature_refs, + feature_views, + registry, + project, + entity_df_event_timestamp_range, + ) + + query = offline_utils.build_point_in_time_query( + feature_view_query_contexts=query_context, + left_table_query_string=tmp_entity_df_table_name, + entity_df_event_timestamp_col=event_timestamp_col, + entity_df_columns=entity_schema.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=full_feature_names, + ) + + return SparkRetrievalJob( + spark_session=spark_session, + query=query, + full_feature_names=full_feature_names, + on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs( + feature_refs, project, registry + ), + metadata=RetrievalMetadata( + features=feature_refs, + keys=list(set(entity_schema.keys()) - {event_timestamp_col}), + min_event_timestamp=entity_df_event_timestamp_range[0], + max_event_timestamp=entity_df_event_timestamp_range[1], + ), + ) + + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + event_timestamp_column: str, + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + """ + Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column + have all already been mapped to column names of the source table and those column names are the values passed + into this function. + """ + assert isinstance(data_source, SparkSource) + warnings.warn( + "The spark offline store is an experimental feature in alpha development. " + "This API is unstable and it could and most probably will be changed in the future.", + RuntimeWarning, + ) + from_expression = data_source.get_table_query_string() + + field_string = ( + '"' + + '", "'.join( + join_key_columns + feature_name_columns + [event_timestamp_column] + ) + + '"' + ) + start_date = start_date.astimezone(tz=utc) + end_date = end_date.astimezone(tz=utc) + + query = f""" + SELECT {field_string} + FROM {from_expression} + WHERE "{event_timestamp_column}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + """ + spark_session = get_spark_session_or_start_new_with_repoconfig( + store_config=config.offline_store + ) + return SparkRetrievalJob( + spark_session=spark_session, query=query, full_feature_names=False + ) + + +class SparkRetrievalJob(RetrievalJob): + def __init__( + self, + spark_session: SparkSession, + query: str, + full_feature_names: bool, + on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None, + metadata: Optional[RetrievalMetadata] = None, + ): + super().__init__() + self.spark_session = spark_session + self.query = query + self._full_feature_names = full_feature_names + self._on_demand_feature_views = on_demand_feature_views + self._metadata = metadata + + @property + def full_feature_names(self) -> bool: + return self._full_feature_names + + @property + def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: + return self._on_demand_feature_views + + def to_spark_df(self) -> pyspark.sql.DataFrame: + statements = self.query.split( + "---EOS---" + ) # TODO can do better than this dirty split + *_, last = map(self.spark_session.sql, statements) + return last + + def _to_df_internal(self) -> pd.DataFrame: + """Return dataset as Pandas DataFrame synchronously""" + return self.to_spark_df().toPandas() + + def _to_arrow_internal(self) -> pyarrow.Table: + """Return dataset as pyarrow Table synchronously""" + df = self.to_df() + return pyarrow.Table.from_pandas(df) # noqa + + def persist(self, storage: SavedDatasetStorage): + """ + Run the retrieval and persist the results in the same offline store used for read. + """ + pass + + @property + def metadata(self) -> Optional[RetrievalMetadata]: + """ + Return metadata information about retrieval. + Should be available even before materializing the dataset itself. + """ + return self._metadata + + +def get_spark_session_or_start_new_with_repoconfig( + store_config: SparkOfflineStoreConfig, +) -> SparkSession: + spark_session = SparkSession.getActiveSession() + if not spark_session: + spark_builder = SparkSession.builder + spark_conf = store_config.spark_conf + if spark_conf: + spark_builder = spark_builder.config( + conf=SparkConf().setAll([(k, v) for k, v in spark_conf.items()]) + ) + + spark_session = spark_builder.getOrCreate() + spark_session.conf.set("spark.sql.parser.quotedRegexColumnNames", "true") + return spark_session + + +def _get_entity_df_event_timestamp_range( + entity_df: Union[pd.DataFrame, str], + entity_df_event_timestamp_col: str, + spark_session: SparkSession, +) -> Tuple[datetime, datetime]: + if isinstance(entity_df, pd.DataFrame): + entity_df_event_timestamp = entity_df.loc[ + :, entity_df_event_timestamp_col + ].infer_objects() + if pd.api.types.is_string_dtype(entity_df_event_timestamp): + entity_df_event_timestamp = pd.to_datetime( + entity_df_event_timestamp, utc=True + ) + entity_df_event_timestamp_range = ( + entity_df_event_timestamp.min().to_pydatetime(), + entity_df_event_timestamp.max().to_pydatetime(), + ) + elif isinstance(entity_df, str): + # If the entity_df is a string (SQL query), determine range + # from table + df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col) + # TODO(kzhang132): need utc conversion here. + entity_df_event_timestamp_range = ( + df.agg({entity_df_event_timestamp_col: "max"}).collect()[0][0], + df.agg({entity_df_event_timestamp_col: "min"}).collect()[0][0], + ) + else: + raise InvalidEntityType(type(entity_df)) + + return entity_df_event_timestamp_range + + +def _upload_entity_df_and_get_entity_schema( + spark_session: SparkSession, + table_name: str, + entity_df: Union[pandas.DataFrame, str], +) -> Dict[str, np.dtype]: + if isinstance(entity_df, pd.DataFrame): + spark_session.createDataFrame(entity_df).createOrReplaceTempView(table_name) + return dict(zip(entity_df.columns, entity_df.dtypes)) + elif isinstance(entity_df, str): + spark_session.sql(entity_df).createOrReplaceTempView(table_name) + limited_entity_df = spark_session.table(table_name) + return dict( + zip( + limited_entity_df.columns, + spark_schema_to_np_dtypes(limited_entity_df.dtypes), + ) + ) + else: + raise InvalidEntityType(type(entity_df)) + + +def _format_datetime(t: datetime) -> str: + # Since Hive does not support timezone, need to transform to utc. + if t.tzinfo: + t = t.astimezone(tz=utc) + dt = t.strftime("%Y-%m-%d %H:%M:%S.%f") + return dt + + +MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """/* + Compute a deterministic hash for the `left_table_query_string` that will be used throughout + all the logic as the field to GROUP BY the data +*/ +CREATE OR REPLACE TEMPORARY VIEW entity_dataframe AS ( + SELECT *, + {{entity_df_event_timestamp_col}} AS entity_timestamp + {% for featureview in featureviews %} + ,CONCAT( + {% for entity in featureview.entities %} + CAST({{entity}} AS STRING), + {% endfor %} + CAST({{entity_df_event_timestamp_col}} AS STRING) + ) AS {{featureview.name}}__entity_row_unique_id + {% endfor %} + FROM {{ left_table_query_string }} +); +---EOS--- +-- Start create temporary table *__base +{% for featureview in featureviews %} +CREATE OR REPLACE TEMPORARY VIEW {{ featureview.name }}__base AS +WITH {{ featureview.name }}__entity_dataframe AS ( + SELECT + {{ featureview.entities | join(', ')}}, + entity_timestamp, + {{featureview.name}}__entity_row_unique_id + FROM entity_dataframe + GROUP BY {{ featureview.entities | join(', ')}}, entity_timestamp, {{featureview.name}}__entity_row_unique_id +), +/* + This query template performs the point-in-time correctness join for a single feature set table + to the provided entity table. + 1. We first join the current feature_view to the entity dataframe that has been passed. + This JOIN has the following logic: + - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + is less than the one provided in the entity dataframe + - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + is higher the the one provided minus the TTL + - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been + computed previously + The output of this CTE will contain all the necessary information and already filtered out most + of the data that is not relevant. +*/ +{{ featureview.name }}__subquery AS ( + SELECT + {{ featureview.event_timestamp_column }} as event_timestamp, + {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} + {{ featureview.entity_selections | join(', ')}}, + {% for feature in featureview.features %} + {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %} + {% endfor %} + FROM {{ featureview.table_subquery }} AS subquery + INNER JOIN ( + SELECT MAX(entity_timestamp) as max_entity_timestamp_ + {% if featureview.ttl == 0 %}{% else %} + ,(MIN(entity_timestamp) - interval '{{ featureview.ttl }}' second) as min_entity_timestamp_ + {% endif %} + FROM entity_dataframe + ) AS temp + ON ( + {{ featureview.event_timestamp_column }} <= max_entity_timestamp_ + {% if featureview.ttl == 0 %}{% else %} + AND {{ featureview.event_timestamp_column }} >= min_entity_timestamp_ + {% endif %} + ) +) +SELECT + subquery.*, + entity_dataframe.entity_timestamp, + entity_dataframe.{{featureview.name}}__entity_row_unique_id +FROM {{ featureview.name }}__subquery AS subquery +INNER JOIN ( + SELECT * + {% if featureview.ttl == 0 %}{% else %} + , (entity_timestamp - interval '{{ featureview.ttl }}' second) as ttl_entity_timestamp + {% endif %} + FROM {{ featureview.name }}__entity_dataframe +) AS entity_dataframe +ON ( + subquery.event_timestamp <= entity_dataframe.entity_timestamp + {% if featureview.ttl == 0 %}{% else %} + AND subquery.event_timestamp >= entity_dataframe.ttl_entity_timestamp + {% endif %} + {% for entity in featureview.entities %} + AND subquery.{{ entity }} = entity_dataframe.{{ entity }} + {% endfor %} +); +---EOS--- +{% endfor %} +-- End create temporary table *__base +{% for featureview in featureviews %} +{% if loop.first %}WITH{% endif %} +/* + 2. If the `created_timestamp_column` has been set, we need to + deduplicate the data first. This is done by calculating the + `MAX(created_at_timestamp)` for each event_timestamp. + We then join the data on the next CTE +*/ +{% if featureview.created_timestamp_column %} +{{ featureview.name }}__dedup AS ( + SELECT + {{featureview.name}}__entity_row_unique_id, + event_timestamp, + MAX(created_timestamp) as created_timestamp + FROM {{ featureview.name }}__base + GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp +), +{% endif %} +/* + 3. The data has been filtered during the first CTE "*__base" + Thus we only need to compute the latest timestamp of each feature. +*/ +{{ featureview.name }}__latest AS ( + SELECT + base.{{featureview.name}}__entity_row_unique_id, + MAX(base.event_timestamp) AS event_timestamp + {% if featureview.created_timestamp_column %} + ,MAX(base.created_timestamp) AS created_timestamp + {% endif %} + FROM {{ featureview.name }}__base AS base + {% if featureview.created_timestamp_column %} + INNER JOIN {{ featureview.name }}__dedup AS dedup + ON ( + dedup.{{featureview.name}}__entity_row_unique_id=base.{{featureview.name}}__entity_row_unique_id + AND dedup.event_timestamp=base.event_timestamp + AND dedup.created_timestamp=base.created_timestamp + ) + {% endif %} + GROUP BY base.{{featureview.name}}__entity_row_unique_id +), +/* + 4. Once we know the latest value of each feature for a given timestamp, + we can join again the data back to the original "base" dataset +*/ +{{ featureview.name }}__cleaned AS ( + SELECT base.* + FROM {{ featureview.name }}__base AS base + INNER JOIN {{ featureview.name }}__latest AS latest + ON ( + base.{{featureview.name}}__entity_row_unique_id=latest.{{featureview.name}}__entity_row_unique_id + AND base.event_timestamp=latest.event_timestamp + {% if featureview.created_timestamp_column %} + AND base.created_timestamp=latest.created_timestamp + {% endif %} + ) +){% if loop.last %}{% else %}, {% endif %} +{% endfor %} +/* + Joins the outputs of multiple time travel joins to a single table. + The entity_dataframe dataset being our source of truth here. + */ +SELECT `(entity_timestamp|{% for featureview in featureviews %}{{featureview.name}}__entity_row_unique_id{% if loop.last %}{% else %}|{% endif %}{% endfor %})?+.+` +FROM entity_dataframe +{% for featureview in featureviews %} +LEFT JOIN ( + SELECT + {{featureview.name}}__entity_row_unique_id + {% for feature in featureview.features %} + ,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %} + {% endfor %} + FROM {{ featureview.name }}__cleaned +) AS {{ featureview.name }}__joined +ON ( + {{ featureview.name }}__joined.{{featureview.name}}__entity_row_unique_id=entity_dataframe.{{featureview.name}}__entity_row_unique_id +) +{% endfor %}""" diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py new file mode 100644 index 0000000000..50e365a631 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -0,0 +1,242 @@ +import logging +import pickle +import traceback +import warnings +from enum import Enum +from typing import Any, Callable, Dict, Iterable, Optional, Tuple + +from pyspark.sql import SparkSession + +from feast.data_source import DataSource +from feast.infra.offline_stores.offline_utils import get_temp_entity_table_name +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.SavedDataset_pb2 import ( + SavedDatasetStorage as SavedDatasetStorageProto, +) +from feast.repo_config import RepoConfig +from feast.saved_dataset import SavedDatasetStorage +from feast.type_map import spark_to_feast_value_type +from feast.value_type import ValueType + +logger = logging.getLogger(__name__) + + +class SparkSourceFormat(Enum): + csv = "csv" + json = "json" + parquet = "parquet" + + +class SparkSource(DataSource): + def __init__( + self, + table: Optional[str] = None, + query: Optional[str] = None, + path: Optional[str] = None, + file_format: Optional[str] = None, + event_timestamp_column: Optional[str] = None, + created_timestamp_column: Optional[str] = None, + field_mapping: Optional[Dict[str, str]] = None, + date_partition_column: Optional[str] = None, + ): + super().__init__( + event_timestamp_column, + created_timestamp_column, + field_mapping, + date_partition_column, + ) + warnings.warn( + "The spark data source API is an experimental feature in alpha development. " + "This API is unstable and it could and most probably will be changed in the future.", + RuntimeWarning, + ) + self.allowed_formats = [format.value for format in SparkSourceFormat] + + # Check that only one of the ways to load a spark dataframe can be used. + if sum([(arg is not None) for arg in [table, query, path]]) != 1: + raise ValueError( + "Exactly one of params(table, query, path) must be specified." + ) + + if path is not None: + if file_format is None: + raise ValueError( + "If 'path' is specified, then 'file_format' is required." + ) + if file_format not in self.allowed_formats: + raise ValueError( + f"'file_format' should be one of {self.allowed_formats}" + ) + + self.spark_options = SparkOptions( + table=table, query=query, path=path, file_format=file_format, + ) + + @property + def table(self): + """ + Returns the table of this feature data source + """ + return self.spark_options.table + + @property + def query(self): + """ + Returns the query of this feature data source + """ + return self.spark_options.query + + @property + def path(self): + """ + Returns the path of the spark data source file. + """ + return self.spark_options.path + + @property + def file_format(self): + """ + Returns the file format of this feature data source. + """ + return self.spark_options.file_format + + @staticmethod + def from_proto(data_source: DataSourceProto) -> Any: + assert data_source.HasField("custom_options") + + spark_options = SparkOptions.from_proto(data_source.custom_options) + return SparkSource( + field_mapping=dict(data_source.field_mapping), + table=spark_options.table, + query=spark_options.query, + path=spark_options.path, + file_format=spark_options.file_format, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, + date_partition_column=data_source.date_partition_column, + ) + + def to_proto(self) -> DataSourceProto: + data_source_proto = DataSourceProto( + type=DataSourceProto.CUSTOM_SOURCE, + field_mapping=self.field_mapping, + custom_options=self.spark_options.to_proto(), + ) + + data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.created_timestamp_column = self.created_timestamp_column + data_source_proto.date_partition_column = self.date_partition_column + + return data_source_proto + + def validate(self, config: RepoConfig): + self.get_table_column_names_and_types(config) + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + return spark_to_feast_value_type + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + from feast.infra.offline_stores.contrib.spark_offline_store.spark import ( + get_spark_session_or_start_new_with_repoconfig, + ) + + spark_session = get_spark_session_or_start_new_with_repoconfig( + store_config=config.offline_store + ) + df = spark_session.sql(f"SELECT * FROM {self.get_table_query_string()}") + return ( + (fields["name"], fields["type"]) + for fields in df.schema.jsonValue()["fields"] + ) + + def get_table_query_string(self) -> str: + """Returns a string that can directly be used to reference this table in SQL""" + if self.table: + # Backticks make sure that spark sql knows this a table reference. + return f"`{self.table}`" + if self.query: + return f"({self.query})" + + # If both the table query string and the actual query are null, we can load from file. + spark_session = SparkSession.getActiveSession() + if spark_session is None: + raise AssertionError("Could not find an active spark session.") + try: + df = spark_session.read.format(self.file_format).load(self.path) + except Exception: + logger.exception( + "Spark read of file source failed.\n" + traceback.format_exc() + ) + tmp_table_name = get_temp_entity_table_name() + df.createOrReplaceTempView(tmp_table_name) + + return f"`{tmp_table_name}`" + + +class SparkOptions: + def __init__( + self, + table: Optional[str] = None, + query: Optional[str] = None, + path: Optional[str] = None, + file_format: Optional[str] = None, + ): + self.table = table + self.query = query + self.path = path + self.file_format = file_format + + @classmethod + def from_proto(cls, spark_options_proto: DataSourceProto.CustomSourceOptions): + """ + Creates a SparkOptions from a protobuf representation of a spark option + args: + spark_options_proto: a protobuf representation of a datasource + Returns: + Returns a SparkOptions object based on the spark_options protobuf + """ + spark_configuration = pickle.loads(spark_options_proto.configuration) + + spark_options = cls( + table=spark_configuration.table, + query=spark_configuration.query, + path=spark_configuration.path, + file_format=spark_configuration.file_format, + ) + return spark_options + + def to_proto(self) -> DataSourceProto.CustomSourceOptions: + """ + Converts an SparkOptionsProto object to its protobuf representation. + Returns: + SparkOptionsProto protobuf + """ + + spark_options_proto = DataSourceProto.CustomSourceOptions( + configuration=pickle.dumps(self), + ) + + return spark_options_proto + + +class SavedDatasetSparkStorage(SavedDatasetStorage): + _proto_attr_name = "spark_storage" + + spark_options: SparkOptions + + def __init__(self, table_ref: Optional[str] = None, query: Optional[str] = None): + self.spark_options = SparkOptions(table=table_ref, query=query) + + @staticmethod + def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage: + # TODO: implementation is not correct. Needs fix and update to protos. + return SavedDatasetSparkStorage(table_ref="", query=None) + + def to_proto(self) -> SavedDatasetStorageProto: + return SavedDatasetStorageProto() + + def to_data_source(self) -> DataSource: + return SparkSource(table=self.spark_options.table) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 0570958f9f..3744c0096b 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -39,6 +39,7 @@ "bigquery": "feast.infra.offline_stores.bigquery.BigQueryOfflineStore", "redshift": "feast.infra.offline_stores.redshift.RedshiftOfflineStore", "snowflake.offline": "feast.infra.offline_stores.snowflake.SnowflakeOfflineStore", + "spark": "feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore", } FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = { diff --git a/sdk/python/feast/templates/spark/__init__.py b/sdk/python/feast/templates/spark/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/templates/spark/bootstrap.py b/sdk/python/feast/templates/spark/bootstrap.py new file mode 100644 index 0000000000..155a86bf48 --- /dev/null +++ b/sdk/python/feast/templates/spark/bootstrap.py @@ -0,0 +1,50 @@ +from datetime import datetime, timedelta +from pathlib import Path + +from pyspark.sql import SparkSession + +from feast.driver_test_data import ( + create_customer_daily_profile_df, + create_driver_hourly_stats_df, +) + +CURRENT_DIR = Path(__file__).parent +DRIVER_ENTITIES = [1001, 1002, 1003] +CUSTOMER_ENTITIES = [201, 202, 203] +START_DATE = datetime.strptime("2022-01-01", "%Y-%m-%d") +END_DATE = START_DATE + timedelta(days=7) + + +def bootstrap(): + # Bootstrap() will automatically be called from the init_repo() during `feast init` + generate_example_data( + spark_session=SparkSession.builder.getOrCreate(), base_dir=str(CURRENT_DIR), + ) + + +def example_data_exists(base_dir: str) -> bool: + for path in [ + Path(base_dir) / "data" / "driver_hourly_stats", + Path(base_dir) / "data" / "customer_daily_profile", + ]: + if not path.exists(): + return False + return True + + +def generate_example_data(spark_session: SparkSession, base_dir: str) -> None: + spark_session.createDataFrame( + data=create_driver_hourly_stats_df(DRIVER_ENTITIES, START_DATE, END_DATE) + ).write.parquet( + path=str(Path(base_dir) / "data" / "driver_hourly_stats"), mode="overwrite", + ) + + spark_session.createDataFrame( + data=create_customer_daily_profile_df(CUSTOMER_ENTITIES, START_DATE, END_DATE) + ).write.parquet( + path=str(Path(base_dir) / "data" / "customer_daily_profile"), mode="overwrite", + ) + + +if __name__ == "__main__": + bootstrap() diff --git a/sdk/python/feast/templates/spark/example.py b/sdk/python/feast/templates/spark/example.py new file mode 100644 index 0000000000..ddda73b787 --- /dev/null +++ b/sdk/python/feast/templates/spark/example.py @@ -0,0 +1,64 @@ +# # # # # # # # # # # # # # # # # # # # # # # # +# This is an example feature definition file # +# # # # # # # # # # # # # # # # # # # # # # # # + +from pathlib import Path + +from google.protobuf.duration_pb2 import Duration + +from feast import Entity, Feature, FeatureView, ValueType +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( + SparkSource, +) + +# Constants related to the generated data sets +CURRENT_DIR = Path(__file__).parent + + +# Entity definitions +driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) +customer = Entity( + name="customer_id", value_type=ValueType.INT64, description="customer id", +) + +# Sources +driver_hourly_stats = SparkSource( + path=f"{CURRENT_DIR}/data/driver_hourly_stats", + file_format="parquet", + event_timestamp_column="event_timestamp", + created_timestamp_column="created", +) +customer_daily_profile = SparkSource( + path=f"{CURRENT_DIR}/data/customer_daily_profile", + file_format="parquet", + event_timestamp_column="event_timestamp", + created_timestamp_column="created", +) + +# Feature Views +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=Duration(seconds=86400 * 7), # one week + features=[ + Feature(name="conv_rate", dtype=ValueType.FLOAT), + Feature(name="acc_rate", dtype=ValueType.FLOAT), + Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ], + online=True, + batch_source=driver_hourly_stats, + tags={}, +) +customer_daily_profile_view = FeatureView( + name="customer_daily_profile", + entities=["customer_id"], + ttl=Duration(seconds=86400 * 7), # one week + features=[ + Feature(name="current_balance", dtype=ValueType.FLOAT), + Feature(name="avg_passenger_count", dtype=ValueType.FLOAT), + Feature(name="lifetime_trip_count", dtype=ValueType.INT64), + ], + online=True, + batch_source=customer_daily_profile, + tags={}, +) diff --git a/sdk/python/feast/templates/spark/feature_store.yaml b/sdk/python/feast/templates/spark/feature_store.yaml new file mode 100644 index 0000000000..2ea0ddfcc9 --- /dev/null +++ b/sdk/python/feast/templates/spark/feature_store.yaml @@ -0,0 +1,14 @@ +project: my_project +registry: data/registry.db +provider: local +offline_store: + type: spark + spark_conf: + spark.master: "local[*]" + spark.ui.enabled: "false" + spark.eventLog.enabled: "false" + spark.sql.catalogImplementation: "hive" + spark.sql.parser.quotedRegexColumnNames: "true" + spark.sql.session.timeZone: "UTC" +online_store: + path: data/online_store.db diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 82827bce2a..713b952d09 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -12,10 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import defaultdict from datetime import datetime, timezone from typing import ( Any, Dict, + Iterator, List, Optional, Sequence, @@ -326,7 +328,14 @@ def _python_value_to_proto_value( ProtoValue(unix_timestamp_list_val=Int64List(val=ts)) # type: ignore for ts in int_timestamps_lists ] - + if feast_value_type == ValueType.BOOL_LIST: + # ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_. + return [ + ProtoValue(**{field_name: proto_type(val=[bool(e) for e in value])}) # type: ignore + if value is not None + else ProtoValue() + for value in values + ] return [ ProtoValue(**{field_name: proto_type(val=value)}) # type: ignore if value is not None @@ -345,15 +354,28 @@ def _python_value_to_proto_value( # ProtoValue does actually accept `np.int_` but the typing complains. return [ProtoValue(unix_timestamp_val=ts) for ts in int_timestamps] # type: ignore + ( + field_name, + func, + valid_scalar_types, + ) = PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE[feast_value_type] + if valid_scalar_types: + assert type(sample) in valid_scalar_types + if feast_value_type == ValueType.BOOL: + # ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_. + return [ + ProtoValue( + **{ + field_name: func( + bool(value) if type(value) is np.bool_ else value # type: ignore + ) + } + ) + if not pd.isnull(value) + else ProtoValue() + for value in values + ] if feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE: - ( - field_name, - func, - valid_scalar_types, - ) = PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE[feast_value_type] - if valid_scalar_types: - assert type(sample) in valid_scalar_types - return [ ProtoValue(**{field_name: func(value)}) if not pd.isnull(value) @@ -566,3 +588,53 @@ def _non_empty_value(value: Any) -> bool: return value is not None and ( not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str) ) + + +def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType: + # TODO not all spark types are convertible + # Current non-convertible types: interval, map, struct, structfield, decimal, binary + type_map: Dict[str, ValueType] = { + "null": ValueType.UNKNOWN, + "byte": ValueType.BYTES, + "string": ValueType.STRING, + "int": ValueType.INT32, + "short": ValueType.INT32, + "bigint": ValueType.INT64, + "long": ValueType.INT64, + "double": ValueType.DOUBLE, + "float": ValueType.FLOAT, + "boolean": ValueType.BOOL, + "timestamp": ValueType.UNIX_TIMESTAMP, + "array": ValueType.BYTES_LIST, + "array": ValueType.STRING_LIST, + "array": ValueType.INT32_LIST, + "array": ValueType.INT64_LIST, + "array": ValueType.DOUBLE_LIST, + "array": ValueType.FLOAT_LIST, + "array": ValueType.BOOL_LIST, + "array": ValueType.UNIX_TIMESTAMP_LIST, + } + # TODO: Find better way of doing this. + if type(spark_type_as_str) != str or spark_type_as_str not in type_map: + return ValueType.NULL + return type_map[spark_type_as_str.lower()] + + +def spark_schema_to_np_dtypes(dtypes: List[Tuple[str, str]]) -> Iterator[np.dtype]: + # TODO recheck all typing (also tz for timestamp) + # https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#timestamp-with-time-zone-semantics + + type_map = defaultdict( + lambda: np.dtype("O"), + { + "boolean": np.dtype("bool"), + "double": np.dtype("float64"), + "float": np.dtype("float64"), + "int": np.dtype("int64"), + "bigint": np.dtype("int64"), + "smallint": np.dtype("int64"), + "timestamp": np.dtype("datetime64[ns]"), + }, + ) + + return (type_map[t] for _, t in dtypes) diff --git a/sdk/python/requirements/py3.7-ci-requirements.txt b/sdk/python/requirements/py3.7-ci-requirements.txt index e93355420f..4e5b523fdf 100644 --- a/sdk/python/requirements/py3.7-ci-requirements.txt +++ b/sdk/python/requirements/py3.7-ci-requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with python 3.7 # To update, run: # -# pip-compile --extra=ci --output-file=requirements/py3.7-ci-requirements.txt setup.py +# pip-compile --extra=ci --output-file=requirements/py3.7-ci-requirements.txt # absl-py==1.0.0 # via tensorflow-metadata @@ -54,7 +54,7 @@ attrs==21.4.0 # pytest avro==1.10.0 # via feast (setup.py) -azure-core==1.22.1 +azure-core==1.23.0 # via # adlfs # azure-identity @@ -514,6 +514,8 @@ py==1.11.0 # pytest-forked py-cpuinfo==8.0.0 # via pytest-benchmark +py4j==0.10.9.3 + # via pyspark pyarrow==6.0.1 # via # feast (setup.py) @@ -556,6 +558,8 @@ pyparsing==2.4.7 # packaging pyrsistent==0.18.1 # via jsonschema +pyspark==3.2.1 + # via feast (setup.py) pytest==7.0.1 # via # feast (setup.py) @@ -714,7 +718,7 @@ tensorflow-metadata==1.7.0 # via feast (setup.py) termcolor==1.1.0 # via great-expectations -terminado==0.13.1 +terminado==0.13.2 # via notebook testcontainers==3.4.2 # via feast (setup.py) @@ -789,6 +793,7 @@ typing-extensions==4.1.1 # argon2-cffi # asgiref # async-timeout + # azure-core # great-expectations # h11 # importlib-metadata @@ -847,4 +852,4 @@ zipp==3.7.0 # The following packages are considered to be unsafe in a requirements file: # pip -# setuptools +# setuptools \ No newline at end of file diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index f1e603fb33..2fa1d94177 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with python 3.8 # To update, run: # -# pip-compile --extra=ci --output-file=requirements/py3.8-ci-requirements.txt setup.py +# pip-compile --extra=ci --output-file=requirements/py3.8-ci-requirements.txt # absl-py==1.0.0 # via tensorflow-metadata @@ -42,6 +42,8 @@ asn1crypto==1.4.0 # snowflake-connector-python assertpy==1.1 # via feast (setup.py) +asttokens==2.0.5 + # via stack-data async-timeout==4.0.2 # via aiohttp attrs==21.4.0 @@ -52,14 +54,14 @@ attrs==21.4.0 # pytest avro==1.10.0 # via feast (setup.py) -azure-core==1.21.1 +azure-core==1.23.0 # via # adlfs # azure-identity # azure-storage-blob azure-datalake-store==0.0.52 # via adlfs -azure-identity==1.7.1 +azure-identity==1.8.0 # via adlfs azure-storage-blob==12.9.0 # via adlfs @@ -75,11 +77,11 @@ black==19.10b0 # via feast (setup.py) bleach==4.1.0 # via nbconvert -boto3==1.20.46 +boto3==1.21.11 # via # feast (setup.py) # moto -botocore==1.23.46 +botocore==1.24.11 # via # boto3 # moto @@ -102,12 +104,12 @@ cffi==1.15.0 # snowflake-connector-python cfgv==3.3.1 # via pre-commit -charset-normalizer==2.0.11 +charset-normalizer==2.0.12 # via # aiohttp # requests # snowflake-connector-python -click==8.0.3 +click==8.0.4 # via # black # feast (setup.py) @@ -118,7 +120,7 @@ cloudpickle==2.0.0 # via dask colorama==0.4.4 # via feast (setup.py) -coverage[toml]==6.3 +coverage[toml]==6.3.2 # via pytest-cov cryptography==3.3.2 # via @@ -155,20 +157,22 @@ docutils==0.17.1 # via # sphinx # sphinx-rtd-theme -entrypoints==0.3 +entrypoints==0.4 # via # altair # jupyter-client # nbconvert execnet==1.9.0 # via pytest-xdist -fastapi==0.73.0 +executing==0.8.3 + # via stack-data +fastapi==0.74.1 # via feast (setup.py) fastavro==1.4.9 # via # feast (setup.py) # pandavro -filelock==3.4.2 +filelock==3.6.0 # via virtualenv firebase-admin==4.5.2 # via feast (setup.py) @@ -178,12 +182,12 @@ frozenlist==1.3.0 # via # aiohttp # aiosignal -fsspec==2022.1.0 +fsspec==2022.2.0 # via # adlfs # dask # gcsfs -gcsfs==2022.1.0 +gcsfs==2022.2.0 # via feast (setup.py) google-api-core[grpc]==1.31.5 # via @@ -195,7 +199,7 @@ google-api-core[grpc]==1.31.5 # google-cloud-core # google-cloud-datastore # google-cloud-firestore -google-api-python-client==2.36.0 +google-api-python-client==2.39.0 # via firebase-admin google-auth==1.35.0 # via @@ -208,11 +212,11 @@ google-auth==1.35.0 # google-cloud-storage google-auth-httplib2==0.1.0 # via google-api-python-client -google-auth-oauthlib==0.4.6 +google-auth-oauthlib==0.5.0 # via gcsfs -google-cloud-bigquery==2.32.0 +google-cloud-bigquery==2.34.1 # via feast (setup.py) -google-cloud-bigquery-storage==2.11.0 +google-cloud-bigquery-storage==2.12.0 # via feast (setup.py) google-cloud-core==1.7.2 # via @@ -221,7 +225,7 @@ google-cloud-core==1.7.2 # google-cloud-datastore # google-cloud-firestore # google-cloud-storage -google-cloud-datastore==2.4.0 +google-cloud-datastore==2.5.0 # via feast (setup.py) google-cloud-firestore==2.3.4 # via firebase-admin @@ -241,9 +245,9 @@ googleapis-common-protos==1.52.0 # feast (setup.py) # google-api-core # tensorflow-metadata -great-expectations==0.14.4 +great-expectations==0.14.8 # via feast (setup.py) -grpcio==1.43.0 +grpcio==1.44.0 # via # feast (setup.py) # google-api-core @@ -251,7 +255,7 @@ grpcio==1.43.0 # grpcio-reflection # grpcio-testing # grpcio-tools -grpcio-reflection==1.43.0 +grpcio-reflection==1.44.0 # via feast (setup.py) grpcio-testing==1.34.0 # via feast (setup.py) @@ -261,13 +265,13 @@ h11==0.13.0 # via uvicorn hiredis==2.0.0 # via feast (setup.py) -httplib2==0.20.2 +httplib2==0.20.4 # via # google-api-python-client # google-auth-httplib2 httptools==0.3.0 # via uvicorn -identify==2.4.7 +identify==2.4.11 # via pre-commit idna==3.3 # via @@ -277,17 +281,17 @@ idna==3.3 # yarl imagesize==1.3.0 # via sphinx -importlib-metadata==4.2.0 +importlib-metadata==4.11.2 # via great-expectations importlib-resources==5.4.0 # via jsonschema iniconfig==1.1.1 # via pytest -ipykernel==6.7.0 +ipykernel==6.9.1 # via # ipywidgets # notebook -ipython==7.31.1 +ipython==8.1.1 # via # ipykernel # ipywidgets @@ -332,7 +336,7 @@ jupyter-client==7.1.2 # ipykernel # nbclient # notebook -jupyter-core==4.9.1 +jupyter-core==4.9.2 # via # jupyter-client # nbconvert @@ -342,13 +346,9 @@ jupyterlab-pygments==0.1.2 # via nbconvert jupyterlab-widgets==1.0.2 # via ipywidgets -libcst==0.4.1 - # via - # google-cloud-bigquery-storage - # google-cloud-datastore locket==0.2.1 # via partd -markupsafe==2.0.1 +markupsafe==2.1.0 # via # jinja2 # moto @@ -368,9 +368,9 @@ mmh3==3.0.0 # via feast (setup.py) mock==2.0.0 # via feast (setup.py) -moto==3.0.2 +moto==3.0.5 # via feast (setup.py) -msal==1.16.0 +msal==1.17.0 # via # azure-identity # msal-extensions @@ -391,14 +391,12 @@ multidict==6.0.2 mypy==0.931 # via feast (setup.py) mypy-extensions==0.4.3 - # via - # mypy - # typing-inspect + # via mypy mypy-protobuf==3.1.0 # via feast (setup.py) -nbclient==0.5.10 +nbclient==0.5.11 # via nbconvert -nbconvert==6.4.1 +nbconvert==6.4.2 # via notebook nbformat==5.1.3 # via @@ -416,7 +414,7 @@ nodeenv==1.6.0 # via pre-commit notebook==6.4.8 # via widgetsnbextension -numpy==1.21.5 +numpy==1.22.2 # via # altair # great-expectations @@ -455,7 +453,7 @@ partd==1.2.0 # via dask pathspec==0.9.0 # via black -pbr==5.8.0 +pbr==5.8.1 # via mock pep517==0.12.0 # via pip-tools @@ -463,19 +461,19 @@ pexpect==4.8.0 # via ipython pickleshare==0.7.5 # via ipython -pip-tools==6.4.0 +pip-tools==6.5.1 # via feast (setup.py) -platformdirs==2.4.1 +platformdirs==2.5.1 # via virtualenv pluggy==1.0.0 # via pytest -portalocker==2.3.2 +portalocker==2.4.0 # via msal-extensions pre-commit==2.17.0 # via feast (setup.py) prometheus-client==0.13.1 # via notebook -prompt-toolkit==3.0.26 +prompt-toolkit==3.0.28 # via ipython proto-plus==1.19.6 # via @@ -502,12 +500,16 @@ ptyprocess==0.7.0 # via # pexpect # terminado +pure-eval==0.2.2 + # via stack-data py==1.11.0 # via # pytest # pytest-forked py-cpuinfo==8.0.0 # via pytest-benchmark +py4j==0.10.9.3 + # via pyspark pyarrow==6.0.1 # via # feast (setup.py) @@ -522,7 +524,7 @@ pycodestyle==2.8.0 # via flake8 pycparser==2.21 # via cffi -pycryptodomex==3.14.0 +pycryptodomex==3.14.1 # via snowflake-connector-python pydantic==1.9.0 # via @@ -550,7 +552,9 @@ pyparsing==2.4.7 # packaging pyrsistent==0.18.1 # via jsonschema -pytest==6.2.5 +pyspark==3.2.1 + # via feast (setup.py) +pytest==7.0.1 # via # feast (setup.py) # pytest-benchmark @@ -602,7 +606,6 @@ pyyaml==6.0 # via # dask # feast (setup.py) - # libcst # pre-commit # uvicorn pyzmq==22.3.0 @@ -615,7 +618,7 @@ redis==3.5.3 # redis-py-cluster redis-py-cluster==2.1.3 # via feast (setup.py) -regex==2022.1.18 +regex==2022.3.2 # via black requests==2.27.1 # via @@ -641,23 +644,24 @@ requests-oauthlib==1.3.1 # via # google-auth-oauthlib # msrest -responses==0.17.0 +responses==0.18.0 # via moto rsa==4.8 # via google-auth ruamel-yaml==0.17.17 # via great-expectations -ruamel-yaml-clib==0.2.6 - # via ruamel-yaml -s3transfer==0.5.0 +ruamel.yaml.clib==0.2.6 + # via ruamel.yaml +s3transfer==0.5.2 # via boto3 -scipy==1.7.3 +scipy==1.8.0 # via great-expectations send2trash==1.8.0 # via notebook six==1.16.0 # via # absl-py + # asttokens # azure-core # azure-identity # bleach @@ -674,13 +678,12 @@ six==1.16.0 # pandavro # pyopenssl # python-dateutil - # responses # virtualenv sniffio==1.2.0 # via anyio snowballstemmer==2.2.0 # via sphinx -snowflake-connector-python[pandas]==2.7.3 +snowflake-connector-python[pandas]==2.7.4 # via feast (setup.py) sphinx==4.3.2 # via @@ -700,33 +703,35 @@ sphinxcontrib-qthelp==1.0.3 # via sphinx sphinxcontrib-serializinghtml==1.1.5 # via sphinx +stack-data==0.2.0 + # via ipython starlette==0.17.1 # via fastapi tabulate==0.8.9 # via feast (setup.py) tenacity==8.0.1 # via feast (setup.py) -tensorflow-metadata==1.6.0 +tensorflow-metadata==1.7.0 # via feast (setup.py) termcolor==1.1.0 # via great-expectations -terminado==0.13.1 +terminado==0.13.2 # via notebook testcontainers==3.4.2 # via feast (setup.py) -testpath==0.5.0 +testpath==0.6.0 # via nbconvert toml==0.10.2 # via # black # feast (setup.py) # pre-commit - # pytest -tomli==2.0.0 +tomli==2.0.1 # via # coverage # mypy # pep517 + # pytest toolz==0.11.2 # via # altair @@ -738,7 +743,7 @@ tornado==6.1 # jupyter-client # notebook # terminado -tqdm==4.62.3 +tqdm==4.63.0 # via # feast (setup.py) # great-expectations @@ -756,37 +761,32 @@ traitlets==5.1.1 # notebook typed-ast==1.5.2 # via black -types-futures==3.3.8 - # via types-protobuf -types-protobuf==3.19.7 +types-protobuf==3.19.12 # via # feast (setup.py) # mypy-protobuf types-python-dateutil==2.8.9 # via feast (setup.py) -types-pytz==2021.3.4 +types-pytz==2021.3.5 # via feast (setup.py) types-pyyaml==6.0.4 # via feast (setup.py) -types-redis==4.1.13 +types-redis==4.1.17 # via feast (setup.py) -types-requests==2.27.8 +types-requests==2.27.11 # via feast (setup.py) -types-setuptools==57.4.8 +types-setuptools==57.4.9 # via feast (setup.py) types-tabulate==0.8.5 # via feast (setup.py) -types-urllib3==1.26.8 +types-urllib3==1.26.10 # via types-requests -typing-extensions==4.0.1 +typing-extensions==4.1.1 # via + # azure-core # great-expectations - # libcst # mypy # pydantic - # typing-inspect -typing-inspect==0.7.1 - # via libcst tzdata==2021.5 # via pytz-deprecation-shim tzlocal==4.1 @@ -800,11 +800,11 @@ urllib3==1.26.8 # minio # requests # responses -uvicorn[standard]==0.17.1 +uvicorn[standard]==0.17.5 # via feast (setup.py) uvloop==0.16.0 # via uvicorn -virtualenv==20.13.0 +virtualenv==20.13.2 # via pre-commit watchgod==0.7 # via uvicorn @@ -812,11 +812,11 @@ wcwidth==0.2.5 # via prompt-toolkit webencodings==0.5.1 # via bleach -websocket-client==1.2.3 +websocket-client==1.3.1 # via docker -websockets==10.1 +websockets==10.2 # via uvicorn -werkzeug==2.0.2 +werkzeug==2.0.3 # via moto wheel==0.37.1 # via pip-tools @@ -835,4 +835,4 @@ zipp==3.7.0 # The following packages are considered to be unsafe in a requirements file: # pip -# setuptools +# setuptools \ No newline at end of file diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 35b32531a5..333ebf614d 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with python 3.9 # To update, run: # -# pip-compile --extra=ci --output-file=requirements/py3.9-ci-requirements.txt setup.py +# pip-compile --extra=ci --output-file=requirements/py3.9-ci-requirements.txt # absl-py==1.0.0 # via tensorflow-metadata @@ -42,6 +42,8 @@ asn1crypto==1.4.0 # snowflake-connector-python assertpy==1.1 # via feast (setup.py) +asttokens==2.0.5 + # via stack-data async-timeout==4.0.2 # via aiohttp attrs==21.4.0 @@ -52,14 +54,14 @@ attrs==21.4.0 # pytest avro==1.10.0 # via feast (setup.py) -azure-core==1.21.1 +azure-core==1.23.0 # via # adlfs # azure-identity # azure-storage-blob azure-datalake-store==0.0.52 # via adlfs -azure-identity==1.7.1 +azure-identity==1.8.0 # via adlfs azure-storage-blob==12.9.0 # via adlfs @@ -71,11 +73,11 @@ black==19.10b0 # via feast (setup.py) bleach==4.1.0 # via nbconvert -boto3==1.20.46 +boto3==1.21.11 # via # feast (setup.py) # moto -botocore==1.23.46 +botocore==1.24.11 # via # boto3 # moto @@ -98,12 +100,12 @@ cffi==1.15.0 # snowflake-connector-python cfgv==3.3.1 # via pre-commit -charset-normalizer==2.0.11 +charset-normalizer==2.0.12 # via # aiohttp # requests # snowflake-connector-python -click==8.0.3 +click==8.0.4 # via # black # feast (setup.py) @@ -114,7 +116,7 @@ cloudpickle==2.0.0 # via dask colorama==0.4.4 # via feast (setup.py) -coverage[toml]==6.3 +coverage[toml]==6.3.2 # via pytest-cov cryptography==3.3.2 # via @@ -151,20 +153,22 @@ docutils==0.17.1 # via # sphinx # sphinx-rtd-theme -entrypoints==0.3 +entrypoints==0.4 # via # altair # jupyter-client # nbconvert execnet==1.9.0 # via pytest-xdist -fastapi==0.73.0 +executing==0.8.3 + # via stack-data +fastapi==0.74.1 # via feast (setup.py) fastavro==1.4.9 # via # feast (setup.py) # pandavro -filelock==3.4.2 +filelock==3.6.0 # via virtualenv firebase-admin==4.5.2 # via feast (setup.py) @@ -174,12 +178,12 @@ frozenlist==1.3.0 # via # aiohttp # aiosignal -fsspec==2022.1.0 +fsspec==2022.2.0 # via # adlfs # dask # gcsfs -gcsfs==2022.1.0 +gcsfs==2022.2.0 # via feast (setup.py) google-api-core[grpc]==1.31.5 # via @@ -191,7 +195,7 @@ google-api-core[grpc]==1.31.5 # google-cloud-core # google-cloud-datastore # google-cloud-firestore -google-api-python-client==2.36.0 +google-api-python-client==2.39.0 # via firebase-admin google-auth==1.35.0 # via @@ -204,11 +208,11 @@ google-auth==1.35.0 # google-cloud-storage google-auth-httplib2==0.1.0 # via google-api-python-client -google-auth-oauthlib==0.4.6 +google-auth-oauthlib==0.5.0 # via gcsfs -google-cloud-bigquery==2.32.0 +google-cloud-bigquery==2.34.1 # via feast (setup.py) -google-cloud-bigquery-storage==2.11.0 +google-cloud-bigquery-storage==2.12.0 # via feast (setup.py) google-cloud-core==1.7.2 # via @@ -217,7 +221,7 @@ google-cloud-core==1.7.2 # google-cloud-datastore # google-cloud-firestore # google-cloud-storage -google-cloud-datastore==2.4.0 +google-cloud-datastore==2.5.0 # via feast (setup.py) google-cloud-firestore==2.3.4 # via firebase-admin @@ -237,9 +241,9 @@ googleapis-common-protos==1.52.0 # feast (setup.py) # google-api-core # tensorflow-metadata -great-expectations==0.14.4 +great-expectations==0.14.8 # via feast (setup.py) -grpcio==1.43.0 +grpcio==1.44.0 # via # feast (setup.py) # google-api-core @@ -247,7 +251,7 @@ grpcio==1.43.0 # grpcio-reflection # grpcio-testing # grpcio-tools -grpcio-reflection==1.43.0 +grpcio-reflection==1.44.0 # via feast (setup.py) grpcio-testing==1.34.0 # via feast (setup.py) @@ -257,13 +261,13 @@ h11==0.13.0 # via uvicorn hiredis==2.0.0 # via feast (setup.py) -httplib2==0.20.2 +httplib2==0.20.4 # via # google-api-python-client # google-auth-httplib2 httptools==0.3.0 # via uvicorn -identify==2.4.7 +identify==2.4.11 # via pre-commit idna==3.3 # via @@ -273,15 +277,15 @@ idna==3.3 # yarl imagesize==1.3.0 # via sphinx -importlib-metadata==4.2.0 +importlib-metadata==4.11.2 # via great-expectations iniconfig==1.1.1 # via pytest -ipykernel==6.7.0 +ipykernel==6.9.1 # via # ipywidgets # notebook -ipython==7.31.1 +ipython==8.1.1 # via # ipykernel # ipywidgets @@ -326,7 +330,7 @@ jupyter-client==7.1.2 # ipykernel # nbclient # notebook -jupyter-core==4.9.1 +jupyter-core==4.9.2 # via # jupyter-client # nbconvert @@ -336,13 +340,9 @@ jupyterlab-pygments==0.1.2 # via nbconvert jupyterlab-widgets==1.0.2 # via ipywidgets -libcst==0.4.1 - # via - # google-cloud-bigquery-storage - # google-cloud-datastore locket==0.2.1 # via partd -markupsafe==2.0.1 +markupsafe==2.1.0 # via # jinja2 # moto @@ -362,9 +362,9 @@ mmh3==3.0.0 # via feast (setup.py) mock==2.0.0 # via feast (setup.py) -moto==3.0.2 +moto==3.0.5 # via feast (setup.py) -msal==1.16.0 +msal==1.17.0 # via # azure-identity # msal-extensions @@ -385,14 +385,12 @@ multidict==6.0.2 mypy==0.931 # via feast (setup.py) mypy-extensions==0.4.3 - # via - # mypy - # typing-inspect + # via mypy mypy-protobuf==3.1.0 # via feast (setup.py) -nbclient==0.5.10 +nbclient==0.5.11 # via nbconvert -nbconvert==6.4.1 +nbconvert==6.4.2 # via notebook nbformat==5.1.3 # via @@ -410,7 +408,7 @@ nodeenv==1.6.0 # via pre-commit notebook==6.4.8 # via widgetsnbextension -numpy==1.21.5 +numpy==1.22.2 # via # altair # great-expectations @@ -449,7 +447,7 @@ partd==1.2.0 # via dask pathspec==0.9.0 # via black -pbr==5.8.0 +pbr==5.8.1 # via mock pep517==0.12.0 # via pip-tools @@ -457,19 +455,19 @@ pexpect==4.8.0 # via ipython pickleshare==0.7.5 # via ipython -pip-tools==6.4.0 +pip-tools==6.5.1 # via feast (setup.py) -platformdirs==2.4.1 +platformdirs==2.5.1 # via virtualenv pluggy==1.0.0 # via pytest -portalocker==2.3.2 +portalocker==2.4.0 # via msal-extensions pre-commit==2.17.0 # via feast (setup.py) prometheus-client==0.13.1 # via notebook -prompt-toolkit==3.0.26 +prompt-toolkit==3.0.28 # via ipython proto-plus==1.19.6 # via @@ -496,12 +494,16 @@ ptyprocess==0.7.0 # via # pexpect # terminado +pure-eval==0.2.2 + # via stack-data py==1.11.0 # via # pytest # pytest-forked py-cpuinfo==8.0.0 # via pytest-benchmark +py4j==0.10.9.3 + # via pyspark pyarrow==6.0.1 # via # feast (setup.py) @@ -516,7 +518,7 @@ pycodestyle==2.8.0 # via flake8 pycparser==2.21 # via cffi -pycryptodomex==3.14.0 +pycryptodomex==3.14.1 # via snowflake-connector-python pydantic==1.9.0 # via @@ -544,7 +546,9 @@ pyparsing==2.4.7 # packaging pyrsistent==0.18.1 # via jsonschema -pytest==6.2.5 +pyspark==3.2.1 + # via feast (setup.py) +pytest==7.0.1 # via # feast (setup.py) # pytest-benchmark @@ -596,7 +600,6 @@ pyyaml==6.0 # via # dask # feast (setup.py) - # libcst # pre-commit # uvicorn pyzmq==22.3.0 @@ -609,7 +612,7 @@ redis==3.5.3 # redis-py-cluster redis-py-cluster==2.1.3 # via feast (setup.py) -regex==2022.1.18 +regex==2022.3.2 # via black requests==2.27.1 # via @@ -635,23 +638,24 @@ requests-oauthlib==1.3.1 # via # google-auth-oauthlib # msrest -responses==0.17.0 +responses==0.18.0 # via moto rsa==4.8 # via google-auth -ruamel-yaml==0.17.17 +ruamel.yaml==0.17.17 # via great-expectations -ruamel-yaml-clib==0.2.6 - # via ruamel-yaml -s3transfer==0.5.0 +ruamel.yaml.clib==0.2.6 + # via ruamel.yaml +s3transfer==0.5.2 # via boto3 -scipy==1.7.3 +scipy==1.8.0 # via great-expectations send2trash==1.8.0 # via notebook six==1.16.0 # via # absl-py + # asttokens # azure-core # azure-identity # bleach @@ -668,13 +672,12 @@ six==1.16.0 # pandavro # pyopenssl # python-dateutil - # responses # virtualenv sniffio==1.2.0 # via anyio snowballstemmer==2.2.0 # via sphinx -snowflake-connector-python[pandas]==2.7.3 +snowflake-connector-python[pandas]==2.7.4 # via feast (setup.py) sphinx==4.3.2 # via @@ -694,33 +697,35 @@ sphinxcontrib-qthelp==1.0.3 # via sphinx sphinxcontrib-serializinghtml==1.1.5 # via sphinx +stack-data==0.2.0 + # via ipython starlette==0.17.1 # via fastapi tabulate==0.8.9 # via feast (setup.py) tenacity==8.0.1 # via feast (setup.py) -tensorflow-metadata==1.6.0 +tensorflow-metadata==1.7.0 # via feast (setup.py) termcolor==1.1.0 # via great-expectations -terminado==0.13.1 +terminado==0.13.2 # via notebook testcontainers==3.4.2 # via feast (setup.py) -testpath==0.5.0 +testpath==0.6.0 # via nbconvert toml==0.10.2 # via # black # feast (setup.py) # pre-commit - # pytest -tomli==2.0.0 +tomli==2.0.1 # via # coverage # mypy # pep517 + # pytest toolz==0.11.2 # via # altair @@ -732,7 +737,7 @@ tornado==6.1 # jupyter-client # notebook # terminado -tqdm==4.62.3 +tqdm==4.63.0 # via # feast (setup.py) # great-expectations @@ -750,37 +755,32 @@ traitlets==5.1.1 # notebook typed-ast==1.5.2 # via black -types-futures==3.3.8 - # via types-protobuf -types-protobuf==3.19.7 +types-protobuf==3.19.12 # via # feast (setup.py) # mypy-protobuf types-python-dateutil==2.8.9 # via feast (setup.py) -types-pytz==2021.3.4 +types-pytz==2021.3.5 # via feast (setup.py) types-pyyaml==6.0.4 # via feast (setup.py) -types-redis==4.1.13 +types-redis==4.1.17 # via feast (setup.py) -types-requests==2.27.8 +types-requests==2.27.11 # via feast (setup.py) -types-setuptools==57.4.8 +types-setuptools==57.4.9 # via feast (setup.py) types-tabulate==0.8.5 # via feast (setup.py) -types-urllib3==1.26.8 +types-urllib3==1.26.10 # via types-requests -typing-extensions==4.0.1 +typing-extensions==4.1.1 # via + # azure-core # great-expectations - # libcst # mypy # pydantic - # typing-inspect -typing-inspect==0.7.1 - # via libcst tzdata==2021.5 # via pytz-deprecation-shim tzlocal==4.1 @@ -794,11 +794,11 @@ urllib3==1.26.8 # minio # requests # responses -uvicorn[standard]==0.17.1 +uvicorn[standard]==0.17.5 # via feast (setup.py) uvloop==0.16.0 # via uvicorn -virtualenv==20.13.0 +virtualenv==20.13.2 # via pre-commit watchgod==0.7 # via uvicorn @@ -806,11 +806,11 @@ wcwidth==0.2.5 # via prompt-toolkit webencodings==0.5.1 # via bleach -websocket-client==1.2.3 +websocket-client==1.3.1 # via docker -websockets==10.1 +websockets==10.2 # via uvicorn -werkzeug==2.0.2 +werkzeug==2.0.3 # via moto wheel==0.37.1 # via pip-tools diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 20033237ef..aefc47dca4 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -90,7 +90,13 @@ "snowflake-connector-python[pandas]>=2.7.3", ] -GE_REQUIRED = ["great_expectations>=0.14.0,<0.15.0"] +SPARK_REQUIRED = [ + "pyspark>=3.0.0", +] + +GE_REQUIRED = [ + "great_expectations>=0.14.0,<0.15.0" +] CI_REQUIRED = ( [ @@ -134,11 +140,12 @@ "types-setuptools", "types-tabulate", ] - + GCP_REQUIRED - + REDIS_REQUIRED - + AWS_REQUIRED - + SNOWFLAKE_REQUIRED - + GE_REQUIRED + + GCP_REQUIRED + + REDIS_REQUIRED + + AWS_REQUIRED + + SNOWFLAKE_REQUIRED + + SPARK_REQUIRED + + GE_REQUIRED ) DEV_REQUIRED = ["mypy-protobuf>=3.1.0", "grpcio-testing==1.*"] + CI_REQUIRED @@ -323,6 +330,7 @@ def run(self): "aws": AWS_REQUIRED, "redis": REDIS_REQUIRED, "snowflake": SNOWFLAKE_REQUIRED, + "spark": SPARK_REQUIRED, "ge": GE_REQUIRED, }, include_package_data=True, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/spark_data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/spark_data_source_creator.py new file mode 100644 index 0000000000..4284c3cf4c --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/spark_data_source_creator.py @@ -0,0 +1,108 @@ +import uuid +from typing import Dict, List + +import pandas as pd +from pyspark import SparkConf +from pyspark.sql import SparkSession + +from feast.data_source import DataSource +from feast.infra.offline_stores.contrib.spark_offline_store.spark import ( + SparkOfflineStoreConfig, +) +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( + SavedDatasetSparkStorage, + SparkSource, +) +from tests.integration.feature_repos.universal.data_source_creator import ( + DataSourceCreator, +) + + +class SparkDataSourceCreator(DataSourceCreator): + tables: List[str] = [] + spark_offline_store_config = None + spark_session = None + + def __init__(self, project_name: str): + self.spark_conf = { + "master": "local[*]", + "spark.ui.enabled": "false", + "spark.eventLog.enabled": "false", + "spark.sql.parser.quotedRegexColumnNames": "true", + "spark.sql.session.timeZone": "UTC", + } + self.project_name = project_name + if not self.spark_offline_store_config: + self.create_offline_store_config() + if not self.spark_session: + self.spark_session = ( + SparkSession.builder.config( + conf=SparkConf().setAll( + [(k, v) for k, v in self.spark_conf.items()] + ) + ) + .appName("pytest-pyspark-local-testing") + .getOrCreate() + ) + self.tables: List[str] = [] + + def teardown(self): + self.spark_session.stop() + + def create_offline_store_config(self): + self.spark_offline_store_config = SparkOfflineStoreConfig() + self.spark_offline_store_config.type = "spark" + self.spark_offline_store_config.spark_conf = self.spark_conf + return self.spark_offline_store_config + + def create_data_source( + self, + df: pd.DataFrame, + destination_name: str, + event_timestamp_column="ts", + created_timestamp_column="created_ts", + field_mapping: Dict[str, str] = None, + **kwargs, + ) -> DataSource: + if event_timestamp_column in df: + df[event_timestamp_column] = pd.to_datetime( + df[event_timestamp_column], utc=True + ) + # Make sure the field mapping is correct and convert the datetime datasources. + if field_mapping: + timestamp_mapping = {value: key for key, value in field_mapping.items()} + if ( + event_timestamp_column in timestamp_mapping + and timestamp_mapping[event_timestamp_column] in df + ): + col = timestamp_mapping[event_timestamp_column] + df[col] = pd.to_datetime(df[col], utc=True) + destination_name = self.get_prefixed_table_name(destination_name) + if not self.spark_session: + self.spark_session = ( + SparkSession.builder.config( + conf=SparkConf().setAll( + [(k, v) for k, v in self.spark_conf.items()] + ) + ) + .appName("pytest-pyspark-local-testing") + .getOrCreate() + ) + self.spark_session.createDataFrame(df).createOrReplaceTempView(destination_name) + self.tables.append(destination_name) + + return SparkSource( + table=destination_name, + event_timestamp_column=event_timestamp_column, + created_timestamp_column=created_timestamp_column, + date_partition_column="", + # maps certain column names to other names + field_mapping=field_mapping or {"ts_1": "ts"}, + ) + + def create_saved_dataset_destination(self) -> SavedDatasetSparkStorage: + table = f"persisted_{str(uuid.uuid4()).replace('-', '_')}" + return SavedDatasetSparkStorage(table_ref=table, query="") + + def get_prefixed_table_name(self, suffix: str) -> str: + return f"{self.project_name}_{suffix}"