Skip to content

Commit

Permalink
[SPARK-48883][ML][R] Replace RDD read / write API invocation with Dat…
Browse files Browse the repository at this point in the history
…aframe read / write API

### What changes were proposed in this pull request?

This PR is a retry of apache#47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition:

https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57

### Why are the changes needed?

In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., apache#29063, apache#15813, apache#17255 and SPARK-19918.

Also, we remove `repartition(1)`. To avoid unnecessary shuffle.

With `repartition(1)`:

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6]
   +- LocalTableScan [_1#0]
```

Without `repartition(1)`:

```
== Physical Plan ==
LocalTableScan [_1#2]
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI in this PR should verify the change

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47341 from HyukjinKwon/SPARK-48883-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed Jul 16, 2024
1 parent 3755d51 commit c0f6db8
Show file tree
Hide file tree
Showing 23 changed files with 110 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -142,7 +144,8 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
7 changes: 5 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadata = ("class" -> instance.getClass.getName) ~
("ratingCol" -> instance.ratingCol)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.alsModel.save(modelPath)
}
Expand All @@ -107,7 +109,8 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val modelPath = new Path(path, "model").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val ratingCol = (rMetadata \ "ratingCol").extract[String]
val alsModel = ALSModel.load(modelPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -133,7 +135,8 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -144,7 +146,8 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -127,7 +129,8 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ private[r] object FMClassifierWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -164,7 +166,8 @@ private[r] object FMClassifierWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ private[r] object FMRegressorWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -145,7 +147,8 @@ private[r] object FMRegressorWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ private[r] object FPGrowthWrapper extends MLReadable[FPGrowthWrapper] {
val rMetadataJson: String = compact(render(
"class" -> instance.getClass.getName
))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.fpGrowthModel.save(modelPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -151,7 +153,8 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] {
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -135,7 +137,8 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
("logLikelihood" -> instance.logLikelihood)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -126,7 +128,8 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val dim = (rMetadata \ "dim").extract[Int]
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ private[r] object GeneralizedLinearRegressionWrapper
("rAic" -> instance.rAic) ~
("rNumIterations" -> instance.rNumIterations)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -184,7 +186,8 @@ private[r] object GeneralizedLinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]]
val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ private[r] object IsotonicRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -112,7 +114,8 @@ private[r] object IsotonicRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -136,7 +138,8 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
Expand Down
7 changes: 5 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] {
("logPerplexity" -> instance.logPerplexity) ~
("vocabulary" -> instance.vocabulary.toList)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -211,7 +213,8 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
val logPerplexity = (rMetadata \ "logPerplexity").extract[Double]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ private[r] object LinearRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -140,7 +142,8 @@ private[r] object LinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ private[r] object LinearSVCWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -150,7 +152,8 @@ private[r] object LinearSVCWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ private[r] object LogisticRegressionWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -205,7 +207,8 @@ private[r] object LogisticRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ private[r] object MultilayerPerceptronClassifierWrapper

val rMetadata = "class" -> instance.getClass.getName
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Loading

0 comments on commit c0f6db8

Please sign in to comment.