Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API #47341

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, does it make sense to make spark.createDataFrame support numPartitions: Int like spark.range?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a discussion about this somewhere and ended up with not having this (because we want to hide the concept of partition in DataFrame in general. But thinking about this again, I think it's probably good to have. SparkR has it FWIW.

// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -142,7 +144,8 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
7 changes: 5 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadata = ("class" -> instance.getClass.getName) ~
("ratingCol" -> instance.ratingCol)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.alsModel.save(modelPath)
}
Expand All @@ -107,7 +109,8 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val modelPath = new Path(path, "model").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val ratingCol = (rMetadata \ "ratingCol").extract[String]
val alsModel = ALSModel.load(modelPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -133,7 +135,8 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -144,7 +146,8 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -127,7 +129,8 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ private[r] object FMClassifierWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -164,7 +166,8 @@ private[r] object FMClassifierWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ private[r] object FMRegressorWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -145,7 +147,8 @@ private[r] object FMRegressorWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ private[r] object FPGrowthWrapper extends MLReadable[FPGrowthWrapper] {
val rMetadataJson: String = compact(render(
"class" -> instance.getClass.getName
))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.fpGrowthModel.save(modelPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -151,7 +153,8 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] {
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -135,7 +137,8 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
("logLikelihood" -> instance.logLikelihood)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -126,7 +128,8 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val dim = (rMetadata \ "dim").extract[Int]
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ private[r] object GeneralizedLinearRegressionWrapper
("rAic" -> instance.rAic) ~
("rNumIterations" -> instance.rNumIterations)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -184,7 +186,8 @@ private[r] object GeneralizedLinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]]
val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ private[r] object IsotonicRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -112,7 +114,8 @@ private[r] object IsotonicRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -136,7 +138,8 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
Expand Down
7 changes: 5 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] {
("logPerplexity" -> instance.logPerplexity) ~
("vocabulary" -> instance.vocabulary.toList)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -211,7 +213,8 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
val logPerplexity = (rMetadata \ "logPerplexity").extract[Double]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ private[r] object LinearRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -140,7 +142,8 @@ private[r] object LinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ private[r] object LinearSVCWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -150,7 +152,8 @@ private[r] object LinearSVCWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ private[r] object LogisticRegressionWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -205,7 +207,8 @@ private[r] object LogisticRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ private[r] object MultilayerPerceptronClassifierWrapper

val rMetadata = "class" -> instance.getClass.getName
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Loading