From c0f6db83db7a1007475e30818c86fcc33205647d Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 17 Jul 2024 08:56:10 +0900 Subject: [PATCH] [SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API ### What changes were proposed in this pull request? This PR is a retry of https://github.com/apache/spark/pull/47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition: https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57 ### Why are the changes needed? In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., https://github.com/apache/spark/pull/29063, https://github.com/apache/spark/pull/15813, https://github.com/apache/spark/pull/17255 and SPARK-19918. Also, we remove `repartition(1)`. To avoid unnecessary shuffle. With `repartition(1)`: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6] +- LocalTableScan [_1#0] ``` Without `repartition(1)`: ``` == Physical Plan == LocalTableScan [_1#2] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should verify the change ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47341 from HyukjinKwon/SPARK-48883-followup. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala | 7 +++++-- .../src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala | 7 +++++-- .../org/apache/spark/ml/r/BisectingKMeansWrapper.scala | 7 +++++-- .../apache/spark/ml/r/DecisionTreeClassifierWrapper.scala | 7 +++++-- .../apache/spark/ml/r/DecisionTreeRegressorWrapper.scala | 7 +++++-- .../scala/org/apache/spark/ml/r/FMClassifierWrapper.scala | 7 +++++-- .../scala/org/apache/spark/ml/r/FMRegressorWrapper.scala | 7 +++++-- .../scala/org/apache/spark/ml/r/FPGrowthWrapper.scala | 5 +++-- .../org/apache/spark/ml/r/GBTClassifierWrapper.scala | 7 +++++-- .../scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala | 7 +++++-- .../org/apache/spark/ml/r/GaussianMixtureWrapper.scala | 7 +++++-- .../spark/ml/r/GeneralizedLinearRegressionWrapper.scala | 7 +++++-- .../org/apache/spark/ml/r/IsotonicRegressionWrapper.scala | 7 +++++-- .../main/scala/org/apache/spark/ml/r/KMeansWrapper.scala | 7 +++++-- .../src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala | 7 +++++-- .../org/apache/spark/ml/r/LinearRegressionWrapper.scala | 7 +++++-- .../scala/org/apache/spark/ml/r/LinearSVCWrapper.scala | 7 +++++-- .../org/apache/spark/ml/r/LogisticRegressionWrapper.scala | 7 +++++-- .../ml/r/MultilayerPerceptronClassifierWrapper.scala | 4 +++- .../scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala | 7 +++++-- .../src/main/scala/org/apache/spark/ml/r/RWrappers.scala | 3 ++- .../apache/spark/ml/r/RandomForestClassifierWrapper.scala | 8 ++++++-- .../apache/spark/ml/r/RandomForestRegressorWrapper.scala | 8 ++++++-- 23 files changed, 110 insertions(+), 44 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala index 7eef3ced422e6..4ad9ebbe36beb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala @@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -142,7 +144,8 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala index 125cdf7259fef..2b204a0470d09 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala @@ -94,7 +94,9 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] { val rMetadata = ("class" -> instance.getClass.getName) ~ ("ratingCol" -> instance.ratingCol) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.alsModel.save(modelPath) } @@ -107,7 +109,8 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] { val rMetadataPath = new Path(path, "rMetadata").toString val modelPath = new Path(path, "model").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val ratingCol = (rMetadata \ "ratingCol").extract[String] val alsModel = ALSModel.load(modelPath) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala index d4486f1b80a10..4daf0f27546b5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala @@ -120,7 +120,9 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp ("size" -> instance.size.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -133,7 +135,8 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] val size = (rMetadata \ "size").extract[Array[Long]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala index 992a0c18819fc..12e824c0fdae4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala @@ -131,7 +131,9 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -144,7 +146,8 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala index db421b5a1875e..48342fc471410 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala @@ -114,7 +114,9 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -127,7 +129,8 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala index 635af0563da0d..7e3c7ab5f2feb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala @@ -151,7 +151,9 @@ private[r] object FMClassifierWrapper ("features" -> instance.features.toImmutableArraySeq) ~ ("labels" -> instance.labels.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -164,7 +166,8 @@ private[r] object FMClassifierWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] val labels = (rMetadata \ "labels").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala index b036a1d102d97..60792dadec5e6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala @@ -132,7 +132,9 @@ private[r] object FMRegressorWrapper val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -145,7 +147,8 @@ private[r] object FMRegressorWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala index b8151d8d90702..86c11eadf8ac4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala @@ -77,8 +77,9 @@ private[r] object FPGrowthWrapper extends MLReadable[FPGrowthWrapper] { val rMetadataJson: String = compact(render( "class" -> instance.getClass.getName )) - - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.fpGrowthModel.save(modelPath) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala index 777191ef5e5c6..5bf021ca3bd4a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala @@ -138,7 +138,9 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper] ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + sparkSession.createDataFrame( + Seq(Tuple1(rMetadataJson)) + ).repartition(1).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -151,7 +153,8 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper] val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala index 6e5ca47fabae6..575ae24582228 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala @@ -122,7 +122,9 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] { ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -135,7 +137,8 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] { val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala index 9a98a8b18b141..dd6e91e891d66 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala @@ -113,7 +113,9 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp ("logLikelihood" -> instance.logLikelihood) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -126,7 +128,8 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val dim = (rMetadata \ "dim").extract[Int] val logLikelihood = (rMetadata \ "logLikelihood").extract[Double] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala index 60cf0631f91de..778af00acc254 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala @@ -170,7 +170,9 @@ private[r] object GeneralizedLinearRegressionWrapper ("rAic" -> instance.rAic) ~ ("rNumIterations" -> instance.rNumIterations) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -184,7 +186,8 @@ private[r] object GeneralizedLinearRegressionWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]] val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala index d4a3adea460fa..c8236d7a2a469 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala @@ -99,7 +99,9 @@ private[r] object IsotonicRegressionWrapper val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -112,7 +114,8 @@ private[r] object IsotonicRegressionWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala index 78c9a15aac597..063caaee0302b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala @@ -123,7 +123,9 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] { ("size" -> instance.size.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -136,7 +138,8 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] { val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] val size = (rMetadata \ "size").extract[Array[Long]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala index 943c38178d6f0..cfcd4a85ab27b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala @@ -198,7 +198,9 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] { ("logPerplexity" -> instance.logPerplexity) ~ ("vocabulary" -> instance.vocabulary.toList) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -211,7 +213,8 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] { val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val logLikelihood = (rMetadata \ "logLikelihood").extract[Double] val logPerplexity = (rMetadata \ "logPerplexity").extract[Double] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala index 96b00fab7e344..ee86c55486e6e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala @@ -127,7 +127,9 @@ private[r] object LinearRegressionWrapper val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -140,7 +142,8 @@ private[r] object LinearRegressionWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala index 3645af3e53115..69e4a8ec22632 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala @@ -137,7 +137,9 @@ private[r] object LinearSVCWrapper ("features" -> instance.features.toImmutableArraySeq) ~ ("labels" -> instance.labels.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -150,7 +152,8 @@ private[r] object LinearSVCWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] val labels = (rMetadata \ "labels").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala index cac3d0609b209..ff7fd6e427295 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala @@ -192,7 +192,9 @@ private[r] object LogisticRegressionWrapper ("features" -> instance.features.toImmutableArraySeq) ~ ("labels" -> instance.labels.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -205,7 +207,8 @@ private[r] object LogisticRegressionWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] val labels = (rMetadata \ "labels").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala index 96c588acc1406..e5a6a0f0853b6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala @@ -142,7 +142,9 @@ private[r] object MultilayerPerceptronClassifierWrapper val rMetadata = "class" -> instance.getClass.getName val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala index d5e8e0ef4890a..bd9905d19aed3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala @@ -102,7 +102,9 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] { ("labels" -> instance.labels.toImmutableArraySeq) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -115,7 +117,8 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] { val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val labels = (rMetadata \ "labels").extract[Array[String]] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala index 551c7514ee85f..3a7539e0937fe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala @@ -33,7 +33,8 @@ private[r] object RWrappers extends MLReader[Object] { override def load(path: String): Object = { implicit val format = DefaultFormats val rMetadataPath = new Path(path, "rMetadata").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val className = (rMetadata \ "class").extract[String] className match { diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala index 7c4175a6c5914..1c1bd046de62f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala @@ -141,7 +141,10 @@ private[r] object RandomForestClassifierWrapper extends MLReadable[RandomForestC ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) + instance.pipeline.save(pipelinePath) } } @@ -154,7 +157,8 @@ private[r] object RandomForestClassifierWrapper extends MLReadable[RandomForestC val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala index 911571cac77de..700989e34e450 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala @@ -124,7 +124,10 @@ private[r] object RandomForestRegressorWrapper extends MLReadable[RandomForestRe ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) + instance.pipeline.save(pipelinePath) } } @@ -137,7 +140,8 @@ private[r] object RandomForestRegressorWrapper extends MLReadable[RandomForestRe val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]]