Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe read / write API #47328

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a regression because SparkSession is heavier than SparkContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not a regression, we are using active spark session but not creating a new one.

Copy link
Contributor

@zhengruifeng zhengruifeng Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what ML scala using for model save and load:

private case class Data(coefficients: Vector, intercept: Double, scale: Double)
override protected def saveImpl(path: String): Unit = {
// Save metadata and Params
DefaultParamsWriter.saveMetadata(instance, path, sc)
// Save model data: coefficients, intercept, scale
val data = Data(instance.coefficients, instance.intercept, instance.scale)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
}
private class AFTSurvivalRegressionModelReader extends MLReader[AFTSurvivalRegressionModel] {
/** Checked against metadata when loading model */
private val className = classOf[AFTSurvivalRegressionModel].getName
override def load(path: String): AFTSurvivalRegressionModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath)
val Row(coefficients: Vector, intercept: Double, scale: Double) =
MLUtils.convertVectorColumnsToML(data, "coefficients")
.select("coefficients", "intercept", "scale")
.head()
val model = new AFTSurvivalRegressionModel(metadata.uid, coefficients, intercept, scale)
metadata.getAndSetParams(model)
model
}

for this R wrapper, I think it is fine to do it in the same way

Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid repartition?

Copy link
Contributor

@zhengruifeng zhengruifeng Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the single partition is needed to avoid creating empty files, even though there is only one row:

spark.range(1).select(sf.lit("123")).write.text("/tmp/test-test")
(base) ➜  runtime git:(master) ls /tmp/test-test
_SUCCESS                                                 part-00011-e548682e-21a5-41ea-93bc-0089efccbf4f-c000.txt
part-00000-e548682e-21a5-41ea-93bc-0089efccbf4f-c000.txt
(base) ➜  runtime git:(master) wc -l /tmp/test-test/*
       0 /tmp/test-test/_SUCCESS
       0 /tmp/test-test/part-00000-e548682e-21a5-41ea-93bc-0089efccbf4f-c000.txt
       1 /tmp/test-test/part-00011-e548682e-21a5-41ea-93bc-0089efccbf4f-c000.txt
       1 total

But I think we can use coalesce(1) to avoid this small shuffle

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is single row, and will produce one partition, see LocalTableScanExec.

spark.createDataFrame(Seq(Tuple1("a"))).write.text("/tmp/abc")

write single file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a followup: #47341

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see, this LocalTableScanExec ganrantees single row -> single partition.

It will be better if we can specify the numPartitions somewhere

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think so but removing it should work for now.


instance.pipeline.save(pipelinePath)
}
Expand All @@ -142,7 +144,8 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
7 changes: 5 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadata = ("class" -> instance.getClass.getName) ~
("ratingCol" -> instance.ratingCol)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.alsModel.save(modelPath)
}
Expand All @@ -107,7 +109,8 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val modelPath = new Path(path, "model").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val ratingCol = (rMetadata \ "ratingCol").extract[String]
val alsModel = ALSModel.load(modelPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -133,7 +135,8 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -144,7 +146,8 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -127,7 +129,8 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ private[r] object FMClassifierWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -164,7 +166,8 @@ private[r] object FMClassifierWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ private[r] object FMRegressorWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -145,7 +147,8 @@ private[r] object FMRegressorWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ private[r] object FPGrowthWrapper extends MLReadable[FPGrowthWrapper] {
"class" -> instance.getClass.getName
))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.fpGrowthModel.save(modelPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -151,7 +153,8 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] {
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -135,7 +137,8 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
("logLikelihood" -> instance.logLikelihood)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -126,7 +128,8 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val dim = (rMetadata \ "dim").extract[Int]
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ private[r] object GeneralizedLinearRegressionWrapper
("rAic" -> instance.rAic) ~
("rNumIterations" -> instance.rNumIterations)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -184,7 +186,8 @@ private[r] object GeneralizedLinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]]
val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ private[r] object IsotonicRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -112,7 +114,8 @@ private[r] object IsotonicRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -136,7 +138,8 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
Expand Down
7 changes: 5 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] {
("logPerplexity" -> instance.logPerplexity) ~
("vocabulary" -> instance.vocabulary.toList)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -211,7 +213,8 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
val logPerplexity = (rMetadata \ "logPerplexity").extract[Double]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ private[r] object LinearRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -140,7 +142,8 @@ private[r] object LinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ private[r] object LinearSVCWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -150,7 +152,8 @@ private[r] object LinearSVCWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ private[r] object LogisticRegressionWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -205,7 +207,8 @@ private[r] object LogisticRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ private[r] object MultilayerPerceptronClassifierWrapper

val rMetadata = "class" -> instance.getClass.getName
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] {
("labels" -> instance.labels.toImmutableArraySeq) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -115,7 +117,8 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadata = parse(rMetadataStr)
val labels = (rMetadata \ "labels").extract[Array[String]]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Loading