Skip to content

Commit

Permalink
[SPARK-38475][CORE] Use error class in org.apache.spark.serializer
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR aims to change exceptions created in package org.apache.spark.serializer to use error class.

### Why are the changes needed?
This is to move exceptions created in package org.apache.spark.serializer to error class.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes apache#42243 from bozhang2820/spark-38475.

Lead-authored-by: Bo Zhang <bo.zhang@databricks.com>
Co-authored-by: Bo Zhang <bozhang2820@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
2 people authored and MaxGekk committed Aug 7, 2023
1 parent a3a3291 commit 2a23c7a
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 10 deletions.
21 changes: 21 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,11 @@
"Not found an encoder of the type <typeName> to Spark SQL internal representation. Consider to change the input type to one of supported at '<docroot>/sql-ref-datatypes.html'."
]
},
"ERROR_READING_AVRO_UNKNOWN_FINGERPRINT" : {
"message" : [
"Error reading avro data -- encountered an unknown fingerprint: <fingerprint>, not sure what schema to use. This could happen if you registered additional schemas after starting your spark context."
]
},
"EVENT_TIME_IS_NOT_ON_TIMESTAMP_TYPE" : {
"message" : [
"The event time <eventName> has the invalid type <eventType>, but expected \"TIMESTAMP\"."
Expand Down Expand Up @@ -864,6 +869,11 @@
],
"sqlState" : "22018"
},
"FAILED_REGISTER_CLASS_WITH_KRYO" : {
"message" : [
"Failed to register classes with Kryo."
]
},
"FAILED_RENAME_PATH" : {
"message" : [
"Failed to rename <sourcePath> to <targetPath> as destination already exists."
Expand Down Expand Up @@ -1564,6 +1574,12 @@
],
"sqlState" : "22032"
},
"INVALID_KRYO_SERIALIZER_BUFFER_SIZE" : {
"message" : [
"The value of the config \"<bufferSizeConfKey>\" must be less than 2048 MiB, but got <bufferSizeConfValue> MiB."
],
"sqlState" : "F0000"
},
"INVALID_LAMBDA_FUNCTION_CALL" : {
"message" : [
"Invalid lambda function call."
Expand Down Expand Up @@ -2006,6 +2022,11 @@
"The join condition <joinCondition> has the invalid type <conditionType>, expected \"BOOLEAN\"."
]
},
"KRYO_BUFFER_OVERFLOW" : {
"message" : [
"Kryo serialization failed: <exceptionMsg>. To avoid this, increase \"<bufferSizeConfKey>\" value."
]
},
"LOAD_DATA_PATH_NOT_EXISTS" : {
"message" : [
"LOAD DATA input path does not exist: <path>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ private[serializer] class GenericAvroSerializer[D <: GenericContainer]
case Some(s) => new Schema.Parser().setValidateDefaults(false).parse(s)
case None =>
throw new SparkException(
"Error reading attempting to read avro data -- encountered an unknown " +
s"fingerprint: $fingerprint, not sure what schema to use. This could happen " +
"if you registered additional schemas after starting your spark context.")
errorClass = "ERROR_READING_AVRO_UNKNOWN_FINGERPRINT",
messageParameters = Map("fingerprint" -> fingerprint.toString),
cause = null)
}
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,21 @@ class KryoSerializer(conf: SparkConf)
private val bufferSizeKb = conf.get(KRYO_SERIALIZER_BUFFER_SIZE)

if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) {
throw new IllegalArgumentException(s"${KRYO_SERIALIZER_BUFFER_SIZE.key} must be less than " +
s"2048 MiB, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} MiB.")
throw new SparkIllegalArgumentException(
errorClass = "INVALID_KRYO_SERIALIZER_BUFFER_SIZE",
messageParameters = Map(
"bufferSizeConfKey" -> KRYO_SERIALIZER_BUFFER_SIZE.key,
"bufferSizeConfValue" -> ByteUnit.KiB.toMiB(bufferSizeKb).toString))
}
private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt

val maxBufferSizeMb = conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE).toInt
if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) {
throw new IllegalArgumentException(s"${KRYO_SERIALIZER_MAX_BUFFER_SIZE.key} must be less " +
s"than 2048 MiB, got: $maxBufferSizeMb MiB.")
throw new SparkIllegalArgumentException(
errorClass = "INVALID_KRYO_SERIALIZER_BUFFER_SIZE",
messageParameters = Map(
"bufferSizeConfKey" -> KRYO_SERIALIZER_MAX_BUFFER_SIZE.key,
"bufferSizeConfValue" -> maxBufferSizeMb.toString))
}
private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt

Expand Down Expand Up @@ -183,7 +189,10 @@ class KryoSerializer(conf: SparkConf)
.foreach { reg => reg.registerClasses(kryo) }
} catch {
case e: Exception =>
throw new SparkException(s"Failed to register classes with Kryo", e)
throw new SparkException(
errorClass = "FAILED_REGISTER_CLASS_WITH_KRYO",
messageParameters = Map.empty,
cause = e)
}
}

Expand Down Expand Up @@ -442,8 +451,12 @@ private[spark] class KryoSerializerInstance(
kryo.writeClassAndObject(output, t)
} catch {
case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
s"increase ${KRYO_SERIALIZER_MAX_BUFFER_SIZE.key} value.", e)
throw new SparkException(
errorClass = "KRYO_BUFFER_OVERFLOW",
messageParameters = Map(
"exceptionMsg" -> e.getMessage,
"bufferSizeConfKey" -> KRYO_SERIALIZER_MAX_BUFFER_SIZE.key),
cause = e)
} finally {
releaseKryo(kryo)
}
Expand Down
24 changes: 24 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,12 @@ SQLSTATE: none assigned

Not found an encoder of the type `<typeName>` to Spark SQL internal representation. Consider to change the input type to one of supported at '`<docroot>`/sql-ref-datatypes.html'.

### ERROR_READING_AVRO_UNKNOWN_FINGERPRINT

SQLSTATE: none assigned

Error reading avro data -- encountered an unknown fingerprint: `<fingerprint>`, not sure what schema to use. This could happen if you registered additional schemas after starting your spark context.

### EVENT_TIME_IS_NOT_ON_TIMESTAMP_TYPE

SQLSTATE: none assigned
Expand Down Expand Up @@ -520,6 +526,12 @@ Failed preparing of the function `<funcName>` for call. Please, double check fun

Failed parsing struct: `<raw>`.

### FAILED_REGISTER_CLASS_WITH_KRYO

SQLSTATE: none assigned

Failed to register classes with Kryo.

### FAILED_RENAME_PATH

[SQLSTATE: 42K04](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down Expand Up @@ -972,6 +984,12 @@ Cannot convert JSON root field to target Spark type.

Input schema `<jsonSchema>` can only contain STRING as a key type for a MAP.

### INVALID_KRYO_SERIALIZER_BUFFER_SIZE

SQLSTATE: F0000

The value of the config "`<bufferSizeConfKey>`" must be less than 2048 MiB, but got `<bufferSizeConfValue>` MiB.

### [INVALID_LAMBDA_FUNCTION_CALL](sql-error-conditions-invalid-lambda-function-call-error-class.html)

SQLSTATE: none assigned
Expand Down Expand Up @@ -1163,6 +1181,12 @@ SQLSTATE: none assigned

The join condition `<joinCondition>` has the invalid type `<conditionType>`, expected "BOOLEAN".

### KRYO_BUFFER_OVERFLOW

SQLSTATE: none assigned

Kryo serialization failed: `<exceptionMsg>`. To avoid this, increase "`<bufferSizeConfKey>`" value.

### LOAD_DATA_PATH_NOT_EXISTS

SQLSTATE: none assigned
Expand Down

0 comments on commit 2a23c7a

Please sign in to comment.