Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48970][PYTHON][ML] Avoid using SparkSession.getActiveSession in spark ML reader/writer #47453

Closed
wants to merge 4 commits into from

Conversation

WeichenXu123
Copy link
Contributor

What changes were proposed in this pull request?

SparkSession.getActiveSession is thread-local session, but spark ML reader / writer might be executed in different threads which causes SparkSession.getActiveSession returning None.

Why are the changes needed?

It fixes the bug like:

        spark = SparkSession.getActiveSession()
>       spark.createDataFrame(  # type: ignore[union-attr]
            [(metadataJson,)], schema=["value"]
        ).coalesce(1).write.text(metadataPath)
E       AttributeError: 'NoneType' object has no attribute 'createDataFrame'

Does this PR introduce any user-facing change?

No

How was this patch tested?

Manually.

Was this patch authored or co-authored using generative AI tooling?

No.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@HyukjinKwon HyukjinKwon changed the title [SPARK-48970] Avoid using SparkSession.getActiveSession in spark ML reader/writer [SPARK-48970][PYTHON][ML] Avoid using SparkSession.getActiveSession in spark ML reader/writer Jul 23, 2024
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@WeichenXu123
Copy link
Contributor Author

merged to master.

@@ -588,7 +588,7 @@ private[ml] object DefaultParamsReader {
*/
def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = ""): Metadata = {
val metadataPath = new Path(path, "metadata").toString
val spark = SparkSession.getActiveSession.get
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @WeichenXu123 , @HyukjinKwon , @zhengruifeng .

This sounds like a regression of

If we cannot get an existing one, I believe we should not create SparkSession here.

Can we recover the existing code?

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not be a regression. This is Spark ML which is DataFrame-based MLlib by definition. Therefore we should always have default session running. Active session is specific to a thread, so it might not exist within the same thread. Alternatively we could use SparkSession.getDefaultSession.

spark.createDataFrame( # type: ignore[union-attr]
[(metadataJson,)], schema=["value"]
).coalesce(1).write.text(metadataPath)
spark = SparkSession._getActiveSessionOrCreate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@@ -580,8 +580,8 @@ def loadMetadata(path: str, sc: "SparkContext", expectedClassName: str = "") ->
If non empty, this is checked against the loaded metadata.
"""
metadataPath = os.path.join(path, "metadata")
spark = SparkSession.getActiveSession()
metadataStr = spark.read.text(metadataPath).first()[0] # type: ignore[union-attr,index]
spark = SparkSession._getActiveSessionOrCreate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@dongjoon-hyun
Copy link
Member

Initially, the existing PRs assumes that there is no regression because we use the active sessions. AFAIK, this assumption was the same in the dev mailing discussion .

https://lists.apache.org/thread/s24lqtmno0xtoxxz6pk6tyn726bfwp8q

Is this regression inevitable, @HyukjinKwon ?

  • If then, could you add a documentation that ML module starts to use SparkSession always instead of SparkContext?
  • If that is the module's changed minimum requirement, we don't need to discuss this topic again.

@dongjoon-hyun
Copy link
Member

I replied on the existing thread.

@HyukjinKwon
Copy link
Member

There is no regression. This is Spark ML which is DataFrame-based MLlib. There should be a running Spark session always.

@zhengruifeng
Copy link
Contributor

@dongjoon-hyun
DefaultParamsReader.loadMetadata is only used to load the metadata of ml models, let me take LogisticRegressionModel as an example:

private class LogisticRegressionModelReader extends MLReader[LogisticRegressionModel] {
/** Checked against metadata when loading model */
private val className = classOf[LogisticRegressionModel].getName
override def load(path: String): LogisticRegressionModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val (major, minor) = VersionUtils.majorMinorVersion(metadata.sparkVersion)
val dataPath = new Path(path, "data").toString
val data = sparkSession.read.format("parquet").load(dataPath)
val model = if (major < 2 || (major == 2 && minor == 0)) {
// 2.0 and before
val Row(numClasses: Int, numFeatures: Int, intercept: Double, coefficients: Vector) =
MLUtils.convertVectorColumnsToML(data, "coefficients")
.select("numClasses", "numFeatures", "intercept", "coefficients")
.head()
val coefficientMatrix =
new DenseMatrix(1, coefficients.size, coefficients.toArray, isTransposed = true)
val interceptVector = Vectors.dense(intercept)
new LogisticRegressionModel(metadata.uid, coefficientMatrix,
interceptVector, numClasses, isMultinomial = false)
} else {
// 2.1+
val Row(numClasses: Int, numFeatures: Int, interceptVector: Vector,
coefficientMatrix: Matrix, isMultinomial: Boolean) = data
.select("numClasses", "numFeatures", "interceptVector", "coefficientMatrix",
"isMultinomial").head()
new LogisticRegressionModel(metadata.uid, coefficientMatrix, interceptVector,
numClasses, isMultinomial)
}
metadata.getAndSetParams(model)
model
}
}

val metadata = DefaultParamsReader.loadMetadata(path, sc, className)

loads the metadata

val data = sparkSession.read.format("parquet").load(dataPath)

then loads the model coefficients, you can see the sparkSession is already avaiable for model loading.

@zhengruifeng
Copy link
Contributor

I think probably we can change the signature of

def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = ""): Metadata

to

def loadMetadata(path: String, spark: SparkSession, expectedClassName: String = ""): Metadata

to avoid such confusion.

I will have a try

@dongjoon-hyun
Copy link
Member

Thank you, @HyukjinKwon and @zhengruifeng . I'm +1 for both to have a clear semantic.

  1. Using SparkSession.getDefaultSession instead of *OrCreate.

Alternatively we could use SparkSession.getDefaultSession.

  1. Having a clear semantic, def loadMetadata(path: String, spark: SparkSession, expectedClassName: String = ""): Metadata.

@dongjoon-hyun
Copy link
Member

For the record and the other reviewers, (2) is implemented and merged to Apache Spark 4.0.0.

ilicmarkodb pushed a commit to ilicmarkodb/spark that referenced this pull request Jul 29, 2024
…n spark ML reader/writer

### What changes were proposed in this pull request?

`SparkSession.getActiveSession` is thread-local session, but spark ML reader / writer might be executed in different threads which causes `SparkSession.getActiveSession` returning None.

### Why are the changes needed?

It fixes the bug like:
```
        spark = SparkSession.getActiveSession()
>       spark.createDataFrame(  # type: ignore[union-attr]
            [(metadataJson,)], schema=["value"]
        ).coalesce(1).write.text(metadataPath)
E       AttributeError: 'NoneType' object has no attribute 'createDataFrame'
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47453 from WeichenXu123/SPARK-48970.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…n spark ML reader/writer

### What changes were proposed in this pull request?

`SparkSession.getActiveSession` is thread-local session, but spark ML reader / writer might be executed in different threads which causes `SparkSession.getActiveSession` returning None.

### Why are the changes needed?

It fixes the bug like:
```
        spark = SparkSession.getActiveSession()
>       spark.createDataFrame(  # type: ignore[union-attr]
            [(metadataJson,)], schema=["value"]
        ).coalesce(1).write.text(metadataPath)
E       AttributeError: 'NoneType' object has no attribute 'createDataFrame'
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47453 from WeichenXu123/SPARK-48970.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants