Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API #47341

Closed
wants to merge 3 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jul 13, 2024

What changes were proposed in this pull request?

This PR is a retry of #47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes repartition(1). We actually don't need this when the input is single row as it creates only single partition:

@transient private lazy val rdd: RDD[InternalRow] = {
if (rows.isEmpty) {
sparkContext.emptyRDD
} else {
val numSlices = math.min(
unsafeRows.length, session.leafNodeDefaultParallelism)
sparkContext.parallelize(unsafeRows.toImmutableArraySeq, numSlices)
}
}

Why are the changes needed?

In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., #29063, #15813, #17255 and SPARK-19918.

Also, we remove repartition(1). To avoid unnecessary shuffle.

With repartition(1):

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6]
   +- LocalTableScan [_1#0]

Without repartition(1):

== Physical Plan ==
LocalTableScan [_1#2]

Does this PR introduce any user-facing change?

No.

How was this patch tested?

CI in this PR should verify the change

Was this patch authored or co-authored using generative AI tooling?

No.

…ocation with Dataframe read / write API""

This reverts commit cc32137.
@HyukjinKwon HyukjinKwon changed the title [SPARK-48883][ML][R][FOLLOW-UP] Avoid repartition when writing out the SparkR metadata [SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API Jul 13, 2024
@HyukjinKwon
Copy link
Member Author

@HyukjinKwon
Copy link
Member Author

BTW, SparkR does not have RDD API so it is guaranteed to have Spark session already running.

@@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, does it make sense to make spark.createDataFrame support numPartitions: Int like spark.range?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a discussion about this somewhere and ended up with not having this (because we want to hide the concept of partition in DataFrame in general. But thinking about this again, I think it's probably good to have. SparkR has it FWIW.

@dongjoon-hyun
Copy link
Member

Thank you, @HyukjinKwon and @zhengruifeng .

In the PR description, could you add specific JIRA issue links for the following ?

In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon
Copy link
Member Author

Addressed all 👍

@HyukjinKwon
Copy link
Member Author

Separated PR to #47347.

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

dongjoon-hyun pushed a commit that referenced this pull request Jul 16, 2024
### What changes were proposed in this pull request?

This PR proposes to remove `repartition(1)` when writing metadata in ML/MLlib. It already writes one file.

### Why are the changes needed?

In order to remove unnecessary shuffle, see also #47341

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests should verify them.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47347 from HyukjinKwon/SPARK-48896.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@HyukjinKwon
Copy link
Member Author

Merged to master.

WeichenXu123 added a commit that referenced this pull request Jul 22, 2024
…h Dataframe read / write API

### What changes were proposed in this pull request?

PysparkML: Replace RDD read / write API invocation with Dataframe read / write API

### Why are the changes needed?

Follow-up of #47341

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47411 from WeichenXu123/SPARK-48909-follow-up.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
### What changes were proposed in this pull request?

This PR proposes to remove `repartition(1)` when writing metadata in ML/MLlib. It already writes one file.

### Why are the changes needed?

In order to remove unnecessary shuffle, see also apache#47341

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests should verify them.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47347 from HyukjinKwon/SPARK-48896.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
…aframe read / write API

### What changes were proposed in this pull request?

This PR is a retry of apache#47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition:

https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57

### Why are the changes needed?

In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., apache#29063, apache#15813, apache#17255 and SPARK-19918.

Also, we remove `repartition(1)`. To avoid unnecessary shuffle.

With `repartition(1)`:

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6]
   +- LocalTableScan [_1#0]
```

Without `repartition(1)`:

```
== Physical Plan ==
LocalTableScan [_1#2]
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI in this PR should verify the change

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47341 from HyukjinKwon/SPARK-48883-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
…h Dataframe read / write API

### What changes were proposed in this pull request?

PysparkML: Replace RDD read / write API invocation with Dataframe read / write API

### Why are the changes needed?

Follow-up of apache#47341

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47411 from WeichenXu123/SPARK-48909-follow-up.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
### What changes were proposed in this pull request?

This PR proposes to remove `repartition(1)` when writing metadata in ML/MLlib. It already writes one file.

### Why are the changes needed?

In order to remove unnecessary shuffle, see also apache#47341

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests should verify them.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47347 from HyukjinKwon/SPARK-48896.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…aframe read / write API

### What changes were proposed in this pull request?

This PR is a retry of apache#47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition:

https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57

### Why are the changes needed?

In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., apache#29063, apache#15813, apache#17255 and SPARK-19918.

Also, we remove `repartition(1)`. To avoid unnecessary shuffle.

With `repartition(1)`:

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6]
   +- LocalTableScan [_1#0]
```

Without `repartition(1)`:

```
== Physical Plan ==
LocalTableScan [_1#2]
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI in this PR should verify the change

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47341 from HyukjinKwon/SPARK-48883-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…h Dataframe read / write API

### What changes were proposed in this pull request?

PysparkML: Replace RDD read / write API invocation with Dataframe read / write API

### Why are the changes needed?

Follow-up of apache#47341

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47411 from WeichenXu123/SPARK-48909-follow-up.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants