Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Get Snowflake Query Output As Pyspark Dataframe (#2504) #3358

Merged
merged 1 commit into from
Nov 23, 2022

Conversation

amithadiraju1694
Copy link
Contributor

  1. Added feature to offline_store-> snowflake.py to return results of snowflake query as pyspark data frame.This helps spark-based users to distribute data, which often doesn't fit in driver nodes through pandas output.

  2. Also added relevant error class, to notify user on missing spark session , particular to this usecase.

Signed-off-by: amithadiraju1694 amith.adiraju@gmail.com

What this PR does / why we need it:

This adds feature to SnowflakeRetrievalJob to return result of snowflake query execution as a pyspark data frame.

Which issue(s) this PR fixes:

Fixes #2504

@amithadiraju1694
Copy link
Contributor Author

/assign @sfc-gh-madkins

/assign @adchia

@@ -447,6 +459,51 @@ def to_sql(self) -> str:
with self._query_generator() as query:
return query

def to_spark_df(
self, spark_session: Optional[SparkSession] = None
) -> pyspark_DataFrame:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amithadiraju1694 can you use just DataFrame here? Not the alias

spark_df: A pyspark dataframe.
"""

if spark_session == None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to combine these two if statements into one

@sfc-gh-madkins
Copy link
Collaborator

/ok-to-test

@sfc-gh-madkins
Copy link
Collaborator

@amithadiraju1694 can you run make lint-python

1. Added feature to offline_store-> snowflake.py to return results of snowflake query as pyspark data frame.This helps spark-based users to distribute data, which often doesn't fit in driver nodes through pandas output.

2. Also added relevant error class, to notify user on missing spark session , particular to this usecase.

Signed-off-by: amithadiraju1694 <amith.adiraju@gmail.com>
Copy link
Collaborator

@adchia adchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: adchia, amithadiraju1694

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@feast-ci-bot feast-ci-bot merged commit 2f18957 into feast-dev:master Nov 23, 2022
@amithadiraju1694 amithadiraju1694 deleted the feat_snow_sparkdf branch November 24, 2022 02:50
kevjumba pushed a commit that referenced this pull request Dec 5, 2022
# [0.27.0](v0.26.0...v0.27.0) (2022-12-05)

### Bug Fixes

* Changing Snowflake template code to avoid query not implemented … ([#3319](#3319)) ([1590d6b](1590d6b))
* Dask zero division error if parquet dataset has only one partition ([#3236](#3236)) ([69e4a7d](69e4a7d))
* Enable Spark materialization on Yarn ([#3370](#3370)) ([0c20a4e](0c20a4e))
* Ensure that Snowflake accounts for number columns that overspecify precision ([#3306](#3306)) ([0ad0ace](0ad0ace))
* Fix memory leak from usage.py not properly cleaning up call stack ([#3371](#3371)) ([a0c6fde](a0c6fde))
* Fix workflow to contain env vars ([#3379](#3379)) ([548bed9](548bed9))
* Update bytewax materialization ([#3368](#3368)) ([4ebe00f](4ebe00f))
* Update the version counts ([#3378](#3378)) ([8112db5](8112db5))
* Updated AWS Athena template ([#3322](#3322)) ([5956981](5956981))
* Wrong UI data source type display ([#3276](#3276)) ([8f28062](8f28062))

### Features

* Cassandra online store, concurrency in bulk write operations ([#3367](#3367)) ([eaf354c](eaf354c))
* Cassandra online store, concurrent fetching for multiple entities ([#3356](#3356)) ([00fa21f](00fa21f))
* Get Snowflake Query Output As Pyspark Dataframe ([#2504](#2504)) ([#3358](#3358)) ([2f18957](2f18957))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Get_historical_features() Does Not Have Option To Return Distributed Dataframe Like A Spark DF
4 participants