Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44398][CONNECT] Scala foreachBatch API #41969

Closed
wants to merge 7 commits into from

Conversation

rangadi
Copy link
Contributor

@rangadi rangadi commented Jul 12, 2023

This implements Scala foreachBatch(). The implementation basic and needs some more enhancements. The server side will be shared by Python implementation as well.

One notable hack in this PR is that it runs user's foreachBatch() with regular(legacy) DataFrame, rather than setting up remote Spark connect session and connect DataFrame.

Why are the changes needed?

Adds foreachBatch() support in Scala Spark Connect.

Does this PR introduce any user-facing change?

Yes. Adds foreachBatch() API

How was this patch tested?

  • A simple unit test.

@rangadi
Copy link
Contributor Author

rangadi commented Jul 12, 2023

cc: @bogao007

Copy link
Contributor

@bogao007 bogao007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, left some questions regarding to the hack.

* Handles setting up Scala remote session and other Spark Connect environment and then
* runs the provided foreachBatch function `fn`.
*
* HACK ALERT: This version does not atually set up Spark connect. Directly passes the DataFrame,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have 2 major questions regarding this:

  • Is the missing part about setting up a Spark Connect session and converting the legacy DataFrame to Spark Connect DataFrame and being executed inside the Spark Connect session? Do we have any Scala example on setting up Spark Connect session on server side and use it?
  • When is getDataFrameOrThrow() being called? Is it only needed for Python or do we need to get the DataFrame by ID inside the Spark Connect session for Scala?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Yes, it is about setting up spark remote session. I don't think there are examples of doing that in Scala.
  • Not sure about the second one. Usually df.sparkSession gives the access to session.

@rangadi rangadi marked this pull request as ready for review July 12, 2023 23:46
@rangadi
Copy link
Contributor Author

rangadi commented Jul 12, 2023

@zhenlineo could you review this? I am going to get the test working right.
We want to merge this ASAP as we are getting multiple streams of work done by 3.5 branch cut.

@xinrong-meng
Copy link
Member

Merged to master, thanks!

ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
This implements Scala foreachBatch(). The implementation basic and needs some more enhancements. The server side will be shared by Python implementation as well.

One notable hack in this PR is that it runs user's `foreachBatch()` with regular(legacy) DataFrame, rather than setting up remote Spark connect session and connect DataFrame.

### Why are the changes needed?
Adds foreachBatch() support in Scala Spark Connect.

### Does this PR introduce _any_ user-facing change?
Yes. Adds foreachBatch() API

### How was this patch tested?
- A simple unit test.

Closes apache#41969 from rangadi/feb-scala.

Authored-by: Raghu Angadi <raghu.angadi@databricks.com>
Signed-off-by: Xinrong Meng <xinrong@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants