Skip to content

Commit

Permalink
revert unnecessary changes
Browse files Browse the repository at this point in the history
  • Loading branch information
xupefei committed Oct 16, 2024
1 parent feb42dd commit 95f71e1
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 33 deletions.
7 changes: 2 additions & 5 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,10 @@ private[spark] object Utils
}

/**
* Run a segment of code using a different context class loader in the current thread.
*
* If `retainChange` is `true` and `fn` changed the context class loader during execution,
* the class loader will be not reverted to the original one when this method returns.
* Run a segment of code using a different context class loader in the current thread
*/
def withContextClassLoader[T](ctxClassLoader: ClassLoader)(fn: => T): T = {
val oldClassLoader = Thread.currentThread().getContextClassLoader
val oldClassLoader = Thread.currentThread().getContextClassLoader()
try {
Thread.currentThread().setContextClassLoader(ctxClassLoader)
fn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.{URI, URL, URLClassLoader}
import java.nio.ByteBuffer
import java.nio.file.{CopyOption, Files, Path, Paths, StandardCopyOption}
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicBoolean

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -88,28 +89,29 @@ class ArtifactManager(session: SparkSession) extends Logging {
}
}

private var shouldApplyClassLoader = false
private val shouldApplyClassLoader = new AtomicBoolean(false)

private var initialContextResourcesCopied = false

private def withClassLoaderIfNeeded[T](f: => T): T = {
if (replIsolated || shouldApplyClassLoader) {
logWarning("shouldApplyClassLoader is true")
val log = s" classloader for session ${session.sessionUUID} because " +
s"replIsolated=$replIsolated and shouldApplyClassLoader=${shouldApplyClassLoader.get()}."
if (replIsolated || shouldApplyClassLoader.get()) {
logDebug(s"Applying $log")
Utils.withContextClassLoader(classloader) {
f
}
} else {
logDebug(s"Not applying $log")
f
}
}

def withResources[T](f: => T): T = {
withClassLoaderIfNeeded {
JobArtifactSet.withActiveJobArtifactState(state) {
// Copy over global initial resources to this session. Often used by spark-submit.
copyInitialContextResourcesIfNeeded()
f
}
def withResources[T](f: => T): T = withClassLoaderIfNeeded {
JobArtifactSet.withActiveJobArtifactState(state) {
// Copy over global initial resources to this session. Often used by spark-submit.
copyInitialContextResourcesIfNeeded()
f
}
}

Expand Down Expand Up @@ -224,7 +226,7 @@ class ArtifactManager(session: SparkSession) extends Logging {
allowOverwrite = true,
deleteSource = deleteStagedFile)

shouldApplyClassLoader = true
shouldApplyClassLoader.set(true)
} else {
val target = ArtifactUtils.concatenatePaths(artifactPath, normalizedRemoteRelativePath)
// Disallow overwriting with modified version
Expand All @@ -249,7 +251,7 @@ class ArtifactManager(session: SparkSession) extends Logging {
(SparkContextResourceType.JAR, normalizedRemoteRelativePath, fragment))
jarsList.add(normalizedRemoteRelativePath)

shouldApplyClassLoader = true
shouldApplyClassLoader.set(true)
} else if (normalizedRemoteRelativePath.startsWith(s"pyfiles${File.separator}")) {
session.sparkContext.addFile(uri)
sparkContextRelativePaths.add(
Expand Down Expand Up @@ -385,7 +387,7 @@ class ArtifactManager(session: SparkSession) extends Logging {
newArtifactManager.cachedBlockIdList.addAll(newBlockIds.asJava)
newArtifactManager.jarsList.addAll(jarsList)
newArtifactManager.pythonIncludeList.addAll(pythonIncludeList)
newArtifactManager.shouldApplyClassLoader = shouldApplyClassLoader
newArtifactManager.shouldApplyClassLoader.set(shouldApplyClassLoader.get())
newArtifactManager
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class DatasetOptimizationSuite extends QueryTest with SharedSparkSession {
createDataset().collect()
// codegen cache should work for Datasets of same type.
val count3 = getCodegenCount()
assert((count2 to (count2 + 5)) contains count3) // allow some variance
assert(count3 == count2)
}

withClue("array type") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,6 @@ class HiveUDFDynamicLoadSuite extends QueryTest with SQLTestUtils with TestHiveS
)
).toImmutableArraySeq


private def assertClassLoaderContains(cl: ClassLoader, target: ClassLoader): Unit = {
var found = false
var current = cl
while (current != null && !found) {
found = current eq target
current = current.getParent
}
assert(found, s"$cl should contain $target")
}

udfTestInfos.foreach { udfInfo =>
// The test jars are built from below commit:
// https://github.com/HeartSaVioR/hive/commit/12f3f036b6efd0299cd1d457c0c0a65e0fd7e5f2
Expand All @@ -168,15 +157,15 @@ class HiveUDFDynamicLoadSuite extends QueryTest with SQLTestUtils with TestHiveS

sql(s"CREATE FUNCTION ${udfInfo.funcName} AS '${udfInfo.className}' USING JAR '$jarUrl'")

assertClassLoaderContains(Thread.currentThread().getContextClassLoader, sparkClassLoader)
assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader)

// JAR will be loaded at first usage, and it will change the current thread's
// context classloader to jar classloader in sharedState.
// See SessionState.addJar for details.
udfInfo.fnVerifyQuery()

assertClassLoaderContains(
Thread.currentThread().getContextClassLoader,
assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader)
assert(Thread.currentThread().getContextClassLoader eq
spark.sharedState.jarClassLoader)

val udfExpr = udfInfo.fnCreateHiveUDFExpression()
Expand Down

0 comments on commit 95f71e1

Please sign in to comment.