Skip to content

Commit

Permalink
[SPARK-45516][CONNECT] Include QueryContext in SparkThrowable proto m…
Browse files Browse the repository at this point in the history
…essage

### What changes were proposed in this pull request?

- Include QueryContext in SparkThrowable proto message
- Reconstruct QueryContext for SparkThrowable exceptions on the client side

### Why are the changes needed?

- Better integration with the error framework

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite"`

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#43352 from heyihong/SPARK-45516.

Lead-authored-by: Yihong He <yihong.he@databricks.com>
Co-authored-by: Yihong He <heyihong.cn@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
2 people authored and HyukjinKwon committed Oct 12, 2023
1 parent 7663fdf commit e720cce
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
assert(!ex.messageParameters.isEmpty)
assert(ex.getSqlState != null)
assert(!ex.isInternalError)
assert(ex.getQueryContext.length == 1)
assert(ex.getQueryContext.head.startIndex() == 7)
assert(ex.getQueryContext.head.stopIndex() == 7)
assert(ex.getQueryContext.head.fragment() == "x")
assert(
ex.getStackTrace
.find(_.getClassName.contains("org.apache.spark.sql.catalyst.analysis.CheckAnalysis"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,13 +819,39 @@ message FetchErrorDetailsResponse {
int32 line_number = 4;
}

// QueryContext defines the schema for the query context of a SparkThrowable.
// It helps users understand where the error occurs while executing queries.
message QueryContext {
// The object type of the query which throws the exception.
// If the exception is directly from the main query, it should be an empty string.
// Otherwise, it should be the exact object type in upper case. For example, a "VIEW".
string object_type = 1;

// The object name of the query which throws the exception.
// If the exception is directly from the main query, it should be an empty string.
// Otherwise, it should be the object name. For example, a view name "V1".
string object_name = 2;

// The starting index in the query text which throws the exception. The index starts from 0.
int32 start_index = 3;

// The stopping index in the query which throws the exception. The index starts from 0.
int32 stop_index = 4;

// The corresponding fragment of the query which throws the exception.
string fragment = 5;
}

// SparkThrowable defines the schema for SparkThrowable exceptions.
message SparkThrowable {
// Succinct, human-readable, unique, and consistent representation of the error category.
optional string error_class = 1;

// message parameters for the error framework.
// The message parameters for the error framework.
map<string, string> message_parameters = 2;

// The query context of a SparkThrowable.
repeated QueryContext query_contexts = 3;
}

// Error defines the schema for the representing exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import io.grpc.protobuf.StatusProto
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods

import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.{QueryContext, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext}
import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -167,10 +167,12 @@ private object GrpcExceptionConverter {
private case class ErrorParams(
message: String,
cause: Option[Throwable],
// errorClass will only be set if the error is enriched and SparkThrowable.
// errorClass will only be set if the error is both enriched and SparkThrowable.
errorClass: Option[String],
// messageParameters will only be set if the error is enriched and SparkThrowable.
messageParameters: Map[String, String])
// messageParameters will only be set if the error is both enriched and SparkThrowable.
messageParameters: Map[String, String],
// queryContext will only be set if the error is both enriched and SparkThrowable.
queryContext: Array[QueryContext])

private def errorConstructor[T <: Throwable: ClassTag](
throwableCtr: ErrorParams => T): (String, ErrorParams => Throwable) = {
Expand All @@ -192,13 +194,15 @@ private object GrpcExceptionConverter {
Origin(),
Origin(),
errorClass = params.errorClass,
messageParameters = params.messageParameters)),
messageParameters = params.messageParameters,
queryContext = params.queryContext)),
errorConstructor(params =>
new AnalysisException(
params.message,
cause = params.cause,
errorClass = params.errorClass,
messageParameters = params.messageParameters)),
messageParameters = params.messageParameters,
context = params.queryContext)),
errorConstructor(params => new NamespaceAlreadyExistsException(params.message)),
errorConstructor(params => new TableAlreadyExistsException(params.message, params.cause)),
errorConstructor(params => new TempTableAlreadyExistsException(params.message, params.cause)),
Expand All @@ -221,7 +225,8 @@ private object GrpcExceptionConverter {
message = params.message,
cause = params.cause.orNull,
errorClass = params.errorClass,
messageParameters = params.messageParameters)))
messageParameters = params.messageParameters,
context = params.queryContext)))

/**
* errorsToThrowable reconstructs the exception based on a list of protobuf messages
Expand All @@ -247,16 +252,35 @@ private object GrpcExceptionConverter {
val causeOpt =
if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx, errors)) else None

val errorClass = if (error.hasSparkThrowable && error.getSparkThrowable.hasErrorClass) {
Some(error.getSparkThrowable.getErrorClass)
} else None

val messageParameters = if (error.hasSparkThrowable) {
error.getSparkThrowable.getMessageParametersMap.asScala.toMap
} else Map.empty[String, String]

val queryContext = error.getSparkThrowable.getQueryContextsList.asScala.map { queryCtx =>
new QueryContext {
override def objectType(): String = queryCtx.getObjectType

override def objectName(): String = queryCtx.getObjectName

override def startIndex(): Int = queryCtx.getStartIndex

override def stopIndex(): Int = queryCtx.getStopIndex

override def fragment(): String = queryCtx.getFragment
}
}.toArray

val exception = constructor(
ErrorParams(
message = error.getMessage,
cause = causeOpt,
errorClass = if (error.hasSparkThrowable && error.getSparkThrowable.hasErrorClass) {
Some(error.getSparkThrowable.getErrorClass)
} else None,
messageParameters = if (error.hasSparkThrowable) {
error.getSparkThrowable.getMessageParametersMap.asScala.toMap
} else Map.empty))
errorClass = errorClass,
messageParameters = messageParameters,
queryContext = queryContext))

if (!error.getStackTraceList.isEmpty) {
exception.setStackTrace(error.getStackTraceList.asScala.toArray.map { stackTraceElement =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ private[connect] object ErrorUtils extends Logging {
if (sparkThrowable.getErrorClass != null) {
sparkThrowableBuilder.setErrorClass(sparkThrowable.getErrorClass)
}
for (queryCtx <- sparkThrowable.getQueryContext) {
sparkThrowableBuilder.addQueryContexts(
FetchErrorDetailsResponse.QueryContext
.newBuilder()
.setObjectType(queryCtx.objectType())
.setObjectName(queryCtx.objectName())
.setStartIndex(queryCtx.startIndex())
.setStopIndex(queryCtx.stopIndex())
.setFragment(queryCtx.fragment())
.build())
}
sparkThrowableBuilder.putAllMessageParameters(sparkThrowable.getMessageParameters)
builder.setSparkThrowable(sparkThrowableBuilder.build())
case _ =>
Expand Down
22 changes: 12 additions & 10 deletions python/pyspark/sql/connect/proto/base_pb2.py

Large diffs are not rendered by default.

69 changes: 68 additions & 1 deletion python/pyspark/sql/connect/proto/base_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2869,6 +2869,59 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message):
self, oneof_group: typing_extensions.Literal["_file_name", b"_file_name"]
) -> typing_extensions.Literal["file_name"] | None: ...

class QueryContext(google.protobuf.message.Message):
"""QueryContext defines the schema for the query context of a SparkThrowable.
It helps users understand where the error occurs while executing queries.
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

OBJECT_TYPE_FIELD_NUMBER: builtins.int
OBJECT_NAME_FIELD_NUMBER: builtins.int
START_INDEX_FIELD_NUMBER: builtins.int
STOP_INDEX_FIELD_NUMBER: builtins.int
FRAGMENT_FIELD_NUMBER: builtins.int
object_type: builtins.str
"""The object type of the query which throws the exception.
If the exception is directly from the main query, it should be an empty string.
Otherwise, it should be the exact object type in upper case. For example, a "VIEW".
"""
object_name: builtins.str
"""The object name of the query which throws the exception.
If the exception is directly from the main query, it should be an empty string.
Otherwise, it should be the object name. For example, a view name "V1".
"""
start_index: builtins.int
"""The starting index in the query text which throws the exception. The index starts from 0."""
stop_index: builtins.int
"""The stopping index in the query which throws the exception. The index starts from 0."""
fragment: builtins.str
"""The corresponding fragment of the query which throws the exception."""
def __init__(
self,
*,
object_type: builtins.str = ...,
object_name: builtins.str = ...,
start_index: builtins.int = ...,
stop_index: builtins.int = ...,
fragment: builtins.str = ...,
) -> None: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"fragment",
b"fragment",
"object_name",
b"object_name",
"object_type",
b"object_type",
"start_index",
b"start_index",
"stop_index",
b"stop_index",
],
) -> None: ...

class SparkThrowable(google.protobuf.message.Message):
"""SparkThrowable defines the schema for SparkThrowable exceptions."""

Expand All @@ -2893,18 +2946,30 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message):

ERROR_CLASS_FIELD_NUMBER: builtins.int
MESSAGE_PARAMETERS_FIELD_NUMBER: builtins.int
QUERY_CONTEXTS_FIELD_NUMBER: builtins.int
error_class: builtins.str
"""Succinct, human-readable, unique, and consistent representation of the error category."""
@property
def message_parameters(
self,
) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
"""message parameters for the error framework."""
"""The message parameters for the error framework."""
@property
def query_contexts(
self,
) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
global___FetchErrorDetailsResponse.QueryContext
]:
"""The query context of a SparkThrowable."""
def __init__(
self,
*,
error_class: builtins.str | None = ...,
message_parameters: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
query_contexts: collections.abc.Iterable[
global___FetchErrorDetailsResponse.QueryContext
]
| None = ...,
) -> None: ...
def HasField(
self,
Expand All @@ -2921,6 +2986,8 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message):
b"error_class",
"message_parameters",
b"message_parameters",
"query_contexts",
b"query_contexts",
],
) -> None: ...
def WhichOneof(
Expand Down

0 comments on commit e720cce

Please sign in to comment.