Skip to content

Commit

Permalink
[SPARK-48941][PYTHON][ML] Replace RDD read / write API invocation wit…
Browse files Browse the repository at this point in the history
…h Dataframe read / write API

### What changes were proposed in this pull request?

PysparkML: Replace RDD read / write API invocation with Dataframe read / write API

### Why are the changes needed?

Follow-up of apache#47341

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47411 from WeichenXu123/SPARK-48909-follow-up.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
  • Loading branch information
WeichenXu123 authored and jingz-db committed Jul 22, 2024
1 parent 0fd23bf commit 20064b8
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions python/pyspark/ml/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,10 @@ def saveMetadata(
metadataJson = DefaultParamsWriter._get_metadata_to_save(
instance, sc, extraMetadata, paramMap
)
sc.parallelize([metadataJson], 1).saveAsTextFile(metadataPath)
spark = SparkSession.getActiveSession()
spark.createDataFrame( # type: ignore[union-attr]
[(metadataJson,)], schema=["value"]
).coalesce(1).write.text(metadataPath)

@staticmethod
def _get_metadata_to_save(
Expand Down Expand Up @@ -577,7 +580,8 @@ def loadMetadata(path: str, sc: "SparkContext", expectedClassName: str = "") ->
If non empty, this is checked against the loaded metadata.
"""
metadataPath = os.path.join(path, "metadata")
metadataStr = sc.textFile(metadataPath, 1).first()
spark = SparkSession.getActiveSession()
metadataStr = spark.read.text(metadataPath).first()[0] # type: ignore[union-attr,index]
loadedVals = DefaultParamsReader._parseMetaData(metadataStr, expectedClassName)
return loadedVals

Expand Down

0 comments on commit 20064b8

Please sign in to comment.