Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API #47328

Closed
wants to merge 1 commit into from

Conversation

WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Jul 12, 2024

What changes were proposed in this pull request?

Replace RDD read / write API invocation with Dataframe read / write API

Why are the changes needed?

In certain storage configuration cases, RDD read / write API has some issue for certain storage types that requires the account key, but Dataframe read / write API works.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

No.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@zhengruifeng zhengruifeng changed the title [SPARK-48883][ML] Replace RDD read / write API invocation with Dataframe read / write API [SPARK-48883][ML][R} Replace RDD read / write API invocation with Dataframe read / write API Jul 12, 2024
@zhengruifeng zhengruifeng changed the title [SPARK-48883][ML][R} Replace RDD read / write API invocation with Dataframe read / write API [SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API Jul 12, 2024
@WeichenXu123 WeichenXu123 deleted the ml-df-writer-save-2 branch July 12, 2024 14:21
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @WeichenXu123 and @zhengruifeng . This is Apache Software Foundation repository. Please avoid to mention a specific commercial vender in the PR description and commit logs.

In databricks runtime, RDD read / write API has some issue for certain storage types that requires the account key, but Dataframe read / write API works.

@@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a regression because SparkSession is heavier than SparkContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not a regression, we are using active spark session but not creating a new one.

Copy link
Contributor

@zhengruifeng zhengruifeng Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what ML scala using for model save and load:

private case class Data(coefficients: Vector, intercept: Double, scale: Double)
override protected def saveImpl(path: String): Unit = {
// Save metadata and Params
DefaultParamsWriter.saveMetadata(instance, path, sc)
// Save model data: coefficients, intercept, scale
val data = Data(instance.coefficients, instance.intercept, instance.scale)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
}
private class AFTSurvivalRegressionModelReader extends MLReader[AFTSurvivalRegressionModel] {
/** Checked against metadata when loading model */
private val className = classOf[AFTSurvivalRegressionModel].getName
override def load(path: String): AFTSurvivalRegressionModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath)
val Row(coefficients: Vector, intercept: Double, scale: Double) =
MLUtils.convertVectorColumnsToML(data, "coefficients")
.select("coefficients", "intercept", "scale")
.head()
val model = new AFTSurvivalRegressionModel(metadata.uid, coefficients, intercept, scale)
metadata.getAndSetParams(model)
model
}

for this R wrapper, I think it is fine to do it in the same way

@WeichenXu123
Copy link
Contributor Author

Hi, @WeichenXu123 and @zhengruifeng . This is Apache Software Foundation repository. Please avoid to mention a specific commercial vender in the PR description and commit logs.

In databricks runtime, RDD read / write API has some issue for certain storage types that requires the account key, but Dataframe read / write API works.

Oh my bad. I updated PR description. Thanks for your reminder.

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid repartition?

Copy link
Contributor

@zhengruifeng zhengruifeng Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the single partition is needed to avoid creating empty files, even though there is only one row:

spark.range(1).select(sf.lit("123")).write.text("/tmp/test-test")
(base) ➜  runtime git:(master) ls /tmp/test-test
_SUCCESS                                                 part-00011-e548682e-21a5-41ea-93bc-0089efccbf4f-c000.txt
part-00000-e548682e-21a5-41ea-93bc-0089efccbf4f-c000.txt
(base) ➜  runtime git:(master) wc -l /tmp/test-test/*
       0 /tmp/test-test/_SUCCESS
       0 /tmp/test-test/part-00000-e548682e-21a5-41ea-93bc-0089efccbf4f-c000.txt
       1 /tmp/test-test/part-00011-e548682e-21a5-41ea-93bc-0089efccbf4f-c000.txt
       1 total

But I think we can use coalesce(1) to avoid this small shuffle

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is single row, and will produce one partition, see LocalTableScanExec.

spark.createDataFrame(Seq(Tuple1("a"))).write.text("/tmp/abc")

write single file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a followup: #47341

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see, this LocalTableScanExec ganrantees single row -> single partition.

It will be better if we can specify the numPartitions somewhere

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think so but removing it should work for now.

@HyukjinKwon
Copy link
Member

Reverted per the dev mailing list discussion

@dongjoon-hyun
Copy link
Member

Thank you for reverting this, Hyukjin.

HyukjinKwon added a commit that referenced this pull request Jul 16, 2024
…aframe read / write API

### What changes were proposed in this pull request?

This PR is a retry of #47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition:

https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57

### Why are the changes needed?

In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., #29063, #15813, #17255 and SPARK-19918.

Also, we remove `repartition(1)`. To avoid unnecessary shuffle.

With `repartition(1)`:

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6]
   +- LocalTableScan [_1#0]
```

Without `repartition(1)`:

```
== Physical Plan ==
LocalTableScan [_1#2]
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI in this PR should verify the change

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47341 from HyukjinKwon/SPARK-48883-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
…aframe read / write API

### What changes were proposed in this pull request?

Replace RDD read / write API invocation with Dataframe read / write API

### Why are the changes needed?

In databricks runtime, RDD read / write API has some issue for certain storage types that requires the account key, but Dataframe read / write API works.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47328 from WeichenXu123/ml-df-writer-save-2.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
…aframe read / write API

### What changes were proposed in this pull request?

This PR is a retry of apache#47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition:

https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57

### Why are the changes needed?

In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., apache#29063, apache#15813, apache#17255 and SPARK-19918.

Also, we remove `repartition(1)`. To avoid unnecessary shuffle.

With `repartition(1)`:

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6]
   +- LocalTableScan [_1#0]
```

Without `repartition(1)`:

```
== Physical Plan ==
LocalTableScan [_1#2]
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI in this PR should verify the change

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47341 from HyukjinKwon/SPARK-48883-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…aframe read / write API

### What changes were proposed in this pull request?

Replace RDD read / write API invocation with Dataframe read / write API

### Why are the changes needed?

In databricks runtime, RDD read / write API has some issue for certain storage types that requires the account key, but Dataframe read / write API works.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47328 from WeichenXu123/ml-df-writer-save-2.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…aframe read / write API

### What changes were proposed in this pull request?

This PR is a retry of apache#47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition:

https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57

### Why are the changes needed?

In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., apache#29063, apache#15813, apache#17255 and SPARK-19918.

Also, we remove `repartition(1)`. To avoid unnecessary shuffle.

With `repartition(1)`:

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6]
   +- LocalTableScan [_1#0]
```

Without `repartition(1)`:

```
== Physical Plan ==
LocalTableScan [_1#2]
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI in this PR should verify the change

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47341 from HyukjinKwon/SPARK-48883-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants