Skip to content

Commit

Permalink
Merge pull request #1583 from apache/master
Browse files Browse the repository at this point in the history
Create a new pull request by comparing changes across two branches
  • Loading branch information
GulajavaMinistudio authored Nov 9, 2023
2 parents 05be60b + 06d8cbe commit 0038f26
Show file tree
Hide file tree
Showing 48 changed files with 615 additions and 254 deletions.
40 changes: 5 additions & 35 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,6 @@
# under the License.
#

#
# Pull Request Labeler Github Action Configuration: https://github.com/marketplace/actions/labeler
#
# Note that we currently cannot use the negatioon operator (i.e. `!`) for miniglob matches as they
# would match any file that doesn't touch them. What's needed is the concept of `any `, which takes a
# list of constraints / globs and then matches all of the constraints for either `any` of the files or
# `all` of the files in the change set.
#
# However, `any`/`all` are not supported in a released version and testing off of the `main` branch
# resulted in some other errors when testing.
#
# An issue has been opened upstream requesting that a release be cut that has support for all/any:
# - https://github.com/actions/labeler/issues/111
#
# While we wait for this issue to be handled upstream, we can remove
# the negated / `!` matches for now and at least have labels again.
#
INFRA:
- ".github/**/*"
- "appveyor.yml"
Expand All @@ -45,32 +28,24 @@ INFRA:
- "dev/merge_spark_pr.py"
- "dev/run-tests-jenkins*"
BUILD:
# Can be supported when a stable release with correct all/any is released
#- any: ['dev/**/*', '!dev/merge_spark_pr.py', '!dev/.rat-excludes']
- "dev/**/*"
- any: ['dev/**/*', '!dev/merge_spark_pr.py', '!dev/run-tests-jenkins*']
- "build/**/*"
- "project/**/*"
- "assembly/**/*"
- "**/*pom.xml"
- "bin/docker-image-tool.sh"
- "bin/find-spark-home*"
- "scalastyle-config.xml"
# These can be added in the above `any` clause (and the /dev/**/* glob removed) when
# `any`/`all` support is released
# - "!dev/merge_spark_pr.py"
# - "!dev/run-tests-jenkins*"
# - "!dev/.rat-excludes"
DOCS:
- "docs/**/*"
- "**/README.md"
- "**/CONTRIBUTING.md"
- "python/docs/**/*"
EXAMPLES:
- "examples/**/*"
- "bin/run-example*"
# CORE needs to be updated when all/any are released upstream.
CORE:
# - any: ["core/**/*", "!**/*UI.scala", "!**/ui/**/*"] # If any file matches all of the globs defined in the list started by `any`, label is applied.
- "core/**/*"
- any: ["core/**/*", "!**/*UI.scala", "!**/ui/**/*"]
- "common/kvstore/**/*"
- "common/network-common/**/*"
- "common/network-shuffle/**/*"
Expand All @@ -82,12 +57,8 @@ SPARK SHELL:
- "repl/**/*"
- "bin/spark-shell*"
SQL:
#- any: ["**/sql/**/*", "!python/pyspark/sql/avro/**/*", "!python/pyspark/sql/streaming/**/*", "!python/pyspark/sql/tests/streaming/test_streaming.py"]
- "**/sql/**/*"
- any: ["**/sql/**/*", "!python/pyspark/sql/avro/**/*", "!python/pyspark/sql/streaming/**/*", "!python/pyspark/sql/tests/streaming/test_streaming*.py"]
- "common/unsafe/**/*"
#- "!python/pyspark/sql/avro/**/*"
#- "!python/pyspark/sql/streaming/**/*"
#- "!python/pyspark/sql/tests/streaming/test_streaming.py"
- "bin/spark-sql*"
- "bin/beeline*"
- "sbin/*thriftserver*.sh"
Expand Down Expand Up @@ -123,7 +94,7 @@ STRUCTURED STREAMING:
- "**/sql/**/streaming/**/*"
- "connector/kafka-0-10-sql/**/*"
- "python/pyspark/sql/streaming/**/*"
- "python/pyspark/sql/tests/streaming/test_streaming.py"
- "python/pyspark/sql/tests/streaming/test_streaming*.py"
- "**/*streaming.R"
PYTHON:
- "bin/pyspark*"
Expand All @@ -148,7 +119,6 @@ DEPLOY:
- "sbin/**/*"
CONNECT:
- "connector/connect/**/*"
- "**/sql/sparkconnect/**/*"
- "python/pyspark/sql/**/connect/**/*"
- "python/pyspark/ml/**/connect/**/*"
PROTOBUF:
Expand Down
13 changes: 0 additions & 13 deletions .github/workflows/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,6 @@ jobs:
contents: read
pull-requests: write
steps:
# In order to get back the negated matches like in the old config,
# we need the actinons/labeler concept of `all` and `any` which matches
# all of the given constraints / glob patterns for either `all`
# files or `any` file in the change set.
#
# Github issue which requests a timeline for a release with any/all support:
# - https://github.com/actions/labeler/issues/111
# This issue also references the issue that mentioned that any/all are only
# supported on main branch (previously called master):
# - https://github.com/actions/labeler/issues/73#issuecomment-639034278
#
# However, these are not in a published release and the current `main` branch
# has some issues upon testing.
- uses: actions/labeler@v4
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,29 +202,27 @@ private static boolean isSymlink(File file) throws IOException {
private static final Map<String, ByteUnit> byteSuffixes;

static {
final Map<String, TimeUnit> timeSuffixesBuilder = new HashMap<>();
timeSuffixesBuilder.put("us", TimeUnit.MICROSECONDS);
timeSuffixesBuilder.put("ms", TimeUnit.MILLISECONDS);
timeSuffixesBuilder.put("s", TimeUnit.SECONDS);
timeSuffixesBuilder.put("m", TimeUnit.MINUTES);
timeSuffixesBuilder.put("min", TimeUnit.MINUTES);
timeSuffixesBuilder.put("h", TimeUnit.HOURS);
timeSuffixesBuilder.put("d", TimeUnit.DAYS);
timeSuffixes = Collections.unmodifiableMap(timeSuffixesBuilder);

final Map<String, ByteUnit> byteSuffixesBuilder = new HashMap<>();
byteSuffixesBuilder.put("b", ByteUnit.BYTE);
byteSuffixesBuilder.put("k", ByteUnit.KiB);
byteSuffixesBuilder.put("kb", ByteUnit.KiB);
byteSuffixesBuilder.put("m", ByteUnit.MiB);
byteSuffixesBuilder.put("mb", ByteUnit.MiB);
byteSuffixesBuilder.put("g", ByteUnit.GiB);
byteSuffixesBuilder.put("gb", ByteUnit.GiB);
byteSuffixesBuilder.put("t", ByteUnit.TiB);
byteSuffixesBuilder.put("tb", ByteUnit.TiB);
byteSuffixesBuilder.put("p", ByteUnit.PiB);
byteSuffixesBuilder.put("pb", ByteUnit.PiB);
byteSuffixes = Collections.unmodifiableMap(byteSuffixesBuilder);
timeSuffixes = Map.of(
"us", TimeUnit.MICROSECONDS,
"ms", TimeUnit.MILLISECONDS,
"s", TimeUnit.SECONDS,
"m", TimeUnit.MINUTES,
"min", TimeUnit.MINUTES,
"h", TimeUnit.HOURS,
"d", TimeUnit.DAYS);

byteSuffixes = Map.ofEntries(
Map.entry("b", ByteUnit.BYTE),
Map.entry("k", ByteUnit.KiB),
Map.entry("kb", ByteUnit.KiB),
Map.entry("m", ByteUnit.MiB),
Map.entry("mb", ByteUnit.MiB),
Map.entry("g", ByteUnit.GiB),
Map.entry("gb", ByteUnit.GiB),
Map.entry("t", ByteUnit.TiB),
Map.entry("tb", ByteUnit.TiB),
Map.entry("p", ByteUnit.PiB),
Map.entry("pb", ByteUnit.PiB));
}

/**
Expand Down
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,12 @@
],
"sqlState" : "42710"
},
"DATA_SOURCE_NOT_EXIST" : {
"message" : [
"Data source '<provider>' not found. Please make sure the data source is registered."
],
"sqlState" : "42704"
},
"DATA_SOURCE_NOT_FOUND" : {
"message" : [
"Failed to find the data source: <provider>. Please find packages at `https://spark.apache.org/third-party-projects.html`."
Expand Down Expand Up @@ -1095,6 +1101,12 @@
],
"sqlState" : "42809"
},
"FOUND_MULTIPLE_DATA_SOURCES" : {
"message" : [
"Detected multiple data sources with the name '<provider>'. Please check the data source isn't simultaneously registered and located in the classpath."
],
"sqlState" : "42710"
},
"GENERATED_COLUMN_WITH_DEFAULT_VALUE" : {
"message" : [
"A column cannot have both a default value and a generation expression but column <colName> has default value: (<defaultValue>) and generation expression: (<genExpr>)."
Expand Down
5 changes: 1 addition & 4 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark

import java.util.Collections
import java.util.concurrent.TimeUnit

import scala.concurrent._
Expand Down Expand Up @@ -255,8 +254,6 @@ private[spark]
class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)
extends JavaFutureAction[T] {

import scala.jdk.CollectionConverters._

override def isCancelled: Boolean = futureAction.isCancelled

override def isDone: Boolean = {
Expand All @@ -266,7 +263,7 @@ class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S
}

override def jobIds(): java.util.List[java.lang.Integer] = {
Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
java.util.List.of(futureAction.jobIds.map(Integer.valueOf): _*)
}

private def getImpl(timeout: Duration): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ private[deploy] object DeployMessages {
master: RpcEndpointRef, driverId: String, success: Boolean, message: String)
extends DeployMessage

case object RequestKillAllDrivers extends DeployMessage

case class KillAllDriversResponse(
master: RpcEndpointRef, success: Boolean, message: String)
extends DeployMessage

case class RequestDriverStatus(driverId: String) extends DeployMessage

case class DriverStatusResponse(found: Boolean, state: Option[DriverState],
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,31 @@ private[deploy] class Master(
}
}

case RequestKillAllDrivers =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
s"Can only kill drivers in ALIVE state."
context.reply(KillAllDriversResponse(self, success = false, msg))
} else {
logInfo("Asked to kill all drivers")
drivers.foreach { d =>
val driverId = d.id
if (waitingDrivers.contains(d)) {
waitingDrivers -= d
self.send(DriverStateChanged(driverId, DriverState.KILLED, None))
} else {
// We just notify the worker to kill the driver here. The final bookkeeping occurs
// on the return path when the worker submits a state change back to the master
// to notify it that the driver was successfully killed.
d.worker.foreach { w =>
w.endpoint.send(KillDriver(driverId))
}
}
logInfo(s"Kill request for $driverId submitted")
}
context.reply(KillAllDriversResponse(self, true, "Kill request for all drivers submitted"))
}

case RequestClearCompletedDriversAndApps =>
val numDrivers = completedDrivers.length
val numApps = completedApps.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,35 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
response
}

/** Request that the server kill all submissions. */
def killAllSubmissions(): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request to kill all submissions in $master.")
var handled: Boolean = false
var response: SubmitRestProtocolResponse = null
for (m <- masters if !handled) {
validateMaster(m)
val url = getKillAllUrl(m)
try {
response = post(url)
response match {
case k: KillAllSubmissionResponse =>
if (!Utils.responseFromBackup(k.message)) {
handleRestResponse(k)
handled = true
}
case unexpected =>
handleUnexpectedRestResponse(unexpected)
}
} catch {
case e: SubmitRestConnectionException =>
if (handleConnectionException(m)) {
throw new SubmitRestConnectionException("Unable to connect to server", e)
}
}
}
response
}

/** Request that the server clears all submissions and applications. */
def clear(): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request to clear $master.")
Expand Down Expand Up @@ -329,6 +358,12 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
new URL(s"$baseUrl/kill/$submissionId")
}

/** Return the REST URL for killing all submissions. */
private def getKillAllUrl(master: String): URL = {
val baseUrl = getBaseUrl(master)
new URL(s"$baseUrl/killall")
}

/** Return the REST URL for clear all existing submissions and applications. */
private def getClearUrl(master: String): URL = {
val baseUrl = getBaseUrl(master)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private[spark] abstract class RestSubmissionServer(

protected val submitRequestServlet: SubmitRequestServlet
protected val killRequestServlet: KillRequestServlet
protected val killAllRequestServlet: KillAllRequestServlet
protected val statusRequestServlet: StatusRequestServlet
protected val clearRequestServlet: ClearRequestServlet

Expand All @@ -64,6 +65,7 @@ private[spark] abstract class RestSubmissionServer(
protected lazy val contextToServlet = Map[String, RestServlet](
s"$baseContext/create/*" -> submitRequestServlet,
s"$baseContext/kill/*" -> killRequestServlet,
s"$baseContext/killall/*" -> killAllRequestServlet,
s"$baseContext/status/*" -> statusRequestServlet,
s"$baseContext/clear/*" -> clearRequestServlet,
"/*" -> new ErrorServlet // default handler
Expand Down Expand Up @@ -229,6 +231,25 @@ private[rest] abstract class KillRequestServlet extends RestServlet {
protected def handleKill(submissionId: String): KillSubmissionResponse
}

/**
* A servlet for handling killAll requests passed to the [[RestSubmissionServer]].
*/
private[rest] abstract class KillAllRequestServlet extends RestServlet {

/**
* Have the Master kill all drivers and return an appropriate response to the client.
* Otherwise, return error.
*/
protected override def doPost(
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
val responseMessage = handleKillAll()
sendResponse(responseMessage, response)
}

protected def handleKillAll(): KillAllSubmissionResponse
}

/**
* A servlet for handling clear requests passed to the [[RestSubmissionServer]].
*/
Expand Down Expand Up @@ -331,7 +352,7 @@ private class ErrorServlet extends RestServlet {
"Missing the /submissions prefix."
case `serverVersion` :: "submissions" :: tail =>
// http://host:port/correct-version/submissions/*
"Missing an action: please specify one of /create, /kill, /clear or /status."
"Missing an action: please specify one of /create, /kill, /killall, /clear or /status."
case unknownVersion :: tail =>
// http://host:port/unknown-version/*
versionMismatch = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ private[deploy] class StandaloneRestServer(
new StandaloneSubmitRequestServlet(masterEndpoint, masterUrl, masterConf)
protected override val killRequestServlet =
new StandaloneKillRequestServlet(masterEndpoint, masterConf)
protected override val killAllRequestServlet =
new StandaloneKillAllRequestServlet(masterEndpoint, masterConf)
protected override val statusRequestServlet =
new StandaloneStatusRequestServlet(masterEndpoint, masterConf)
protected override val clearRequestServlet =
Expand All @@ -87,6 +89,23 @@ private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef,
}
}

/**
* A servlet for handling killAll requests passed to the [[StandaloneRestServer]].
*/
private[rest] class StandaloneKillAllRequestServlet(masterEndpoint: RpcEndpointRef, conf: SparkConf)
extends KillAllRequestServlet {

protected def handleKillAll() : KillAllSubmissionResponse = {
val response = masterEndpoint.askSync[DeployMessages.KillAllDriversResponse](
DeployMessages.RequestKillAllDrivers)
val k = new KillAllSubmissionResponse
k.serverSparkVersion = sparkVersion
k.message = response.message
k.success = response.success
k
}
}

/**
* A servlet for handling status requests passed to the [[StandaloneRestServer]].
*/
Expand Down
Loading

0 comments on commit 0038f26

Please sign in to comment.