From 3ec85f9b6ac98150d5f90e85982e923977767ef1 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 16 Jul 2024 09:00:51 -0700 Subject: [PATCH] [SPARK-48909][ML][MLLIB] Uses SparkSession over SparkContext when writing metadata ### What changes were proposed in this pull request? This PR proposes to use SparkSession over SparkContext when writing metadata ### Why are the changes needed? See https://github.com/apache/spark/pull/47347#issuecomment-2229701812 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests should cover it. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47366 from HyukjinKwon/SPARK-48909. Authored-by: Hyukjin Kwon Signed-off-by: Dongjoon Hyun --- .../main/scala/org/apache/spark/ml/util/ReadWrite.scala | 8 ++++++-- .../apache/spark/mllib/classification/NaiveBayes.scala | 4 ++-- .../classification/impl/GLMClassificationModel.scala | 2 +- .../spark/mllib/clustering/BisectingKMeansModel.scala | 6 +++--- .../spark/mllib/clustering/GaussianMixtureModel.scala | 2 +- .../org/apache/spark/mllib/clustering/KMeansModel.scala | 4 ++-- .../org/apache/spark/mllib/clustering/LDAModel.scala | 4 ++-- .../spark/mllib/clustering/PowerIterationClustering.scala | 2 +- .../org/apache/spark/mllib/feature/ChiSqSelector.scala | 2 +- .../scala/org/apache/spark/mllib/feature/Word2Vec.scala | 2 +- .../main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala | 2 +- .../scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 2 +- .../mllib/recommendation/MatrixFactorizationModel.scala | 2 +- .../spark/mllib/regression/IsotonicRegression.scala | 2 +- .../spark/mllib/regression/impl/GLMRegressionModel.scala | 2 +- .../apache/spark/mllib/tree/model/DecisionTreeModel.scala | 4 ++-- .../spark/mllib/tree/model/treeEnsembleModels.scala | 2 +- 17 files changed, 28 insertions(+), 24 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala index 9b26d0a911aca..021595f76c247 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala @@ -411,7 +411,10 @@ private[ml] object DefaultParamsWriter { paramMap: Option[JValue] = None): Unit = { val metadataPath = new Path(path, "metadata").toString val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap) - sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath) + val spark = SparkSession.builder().sparkContext(sc).getOrCreate() + // Note that we should write single file. If there are more than one row + // it produces more partitions. + spark.createDataFrame(Seq(Tuple1(metadataJson))).write.text(metadataPath) } /** @@ -585,7 +588,8 @@ private[ml] object DefaultParamsReader { */ def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = ""): Metadata = { val metadataPath = new Path(path, "metadata").toString - val metadataStr = sc.textFile(metadataPath, 1).first() + val spark = SparkSession.getActiveSession.get + val metadataStr = spark.read.text(metadataPath).first().getString(0) parseMetadata(metadataStr, expectedClassName) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 3bc1d592f9898..e5b162e83c77b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -198,7 +198,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) // Create Parquet data. spark.createDataFrame(Seq(data)).write.parquet(dataPath(path)) @@ -243,7 +243,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) // Create Parquet data. spark.createDataFrame(Seq(data)).write.parquet(dataPath(path)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala index cb18a6003f7f1..439682797d032 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala @@ -57,7 +57,7 @@ private[classification] object GLMClassificationModel { val metadata = compact(render( ("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> numFeatures) ~ ("numClasses" -> numClasses))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val data = Data(weights, intercept, threshold) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala index 9f3aad9238979..083c3e3e77a9b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala @@ -179,7 +179,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, @@ -215,7 +215,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, @@ -253,7 +253,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure) ~ ("trainingCost" -> model.trainingCost))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 8982a8ca7c6c0..40a810a699ac1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -147,7 +147,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { // Create JSON metadata. val metadata = compact(render (("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("k" -> weights.length))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val dataArray = Array.tabulate(weights.length) { i => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 476df64581f7e..e5c0b27072d02 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -172,7 +172,7 @@ object KMeansModel extends Loader[KMeansModel] { val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq) .map { case (p, id) => Cluster(id, p.vector) @@ -207,7 +207,7 @@ object KMeansModel extends Loader[KMeansModel] { ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k) ~ ("distanceMeasure" -> model.distanceMeasure) ~ ("trainingCost" -> model.trainingCost))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq) .map { case (p, id) => Cluster(id, p.vector) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 831c7a9316fdc..10a81acede0c7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -460,7 +460,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] { ("docConcentration" -> docConcentration.toArray.toImmutableArraySeq) ~ ("topicConcentration" -> topicConcentration) ~ ("gammaShape" -> gammaShape))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val topicsDenseMatrix = topicsMatrix.asBreeze.toDenseMatrix val topics = Range(0, k).map { topicInd => @@ -869,7 +869,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { ("topicConcentration" -> topicConcentration) ~ ("iterationTimes" -> iterationTimes.toImmutableArraySeq) ~ ("gammaShape" -> gammaShape))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString spark.createDataFrame(Seq(Data(Vectors.fromBreeze(globalTopicTotals)))).write.parquet(newPath) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 9150bb305876b..639e762ef3c87 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -73,7 +73,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) spark.createDataFrame(model.assignments).write.parquet(Loader.dataPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index 41eb6567b845a..ed23df70c5771 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -136,7 +136,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val dataArray = Array.tabulate(model.selectedFeatures.length) { i => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 499dc09b86211..b5b2233ecb756 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -686,7 +686,7 @@ object Word2VecModel extends Loader[Word2VecModel] { val metadata = compact(render( ("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // We want to partition the model in partitions smaller than // spark.kryoserializer.buffer.max diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index 246146a831f83..fd21e1998ce70 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -108,7 +108,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Get the type of item class val sample = model.freqItemsets.first().items(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 3c648f34c6100..9c16ac2ecd526 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -655,7 +655,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Get the type of item class val sample = model.freqSequences.first().sequence(0)(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index bc888aecec0ab..fb9e8ac7c8920 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -387,7 +387,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { import spark.implicits._ val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) model.userFeatures.toDF("id", "features").write.parquet(userPath(path)) model.productFeatures.toDF("id", "features").write.parquet(productPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index e957d8ebd74ac..456580ffa5315 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -188,7 +188,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("isotonic" -> isotonic))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) spark.createDataFrame( boundaries.toImmutableArraySeq.zip(predictions).map { case (b, p) => Data(b, p) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala index bbc513f93b38a..b527797a2f2f9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala @@ -53,7 +53,7 @@ private[regression] object GLMRegressionModel { val metadata = compact(render( ("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> weights.size))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val data = Data(weights, intercept) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index 2f65dea0c4a89..b45211c1689c7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -226,15 +226,15 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { } // Create JSON metadata. + val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("algo" -> model.algo.toString) ~ ("numNodes" -> model.numNodes))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val nodes = model.topNode.subtreeIterator.toSeq val dataRDD = sc.parallelize(nodes).map(NodeData.apply(0, _)) - val spark = SparkSession.builder().sparkContext(sc).getOrCreate() spark.createDataFrame(dataRDD).write.parquet(Loader.dataPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index aa2287f3af896..7251dfd07a1fa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -430,7 +430,7 @@ private[tree] object TreeEnsembleModel extends Logging { val metadata = compact(render( ("class" -> className) ~ ("version" -> thisFormatVersion) ~ ("metadata" -> Extraction.decompose(ensembleMetadata)))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val dataRDD = sc.parallelize(model.trees.zipWithIndex.toImmutableArraySeq)