Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable Arrow-based columnar data transfers #3996

Merged
merged 1 commit into from
Mar 21, 2024

Conversation

ElliotNguyen68
Copy link
Contributor

What this PR does / why we need it: When to_df for Spark Retrieval Job, we will need to transfer from spark dataframe to pandas dataframe, to be better handle this transfer process, I add a config to spark session, refer to https://docs.databricks.com/en/pandas/pyspark-pandas-conversion.html

Which issue(s) this PR fixes:

Fixes #

@ElliotNguyen68
Copy link
Contributor Author

Hi @sudohainguyen could you reivew the pr ?

@sudohainguyen
Copy link
Collaborator

can we eliminate type hint changes and leave it for another PR?
it's better one thing focused only 😄

@sudohainguyen
Copy link
Collaborator

another comment, have you tested the performance before and after enabling arrow transfer? how was it?
genuinely wondering

@ElliotNguyen68
Copy link
Contributor Author

can we eliminate type hint changes and leave it for another PR? it's better one thing focused only 😄

got it

@ElliotNguyen68
Copy link
Contributor Author

ElliotNguyen68 commented Mar 9, 2024

another comment, have you tested the performance before and after enabling arrow transfer? how was it? genuinely wondering

yes I tested it using databrick (3 worker nodes), the data contains over 6 milions rows, with the config enable, this take about 11sec, when not even over 6 minutes still cannot get the result

@ElliotNguyen68
Copy link
Contributor Author

@sudohainguyen detail the tests
image
image
image

spark_session = get_spark_session_or_start_new_with_repoconfig(
self._config.offline_store
)
spark_session.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
Copy link
Collaborator

@HaoXuAI HaoXuAI Mar 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can explicitly claim this in the document, the spark offline store is leveraging arrow to process data. And it's probably better to have a config for it to enable or not as wished by the user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how can we achieve this @HaoXuAI ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @HaoXuAI , I check on snowflake onffline store source code, snowflake.py and can see that the code owner also use spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true') within the codebase, without from a config, so I think we can also follow this, because this is a good thing for feast performance, (1 more thinkg to notice is that our requirement for feast is pyspark version >=3.0.0, and the default value for this config is false for spark session (https://spark.apache.org/docs/latest/configuration.html#:~:text=3.0%2C%20please%20set%20%27-,spark.sql.execution.arrow.pyspark.enabled,-%27.)), what do you think ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest turning it into a offline store configuration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you mean we will let the user to config this when they init FeatureStore object using repo config or feature_store.yml , isnt it ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ElliotNguyen68 Yes, I agree. But I still don't like the idea of having two methods of setting the same config. We will have to handle cases when user sets both, for example and stuff like that. Maybe we should just document this option better on spark page. Let's include it in the example and also add a sentence about why it might be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so how to include it in the document 🙂?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/feast-dev/feast/blob/master/docs/reference/offline-stores/spark.md. this is the spark document code, you can add it here.
Also looks like this is the place to initialize the spark config:

spark_session.conf.set("spark.sql.parser.quotedRegexColumnNames", "true")
, can you instead add it here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ElliotNguyen68 https://github.com/feast-dev/feast/tree/master/sdk/python/feast/templates/spark can you add it to spark template as well? Makes sense that people might use it when bootstrapping a new project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @HaoXuAI , @tokoko , I change those changes to md and template file already, do you guys think that is ok for now ?

Signed-off-by: tanlocnguyen <tanlocnguyen296@gmail.com>
Copy link
Collaborator

@HaoXuAI HaoXuAI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@HaoXuAI HaoXuAI merged commit d8d7567 into feast-dev:master Mar 21, 2024
15 checks passed
franciscojavierarceo pushed a commit that referenced this pull request Apr 16, 2024
# [0.36.0](v0.35.0...v0.36.0) (2024-04-16)

### Bug Fixes

* Add __eq__, __hash__ to SparkSource for correct comparison ([#4028](#4028)) ([e703b40](e703b40))
* Add conn.commit() to Postgresonline_write_batch.online_write_batch ([#3904](#3904)) ([7d75fc5](7d75fc5))
* Add missing __init__.py to embedded_go ([#4051](#4051)) ([6bb4c73](6bb4c73))
* Add missing init files in infra utils ([#4067](#4067)) ([54910a1](54910a1))
* Added registryPath parameter documentation in WebUI reference ([#3983](#3983)) ([5e0af8f](5e0af8f)), closes [#3974](#3974) [#3974](#3974)
* Adding missing init files in materialization modules ([#4052](#4052)) ([df05253](df05253))
* Allow trancated timestamps when converting ([#3861](#3861)) ([bdd7dfb](bdd7dfb))
* Azure blob storage support in Java feature server ([#2319](#2319)) ([#4014](#4014)) ([b9aabbd](b9aabbd))
* Bugfix for grabbing historical data from Snowflake with array type features. ([#3964](#3964)) ([1cc94f2](1cc94f2))
* Bytewax materialization engine fails when loading feature_store.yaml ([#3912](#3912)) ([987f0fd](987f0fd))
* CI unittest warnings ([#4006](#4006)) ([0441b8b](0441b8b))
* Correct the returning class proto type of StreamFeatureView to StreamFeatureViewProto instead of FeatureViewProto. ([#3843](#3843)) ([86d6221](86d6221))
* Create index only if not exists during MySQL online store update ([#3905](#3905)) ([2f99a61](2f99a61))
* Disable minio tests in workflows on master and nightly ([#4072](#4072)) ([c06dda8](c06dda8))
* Disable the Feast Usage feature by default. ([#4090](#4090)) ([b5a7013](b5a7013))
* Dump repo_config by alias ([#4063](#4063)) ([e4bef67](e4bef67))
* Extend SQL registry config with a sqlalchemy_config_kwargs key ([#3997](#3997)) ([21931d5](21931d5))
* Feature Server image startup in OpenShift clusters ([#4096](#4096)) ([9efb243](9efb243))
* Fix copy method for StreamFeatureView ([#3951](#3951)) ([cf06704](cf06704))
* Fix for materializing entityless feature views in Snowflake ([#3961](#3961)) ([1e64c77](1e64c77))
* Fix type mapping spark ([#4071](#4071)) ([3afa78e](3afa78e))
* Fix typo as the cli does not support shortcut-f option. ([#3954](#3954)) ([dd79dbb](dd79dbb))
* Get container host addresses from testcontainers ([#3946](#3946)) ([2cf1a0f](2cf1a0f))
* Handle ComplexFeastType to None comparison ([#3876](#3876)) ([fa8492d](fa8492d))
* Hashlib md5 errors in FIPS for python 3.9+ ([#4019](#4019)) ([6d9156b](6d9156b))
* Making the query_timeout variable as optional int because upstream is considered to be optional ([#4092](#4092)) ([fd5b620](fd5b620))
* Move gRPC dependencies to an extra ([#3900](#3900)) ([f93c5fd](f93c5fd))
* Prevent spamming pull busybox from dockerhub ([#3923](#3923)) ([7153cad](7153cad))
* Quickstart notebook example ([#3976](#3976)) ([b023aa5](b023aa5))
* Raise error when not able read of file source spark source ([#4005](#4005)) ([34cabfb](34cabfb))
* remove not use input parameter in spark source ([#3980](#3980)) ([7c90882](7c90882))
* Remove parentheses in pull_latest_from_table_or_query ([#4026](#4026)) ([dc4671e](dc4671e))
* Remove proto-plus imports ([#4044](#4044)) ([ad8f572](ad8f572))
* Remove unnecessary dependency on mysqlclient ([#3925](#3925)) ([f494f02](f494f02))
* Restore label check for all actions using pull_request_target ([#3978](#3978)) ([591ba4e](591ba4e))
* Revert mypy config ([#3952](#3952)) ([6b8e96c](6b8e96c))
* Rewrite Spark materialization engine to use mapInPandas ([#3936](#3936)) ([dbb59ba](dbb59ba))
* Run feature server w/o gunicorn on windows ([#4024](#4024)) ([584e9b1](584e9b1))
* SqlRegistry _apply_object update statement ([#4042](#4042)) ([ef62def](ef62def))
* Substrait ODFVs for online ([#4064](#4064)) ([26391b0](26391b0))
* Swap security label check on the PR title validation job to explicit permissions instead ([#3987](#3987)) ([f604af9](f604af9))
* Transformation server doesn't generate files from proto ([#3902](#3902)) ([d3a2a45](d3a2a45))
* Trino as an OfflineStore Access Denied when BasicAuthenticaion ([#3898](#3898)) ([49d2988](49d2988))
* Trying to import pyspark lazily to avoid the dependency on the library ([#4091](#4091)) ([a05cdbc](a05cdbc))
* Typo Correction in Feast UI Readme ([#3939](#3939)) ([c16e5af](c16e5af))
* Update actions/setup-python from v3 to v4 ([#4003](#4003)) ([ee4c4f1](ee4c4f1))
* Update typeguard version to >=4.0.0 ([#3837](#3837)) ([dd96150](dd96150))
* Upgrade sqlalchemy from 1.x to 2.x regarding PVE-2022-51668. ([#4065](#4065)) ([ec4c15c](ec4c15c))
* Use CopyFrom() instead of __deepycopy__() for creating a copy of protobuf object. ([#3999](#3999)) ([5561b30](5561b30))
* Using version args to install the correct feast version ([#3953](#3953)) ([b83a702](b83a702))
* Verify the existence of Registry tables in snowflake before calling CREATE sql command. Allow read-only user to call feast apply. ([#3851](#3851)) ([9a3590e](9a3590e))

### Features

* Add duckdb offline store ([#3981](#3981)) ([161547b](161547b))
* Add Entity df in format of a Spark Dataframe instead of just pd.DataFrame or string for SparkOfflineStore ([#3988](#3988)) ([43b2c28](43b2c28))
* Add gRPC Registry Server ([#3924](#3924)) ([373e624](373e624))
* Add local tests for s3 registry using minio ([#4029](#4029)) ([d82d1ec](d82d1ec))
* Add python bytes to array type conversion support proto ([#3874](#3874)) ([8688acd](8688acd))
* Add python client for remote registry server ([#3941](#3941)) ([42a7b81](42a7b81))
* Add Substrait-based ODFV transformation ([#3969](#3969)) ([9e58bd4](9e58bd4))
* Add support for arrays in snowflake ([#3769](#3769)) ([8d6bec8](8d6bec8))
* Added delete_table to redis online store ([#3857](#3857)) ([03dae13](03dae13))
* Adding support for Native Python feature transformations for ODFVs ([#4045](#4045)) ([73bc853](73bc853))
* Bumping requirements ([#4079](#4079)) ([1943056](1943056))
* Decouple transformation types from ODFVs ([#3949](#3949)) ([0a9fae8](0a9fae8))
* Dropping Python 3.8 from local integration tests and integration tests ([#3994](#3994)) ([817995c](817995c))
* Dropping python 3.8 requirements files from the project. ([#4021](#4021)) ([f09c612](f09c612))
* Dropping the support for python 3.8 version from feast ([#4010](#4010)) ([a0f7472](a0f7472))
* Dropping unit tests for Python 3.8 ([#3989](#3989)) ([60f24f9](60f24f9))
* Enable Arrow-based columnar data transfers  ([#3996](#3996)) ([d8d7567](d8d7567))
* Enable Vector database and retrieve_online_documents API ([#4061](#4061)) ([ec19036](ec19036))
* Kubernetes materialization engine written based on bytewax ([#4087](#4087)) ([7617bdb](7617bdb))
* Lint with ruff ([#4043](#4043)) ([7f1557b](7f1557b))
* Make arrow primary interchange for offline ODFV execution ([#4083](#4083)) ([9ed0a09](9ed0a09))
* Pandas v2 compatibility ([#3957](#3957)) ([64459ad](64459ad))
* Pull duckdb from contribs, add to CI ([#4059](#4059)) ([318a2b8](318a2b8))
* Refactor ODFV schema inference ([#4076](#4076)) ([c50a9ff](c50a9ff))
* Refactor registry caching logic into a separate class ([#3943](#3943)) ([924f944](924f944))
* Rename OnDemandTransformations to Transformations ([#4038](#4038)) ([9b98eaf](9b98eaf))
* Revert updating dependencies so that feast can be run on 3.11. ([#3968](#3968)) ([d3c68fb](d3c68fb)), closes [#3958](#3958)
* Rewrite ibis point-in-time-join w/o feast abstractions ([#4023](#4023)) ([3980e0c](3980e0c))
* Support s3gov schema by snowflake offline store during materialization ([#3891](#3891)) ([ea8ad17](ea8ad17))
* Update odfv test ([#4054](#4054)) ([afd52b8](afd52b8))
* Update pyproject.toml to use Python 3.9 as default ([#4011](#4011)) ([277b891](277b891))
* Update the Pydantic from v1 to v2 ([#3948](#3948)) ([ec11a7c](ec11a7c))
* Updating dependencies so that feast can be run on 3.11. ([#3958](#3958)) ([59639db](59639db))
* Updating protos to separate transformation ([#4018](#4018)) ([c58ef74](c58ef74))

### Reverts

* Reverting bumping requirements ([#4081](#4081)) ([1ba65b4](1ba65b4)), closes [#4079](#4079)
* Verify the existence of Registry tables in snowflake… ([#3907](#3907)) ([c0d358a](c0d358a)), closes [#3851](#3851)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants