Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Jul 13, 2024
1 parent 9eedeaa commit dd07772
Show file tree
Hide file tree
Showing 22 changed files with 66 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
6 changes: 3 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadata = ("class" -> instance.getClass.getName) ~
("ratingCol" -> instance.ratingCol)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.alsModel.save(modelPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ private[r] object FMClassifierWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ private[r] object FMRegressorWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ private[r] object FPGrowthWrapper extends MLReadable[FPGrowthWrapper] {
val rMetadataJson: String = compact(render(
"class" -> instance.getClass.getName
))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.fpGrowthModel.save(modelPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] {
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
("logLikelihood" -> instance.logLikelihood)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ private[r] object GeneralizedLinearRegressionWrapper
("rAic" -> instance.rAic) ~
("rNumIterations" -> instance.rNumIterations)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ private[r] object IsotonicRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand Down
6 changes: 3 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] {
("logPerplexity" -> instance.logPerplexity) ~
("vocabulary" -> instance.vocabulary.toList)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ private[r] object LinearRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ private[r] object LinearSVCWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ private[r] object LogisticRegressionWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ private[r] object MultilayerPerceptronClassifierWrapper

val rMetadata = "class" -> instance.getClass.getName
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] {
("labels" -> instance.labels.toImmutableArraySeq) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ private[r] object RandomForestClassifierWrapper extends MLReadable[RandomForestC
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ private[r] object RandomForestRegressorWrapper extends MLReadable[RandomForestRe
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ private[ml] object DefaultParamsWriter {
val metadataPath = new Path(path, "metadata").toString
val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap)
val spark = SparkSession.getActiveSession.get
spark.createDataFrame(Seq(Tuple1(metadataJson))).repartition(1).write.text(metadataPath)
// Note that we should write single file. If there are more than one row
// it produces more partitions.
spark.createDataFrame(Seq(Tuple1(metadataJson))).write.text(metadataPath)
}

/**
Expand Down

0 comments on commit dd07772

Please sign in to comment.