Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47092][CORE][SQL][K8S] Add getUriBuilder to o.a.s.u.Utils and use it #45168

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.net.URI
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import javax.ws.rs.core.UriBuilder

import scala.collection.Map
import scala.collection.concurrent.{Map => ScalaConcurrentMap}
Expand Down Expand Up @@ -1829,12 +1828,12 @@ class SparkContext(config: SparkConf) extends Logging {
addedArchives
.getOrElseUpdate(jobArtifactUUID, new ConcurrentHashMap[String, Long]().asScala)
.putIfAbsent(
UriBuilder.fromUri(new URI(key)).fragment(uri.getFragment).build().toString,
Utils.getUriBuilder(new URI(key)).fragment(uri.getFragment).build().toString,
timestamp).isEmpty) {
logInfo(s"Added archive $path at $key with timestamp $timestamp")
// If the scheme is file, use URI to simply copy instead of downloading.
val uriToUse = if (!isLocal && scheme == "file") uri else new URI(key)
val uriToDownload = UriBuilder.fromUri(uriToUse).fragment(null).build()
val uriToDownload = Utils.getUriBuilder(uriToUse).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
hadoopConfiguration, timestamp, useCache = false, shouldUntar = false)
val dest = new File(
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.nio.file.Files
import java.security.PrivilegedExceptionAction
import java.util.ServiceLoader
import java.util.jar.JarInputStream
import javax.ws.rs.core.UriBuilder

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -409,7 +408,7 @@ private[spark] class SparkSubmit extends Logging {
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
val localResources = downloadFileList(
resolvedUris.map(
UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
Utils.getUriBuilder(_).fragment(null).build().toString).mkString(","),
targetDir, sparkConf, hadoopConf)
Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map {
case (localResources, resolvedUri) =>
Expand All @@ -426,7 +425,7 @@ private[spark] class SparkSubmit extends Logging {
Files.copy(source.toPath, dest.toPath)
}
// Keep the URIs of local files with the given fragments.
UriBuilder.fromUri(
Utils.getUriBuilder(
localResources).fragment(resolvedUri.getFragment).build().toString
}.mkString(",")
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy
import javax.ws.rs.core.UriBuilder

import scala.collection.immutable
import scala.collection.mutable.{ArrayBuffer, HashMap}
Expand Down Expand Up @@ -1157,7 +1156,7 @@ private[spark] class Executor(
state.currentArchives.getOrElse(name, -1L) < timestamp) {
logInfo(s"Fetching $name with timestamp $timestamp")
val sourceURI = new URI(name)
val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build()
val uriToDownload = Utils.getUriBuilder(sourceURI).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
val dest = new File(
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.zip.{GZIPInputStream, ZipInputStream}
import javax.ws.rs.core.UriBuilder

import scala.annotation.tailrec
import scala.collection.Map
Expand Down Expand Up @@ -2885,6 +2886,20 @@ private[spark] object Utils
uri.startsWith(s"$LOCAL_SCHEME:")
}

/** Create a UriBuilder from URI object. */
def getUriBuilder(uri: URI): UriBuilder = {
// scalastyle:off uribuilder
UriBuilder.fromUri(uri)
// scalastyle:on uribuilder
}

/** Create a UriBuilder from URI string. */
def getUriBuilder(uri: String): UriBuilder = {
// scalastyle:off uribuilder
UriBuilder.fromUri(uri)
// scalastyle:on uribuilder
}

/** Check whether the file of the path is splittable. */
def isFileSplittable(path: Path, codecFactory: CompressionCodecFactory): Boolean = {
val codec = codecFactory.getCodec(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.deploy.k8s.features

import javax.ws.rs.core.UriBuilder

import scala.collection.mutable
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -171,7 +169,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
conf.get(key).partition(uri => KubernetesUtils.isLocalAndResolvable(uri))
val value = {
if (key == ARCHIVES) {
localUris.map(UriBuilder.fromUri(_).fragment(null).build()).map(_.toString)
localUris.map(Utils.getUriBuilder(_).fragment(null).build()).map(_.toString)
} else {
localUris
}
Expand All @@ -180,7 +178,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
if (resolved.nonEmpty) {
val resolvedValue = if (key == ARCHIVES) {
localUris.zip(resolved).map { case (uri, r) =>
UriBuilder.fromUri(r).fragment(new java.net.URI(uri).getFragment).build().toString
Utils.getUriBuilder(r).fragment(new java.net.URI(uri).getFragment).build().toString
}
} else {
resolved
Expand Down
5 changes: 5 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ This file is divided into 3 sections:
of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
</check>

<check customId="uribuilder" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">UriBuilder\.fromUri</parameter></parameters>
<customMessage>Use Utils.getUriBuilder instead.</customMessage>
</check>

<check customId="executioncontextglobal" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">scala\.concurrent\.ExecutionContext\.Implicits\.global</parameter></parameters>
<customMessage> User queries can use global thread pool, causing starvation and eventual OOM.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.File
import java.net.{URI, URL, URLClassLoader}
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.util.concurrent.CopyOnWriteArrayList
import javax.ws.rs.core.UriBuilder

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -174,7 +173,7 @@ class ArtifactManager(session: SparkSession) extends Logging {
}
} else if (remoteRelativePath.startsWith(s"archives${File.separator}")) {
val canonicalUri =
fragment.map(UriBuilder.fromUri(new URI(uri)).fragment).getOrElse(new URI(uri))
fragment.map(Utils.getUriBuilder(new URI(uri)).fragment).getOrElse(new URI(uri))
session.sparkContext.addArchive(canonicalUri.toString)
} else if (remoteRelativePath.startsWith(s"files${File.separator}")) {
session.sparkContext.addFile(uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution

import java.time.ZoneOffset
import java.util.{Locale, TimeZone}
import javax.ws.rs.core.UriBuilder

import scala.jdk.CollectionConverters._

Expand All @@ -42,6 +41,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.Utils.getUriBuilder

/**
* Concrete parser for Spark SQL statements.
Expand Down Expand Up @@ -862,7 +862,7 @@ class SparkSqlAstBuilder extends AstBuilder {
throw QueryParsingErrors.unsupportedLocalFileSchemeError(ctx, pathScheme)
case _ =>
// force scheme to be file rather than fs.default.name
val loc = Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build())
val loc = Some(getUriBuilder(CatalogUtils.stringToURI(path)).scheme("file").build())
storage = storage.copy(locationUri = loc)
}
}
Expand Down