Skip to content

Commit

Permalink
Merge pull request #1580 from apache/master
Browse files Browse the repository at this point in the history
Create a new pull request by comparing changes across two branches
  • Loading branch information
GulajavaMinistudio authored Nov 2, 2023
2 parents 6dff7b9 + 30ec6e3 commit a8fbb3e
Show file tree
Hide file tree
Showing 123 changed files with 2,240 additions and 761 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ jobs:
export PVC_TESTS_VM_PATH=$PVC_TMP_DIR
minikube mount ${PVC_TESTS_HOST_PATH}:${PVC_TESTS_VM_PATH} --gid=0 --uid=185 &
kubectl create clusterrolebinding serviceaccounts-cluster-admin --clusterrole=cluster-admin --group=system:serviceaccounts || true
kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/v1.8.0/installer/volcano-development.yaml || true
kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/v1.8.1/installer/volcano-development.yaml || true
eval $(minikube docker-env)
build/sbt -Psparkr -Pkubernetes -Pvolcano -Pkubernetes-integration-tests -Dspark.kubernetes.test.driverRequestCores=0.5 -Dspark.kubernetes.test.executorRequestCores=0.2 -Dspark.kubernetes.test.volcanoMaxConcurrencyJobNum=1 -Dtest.exclude.tags=local "kubernetes-integration-tests/test"
- name: Upload Spark on K8S integration tests log files
Expand Down
9 changes: 9 additions & 0 deletions common/utils/src/main/java/org/apache/spark/QueryContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
*/
@Evolving
public interface QueryContext {
// The type of this query context.
QueryContextType contextType();

// The object type of the query which throws the exception.
// If the exception is directly from the main query, it should be an empty string.
// Otherwise, it should be the exact object type in upper case. For example, a "VIEW".
Expand All @@ -45,4 +48,10 @@ public interface QueryContext {

// The corresponding fragment of the query which throws the exception.
String fragment();

// The user code (call site of the API) that caused throwing the exception.
String callSite();

// Summary of the exception cause.
String summary();
}
31 changes: 31 additions & 0 deletions common/utils/src/main/java/org/apache/spark/QueryContextType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.annotation.Evolving;

/**
* The type of {@link QueryContext}.
*
* @since 4.0.0
*/
@Evolving
public enum QueryContextType {
SQL,
DataFrame
}
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1737,6 +1737,11 @@
"Session already exists."
]
},
"SESSION_CLOSED" : {
"message" : [
"Session was closed."
]
},
"SESSION_NOT_FOUND" : {
"message" : [
"Session not found."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,19 @@ private[spark] object SparkThrowableHelper {
g.writeArrayFieldStart("queryContext")
e.getQueryContext.foreach { c =>
g.writeStartObject()
g.writeStringField("objectType", c.objectType())
g.writeStringField("objectName", c.objectName())
val startIndex = c.startIndex() + 1
if (startIndex > 0) g.writeNumberField("startIndex", startIndex)
val stopIndex = c.stopIndex() + 1
if (stopIndex > 0) g.writeNumberField("stopIndex", stopIndex)
g.writeStringField("fragment", c.fragment())
c.contextType() match {
case QueryContextType.SQL =>
g.writeStringField("objectType", c.objectType())
g.writeStringField("objectName", c.objectName())
val startIndex = c.startIndex() + 1
if (startIndex > 0) g.writeNumberField("startIndex", startIndex)
val stopIndex = c.stopIndex() + 1
if (stopIndex > 0) g.writeNumberField("stopIndex", stopIndex)
g.writeStringField("fragment", c.fragment())
case QueryContextType.DataFrame =>
g.writeStringField("fragment", c.fragment())
g.writeStringField("callSite", c.callSite())
}
g.writeEndObject()
}
g.writeEndArray()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import scala.collection.immutable

/**
* Implicit methods related to Scala Array.
*/
private[spark] object ArrayImplicits {

implicit class SparkArrayOps[T](xs: Array[T]) {

/**
* Wraps an Array[T] as an immutable.ArraySeq[T] without copying.
*/
def toImmutableArraySeq: immutable.ArraySeq[T] =
if (xs eq null) null
else immutable.ArraySeq.unsafeWrapArray(xs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.net.URI
import java.util.concurrent.TimeUnit._
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}

import scala.collection.immutable
import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe.TypeTag

Expand All @@ -45,6 +44,7 @@ import org.apache.spark.sql.internal.{CatalogImpl, SqlApiConf}
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ArrayImplicits._

/**
* The entry point to programming Spark with the Dataset and DataFrame API.
Expand Down Expand Up @@ -248,7 +248,7 @@ class SparkSession private[sql] (
proto.SqlCommand
.newBuilder()
.setSql(sqlText)
.addAllPosArguments(immutable.ArraySeq.unsafeWrapArray(args.map(lit(_).expr)).asJava)))
.addAllPosArguments(args.map(lit(_).expr).toImmutableArraySeq.asJava)))
val plan = proto.Plan.newBuilder().setCommand(cmd)
// .toBuffer forces that the iterator is consumed and closed
val responseSeq = client.execute(plan.build()).toBuffer.toSeq
Expand Down Expand Up @@ -665,6 +665,9 @@ class SparkSession private[sql] (
* @since 3.4.0
*/
override def close(): Unit = {
if (releaseSessionOnClose) {
client.releaseSession()
}
client.shutdown()
allocator.close()
SparkSession.onSessionClose(this)
Expand Down Expand Up @@ -735,6 +738,11 @@ class SparkSession private[sql] (
* We null out the instance for now.
*/
private def writeReplace(): Any = null

/**
* Set to false to prevent client.releaseSession on close() (testing only)
*/
private[sql] var releaseSessionOnClose = true
}

// The minimal builder needed to create a spark session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ class PlanGenerationTestSuite
}

override protected def afterAll(): Unit = {
session.close()
// Don't call client.releaseSession on close(), because the connection details are dummy.
session.releaseSessionOnClose = false
session.stop()
if (cleanOrphanedGoldenFiles) {
cleanOrphanedGoldenFile()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,24 @@ class SparkSessionSuite extends ConnectFunSuite {
private val connectionString2: String = "sc://test.me:14099"
private val connectionString3: String = "sc://doit:16845"

private def closeSession(session: SparkSession): Unit = {
// Don't call client.releaseSession on close(), because the connection details are dummy.
session.releaseSessionOnClose = false
session.close()
}

test("default") {
val session = SparkSession.builder().getOrCreate()
assert(session.client.configuration.host == "localhost")
assert(session.client.configuration.port == 15002)
session.close()
closeSession(session)
}

test("remote") {
val session = SparkSession.builder().remote(connectionString2).getOrCreate()
assert(session.client.configuration.host == "test.me")
assert(session.client.configuration.port == 14099)
session.close()
closeSession(session)
}

test("getOrCreate") {
Expand All @@ -53,8 +59,8 @@ class SparkSessionSuite extends ConnectFunSuite {
try {
assert(session1 eq session2)
} finally {
session1.close()
session2.close()
closeSession(session1)
closeSession(session2)
}
}

Expand All @@ -65,8 +71,8 @@ class SparkSessionSuite extends ConnectFunSuite {
assert(session1 ne session2)
assert(session1.client.configuration == session2.client.configuration)
} finally {
session1.close()
session2.close()
closeSession(session1)
closeSession(session2)
}
}

Expand All @@ -77,8 +83,8 @@ class SparkSessionSuite extends ConnectFunSuite {
assert(session1 ne session2)
assert(session1.client.configuration == session2.client.configuration)
} finally {
session1.close()
session2.close()
closeSession(session1)
closeSession(session2)
}
}

Expand All @@ -98,7 +104,7 @@ class SparkSessionSuite extends ConnectFunSuite {
assertThrows[RuntimeException] {
session.range(10).count()
}
session.close()
closeSession(session)
}

test("Default/Active session") {
Expand Down Expand Up @@ -136,12 +142,12 @@ class SparkSessionSuite extends ConnectFunSuite {
assert(SparkSession.getActiveSession.contains(session1))

// Close session1
session1.close()
closeSession(session1)
assert(SparkSession.getDefaultSession.contains(session2))
assert(SparkSession.getActiveSession.isEmpty)

// Close session2
session2.close()
closeSession(session2)
assert(SparkSession.getDefaultSession.isEmpty)
assert(SparkSession.getActiveSession.isEmpty)
}
Expand Down Expand Up @@ -187,7 +193,7 @@ class SparkSessionSuite extends ConnectFunSuite {

// Step 3 - close session 1, no more default session in both scripts
phaser.arriveAndAwaitAdvance()
session1.close()
closeSession(session1)

// Step 4 - no default session, same active session.
phaser.arriveAndAwaitAdvance()
Expand Down Expand Up @@ -240,27 +246,27 @@ class SparkSessionSuite extends ConnectFunSuite {

// Step 7 - close active session in script2
phaser.arriveAndAwaitAdvance()
internalSession.close()
closeSession(internalSession)
assert(SparkSession.getActiveSession.isEmpty)
}
assert(script1.get())
assert(script2.get())
assert(SparkSession.getActiveSession.contains(session2))
session2.close()
closeSession(session2)
assert(SparkSession.getActiveSession.isEmpty)
} finally {
executor.shutdown()
}
}

test("deprecated methods") {
SparkSession
val session = SparkSession
.builder()
.master("yayay")
.appName("bob")
.enableHiveSupport()
.create()
.close()
closeSession(session)
}

test("serialize as null") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,30 @@ message ReleaseExecuteResponse {
optional string operation_id = 2;
}

message ReleaseSessionRequest {
// (Required)
//
// The session_id of the request to reattach to.
// This must be an id of existing session.
string session_id = 1;

// (Required) User context
//
// user_context.user_id and session+id both identify a unique remote spark session on the
// server side.
UserContext user_context = 2;

// Provides optional information about the client sending the request. This field
// can be used for language or version specific information and is only intended for
// logging purposes and will not be interpreted by the server.
optional string client_type = 3;
}

message ReleaseSessionResponse {
// Session id of the session on which the release executed.
string session_id = 1;
}

message FetchErrorDetailsRequest {

// (Required)
Expand Down Expand Up @@ -823,6 +847,13 @@ message FetchErrorDetailsResponse {
// QueryContext defines the schema for the query context of a SparkThrowable.
// It helps users understand where the error occurs while executing queries.
message QueryContext {
// The type of this query context.
enum ContextType {
SQL = 0;
DATAFRAME = 1;
}
ContextType context_type = 10;

// The object type of the query which throws the exception.
// If the exception is directly from the main query, it should be an empty string.
// Otherwise, it should be the exact object type in upper case. For example, a "VIEW".
Expand All @@ -841,6 +872,12 @@ message FetchErrorDetailsResponse {

// The corresponding fragment of the query which throws the exception.
string fragment = 5;

// The user code (call site of the API) that caused throwing the exception.
string callSite = 6;

// Summary of the exception cause.
string summary = 7;
}

// SparkThrowable defines the schema for SparkThrowable exceptions.
Expand Down Expand Up @@ -921,6 +958,12 @@ service SparkConnectService {
// RPC and ReleaseExecute may not be used.
rpc ReleaseExecute(ReleaseExecuteRequest) returns (ReleaseExecuteResponse) {}

// Release a session.
// All the executions in the session will be released. Any further requests for the session with
// that session_id for the given user_id will fail. If the session didn't exist or was already
// released, this is a noop.
rpc ReleaseSession(ReleaseSessionRequest) returns (ReleaseSessionResponse) {}

// FetchErrorDetails retrieves the matched exception with details based on a provided error id.
rpc FetchErrorDetails(FetchErrorDetailsRequest) returns (FetchErrorDetailsResponse) {}
}
Expand Down
Loading

0 comments on commit a8fbb3e

Please sign in to comment.