Skip to content

Commit

Permalink
[SPARK-48909][ML][MLLIB] Uses SparkSession over SparkContext when wri…
Browse files Browse the repository at this point in the history
…ting metadata

### What changes were proposed in this pull request?

This PR proposes to use SparkSession over SparkContext when writing metadata

### Why are the changes needed?

See apache#47347 (comment)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests should cover it.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47366 from HyukjinKwon/SPARK-48909.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
HyukjinKwon authored and jingz-db committed Jul 22, 2024
1 parent bb5267c commit a402f51
Show file tree
Hide file tree
Showing 17 changed files with 28 additions and 24 deletions.
8 changes: 6 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,10 @@ private[ml] object DefaultParamsWriter {
paramMap: Option[JValue] = None): Unit = {
val metadataPath = new Path(path, "metadata").toString
val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap)
sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Note that we should write single file. If there are more than one row
// it produces more partitions.
spark.createDataFrame(Seq(Tuple1(metadataJson))).write.text(metadataPath)
}

/**
Expand Down Expand Up @@ -585,7 +588,8 @@ private[ml] object DefaultParamsReader {
*/
def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = ""): Metadata = {
val metadataPath = new Path(path, "metadata").toString
val metadataStr = sc.textFile(metadataPath, 1).first()
val spark = SparkSession.getActiveSession.get
val metadataStr = spark.read.text(metadataPath).first().getString(0)
parseMetadata(metadataStr, expectedClassName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path))

// Create Parquet data.
spark.createDataFrame(Seq(data)).write.parquet(dataPath(path))
Expand Down Expand Up @@ -243,7 +243,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path))

// Create Parquet data.
spark.createDataFrame(Seq(data)).write.parquet(dataPath(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[classification] object GLMClassificationModel {
val metadata = compact(render(
("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> numFeatures) ~ ("numClasses" -> numClasses)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val data = Data(weights, intercept, threshold)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val data = getNodes(model.root).map(node => Data(node.index, node.size,
node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height,
Expand Down Expand Up @@ -215,7 +215,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val data = getNodes(model.root).map(node => Data(node.index, node.size,
node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height,
Expand Down Expand Up @@ -253,7 +253,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure)
~ ("trainingCost" -> model.trainingCost)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val data = getNodes(model.root).map(node => Data(node.index, node.size,
node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
// Create JSON metadata.
val metadata = compact(render
(("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("k" -> weights.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val dataArray = Array.tabulate(weights.length) { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object KMeansModel extends Loader[KMeansModel] {
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))
val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq)
.map { case (p, id) =>
Cluster(id, p.vector)
Expand Down Expand Up @@ -207,7 +207,7 @@ object KMeansModel extends Loader[KMeansModel] {
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("k" -> model.k) ~ ("distanceMeasure" -> model.distanceMeasure)
~ ("trainingCost" -> model.trainingCost)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))
val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq)
.map { case (p, id) =>
Cluster(id, p.vector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
("docConcentration" -> docConcentration.toArray.toImmutableArraySeq) ~
("topicConcentration" -> topicConcentration) ~
("gammaShape" -> gammaShape)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val topicsDenseMatrix = topicsMatrix.asBreeze.toDenseMatrix
val topics = Range(0, k).map { topicInd =>
Expand Down Expand Up @@ -869,7 +869,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
("topicConcentration" -> topicConcentration) ~
("iterationTimes" -> iterationTimes.toImmutableArraySeq) ~
("gammaShape" -> gammaShape)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString
spark.createDataFrame(Seq(Data(Vectors.fromBreeze(globalTopicTotals)))).write.parquet(newPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

spark.createDataFrame(model.assignments).write.parquet(Loader.dataPath(path))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] {

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val dataArray = Array.tabulate(model.selectedFeatures.length) { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
val metadata = compact(render(
("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~
("vectorSize" -> vectorSize) ~ ("numWords" -> numWords)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// We want to partition the model in partitions smaller than
// spark.kryoserializer.buffer.max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Get the type of item class
val sample = model.freqItemsets.first().items(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] {

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Get the type of item class
val sample = model.freqSequences.first().sequence(0)(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
import spark.implicits._
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path))
model.userFeatures.toDF("id", "features").write.parquet(userPath(path))
model.productFeatures.toDF("id", "features").write.parquet(productPath(path))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("isotonic" -> isotonic)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path))

spark.createDataFrame(
boundaries.toImmutableArraySeq.zip(predictions).map { case (b, p) => Data(b, p) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[regression] object GLMRegressionModel {
val metadata = compact(render(
("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> weights.size)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val data = Data(weights, intercept)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,15 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
}

// Create JSON metadata.
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("algo" -> model.algo.toString) ~ ("numNodes" -> model.numNodes)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val nodes = model.topNode.subtreeIterator.toSeq
val dataRDD = sc.parallelize(nodes).map(NodeData.apply(0, _))
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
spark.createDataFrame(dataRDD).write.parquet(Loader.dataPath(path))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ private[tree] object TreeEnsembleModel extends Logging {
val metadata = compact(render(
("class" -> className) ~ ("version" -> thisFormatVersion) ~
("metadata" -> Extraction.decompose(ensembleMetadata))))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val dataRDD = sc.parallelize(model.trees.zipWithIndex.toImmutableArraySeq)
Expand Down

0 comments on commit a402f51

Please sign in to comment.