Skip to content

Commit

Permalink
Merge pull request #1528 from apache/master
Browse files Browse the repository at this point in the history
Create a new pull request by comparing changes across two branches
  • Loading branch information
GulajavaMinistudio authored Jul 21, 2023
2 parents c86b5b6 + e6649eb commit 8d181b9
Show file tree
Hide file tree
Showing 162 changed files with 3,071 additions and 768 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,9 @@ jobs:
- name: start minikube
run: |
# See more in "Installation" https://minikube.sigs.k8s.io/docs/start/
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
# curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
# TODO(SPARK-44495): Resume to use the latest minikube for k8s-integration-tests.
curl -LO https://storage.googleapis.com/minikube/releases/v1.30.1/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube
# Github Action limit cpu:2, memory: 6947MB, limit to 2U6G for better resource statistic
minikube start --cpus 2 --memory 6144
Expand Down
87 changes: 77 additions & 10 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,65 @@
"Could not load Protobuf class with name <protobufClassName>. <explanation>."
]
},
"CANNOT_LOAD_STATE_STORE" : {
"message" : [
"An error occurred during loading state."
],
"subClass" : {
"CANNOT_READ_CHECKPOINT" : {
"message" : [
"Cannot read RocksDB checkpoint metadata. Expected <expectedVersion>, but found <actualVersion>."
]
},
"CANNOT_READ_DELTA_FILE_KEY_SIZE" : {
"message" : [
"Error reading delta file <fileToRead> of <clazz>: key size cannot be <keySize>."
]
},
"CANNOT_READ_DELTA_FILE_NOT_EXISTS" : {
"message" : [
"Error reading delta file <fileToRead> of <clazz>: <fileToRead> does not exist."
]
},
"CANNOT_READ_SNAPSHOT_FILE_KEY_SIZE" : {
"message" : [
"Error reading snapshot file <fileToRead> of <clazz>: key size cannot be <keySize>."
]
},
"CANNOT_READ_SNAPSHOT_FILE_VALUE_SIZE" : {
"message" : [
"Error reading snapshot file <fileToRead> of <clazz>: value size cannot be <valueSize>."
]
},
"CANNOT_READ_STREAMING_STATE_FILE" : {
"message" : [
"Error reading streaming state file of <fileToRead> does not exist. If the stream job is restarted with a new or updated state operation, please create a new checkpoint location or clear the existing checkpoint location."
]
},
"UNCATEGORIZED" : {
"message" : [
""
]
},
"UNEXPECTED_FILE_SIZE" : {
"message" : [
"Copied <dfsFile> to <localFile>, expected <expectedSize> bytes, found <localFileSize> bytes."
]
},
"UNEXPECTED_VERSION" : {
"message" : [
"Version cannot be <version> because it is less than 0."
]
},
"UNRELEASED_THREAD_ERROR" : {
"message" : [
"<loggingId>: RocksDB instance could not be acquired by <newAcquiredThreadInfo> as it was not released by <acquiredThreadInfo> after <timeWaitedMs> ms.",
"Thread holding the lock has trace: <stackTraceOutput>"
]
}
},
"sqlState" : "58030"
},
"CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE" : {
"message" : [
"Failed to merge incompatible data types <left> and <right>. Please check the data types of the columns being merged and ensure that they are compatible. If necessary, consider casting the columns to compatible data types before attempting the merge."
Expand Down Expand Up @@ -1383,6 +1442,24 @@
],
"sqlState" : "22023"
},
"INVALID_HANDLE" : {
"message" : [
"The handle <handle> is invalid."
],
"subClass" : {
"ALREADY_EXISTS" : {
"message" : [
"Handle already exists."
]
},
"FORMAT" : {
"message" : [
"Handle has invalid format. Handle must an UUID string of the format '00112233-4455-6677-8899-aabbccddeeff'"
]
}
},
"sqlState" : "HY000"
},
"INVALID_HIVE_COLUMN_NAME" : {
"message" : [
"Cannot create the table <tableName> having the nested column <columnName> whose name contains invalid characters <invalidChars> in Hive metastore."
Expand Down Expand Up @@ -5749,16 +5826,6 @@
"Foreach writer has been aborted due to a task failure."
]
},
"_LEGACY_ERROR_TEMP_2258" : {
"message" : [
"Error reading delta file <fileToRead> of <clazz>: key size cannot be <keySize>."
]
},
"_LEGACY_ERROR_TEMP_2259" : {
"message" : [
"Error reading snapshot file <fileToRead> of <clazz>: <message>"
]
},
"_LEGACY_ERROR_TEMP_2260" : {
"message" : [
"Cannot purge as it might break internal state."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ class SparkException(
errorClass = Some(errorClass),
messageParameters = messageParameters)

def this(errorClass: String, messageParameters: Map[String, String], cause: Throwable,
context: Array[QueryContext]) =
this(
message = SparkThrowableHelper.getMessage(errorClass, messageParameters),
cause = cause,
errorClass = Some(errorClass),
messageParameters = messageParameters,
context = context)

override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava

override def getErrorClass: String = errorClass.orNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.util

import scala.util.Try

trait SparkClassUtils {
def getSparkClassLoader: ClassLoader = getClass.getClassLoader

Expand All @@ -39,6 +41,11 @@ trait SparkClassUtils {
}
// scalastyle:on classforname
}

/** Determines whether the provided class is loadable in the current thread. */
def classIsLoadable(clazz: String): Boolean = {
Try { classForName(clazz, initialize = false) }.isSuccess
}
}

object SparkClassUtils extends SparkClassUtils
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.util

import scala.collection.immutable

trait SparkCollectionUtils {
/**
* Same function as `keys.zipWithIndex.toMap`, but has perf gain.
*/
def toMapWithIndex[K](keys: Iterable[K]): Map[K, Int] = {
val builder = immutable.Map.newBuilder[K, Int]
val keyIter = keys.iterator
var idx = 0
while (keyIter.hasNext) {
builder += (keyIter.next(), idx).asInstanceOf[(K, Int)]
idx = idx + 1
}
builder.result()
}
}

object SparkCollectionUtils extends SparkCollectionUtils
Original file line number Diff line number Diff line change
Expand Up @@ -613,14 +613,40 @@ class SparkSession private[sql] (
/**
* Interrupt all operations of this session currently running on the connected server.
*
* TODO/WIP: Currently it will interrupt the Spark Jobs running on the server, triggered from
* ExecutePlan requests. If an operation is not running a Spark Job, it becomes an noop and the
* operation will continue afterwards, possibly with more Spark Jobs.
* @return
* sequence of operationIds of interrupted operations. Note: there is still a possiblility of
* operation finishing just as it is interrupted.
*
* @since 3.5.0
*/
def interruptAll(): Unit = {
client.interruptAll()
def interruptAll(): Seq[String] = {
client.interruptAll().getInterruptedIdsList.asScala.toSeq
}

/**
* Interrupt all operations of this session with the given operation tag.
*
* @return
* sequence of operationIds of interrupted operations. Note: there is still a possiblility of
* operation finishing just as it is interrupted.
*
* @since 3.5.0
*/
def interruptTag(tag: String): Seq[String] = {
client.interruptTag(tag).getInterruptedIdsList.asScala.toSeq
}

/**
* Interrupt an operation of this session with the given operationId.
*
* @return
* sequence of operationIds of interrupted operations. Note: there is still a possiblility of
* operation finishing just as it is interrupted.
*
* @since 3.5.0
*/
def interruptOperation(operationId: String): Seq[String] = {
client.interruptOperation(operationId).getInterruptedIdsList.asScala.toSeq
}

/**
Expand All @@ -641,6 +667,50 @@ class SparkSession private[sql] (
allocator.close()
SparkSession.onSessionClose(this)
}

/**
* Add a tag to be assigned to all the operations started by this thread in this session.
*
* @param tag
* The tag to be added. Cannot contain ',' (comma) character or be an empty string.
*
* @since 3.5.0
*/
def addTag(tag: String): Unit = {
client.addTag(tag)
}

/**
* Remove a tag previously added to be assigned to all the operations started by this thread in
* this session. Noop if such a tag was not added earlier.
*
* @param tag
* The tag to be removed. Cannot contain ',' (comma) character or be an empty string.
*
* @since 3.5.0
*/
def removeTag(tag: String): Unit = {
client.removeTag(tag)
}

/**
* Get the tags that are currently set to be assigned to all the operations started by this
* thread.
*
* @since 3.5.0
*/
def getTags(): Set[String] = {
client.getTags()
}

/**
* Clear the current thread's operation tags.
*
* @since 3.5.0
*/
def clearTags(): Unit = {
client.clearTags()
}
}

// The minimal builder needed to create a spark session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import java.net.URI
import java.util.UUID
import java.util.concurrent.Executor

import scala.collection.JavaConverters._
import scala.collection.mutable

import com.google.protobuf.ByteString
import io.grpc._

import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.UserContext
import org.apache.spark.sql.connect.common.ProtoUtils
import org.apache.spark.sql.connect.common.config.ConnectCommon

/**
Expand Down Expand Up @@ -76,6 +80,7 @@ private[sql] class SparkConnectClient(
.setUserContext(userContext)
.setSessionId(sessionId)
.setClientType(userAgent)
.addAllTags(tags.get.toSeq.asJava)
.build()
bstub.executePlan(request)
}
Expand Down Expand Up @@ -195,6 +200,59 @@ private[sql] class SparkConnectClient(
bstub.interrupt(request)
}

private[sql] def interruptTag(tag: String): proto.InterruptResponse = {
val builder = proto.InterruptRequest.newBuilder()
val request = builder
.setUserContext(userContext)
.setSessionId(sessionId)
.setClientType(userAgent)
.setInterruptType(proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_TAG)
.setOperationTag(tag)
.build()
bstub.interrupt(request)
}

private[sql] def interruptOperation(id: String): proto.InterruptResponse = {
val builder = proto.InterruptRequest.newBuilder()
val request = builder
.setUserContext(userContext)
.setSessionId(sessionId)
.setClientType(userAgent)
.setInterruptType(proto.InterruptRequest.InterruptType.INTERRUPT_TYPE_OPERATION_ID)
.setOperationId(id)
.build()
bstub.interrupt(request)
}

private[this] val tags = new InheritableThreadLocal[mutable.Set[String]] {
override def childValue(parent: mutable.Set[String]): mutable.Set[String] = {
// Note: make a clone such that changes in the parent tags aren't reflected in
// those of the children threads.
parent.clone()
}
override protected def initialValue(): mutable.Set[String] = new mutable.HashSet[String]()
}

private[sql] def addTag(tag: String): Unit = {
// validation is also done server side, but this will give error earlier.
ProtoUtils.throwIfInvalidTag(tag)
tags.get += tag
}

private[sql] def removeTag(tag: String): Unit = {
// validation is also done server side, but this will give error earlier.
ProtoUtils.throwIfInvalidTag(tag)
tags.get.remove(tag)
}

private[sql] def getTags(): Set[String] = {
tags.get.toSet
}

private[sql] def clearTags(): Unit = {
tags.get.clear()
}

def copy(): SparkConnectClient = configuration.toSparkConnectClient

/**
Expand Down
Loading

0 comments on commit 8d181b9

Please sign in to comment.